1 /** 2 * This is a low-level messaging API upon which more structured or restrictive 3 * APIs may be built. The general idea is that every messageable entity is 4 * represented by a common handle type called a Tid, which allows messages to 5 * be sent to logical threads that are executing in both the current process 6 * and in external processes using the same interface. This is an important 7 * aspect of scalability because it allows the components of a program to be 8 * spread across available resources with few to no changes to the actual 9 * implementation. 10 * 11 * A logical thread is an execution context that has its own stack and which 12 * runs asynchronously to other logical threads. These may be preemptively 13 * scheduled kernel threads, fibers (cooperative user-space threads), or some 14 * other concept with similar behavior. 15 * 16 * The type of concurrency used when logical threads are created is determined 17 * by the Scheduler selected at initialization time. The default behavior is 18 * currently to create a new kernel thread per call to spawn, but other 19 * schedulers are available that multiplex fibers across the main thread or 20 * use some combination of the two approaches. 21 * 22 * Note: 23 * Copied (almost verbatim) from Phobos at commit 3bfccf4f1 (2019-11-27) 24 * Changes are this notice, and the module rename, from `std.concurrency` 25 * to `geod24.concurrency`. 26 * 27 * Copyright: Copyright Sean Kelly 2009 - 2014. 28 * License: <a href="http://www.boost.org/LICENSE_1_0.txt">Boost License 1.0</a>. 29 * Authors: Sean Kelly, Alex Rønne Petersen, Martin Nowak 30 * Source: $(PHOBOSSRC std/concurrency.d) 31 */ 32 module geod24.concurrency; 33 34 public import std.variant; 35 36 import core.atomic; 37 import core.sync.condition; 38 import core.sync.mutex; 39 import core.sync.semaphore; 40 import core.thread; 41 import core.time : MonoTime; 42 import std.range.primitives; 43 import std.traits; 44 import std.algorithm; 45 import std.typecons; 46 import std.container : DList, SList; 47 import std.exception : assumeWontThrow; 48 49 import geod24.RingBuffer; 50 51 private 52 { 53 bool hasLocalAliasing(Types...)() 54 { 55 // Works around "statement is not reachable" 56 bool doesIt = false; 57 static foreach (T; Types) 58 doesIt |= hasLocalAliasingImpl!T; 59 return doesIt; 60 } 61 62 template hasLocalAliasingImpl (T) 63 { 64 import std.typecons : Rebindable; 65 66 static if (is(T : Channel!CT, CT)) 67 immutable bool hasLocalAliasingImpl = hasLocalAliasing!CT; 68 else static if (is(T : Rebindable!R, R)) 69 immutable bool hasLocalAliasingImpl = hasLocalAliasing!R; 70 else static if (is(T == struct)) 71 immutable bool hasLocalAliasingImpl = hasLocalAliasing!(typeof(T.tupleof)); 72 else 73 immutable bool hasLocalAliasingImpl = std.traits.hasUnsharedAliasing!(T); 74 } 75 76 @safe unittest 77 { 78 static struct Container { Channel!int t; Channel!string m; } 79 static assert(!hasLocalAliasing!(Channel!(Channel!int), Channel!int, Container, int)); 80 static assert( hasLocalAliasing!(Channel!(Channel!(int[])))); 81 } 82 83 @safe unittest 84 { 85 /* Issue 20097 */ 86 import std.datetime.systime : SysTime; 87 static struct Container { SysTime time; } 88 static assert(!hasLocalAliasing!(SysTime, Container)); 89 } 90 } 91 92 // Exceptions 93 94 class FiberBlockedException : Exception 95 { 96 this(string msg = "Fiber is blocked") @safe pure nothrow @nogc 97 { 98 super(msg); 99 } 100 } 101 102 103 // Thread Creation 104 105 private FiberScheduler thisSchedulerStorage; 106 107 /*************************************************************************** 108 109 Get current running FiberScheduler 110 111 Returns: 112 Returns a reference to the current scheduler or null no 113 scheduler is running 114 115 TODO: Support for nested schedulers 116 117 ***************************************************************************/ 118 119 public FiberScheduler thisScheduler () nothrow 120 { 121 return thisSchedulerStorage; 122 } 123 124 /*************************************************************************** 125 126 Set current running FiberScheduler 127 128 Params: 129 value = Reference to the current FiberScheduler 130 131 ***************************************************************************/ 132 133 public void thisScheduler (FiberScheduler value) nothrow 134 { 135 thisSchedulerStorage = value; 136 } 137 138 /** 139 * An example Scheduler using Fibers. 140 * 141 * This is an example scheduler that creates a new Fiber per call to spawn 142 * and multiplexes the execution of all fibers within the main thread. 143 */ 144 public class FiberScheduler 145 { 146 /*************************************************************************** 147 148 Default ctor 149 150 Params: 151 max_parked_fibers = Maximum number of parked fibers 152 153 ***************************************************************************/ 154 155 public this (size_t max_parked_fibers = 8) nothrow 156 { 157 this.sem = assumeWontThrow(new Semaphore()); 158 this.blocked_ex = new FiberBlockedException(); 159 this.max_parked = max_parked_fibers; 160 } 161 162 /** 163 * This creates a new Fiber for the supplied op and then starts the 164 * dispatcher. 165 */ 166 void start (void delegate() op) 167 { 168 this.create(op, true); 169 // Make sure the just-created fiber is run first 170 this.dispatch(); 171 } 172 173 /** 174 * This created a new Fiber for the supplied op and adds it to the 175 * dispatch list. 176 */ 177 void spawn (void delegate() op) nothrow 178 { 179 this.create(op); 180 FiberScheduler.yield(); 181 } 182 183 /************************************************************************** 184 185 Schedule a task to be run next time the scheduler yields 186 187 Behave similarly to `spawn`, but instead of running the task 188 immediately, it simply adds it to the queue and continue executing 189 the current task. 190 191 Params: 192 op = Operation to run 193 194 **************************************************************************/ 195 196 public void schedule (void delegate() op) nothrow 197 { 198 this.create(op); 199 } 200 201 /** 202 * If the caller is a scheduled Fiber, this yields execution to another 203 * scheduled Fiber. 204 */ 205 public static void yield () nothrow 206 { 207 // NOTE: It's possible that we should test whether the calling Fiber 208 // is an InfoFiber before yielding, but I think it's reasonable 209 // that any fiber should yield here. 210 if (Fiber.getThis()) 211 Fiber.yield(); 212 } 213 214 /** 215 * If the caller is a scheduled Fiber, this yields execution to another 216 * scheduled Fiber. 217 */ 218 public static void yieldAndThrow (Throwable t) nothrow 219 { 220 // NOTE: It's possible that we should test whether the calling Fiber 221 // is an InfoFiber before yielding, but I think it's reasonable 222 // that any fiber should yield here. 223 if (Fiber.getThis()) 224 Fiber.yieldAndThrow(t); 225 } 226 227 /// Resource type that will be tracked by FiberScheduler 228 protected interface Resource 229 { 230 /// 231 void release () nothrow; 232 } 233 234 /*********************************************************************** 235 236 Add Resource to the Resource list of runnning Fiber 237 238 Param: 239 r = Resource instace 240 241 ***********************************************************************/ 242 243 private void addResource (Resource r, InfoFiber info_fiber = cast(InfoFiber) Fiber.getThis()) nothrow 244 { 245 assert(info_fiber, "Called from outside of an InfoFiber"); 246 info_fiber.resources.insert(r); 247 } 248 249 /*********************************************************************** 250 251 Remove Resource from the Resource list of runnning Fiber 252 253 Param: 254 r = Resource instance 255 256 Returns: 257 Success/failure 258 259 ***********************************************************************/ 260 261 private bool removeResource (Resource r, InfoFiber info_fiber = cast(InfoFiber) Fiber.getThis()) nothrow 262 { 263 assert(info_fiber, "Called from outside of an InfoFiber"); 264 // TODO: For some cases, search is not neccesary. We can just pop the last element 265 return assumeWontThrow(info_fiber.resources.linearRemoveElement(r)); 266 } 267 268 /** 269 * Creates a new Fiber which calls the given delegate. 270 * 271 * Params: 272 * op = The delegate the fiber should call 273 * insert_front = Fiber will be added to the front 274 * of the ready queue to be run first 275 */ 276 protected void create (void delegate() op, bool insert_front = false) nothrow 277 { 278 InfoFiber new_fiber; 279 if (this.parked_count > 0) 280 { 281 new_fiber = this.parked_fibers.front(); 282 new_fiber.reuse(op); 283 284 this.parked_fibers.removeFront(); 285 this.parked_count--; 286 } 287 else 288 { 289 new_fiber = new InfoFiber(op); 290 } 291 292 if (insert_front) 293 this.readyq.insertFront(new_fiber); 294 else 295 this.readyq.insertBack(new_fiber); 296 } 297 298 /** 299 * Fiber which embeds neccessary info for FiberScheduler 300 */ 301 protected static class InfoFiber : Fiber 302 { 303 /// Semaphore reference that this Fiber is blocked on 304 FiberBlocker blocker; 305 306 /// List of Resources held by this Fiber 307 DList!Resource resources; 308 309 this (void delegate() op, size_t sz = 512 * 1024) nothrow 310 { 311 super(op, sz); 312 } 313 314 /*********************************************************************** 315 316 Reset the Fiber to be reused with a new delegate 317 318 Param: 319 op = Delegate 320 321 ***********************************************************************/ 322 323 void reuse (void delegate() op) nothrow 324 { 325 assert(this.state == Fiber.State.TERM, "Can not reuse a non terminated Fiber"); 326 this.blocker = null; 327 this.resources.clear(); 328 this.reset(op); 329 } 330 } 331 332 public final class FiberBlocker 333 { 334 335 /*********************************************************************** 336 337 Associate `FiberBlocker` with the running `Fiber` 338 339 `FiberScheduler` will check to see if the `Fiber` is blocking on a 340 `FiberBlocker` to avoid rescheduling it unnecessarily 341 342 ***********************************************************************/ 343 344 private void registerToInfoFiber (InfoFiber info_fiber = cast(InfoFiber) Fiber.getThis()) nothrow 345 { 346 assert(info_fiber !is null, "This Fiber does not belong to FiberScheduler"); 347 assert(info_fiber.blocker is null, "This Fiber already has a registered FiberBlocker"); 348 info_fiber.blocker = this; 349 350 } 351 352 /*********************************************************************** 353 354 Wait on the blocker with optional timeout 355 356 Params: 357 period = Timeout period 358 359 ***********************************************************************/ 360 361 bool wait (Duration period = Duration.init) nothrow 362 { 363 if (period != Duration.init) 364 this.limit = MonoTime.currTime + period; 365 366 if (this.shouldBlock()) 367 { 368 this.registerToInfoFiber(); 369 FiberScheduler.yieldAndThrow(this.outer.blocked_ex); 370 } 371 372 this.limit = MonoTime.init; 373 this.notified = false; 374 return !this.hasTimedOut(); 375 } 376 377 /*********************************************************************** 378 379 Unblock the Fiber waiting on this blocker 380 381 ***********************************************************************/ 382 383 void notify () nothrow 384 { 385 this.stopTimer(); 386 this.notified = true; 387 assumeWontThrow(this.outer.sem.notify()); 388 } 389 390 /*********************************************************************** 391 392 Query if `FiberBlocker` should still block 393 394 FiberBlocker will block the Fiber until it is notified or 395 the specified timeout is reached. 396 397 ***********************************************************************/ 398 399 bool shouldBlock () nothrow 400 { 401 bool timed_out = (this.limit != MonoTime.init 402 && MonoTime.currTime >= this.limit); 403 404 if (timed_out) 405 cas(&this.timer_state, TimerState.Running, TimerState.TimedOut); 406 407 return atomicLoad(this.timer_state) != TimerState.TimedOut && !this.notified; 408 } 409 410 /*********************************************************************** 411 412 Try freezing the internal timer 413 414 ***********************************************************************/ 415 416 bool stopTimer () nothrow 417 { 418 return cas(&this.timer_state, TimerState.Running, TimerState.Stopped); 419 } 420 421 /*********************************************************************** 422 423 Query if the internal timer has timed out 424 425 ***********************************************************************/ 426 427 bool hasTimedOut () nothrow 428 { 429 return atomicLoad(this.timer_state) == TimerState.TimedOut; 430 } 431 432 MonoTime getTimeout () nothrow 433 { 434 return limit; 435 } 436 437 private: 438 enum TimerState 439 { 440 Running, 441 TimedOut, 442 Stopped 443 } 444 445 /// Time limit that will eventually unblock the caller if a timeout is specified 446 MonoTime limit = MonoTime.init; 447 448 /// State of the blocker 449 bool notified; 450 451 /// State of the internal timer 452 shared(TimerState) timer_state; 453 } 454 455 /*********************************************************************** 456 457 Start the scheduling loop 458 459 ***********************************************************************/ 460 461 private void dispatch () 462 { 463 thisScheduler(this); 464 scope (exit) thisScheduler(null); 465 466 MonoTime earliest_timeout; 467 468 while (!this.readyq.empty() || !this.wait_list.empty()) 469 { 470 while (!readyq.empty()) 471 { 472 InfoFiber cur_fiber = this.readyq.front(); 473 this.readyq.removeFront(); 474 475 assert(cur_fiber.state != Fiber.State.TERM); 476 477 auto t = cur_fiber.call(Fiber.Rethrow.no); 478 479 // Fibers that block on a FiberBlocker throw an 480 // exception for scheduler to catch 481 if (t is this.blocked_ex) 482 { 483 auto cur_timeout = cur_fiber.blocker.getTimeout(); 484 485 // Keep track of the earliest timeout in the system 486 if (cur_timeout != MonoTime.init 487 && (earliest_timeout == MonoTime.init || cur_timeout < earliest_timeout)) 488 { 489 earliest_timeout = cur_timeout; 490 } 491 492 this.wait_list.insert(cur_fiber); 493 continue; 494 } 495 else if (t) 496 { 497 // We are exiting the dispatch loop prematurely, all resources 498 // held by Fibers should be released. 499 this.releaseResources(cur_fiber); 500 throw t; 501 } 502 503 if (cur_fiber.state != Fiber.State.TERM) 504 { 505 this.readyq.insert(cur_fiber); 506 } 507 // Park terminated Fiber 508 else if (this.parked_count < this.max_parked) 509 { 510 this.parked_fibers.insertFront(cur_fiber); 511 this.parked_count++; 512 } 513 // Destroy the terminated Fiber to immediately reclaim 514 // the stack space 515 else 516 { 517 destroy!false(cur_fiber); 518 } 519 520 // See if there are Fibers to be woken up if we reach a timeout 521 // or the scheduler semaphore was notified 522 if (MonoTime.currTime >= earliest_timeout || this.sem.tryWait()) 523 earliest_timeout = wakeFibers(); 524 } 525 526 if (!this.wait_list.empty()) 527 { 528 Duration time_to_timeout = earliest_timeout - MonoTime.currTime; 529 530 // Sleep until a timeout or an event 531 if (earliest_timeout == MonoTime.init) 532 this.sem.wait(); 533 else if (time_to_timeout > 0.seconds) 534 this.sem.wait(time_to_timeout); 535 } 536 537 // OS Thread woke up populate ready queue 538 earliest_timeout = wakeFibers(); 539 } 540 } 541 542 /*********************************************************************** 543 544 Move unblocked Fibers to ready queue 545 546 Return: 547 Returns the earliest timeout left in the waiting list 548 549 ***********************************************************************/ 550 551 private MonoTime wakeFibers() 552 { 553 import std.range; 554 MonoTime earliest_timeout; 555 556 auto wait_range = this.wait_list[]; 557 while (!wait_range.empty) 558 { 559 auto fiber = wait_range.front; 560 561 // Remove the unblocked Fiber from wait list and 562 // append it to the end ofready queue 563 if (!fiber.blocker.shouldBlock()) 564 { 565 this.wait_list.popFirstOf(wait_range); 566 fiber.blocker = null; 567 this.readyq.insert(fiber); 568 } 569 else 570 { 571 auto timeout = fiber.blocker.getTimeout(); 572 if (timeout != MonoTime.init 573 && (earliest_timeout == MonoTime.init || timeout < earliest_timeout)) 574 { 575 earliest_timeout = timeout; 576 } 577 wait_range.popFront(); 578 } 579 } 580 581 return earliest_timeout; 582 } 583 584 /*********************************************************************** 585 586 Release all resources currently held by all Fibers owned by this 587 scheduler 588 589 Param: 590 cur_fiber = Running Fiber 591 592 ***********************************************************************/ 593 594 private void releaseResources (InfoFiber cur_fiber) 595 { 596 foreach (ref resource; cur_fiber.resources) 597 resource.release(); 598 foreach (ref fiber; this.readyq) 599 foreach (ref resource; fiber.resources) 600 resource.release(); 601 foreach (ref fiber; this.wait_list) 602 foreach (ref resource; fiber.resources) 603 resource.release(); 604 } 605 606 private: 607 608 /// OS semaphore for scheduler to sleep on 609 Semaphore sem; 610 611 /// A FIFO Queue of Fibers ready to run 612 DList!InfoFiber readyq; 613 614 /// List of Fibers waiting for an event 615 DList!InfoFiber wait_list; 616 617 /// Cached instance of FiberBlockedException 618 FiberBlockedException blocked_ex; 619 620 /// List of parked fibers to be reused 621 SList!InfoFiber parked_fibers; 622 623 /// Number of currently parked fibers 624 size_t parked_count; 625 626 /// Maximum number of parked fibers 627 immutable size_t max_parked; 628 } 629 630 /// Ensure argument to `start` is run first 631 unittest 632 { 633 { 634 scope sched = new FiberScheduler(); 635 bool startHasRun; 636 sched.spawn(() => assert(startHasRun)); 637 sched.start(() { startHasRun = true; }); 638 } 639 { 640 scope sched = new FiberScheduler(); 641 bool startHasRun; 642 sched.schedule(() => assert(startHasRun)); 643 sched.start(() { startHasRun = true; }); 644 } 645 } 646 647 /*********************************************************************** 648 649 A common interface for objects that can be used in `select()` 650 651 ***********************************************************************/ 652 653 public interface Selectable 654 { 655 /*********************************************************************** 656 657 Try to read/write to the `Selectable` without blocking. If the 658 operation would block, queue and link it with the `sel_state` 659 660 Params: 661 ptr = pointer to the data for the select operation 662 sel_state = SelectState instace of the select call being executed 663 sel_id = id of the select call being executed 664 665 ***********************************************************************/ 666 667 void selectWrite (void* ptr, SelectState sel_state, int sel_id); 668 669 /// Ditto 670 void selectRead (void* ptr, SelectState sel_state, int sel_id); 671 } 672 673 /*********************************************************************** 674 675 An aggregate to hold neccessary information for a select operation 676 677 ***********************************************************************/ 678 679 public struct SelectEntry 680 { 681 /// Reference to a Selectable object 682 Selectable selectable; 683 684 /// Pointer to the select data 685 void* select_data; 686 687 /*********************************************************************** 688 689 Default ctor 690 691 Params: 692 selectable = A selectable interface reference 693 select_data = pointer to the data for the select operation 694 695 ***********************************************************************/ 696 697 this (Selectable selectable, void* select_data) @safe pure nothrow @nogc 698 { 699 this.selectable = selectable; 700 this.select_data = select_data; 701 } 702 } 703 704 /// Consists of the id and result of the select operation that was completed 705 public alias SelectReturn = Tuple!(bool, "success", int, "id"); 706 707 /*********************************************************************** 708 709 Block on multiple `Channel`s 710 711 Only one operation is completed per `select()` call 712 713 Params: 714 read_list = List of `Channel`s to select for read operation 715 write_list = List of `Channel`s to select for write operation 716 timeout = Optional timeout 717 718 Return: 719 Returns success/failure status of the operation and the index 720 of the `Channel` that the operation was carried on. The index is 721 the position of the SelectEntry in `read_list ~ write_list`, ie 722 concatenated lists. 723 724 ***********************************************************************/ 725 726 public SelectReturn select (ref SelectEntry[] read_list, ref SelectEntry[] write_list, 727 Duration timeout = Duration.init) 728 { 729 import std.random : randomShuffle; 730 731 auto ss = new SelectState(thisScheduler().new FiberBlocker()); 732 int sel_id = 0; 733 thisScheduler().addResource(ss); 734 scope (exit) thisScheduler().removeResource(ss); 735 736 read_list = read_list.randomShuffle(); 737 write_list = write_list.randomShuffle(); 738 739 foreach (ref entry; read_list) 740 { 741 if (ss.isConsumed()) 742 break; 743 entry.selectable.selectRead(entry.select_data, ss, sel_id++); 744 } 745 746 foreach (ref entry; write_list) 747 { 748 if (ss.isConsumed()) 749 break; 750 entry.selectable.selectWrite(entry.select_data, ss, sel_id++); 751 } 752 753 if (!ss.blocker.wait(timeout)) 754 return SelectReturn(false, -1); // Timed out 755 756 return SelectReturn(ss.success, ss.id); 757 } 758 759 /*********************************************************************** 760 761 Holds the state of a group of `selectRead`/`selectWrite` calls. 762 Synchronizes peers that will consume those calls, so that only one 763 `selectRead`/`selectWrite` call is completed. 764 765 ***********************************************************************/ 766 767 final private class SelectState : FiberScheduler.Resource 768 { 769 /// Shared blocker object for multiple ChannelQueueEntry objects 770 FiberScheduler.FiberBlocker blocker; 771 772 /*********************************************************************** 773 774 Default constructor 775 776 ***********************************************************************/ 777 778 this (FiberScheduler.FiberBlocker blocker) @safe pure nothrow @nogc 779 { 780 this.blocker = blocker; 781 } 782 783 /*********************************************************************** 784 785 Tries to atomically consume a `SelectState` and sets `id` and `success` 786 fields 787 788 Param: 789 id = ID of the `selectRead`/`selectWrite` call that is consuming 790 this `SelectState` 791 success_in = Success/Failure of the select call 792 793 Return: 794 Returns true if `SelectState` was not already consumed, false otherwise 795 796 ***********************************************************************/ 797 798 bool tryConsume (int id, bool success_in = true) nothrow 799 { 800 if (cas(&this.consumed, false, true)) 801 { 802 this.id = id; 803 this.success = success_in; 804 return true; 805 } 806 return false; 807 } 808 809 /*********************************************************************** 810 811 Returns if `SelectState` is already consumed or not 812 813 ***********************************************************************/ 814 815 bool isConsumed () nothrow 816 { 817 return atomicLoad(this.consumed); 818 } 819 820 /*********************************************************************** 821 822 Consume SelectState so that it is neutralized 823 824 ***********************************************************************/ 825 826 void release () nothrow 827 { 828 this.tryConsume(-1, false); 829 } 830 831 /// ID of the select call that consumed this `SelectState` 832 int id; 833 834 /// Success/failure state of the select call with ID of `id` 835 bool success; 836 837 private: 838 /// Indicates if this `SelectState` is consumed or not 839 shared(bool) consumed; 840 } 841 842 /*********************************************************************** 843 844 A golang style channel implementation with buffered and unbuffered 845 operation modes 846 847 Intended to be used between Fibers 848 849 Param: 850 T = Type of the messages carried accross the `Channel`. Currently 851 all reference and values types are supported. 852 853 ***********************************************************************/ 854 855 final public class Channel (T) : Selectable 856 { 857 /*********************************************************************** 858 859 Constructs a Channel 860 861 Param: 862 max_size = Maximum amount of T a Channel can buffer 863 (0 -> Unbuffered operation, 864 Positive integer -> Buffered operation) 865 866 ***********************************************************************/ 867 868 this (ulong max_size = 0) nothrow 869 { 870 this.max_size = max_size; 871 this.lock = new FiberMutex; 872 if (max_size) 873 this.buffer = new RingBuffer!T(max_size); 874 } 875 876 /*********************************************************************** 877 878 Write a message to the `Channel` with an optional timeout 879 880 Unbuffered mode: 881 882 If a reader is already blocked on the `Channel`, writer copies the 883 message to reader's buffer and wakes up the reader by yielding 884 885 If no reader is ready in the wait queue, writer appends itself 886 to write wait queue and blocks 887 888 Buffered mode: 889 890 If a reader is already blocked on the `Channel`, writer copies the 891 message to reader's buffer and wakes up the reader and returns 892 immediately. 893 894 If the buffer is not full writer puts the message in the `Channel` 895 buffer and returns immediately 896 897 If buffer is full writer appends itself to write wait queue and blocks 898 899 If `Channel` is closed, it returns immediately with a failure, 900 regardless of the operation mode 901 902 Param: 903 val = Message to write to `Channel` 904 duration = Timeout duration 905 906 Returns: 907 Success/Failure - Fails when `Channel` is closed or timeout is reached. 908 909 ***********************************************************************/ 910 911 bool write () (auto ref T val, Duration duration = Duration.init) nothrow 912 { 913 this.lock.lock_nothrow(); 914 915 bool success = tryWrite(val); 916 917 if (!success && !this.isClosed()) 918 { 919 ChannelQueueEntry q_ent = this.enqueueEntry(this.writeq, &val); 920 thisScheduler().addResource(q_ent); 921 scope (exit) thisScheduler().removeResource(q_ent); 922 923 this.lock.unlock_nothrow(); 924 return q_ent.blocker.wait(duration) && q_ent.success; 925 } 926 927 this.lock.unlock_nothrow(); 928 return success; 929 } 930 931 /*********************************************************************** 932 933 Try to write a message to `Channel` without blocking 934 935 tryWrite writes a message if it is possible to do so without blocking. 936 937 If the tryWrite is being executed in a select call, it tries to consume the 938 `caller_sel` with the given `caller_sel_id`. It only proceeds if it can 939 successfully consume the `caller_sel` 940 941 Param: 942 val = Message to write to `Channel` 943 caller_sel = SelectState instace of the select call being executed 944 caller_sel_id = id of the select call being executed 945 946 Returns: 947 Success/Failure 948 949 ***********************************************************************/ 950 951 private bool tryWrite () (auto ref T val, SelectState caller_sel = null, int caller_sel_id = 0) nothrow 952 { 953 if (this.isClosed()) 954 { 955 if (caller_sel) 956 caller_sel.tryConsume(caller_sel_id, false); 957 return false; 958 } 959 960 if (ChannelQueueEntry readq_ent = this.dequeueEntry(this.readq, caller_sel, caller_sel_id)) 961 { 962 *readq_ent.pVal = val; 963 readq_ent.blocker.notify(); 964 return true; 965 } 966 967 if (this.max_size > 0 // this.max_size > 0 = buffered 968 && this.buffer.length < this.max_size 969 && (!caller_sel || caller_sel.tryConsume(caller_sel_id))) 970 { 971 this.buffer.insert(val); 972 return true; 973 } 974 975 return false; 976 } 977 978 /*********************************************************************** 979 980 Try to write a message to Channel without blocking and if it fails, 981 create a write queue entry using the given `sel_state` and `sel_id` 982 983 Param: 984 ptr = Message to write to channel 985 sel_state = SelectState instace of the select call being executed 986 sel_id = id of the select call being executed 987 988 ***********************************************************************/ 989 990 void selectWrite (void* ptr, SelectState sel_state, int sel_id) nothrow 991 { 992 assert(ptr !is null); 993 assert(sel_state !is null); 994 T* val = cast(T*) ptr; 995 996 this.lock.lock_nothrow(); 997 998 bool success = tryWrite(*val, sel_state, sel_id); 999 1000 if (!sel_state.isConsumed()) 1001 this.enqueueEntry(this.writeq, val, sel_state, sel_id); 1002 1003 if (success || this.isClosed() || sel_state.id == -1) 1004 { 1005 this.lock.unlock_nothrow(); 1006 sel_state.blocker.notify(); 1007 } 1008 else 1009 this.lock.unlock_nothrow(); 1010 } 1011 1012 /*********************************************************************** 1013 1014 Read a message from the Channel with an optional timeout 1015 1016 Unbuffered mode: 1017 1018 If a writer is already blocked on the Channel, reader copies the 1019 value to `output` and wakes up the writer by yielding 1020 1021 If no writer is ready in the wait queue, reader appends itself 1022 to read wait queue and blocks 1023 1024 If channel is closed, it returns immediatly with a failure 1025 1026 Buffered mode: 1027 1028 If there are existing messages in the buffer, reader pops one off 1029 the buffer and returns immediatly with success, regardless of the 1030 Channel being closed or not 1031 1032 If there are no messages in the buffer it behaves exactly like the 1033 unbuffered operation 1034 1035 Param: 1036 output = Reference to output variable 1037 duration = Timeout duration 1038 1039 Returns: 1040 Success/Failure - Fails when channel is closed and there are 1041 no existing messages to be read. Fails when timeout is reached. 1042 1043 ***********************************************************************/ 1044 1045 bool read (ref T output, Duration duration = Duration.init) nothrow 1046 { 1047 this.lock.lock_nothrow(); 1048 1049 bool success = tryRead(output); 1050 1051 if (!success && !this.isClosed()) 1052 { 1053 ChannelQueueEntry q_ent = this.enqueueEntry(this.readq, &output); 1054 thisScheduler().addResource(q_ent); 1055 scope (exit) thisScheduler().removeResource(q_ent); 1056 1057 this.lock.unlock_nothrow(); 1058 return q_ent.blocker.wait(duration) && q_ent.success; 1059 } 1060 1061 this.lock.unlock_nothrow(); 1062 return success; 1063 } 1064 1065 /*********************************************************************** 1066 1067 Try to read a message from Channel without blocking 1068 1069 tryRead reads a message if it is possible to do so without blocking. 1070 1071 If the tryRead is being executed in a select call, it tries to consume the 1072 `caller_sel` with the given `caller_sel_id`. It only proceeds if it can 1073 successfully consume the `caller_sel` 1074 1075 Param: 1076 output = Field to write the message to 1077 caller_sel = SelectState instace of the select call being executed 1078 caller_sel_id = id of the select call being executed\ 1079 1080 Returns: 1081 Success/Failure 1082 1083 ***********************************************************************/ 1084 1085 private bool tryRead (ref T output, SelectState caller_sel = null, int caller_sel_id = 0) nothrow 1086 { 1087 ChannelQueueEntry write_ent = this.dequeueEntry(this.writeq, caller_sel, caller_sel_id); 1088 1089 if (this.max_size > 0 && !this.buffer.empty()) 1090 { 1091 // if dequeueEntry fails, we will try to consume caller_sel again. 1092 if (!caller_sel || write_ent || caller_sel.tryConsume(caller_sel_id)) 1093 { 1094 output = this.buffer.front(); 1095 this.buffer.popFront(); 1096 1097 if (write_ent) 1098 { 1099 this.buffer.insert(*write_ent.pVal); 1100 } 1101 } 1102 else 1103 { 1104 return false; 1105 } 1106 } 1107 // if dequeueEntry returns a valid entry, it always successfully consumes the related select states. 1108 // the race between 2 select calls is resolved in dequeueEntry. 1109 else if (write_ent) 1110 { 1111 output = *write_ent.pVal; 1112 } 1113 else 1114 { 1115 if (this.isClosed() && caller_sel) 1116 caller_sel.tryConsume(caller_sel_id, false); 1117 return false; 1118 } 1119 1120 if (write_ent) 1121 write_ent.blocker.notify(); 1122 return true; 1123 } 1124 1125 /*********************************************************************** 1126 1127 Try to read a message from Channel without blocking and if it fails, 1128 create a read queue entry using the given `sel_state` and `sel_id` 1129 1130 Param: 1131 ptr = Buffer to write the message to 1132 sel_state = SelectState instace of the select call being executed 1133 sel_id = id of the select call being executed 1134 1135 ***********************************************************************/ 1136 1137 void selectRead (void* ptr, SelectState sel_state, int sel_id) nothrow 1138 { 1139 assert(ptr !is null); 1140 assert(sel_state !is null); 1141 T* val = cast(T*) ptr; 1142 1143 this.lock.lock_nothrow(); 1144 1145 bool success = tryRead(*val, sel_state, sel_id); 1146 1147 if (!sel_state.isConsumed()) 1148 this.enqueueEntry(this.readq, val, sel_state, sel_id); 1149 1150 if (success || this.isClosed() || sel_state.id == -1) 1151 { 1152 this.lock.unlock_nothrow(); 1153 sel_state.blocker.notify(); 1154 } 1155 else 1156 this.lock.unlock_nothrow(); 1157 } 1158 1159 /*********************************************************************** 1160 1161 Close the channel 1162 1163 Closes the channel by marking it closed and flushing all the wait 1164 queues 1165 1166 ***********************************************************************/ 1167 1168 void close () nothrow 1169 { 1170 if (cas(&this.closed, false, true)) 1171 { 1172 this.lock.lock_nothrow(); 1173 scope (exit) this.lock.unlock_nothrow(); 1174 1175 // Wake blocked Fibers up, report the failure 1176 foreach (ref entry; this.readq) 1177 { 1178 entry.terminate(); 1179 } 1180 foreach (ref entry; this.writeq) 1181 { 1182 entry.terminate(); 1183 } 1184 1185 this.readq.clear(); 1186 this.writeq.clear(); 1187 } 1188 } 1189 1190 /*********************************************************************** 1191 1192 Return the length of the internal buffer 1193 1194 ***********************************************************************/ 1195 1196 size_t length () nothrow 1197 { 1198 this.lock.lock_nothrow(); 1199 scope (exit) this.lock.unlock_nothrow(); 1200 return this.buffer.length; 1201 } 1202 1203 /*********************************************************************** 1204 1205 Return the closed status of the `Channel` 1206 1207 ***********************************************************************/ 1208 1209 bool isClosed () const @safe pure nothrow @nogc scope 1210 { 1211 return atomicLoad(this.closed); 1212 } 1213 1214 /*********************************************************************** 1215 1216 An aggrate of neccessary information to block a Fiber and record 1217 their request 1218 1219 ***********************************************************************/ 1220 1221 private static class ChannelQueueEntry : FiberScheduler.Resource 1222 { 1223 /// FiberBlocker blocking the `Fiber` 1224 FiberScheduler.FiberBlocker blocker; 1225 1226 /// Pointer to the variable that we will read to/from 1227 T* pVal; 1228 1229 /// Result of the blocking read/write call 1230 bool success = true; 1231 1232 /// State of the select call that this entry was created for 1233 SelectState select_state; 1234 1235 /// Id of the select call that this entry was created for 1236 int sel_id; 1237 1238 this (T* pVal, SelectState select_state = null, int sel_id = 0) nothrow 1239 { 1240 this.pVal = pVal; 1241 this.select_state = select_state; 1242 this.sel_id = sel_id; 1243 1244 if (this.select_state) 1245 this.blocker = this.select_state.blocker; 1246 else 1247 this.blocker = thisScheduler().new FiberBlocker(); 1248 } 1249 1250 /*********************************************************************** 1251 1252 Terminate a `ChannelQueueEntry` by waking up the blocked Fiber 1253 and reporting the failure 1254 1255 This is called on all the `ChannelQueueEntry` instances still in 1256 the wait queues when Channel is closed 1257 1258 ***********************************************************************/ 1259 1260 void terminate () nothrow 1261 { 1262 this.success = false; 1263 if (!this.select_state || this.select_state.tryConsume(this.sel_id, this.success)) 1264 this.blocker.notify(); 1265 } 1266 1267 1268 /*********************************************************************** 1269 1270 Terminate ChannelQueueEntry so that it is neutralized 1271 1272 ***********************************************************************/ 1273 1274 void release () nothrow 1275 { 1276 if (this.blocker.stopTimer()) 1277 this.pVal = null; // Sanitize pVal so that we can catch illegal accesses 1278 } 1279 } 1280 1281 /*********************************************************************** 1282 1283 Create and enqueue a `ChannelQueueEntry` to the given entryq 1284 1285 Param: 1286 entryq = Queue to append the new ChannelQueueEntry 1287 pVal = Pointer to the message buffer 1288 sel_state = SelectState object to associate with the 1289 newly created ChannelQueueEntry 1290 sel_id = id of the select call creating the new ChannelQueueEntry 1291 1292 Return: 1293 newly created ChannelQueueEntry 1294 1295 ***********************************************************************/ 1296 1297 private ChannelQueueEntry enqueueEntry (ref DList!ChannelQueueEntry entryq, T* pVal, 1298 SelectState sel_state = null, int sel_id = 0) nothrow 1299 { 1300 assert(pVal !is null); 1301 1302 ChannelQueueEntry q_ent = new ChannelQueueEntry(pVal, sel_state, sel_id); 1303 entryq.insert(q_ent); 1304 1305 return q_ent; 1306 } 1307 1308 /*********************************************************************** 1309 1310 Dequeue a `ChannelQueueEntry` from the given `entryq` 1311 1312 Walks the `entryq` until it finds a suitable entry or the queue 1313 empties. If `dequeueEntry` is called from a select, it tries to 1314 consume the `caller_sel` if the `peer_sel` is not currently consumed. 1315 If it fails to consume the `caller_sel`, returns with a failure. 1316 1317 If selected queue entry is part of a select operation, it is also 1318 consumed. If it consumes `caller_sel` but `peer_sel` was already 1319 consumed whole select operation would fail and caller would need to try 1320 again. This should be a rare case, where the `peer_sel` gets consumed by 1321 someone else between the first if check which verifies that it is not 1322 consumed and the point we actually try to consume it. 1323 1324 Param: 1325 entryq = Queue to append the new ChannelQueueEntry 1326 caller_sel = SelectState instace of the select call being executed 1327 caller_sel_id = id of the select call being executed 1328 1329 Return: 1330 a valid ChannelQueueEntry or null 1331 1332 ***********************************************************************/ 1333 1334 private ChannelQueueEntry dequeueEntry (ref DList!ChannelQueueEntry entryq, 1335 SelectState caller_sel = null, int caller_sel_id = 0) nothrow 1336 { 1337 while (!entryq.empty()) 1338 { 1339 ChannelQueueEntry qent = entryq.front(); 1340 auto peer_sel = qent.select_state; 1341 1342 if ((!peer_sel || !peer_sel.isConsumed()) && qent.blocker.shouldBlock()) 1343 { 1344 // If we are in a select call, try to consume the caller select 1345 // if we can't consume the caller select, no need to continue 1346 if (caller_sel && !caller_sel.tryConsume(caller_sel_id)) 1347 { 1348 return null; 1349 } 1350 1351 // At this point, caller select is consumed. 1352 // Try to consume the peer select if it exists 1353 // If peer_sel was consumed by someone else, tough luck 1354 // In that case, whole select will fail since we consumed the caller_sel 1355 if ((!peer_sel || peer_sel.tryConsume(qent.sel_id)) && qent.blocker.stopTimer()) 1356 { 1357 entryq.removeFront(); 1358 return qent; 1359 } 1360 else if (caller_sel) 1361 { 1362 // Mark caller_sel failed 1363 caller_sel.id = -1; 1364 caller_sel.success = false; 1365 return null; 1366 } 1367 } 1368 1369 entryq.removeFront(); 1370 } 1371 1372 return null; 1373 } 1374 1375 private: 1376 /// Internal data storage 1377 RingBuffer!T buffer; 1378 1379 /// Closed flag 1380 bool closed; 1381 1382 /// Per channel lock 1383 FiberMutex lock; 1384 1385 /// List of fibers blocked on read() 1386 DList!ChannelQueueEntry readq; 1387 1388 /// List of fibers blocked on write() 1389 DList!ChannelQueueEntry writeq; 1390 1391 public: 1392 /// Maximum amount of T a Channel can buffer 1393 immutable ulong max_size; 1394 } 1395 1396 // Test non blocking operation 1397 @system unittest 1398 { 1399 string str = "DEADBEEF"; 1400 string rcv_str; 1401 FiberScheduler scheduler = new FiberScheduler; 1402 auto chn = new Channel!string(2); 1403 1404 scheduler.start({ 1405 chn.write(str ~ " 1"); 1406 assert(chn.length() == 1); 1407 chn.write(str ~ " 2"); 1408 assert(chn.length() == 2); 1409 1410 assert(chn.read(rcv_str)); 1411 assert(rcv_str == str ~ " 1"); 1412 assert(chn.length() == 1); 1413 1414 chn.write(str ~ " 3"); 1415 assert(chn.length() == 2); 1416 1417 assert(chn.read(rcv_str)); 1418 assert(rcv_str == str ~ " 2"); 1419 assert(chn.length() == 1); 1420 1421 assert(chn.read(rcv_str)); 1422 assert(rcv_str == str ~ " 3"); 1423 assert(chn.length() == 0); 1424 }); 1425 } 1426 1427 // Test unbuffered blocking operation with multiple receivers 1428 // Receiver should read every message in the order they were sent 1429 @system unittest 1430 { 1431 FiberScheduler scheduler = new FiberScheduler; 1432 auto chn = new Channel!int(); 1433 int n = 1000; 1434 long sum; 1435 1436 scheduler.spawn( 1437 () { 1438 int val, prev; 1439 bool ret = chn.read(prev); 1440 sum += prev; 1441 assert(ret); 1442 1443 while (chn.read(val)) 1444 { 1445 sum += val; 1446 assert(ret); 1447 assert(prev < val); 1448 prev = val; 1449 } 1450 } 1451 ); 1452 1453 scheduler.spawn( 1454 () { 1455 int val, prev; 1456 bool ret = chn.read(prev); 1457 sum += prev; 1458 assert(ret); 1459 1460 while (chn.read(val)) 1461 { 1462 sum += val; 1463 assert(ret); 1464 assert(prev < val); 1465 prev = val; 1466 } 1467 } 1468 ); 1469 1470 scheduler.start( 1471 () { 1472 for (int i = 0; i <= n; i++) 1473 { 1474 assert(chn.write(i)); 1475 } 1476 chn.close(); 1477 1478 assert(!chn.write(0)); 1479 } 1480 ); 1481 1482 // Sum of [0..1000] 1483 assert(sum == n*(n+1)/2); 1484 } 1485 1486 // Test that writer is not blocked until buffer is full and a read unblocks the writer 1487 // Reader should be able to read remaining messages after chn is closed 1488 @system unittest 1489 { 1490 FiberScheduler scheduler = new FiberScheduler; 1491 auto chn = new Channel!int(5); 1492 1493 scheduler.spawn( 1494 () { 1495 int val; 1496 assert(chn.max_size == chn.length); 1497 assert(chn.read(val)); 1498 chn.close(); 1499 // Read remaining messages after channel is closed 1500 for (int i = 0; i < chn.max_size; i++) 1501 { 1502 assert(chn.read(val)); 1503 } 1504 // No more messages to read on closed chn 1505 assert(!chn.read(val)); 1506 } 1507 ); 1508 1509 scheduler.start( 1510 () { 1511 for (int i = 0; i < chn.max_size; i++) 1512 { 1513 assert(chn.write(i)); 1514 } 1515 assert(chn.max_size == chn.length); 1516 // Wait for read. 1517 assert(chn.write(42)); 1518 // Reader already closed the channel 1519 assert(!chn.write(0)); 1520 } 1521 ); 1522 } 1523 1524 @system unittest 1525 { 1526 struct HyperLoopMessage 1527 { 1528 int id; 1529 MonoTime time; 1530 } 1531 1532 FiberScheduler scheduler = new FiberScheduler; 1533 auto chn1 = new Channel!HyperLoopMessage(); 1534 auto chn2 = new Channel!HyperLoopMessage(); 1535 auto chn3 = new Channel!HyperLoopMessage(); 1536 1537 scheduler.spawn( 1538 () { 1539 HyperLoopMessage msg; 1540 1541 for (int i = 0; i < 1000; ++i) 1542 { 1543 assert(chn2.read(msg)); 1544 assert(msg.id % 3 == 1); 1545 msg.id++; 1546 msg.time = MonoTime.currTime; 1547 assert(chn3.write(msg)); 1548 } 1549 } 1550 ); 1551 1552 scheduler.spawn( 1553 () { 1554 HyperLoopMessage msg; 1555 1556 for (int i = 0; i < 1000; ++i) 1557 { 1558 assert(chn1.read(msg)); 1559 assert(msg.id % 3 == 0); 1560 msg.id++; 1561 msg.time = MonoTime.currTime; 1562 assert(chn2.write(msg)); 1563 } 1564 } 1565 ); 1566 1567 scheduler.start( 1568 () { 1569 HyperLoopMessage msg = { 1570 id : 0, 1571 time : MonoTime.currTime 1572 }; 1573 1574 for (int i = 0; i < 1000; ++i) 1575 { 1576 assert(chn1.write(msg)); 1577 assert(chn3.read(msg)); 1578 assert(msg.id % 3 == 2); 1579 msg.id++; 1580 msg.time = MonoTime.currTime; 1581 } 1582 } 1583 ); 1584 } 1585 1586 // Multiple writer threads writing to a buffered channel 1587 // Reader should receive all messages 1588 @system unittest 1589 { 1590 FiberScheduler scheduler = new FiberScheduler; 1591 immutable int n = 5000; 1592 auto chn1 = new Channel!int(n/10); 1593 1594 shared int sharedVal = 0; 1595 shared int writer_sum = 0; 1596 1597 auto t1 = new Thread({ 1598 FiberScheduler scheduler = new FiberScheduler(); 1599 scheduler.start( 1600 () { 1601 int val = atomicOp!"+="(sharedVal, 1); 1602 while (chn1.write(val)) 1603 { 1604 atomicOp!"+="(writer_sum, val); 1605 val = atomicOp!"+="(sharedVal, 1); 1606 } 1607 } 1608 ); 1609 }); 1610 t1.start(); 1611 1612 auto t2 = new Thread({ 1613 FiberScheduler scheduler = new FiberScheduler(); 1614 scheduler.start( 1615 () { 1616 int val = atomicOp!"+="(sharedVal, 1); 1617 while (chn1.write(val)) 1618 { 1619 atomicOp!"+="(writer_sum, val); 1620 val = atomicOp!"+="(sharedVal, 1); 1621 } 1622 } 1623 ); 1624 }); 1625 t2.start(); 1626 1627 scheduler.start( 1628 () { 1629 int reader_sum, readVal, count; 1630 1631 while(chn1.read(readVal)) 1632 { 1633 reader_sum += readVal; 1634 if (count++ == n) chn1.close(); 1635 } 1636 1637 thread_joinAll(); 1638 assert(reader_sum == writer_sum); 1639 } 1640 ); 1641 } 1642 1643 @system unittest 1644 { 1645 FiberScheduler scheduler = new FiberScheduler; 1646 auto chn1 = new Channel!int(); 1647 auto chn2 = new Channel!int(1); 1648 auto chn3 = new Channel!int(); 1649 1650 scheduler.spawn( 1651 () { 1652 chn1.write(42); 1653 chn1.close(); 1654 chn3.write(37); 1655 } 1656 ); 1657 1658 scheduler.spawn( 1659 () { 1660 chn2.write(44); 1661 chn2.close(); 1662 } 1663 ); 1664 1665 scheduler.start( 1666 () { 1667 bool[3] chn_closed; 1668 int[3] read_val; 1669 for (int i = 0; i < 5; ++i) 1670 { 1671 SelectEntry[] read_list; 1672 SelectEntry[] write_list; 1673 1674 if (!chn_closed[0]) 1675 read_list ~= SelectEntry(chn1, &read_val[0]); 1676 if (!chn_closed[1]) 1677 read_list ~= SelectEntry(chn2, &read_val[1]); 1678 read_list ~= SelectEntry(chn3, &read_val[2]); 1679 1680 auto select_return = select(read_list, write_list); 1681 1682 if (!select_return.success) 1683 { 1684 if (!chn_closed[0]) chn_closed[0] = true; 1685 else if (!chn_closed[1]) chn_closed[1] = true; 1686 else chn_closed[2] = true; 1687 } 1688 } 1689 assert(read_val[0] == 42 && read_val[1] == 44 && read_val[2] == 37); 1690 assert(chn_closed[0] && chn_closed[1] && !chn_closed[2]); 1691 } 1692 ); 1693 } 1694 1695 @system unittest 1696 { 1697 FiberScheduler scheduler = new FiberScheduler; 1698 1699 auto chn1 = new Channel!int(20); 1700 auto chn2 = new Channel!int(); 1701 auto chn3 = new Channel!int(20); 1702 auto chn4 = new Channel!int(); 1703 1704 void thread_func (T) (ref T write_chn, ref T read_chn, int _tid) 1705 { 1706 FiberScheduler scheduler = new FiberScheduler; 1707 int read_val, write_val; 1708 int prev_read = -1; 1709 int n = 10000; 1710 1711 scheduler.start( 1712 () { 1713 while(read_val < n || write_val <= n) 1714 { 1715 int a; 1716 SelectEntry[] read_list; 1717 SelectEntry[] write_list; 1718 1719 if (write_val <= n) 1720 write_list ~= SelectEntry(write_chn, &write_val); 1721 1722 if (read_val < n) 1723 read_list ~= SelectEntry(read_chn, &read_val); 1724 1725 auto select_return = select(read_list, write_list); 1726 1727 if (select_return.success) 1728 { 1729 if (read_list.length > 0 && select_return.id == 0) 1730 { 1731 assert(prev_read + 1 == read_val); 1732 prev_read = read_val; 1733 } 1734 else 1735 { 1736 write_val++; 1737 } 1738 } 1739 } 1740 } 1741 ); 1742 } 1743 1744 auto t1 = new Thread({ 1745 thread_func(chn1, chn2, 0); 1746 }); 1747 t1.start(); 1748 1749 auto t2 = new Thread({ 1750 thread_func(chn2, chn3, 1); 1751 }); 1752 t2.start(); 1753 1754 auto t3 = new Thread({ 1755 thread_func(chn3, chn4, 2); 1756 }); 1757 t3.start(); 1758 1759 thread_func(chn4, chn1, 3); 1760 1761 thread_joinAll(); 1762 } 1763 1764 @system unittest 1765 { 1766 FiberScheduler scheduler = new FiberScheduler; 1767 auto chn1 = new Channel!int(); 1768 auto chn2 = new Channel!int(); 1769 1770 scheduler.spawn( 1771 () { 1772 FiberScheduler.yield(); 1773 chn1.close(); 1774 } 1775 ); 1776 1777 scheduler.spawn( 1778 () { 1779 FiberScheduler.yield(); 1780 chn2.close(); 1781 } 1782 ); 1783 1784 scheduler.spawn( 1785 () { 1786 for (int i = 0; i < 2; ++i) 1787 { 1788 int write_val = 42; 1789 SelectEntry[] read_list; 1790 SelectEntry[] write_list; 1791 write_list ~= SelectEntry(chn1, &write_val); 1792 auto select_return = select(read_list, write_list); 1793 1794 assert(select_return.id == 0); 1795 assert(!select_return.success); 1796 } 1797 } 1798 ); 1799 1800 scheduler.start( 1801 () { 1802 for (int i = 0; i < 2; ++i) 1803 { 1804 int read_val; 1805 SelectEntry[] read_list; 1806 SelectEntry[] write_list; 1807 read_list ~= SelectEntry(chn2, &read_val); 1808 auto select_return = select(read_list, write_list); 1809 1810 assert(select_return.id == 0); 1811 assert(!select_return.success); 1812 } 1813 } 1814 ); 1815 } 1816 1817 @system unittest 1818 { 1819 import core.sync.semaphore; 1820 FiberScheduler scheduler = new FiberScheduler; 1821 auto chn1 = new Channel!int(); 1822 1823 auto start = MonoTime.currTime; 1824 Semaphore writer_sem = new Semaphore(); 1825 Semaphore reader_sem = new Semaphore(); 1826 1827 auto t1 = new Thread({ 1828 FiberScheduler scheduler = new FiberScheduler; 1829 scheduler.start( 1830 () { 1831 writer_sem.wait(); 1832 assert(chn1.write(42)); 1833 } 1834 ); 1835 }); 1836 t1.start(); 1837 1838 auto t2 = new Thread({ 1839 FiberScheduler scheduler = new FiberScheduler; 1840 scheduler.start( 1841 () { 1842 int read_val; 1843 reader_sem.wait(); 1844 assert(chn1.read(read_val)); 1845 assert(read_val == 43); 1846 } 1847 ); 1848 }); 1849 t2.start(); 1850 1851 scheduler.start( 1852 () { 1853 int read_val; 1854 1855 scope (failure) { 1856 reader_sem.notify(); 1857 writer_sem.notify(); 1858 chn1.close(); 1859 } 1860 1861 assert(!chn1.read(read_val, 1000.msecs)); 1862 assert(MonoTime.currTime - start >= 1000.msecs); 1863 1864 writer_sem.notify(); 1865 assert(chn1.read(read_val, 1000.msecs)); 1866 assert(read_val == 42); 1867 1868 start = MonoTime.currTime; 1869 1870 assert(!chn1.write(read_val + 1, 1000.msecs)); 1871 assert(MonoTime.currTime - start >= 1000.msecs); 1872 1873 reader_sem.notify(); 1874 assert(chn1.write(read_val + 1, 1000.msecs)); 1875 } 1876 ); 1877 1878 thread_joinAll(); 1879 } 1880 1881 /// A simple spinlock 1882 struct SpinLock 1883 { 1884 /// Spin until lock is free 1885 void lock() nothrow 1886 { 1887 while (!cas(&locked, false, true)) { } 1888 } 1889 1890 /// Atomically unlock 1891 void unlock() nothrow 1892 { 1893 atomicStore!(MemoryOrder.rel)(locked, false); 1894 } 1895 1896 /// Lock state 1897 private shared(bool) locked; 1898 } 1899 1900 /// A Fiber level Semaphore 1901 class FiberSemaphore 1902 { 1903 1904 /*********************************************************************** 1905 1906 Ctor 1907 1908 Param: 1909 count = Initial count of FiberSemaphore 1910 1911 ************************************************************************/ 1912 1913 this (size_t count = 0) nothrow 1914 { 1915 this.count = count; 1916 } 1917 1918 /*********************************************************************** 1919 1920 Wait for FiberSemaphore count to be greater than 0 1921 1922 ************************************************************************/ 1923 1924 void wait () nothrow 1925 { 1926 this.slock.lock(); 1927 1928 if (this.count > 0) 1929 { 1930 this.count--; 1931 this.slock.unlock(); 1932 return; 1933 } 1934 auto entry = new SemaphoreQueueEntry(); 1935 thisScheduler().addResource(entry); 1936 scope (exit) thisScheduler().removeResource(entry); 1937 this.queue.insert(entry); 1938 1939 this.slock.unlock(); 1940 entry.blocker.wait(); 1941 } 1942 1943 /*********************************************************************** 1944 1945 Increment the FiberSemaphore count 1946 1947 ************************************************************************/ 1948 1949 void notify () nothrow 1950 { 1951 this.slock.lock(); 1952 if (auto entry = this.dequeueEntry()) 1953 entry.blocker.notify(); 1954 else 1955 this.count++; 1956 this.slock.unlock(); 1957 } 1958 1959 /// 1960 private class SemaphoreQueueEntry : FiberScheduler.Resource 1961 { 1962 1963 /// 1964 this () nothrow 1965 { 1966 assert(thisScheduler(), "Can not block with no FiberScheduler running!"); 1967 this.blocker = thisScheduler(). new FiberBlocker(); 1968 } 1969 1970 /*********************************************************************** 1971 1972 Terminate SemaphoreQueueEntry so that it is neutralized 1973 1974 ************************************************************************/ 1975 1976 void release () nothrow 1977 { 1978 if (!blocker.stopTimer()) 1979 this.outer.notify(); 1980 } 1981 1982 /// FiberBlocker blocking the `Fiber` 1983 FiberScheduler.FiberBlocker blocker; 1984 } 1985 1986 /*********************************************************************** 1987 1988 Dequeue a `SemaphoreQueueEntry` from the waiting queue 1989 1990 Return: 1991 a valid SemaphoreQueueEntry or null 1992 1993 ***********************************************************************/ 1994 1995 private SemaphoreQueueEntry dequeueEntry () nothrow 1996 { 1997 while (!this.queue.empty()) 1998 { 1999 auto entry = this.queue.front(); 2000 this.queue.removeFront(); 2001 if (entry.blocker.shouldBlock() && entry.blocker.stopTimer()) 2002 { 2003 return entry; 2004 } 2005 } 2006 return null; 2007 } 2008 2009 /// 2010 private SpinLock slock; 2011 2012 /// Waiting queue for Fibers 2013 private DList!SemaphoreQueueEntry queue; 2014 2015 /// Current semaphore count 2016 private size_t count; 2017 } 2018 2019 /// A Fiber level Mutex, essentially a binary FiberSemaphore 2020 class FiberMutex : FiberSemaphore 2021 { 2022 this () nothrow 2023 { 2024 super(1); 2025 } 2026 2027 /// 2028 alias lock = wait; 2029 2030 /// 2031 alias unlock = notify; 2032 2033 /// 2034 alias lock_nothrow = lock; 2035 2036 /// 2037 alias unlock_nothrow = unlock; 2038 } 2039 2040 // Test releasing a queue entry 2041 @system unittest 2042 { 2043 FiberMutex mtx = new FiberMutex(); 2044 int sharedVal; 2045 2046 auto t1 = new Thread({ 2047 FiberScheduler scheduler = new FiberScheduler; 2048 scheduler.start( 2049 () { 2050 mtx.lock(); 2051 Thread.sleep(400.msecs); 2052 sharedVal += 1; 2053 mtx.unlock(); 2054 } 2055 ); 2056 }); 2057 t1.start(); 2058 2059 auto t2 = new Thread({ 2060 FiberScheduler scheduler = new FiberScheduler; 2061 Thread.sleep(100.msecs); 2062 2063 scheduler.spawn( 2064 () { 2065 Thread.sleep(200.msecs); 2066 throw new Exception(""); 2067 } 2068 ); 2069 2070 try 2071 { 2072 scheduler.start( 2073 () { 2074 mtx.lock(); 2075 sharedVal += 1; 2076 mtx.unlock(); 2077 } 2078 ); 2079 } catch (Exception e) { } 2080 }); 2081 t2.start(); 2082 2083 auto t3 = new Thread({ 2084 FiberScheduler scheduler = new FiberScheduler; 2085 scheduler.start( 2086 () { 2087 Thread.sleep(200.msecs); 2088 mtx.lock(); 2089 assert(sharedVal == 1); 2090 sharedVal += 1; 2091 mtx.unlock(); 2092 } 2093 ); 2094 }); 2095 t3.start(); 2096 2097 thread_joinAll(); 2098 }