1 /** 2 * This is a low-level messaging API upon which more structured or restrictive 3 * APIs may be built. The general idea is that every messageable entity is 4 * represented by a common handle type called a Tid, which allows messages to 5 * be sent to logical threads that are executing in both the current process 6 * and in external processes using the same interface. This is an important 7 * aspect of scalability because it allows the components of a program to be 8 * spread across available resources with few to no changes to the actual 9 * implementation. 10 * 11 * A logical thread is an execution context that has its own stack and which 12 * runs asynchronously to other logical threads. These may be preemptively 13 * scheduled kernel threads, fibers (cooperative user-space threads), or some 14 * other concept with similar behavior. 15 * 16 * The type of concurrency used when logical threads are created is determined 17 * by the Scheduler selected at initialization time. The default behavior is 18 * currently to create a new kernel thread per call to spawn, but other 19 * schedulers are available that multiplex fibers across the main thread or 20 * use some combination of the two approaches. 21 * 22 * Note: 23 * Copied (almost verbatim) from Phobos at commit 3bfccf4f1 (2019-11-27) 24 * Changes are this notice, and the module rename, from `std.concurrency` 25 * to `geod24.concurrency`. 26 * 27 * Copyright: Copyright Sean Kelly 2009 - 2014. 28 * License: <a href="http://www.boost.org/LICENSE_1_0.txt">Boost License 1.0</a>. 29 * Authors: Sean Kelly, Alex Rønne Petersen, Martin Nowak 30 * Source: $(PHOBOSSRC std/concurrency.d) 31 */ 32 module geod24.concurrency; 33 34 public import std.variant; 35 36 import core.atomic; 37 import core.sync.condition; 38 import core.sync.mutex; 39 import core.sync.semaphore; 40 import core.thread; 41 import core.time : MonoTime; 42 import std.range.primitives; 43 import std.traits; 44 import std.algorithm; 45 import std.typecons; 46 import std.container : DList, SList; 47 import std.exception : assumeWontThrow; 48 49 import geod24.RingBuffer; 50 51 private 52 { 53 bool hasLocalAliasing(Types...)() 54 { 55 // Works around "statement is not reachable" 56 bool doesIt = false; 57 static foreach (T; Types) 58 doesIt |= hasLocalAliasingImpl!T; 59 return doesIt; 60 } 61 62 template hasLocalAliasingImpl (T) 63 { 64 import std.typecons : Rebindable; 65 66 static if (is(T : Channel!CT, CT)) 67 immutable bool hasLocalAliasingImpl = hasLocalAliasing!CT; 68 else static if (is(T : Rebindable!R, R)) 69 immutable bool hasLocalAliasingImpl = hasLocalAliasing!R; 70 else static if (is(T == struct)) 71 immutable bool hasLocalAliasingImpl = hasLocalAliasing!(typeof(T.tupleof)); 72 else 73 immutable bool hasLocalAliasingImpl = std.traits.hasUnsharedAliasing!(T); 74 } 75 76 @safe unittest 77 { 78 static struct Container { Channel!int t; Channel!string m; } 79 static assert(!hasLocalAliasing!(Channel!(Channel!int), Channel!int, Container, int)); 80 static assert( hasLocalAliasing!(Channel!(Channel!(int[])))); 81 } 82 83 @safe unittest 84 { 85 /* Issue 20097 */ 86 import std.datetime.systime : SysTime; 87 static struct Container { SysTime time; } 88 static assert(!hasLocalAliasing!(SysTime, Container)); 89 } 90 } 91 92 // Exceptions 93 94 class FiberBlockedException : Exception 95 { 96 this(string msg = "Fiber is blocked") @safe pure nothrow @nogc 97 { 98 super(msg); 99 } 100 } 101 102 103 // Thread Creation 104 105 private FiberScheduler thisSchedulerStorage; 106 107 /*************************************************************************** 108 109 Get current running FiberScheduler 110 111 Returns: 112 Returns a reference to the current scheduler or null no 113 scheduler is running 114 115 TODO: Support for nested schedulers 116 117 ***************************************************************************/ 118 119 public FiberScheduler thisScheduler () nothrow 120 { 121 return thisSchedulerStorage; 122 } 123 124 /*************************************************************************** 125 126 Set current running FiberScheduler 127 128 Params: 129 value = Reference to the current FiberScheduler 130 131 ***************************************************************************/ 132 133 public void thisScheduler (FiberScheduler value) nothrow 134 { 135 thisSchedulerStorage = value; 136 } 137 138 /** 139 * An example Scheduler using Fibers. 140 * 141 * This is an example scheduler that creates a new Fiber per call to spawn 142 * and multiplexes the execution of all fibers within the main thread. 143 */ 144 public class FiberScheduler 145 { 146 /*************************************************************************** 147 148 Default ctor 149 150 Params: 151 max_parked_fibers = Maximum number of parked fibers 152 153 ***************************************************************************/ 154 155 public this (size_t max_parked_fibers = 8) nothrow 156 { 157 this.sem = assumeWontThrow(new Semaphore()); 158 this.blocked_ex = new FiberBlockedException(); 159 this.max_parked = max_parked_fibers; 160 } 161 162 /** 163 * This creates a new Fiber for the supplied op and then starts the 164 * dispatcher. 165 */ 166 void start (void delegate() op) 167 { 168 this.create(op, true); 169 // Make sure the just-created fiber is run first 170 this.dispatch(); 171 } 172 173 /** 174 * This created a new Fiber for the supplied op and adds it to the 175 * dispatch list. 176 */ 177 void spawn (void delegate() op) nothrow 178 { 179 this.create(op); 180 FiberScheduler.yield(); 181 } 182 183 /************************************************************************** 184 185 Schedule a task to be run next time the scheduler yields 186 187 Behave similarly to `spawn`, but instead of running the task 188 immediately, it simply adds it to the queue and continue executing 189 the current task. 190 191 Params: 192 op = Operation to run 193 194 **************************************************************************/ 195 196 public void schedule (void delegate() op) nothrow 197 { 198 this.create(op); 199 } 200 201 /** 202 * If the caller is a scheduled Fiber, this yields execution to another 203 * scheduled Fiber. 204 */ 205 public static void yield () nothrow 206 { 207 // NOTE: It's possible that we should test whether the calling Fiber 208 // is an InfoFiber before yielding, but I think it's reasonable 209 // that any fiber should yield here. 210 if (Fiber.getThis()) 211 Fiber.yield(); 212 } 213 214 /** 215 * If the caller is a scheduled Fiber, this yields execution to another 216 * scheduled Fiber. 217 */ 218 public static void yieldAndThrow (Throwable t) nothrow 219 { 220 // NOTE: It's possible that we should test whether the calling Fiber 221 // is an InfoFiber before yielding, but I think it's reasonable 222 // that any fiber should yield here. 223 if (Fiber.getThis()) 224 Fiber.yieldAndThrow(t); 225 } 226 227 /// Resource type that will be tracked by FiberScheduler 228 protected interface Resource 229 { 230 /// 231 void release () nothrow; 232 } 233 234 /*********************************************************************** 235 236 Add Resource to the Resource list of runnning Fiber 237 238 Param: 239 r = Resource instace 240 241 ***********************************************************************/ 242 243 private void addResource (Resource r, InfoFiber info_fiber = cast(InfoFiber) Fiber.getThis()) nothrow 244 { 245 assert(info_fiber, "Called from outside of an InfoFiber"); 246 info_fiber.resources.insert(r); 247 } 248 249 /*********************************************************************** 250 251 Remove Resource from the Resource list of runnning Fiber 252 253 Param: 254 r = Resource instance 255 256 Returns: 257 Success/failure 258 259 ***********************************************************************/ 260 261 private bool removeResource (Resource r, InfoFiber info_fiber = cast(InfoFiber) Fiber.getThis()) nothrow 262 { 263 assert(info_fiber, "Called from outside of an InfoFiber"); 264 // TODO: For some cases, search is not neccesary. We can just pop the last element 265 return assumeWontThrow(info_fiber.resources.linearRemoveElement(r)); 266 } 267 268 /** 269 * Creates a new Fiber which calls the given delegate. 270 * 271 * Params: 272 * op = The delegate the fiber should call 273 * insert_front = Fiber will be added to the front 274 * of the ready queue to be run first 275 */ 276 protected void create (void delegate() op, bool insert_front = false) nothrow 277 { 278 InfoFiber new_fiber; 279 if (this.parked_count > 0) 280 { 281 new_fiber = this.parked_fibers.front(); 282 new_fiber.reuse(op); 283 284 this.parked_fibers.removeFront(); 285 this.parked_count--; 286 } 287 else 288 { 289 new_fiber = new InfoFiber(op); 290 } 291 292 if (insert_front) 293 this.readyq.insertFront(new_fiber); 294 else 295 this.readyq.insertBack(new_fiber); 296 } 297 298 /** 299 * Fiber which embeds neccessary info for FiberScheduler 300 */ 301 protected static class InfoFiber : Fiber 302 { 303 /// Semaphore reference that this Fiber is blocked on 304 FiberBlocker blocker; 305 306 /// List of Resources held by this Fiber 307 DList!Resource resources; 308 309 this (void delegate() op, size_t sz = 512 * 1024) nothrow 310 { 311 super(op, sz); 312 } 313 314 /*********************************************************************** 315 316 Reset the Fiber to be reused with a new delegate 317 318 Param: 319 op = Delegate 320 321 ***********************************************************************/ 322 323 void reuse (void delegate() op) nothrow 324 { 325 assert(this.state == Fiber.State.TERM, "Can not reuse a non terminated Fiber"); 326 this.blocker = null; 327 this.resources.clear(); 328 this.reset(op); 329 } 330 } 331 332 public final class FiberBlocker 333 { 334 335 /*********************************************************************** 336 337 Associate `FiberBlocker` with the running `Fiber` 338 339 `FiberScheduler` will check to see if the `Fiber` is blocking on a 340 `FiberBlocker` to avoid rescheduling it unnecessarily 341 342 ***********************************************************************/ 343 344 private void registerToInfoFiber (InfoFiber info_fiber = cast(InfoFiber) Fiber.getThis()) nothrow 345 { 346 assert(info_fiber !is null, "This Fiber does not belong to FiberScheduler"); 347 assert(info_fiber.blocker is null, "This Fiber already has a registered FiberBlocker"); 348 info_fiber.blocker = this; 349 350 } 351 352 /*********************************************************************** 353 354 Wait on the blocker with optional timeout 355 356 Params: 357 period = Timeout period 358 359 ***********************************************************************/ 360 361 bool wait (Duration period = Duration.init) nothrow 362 { 363 if (period != Duration.init) 364 this.limit = MonoTime.currTime + period; 365 366 if (this.shouldBlock()) 367 { 368 this.registerToInfoFiber(); 369 FiberScheduler.yieldAndThrow(this.outer.blocked_ex); 370 } 371 372 this.limit = MonoTime.init; 373 this.notified = false; 374 return !this.hasTimedOut(); 375 } 376 377 /*********************************************************************** 378 379 Unblock the Fiber waiting on this blocker 380 381 ***********************************************************************/ 382 383 void notify () nothrow 384 { 385 this.stopTimer(); 386 this.notified = true; 387 assumeWontThrow(this.outer.sem.notify()); 388 } 389 390 /*********************************************************************** 391 392 Query if `FiberBlocker` should still block 393 394 FiberBlocker will block the Fiber until it is notified or 395 the specified timeout is reached. 396 397 ***********************************************************************/ 398 399 bool shouldBlock () nothrow 400 { 401 bool timed_out = (this.limit != MonoTime.init 402 && MonoTime.currTime >= this.limit); 403 404 if (timed_out) 405 cas(&this.timer_state, TimerState.Running, TimerState.TimedOut); 406 407 return atomicLoad(this.timer_state) != TimerState.TimedOut && !this.notified; 408 } 409 410 /*********************************************************************** 411 412 Try freezing the internal timer 413 414 ***********************************************************************/ 415 416 bool stopTimer () nothrow 417 { 418 return cas(&this.timer_state, TimerState.Running, TimerState.Stopped); 419 } 420 421 /*********************************************************************** 422 423 Query if the internal timer has timed out 424 425 ***********************************************************************/ 426 427 bool hasTimedOut () nothrow 428 { 429 return atomicLoad(this.timer_state) == TimerState.TimedOut; 430 } 431 432 MonoTime getTimeout () nothrow 433 { 434 return limit; 435 } 436 437 private: 438 enum TimerState 439 { 440 Running, 441 TimedOut, 442 Stopped 443 } 444 445 /// Time limit that will eventually unblock the caller if a timeout is specified 446 MonoTime limit = MonoTime.init; 447 448 /// State of the blocker 449 bool notified; 450 451 /// State of the internal timer 452 shared(TimerState) timer_state; 453 } 454 455 /*********************************************************************** 456 457 Start the scheduling loop 458 459 ***********************************************************************/ 460 461 private void dispatch () 462 { 463 thisScheduler(this); 464 scope (exit) thisScheduler(null); 465 466 MonoTime earliest_timeout; 467 468 while (!this.readyq.empty() || !this.wait_list.empty()) 469 { 470 while (!readyq.empty()) 471 { 472 InfoFiber cur_fiber = this.readyq.front(); 473 if (this.critical_section_fiber && 474 cur_fiber !is this.critical_section_fiber) 475 break; 476 477 this.readyq.removeFront(); 478 479 assert(cur_fiber.state != Fiber.State.TERM); 480 481 auto t = cur_fiber.call(Fiber.Rethrow.no); 482 483 // Fibers that block on a FiberBlocker throw an 484 // exception for scheduler to catch 485 if (t is this.blocked_ex) 486 { 487 auto cur_timeout = cur_fiber.blocker.getTimeout(); 488 489 // Keep track of the earliest timeout in the system 490 if (cur_timeout != MonoTime.init 491 && (earliest_timeout == MonoTime.init || cur_timeout < earliest_timeout)) 492 { 493 earliest_timeout = cur_timeout; 494 } 495 496 this.wait_list.insert(cur_fiber); 497 continue; 498 } 499 else if (t) 500 { 501 // We are exiting the dispatch loop prematurely, all resources 502 // held by Fibers should be released. 503 this.releaseResources(cur_fiber); 504 throw t; 505 } 506 507 if (cur_fiber.state != Fiber.State.TERM) 508 { 509 if (this.critical_section_fiber is cur_fiber) 510 this.readyq.insertFront(cur_fiber); 511 else 512 this.readyq.insertBack(cur_fiber); 513 } 514 // Park terminated Fiber 515 else if (this.parked_count < this.max_parked) 516 { 517 this.parked_fibers.insertFront(cur_fiber); 518 this.parked_count++; 519 } 520 // Destroy the terminated Fiber to immediately reclaim 521 // the stack space 522 else 523 { 524 destroy!false(cur_fiber); 525 } 526 527 // See if there are Fibers to be woken up if we reach a timeout 528 // or the scheduler semaphore was notified 529 if (MonoTime.currTime >= earliest_timeout || this.sem.tryWait()) 530 earliest_timeout = wakeFibers(); 531 } 532 533 if (!this.wait_list.empty()) 534 { 535 Duration time_to_timeout = earliest_timeout - MonoTime.currTime; 536 537 // Sleep until a timeout or an event 538 if (earliest_timeout == MonoTime.init) 539 this.sem.wait(); 540 else if (time_to_timeout > 0.seconds) 541 this.sem.wait(time_to_timeout); 542 } 543 544 // OS Thread woke up populate ready queue 545 earliest_timeout = wakeFibers(); 546 } 547 } 548 549 /*********************************************************************** 550 551 Move unblocked Fibers to ready queue 552 553 Return: 554 Returns the earliest timeout left in the waiting list 555 556 ***********************************************************************/ 557 558 private MonoTime wakeFibers() 559 { 560 import std.range; 561 MonoTime earliest_timeout; 562 563 auto wait_range = this.wait_list[]; 564 while (!wait_range.empty) 565 { 566 auto fiber = wait_range.front; 567 568 // Remove the unblocked Fiber from wait list and 569 // append it to the end ofready queue 570 if (!fiber.blocker.shouldBlock()) 571 { 572 this.wait_list.popFirstOf(wait_range); 573 fiber.blocker = null; 574 if (this.critical_section_fiber is fiber) 575 this.readyq.insertFront(fiber); 576 else 577 this.readyq.insertBack(fiber); 578 } 579 else 580 { 581 auto timeout = fiber.blocker.getTimeout(); 582 if (timeout != MonoTime.init 583 && (earliest_timeout == MonoTime.init || timeout < earliest_timeout)) 584 { 585 earliest_timeout = timeout; 586 } 587 wait_range.popFront(); 588 } 589 } 590 591 return earliest_timeout; 592 } 593 594 /*********************************************************************** 595 596 Release all resources currently held by all Fibers owned by this 597 scheduler 598 599 Param: 600 cur_fiber = Running Fiber 601 602 ***********************************************************************/ 603 604 private void releaseResources (InfoFiber cur_fiber) 605 { 606 foreach (ref resource; cur_fiber.resources) 607 resource.release(); 608 foreach (ref fiber; this.readyq) 609 foreach (ref resource; fiber.resources) 610 resource.release(); 611 foreach (ref fiber; this.wait_list) 612 foreach (ref resource; fiber.resources) 613 resource.release(); 614 } 615 616 /// Disables Fiber switching 617 public void enterCriticalSection () nothrow 618 { 619 this.critical_section_fiber = cast(InfoFiber) Fiber.getThis(); 620 assert(this.critical_section_fiber !is null); 621 } 622 623 /// Enables Fiber switching 624 public void exitCriticalSection () nothrow 625 { 626 assert(this.critical_section_fiber is cast(InfoFiber) Fiber.getThis()); 627 this.critical_section_fiber = null; 628 } 629 630 private: 631 632 /// OS semaphore for scheduler to sleep on 633 Semaphore sem; 634 635 /// A FIFO Queue of Fibers ready to run 636 DList!InfoFiber readyq; 637 638 /// List of Fibers waiting for an event 639 DList!InfoFiber wait_list; 640 641 /// Cached instance of FiberBlockedException 642 FiberBlockedException blocked_ex; 643 644 /// List of parked fibers to be reused 645 SList!InfoFiber parked_fibers; 646 647 /// Number of currently parked fibers 648 size_t parked_count; 649 650 /// Maximum number of parked fibers 651 immutable size_t max_parked; 652 653 /// InfoFiber that is in critical section 654 InfoFiber critical_section_fiber; 655 } 656 657 /// Ensure argument to `start` is run first 658 unittest 659 { 660 { 661 scope sched = new FiberScheduler(); 662 bool startHasRun; 663 sched.spawn(() => assert(startHasRun)); 664 sched.start(() { startHasRun = true; }); 665 } 666 { 667 scope sched = new FiberScheduler(); 668 bool startHasRun; 669 sched.schedule(() => assert(startHasRun)); 670 sched.start(() { startHasRun = true; }); 671 } 672 } 673 674 /*********************************************************************** 675 676 A common interface for objects that can be used in `select()` 677 678 ***********************************************************************/ 679 680 public interface Selectable 681 { 682 /*********************************************************************** 683 684 Try to read/write to the `Selectable` without blocking. If the 685 operation would block, queue and link it with the `sel_state` 686 687 Params: 688 ptr = pointer to the data for the select operation 689 sel_state = SelectState instace of the select call being executed 690 sel_id = id of the select call being executed 691 692 ***********************************************************************/ 693 694 void selectWrite (void* ptr, SelectState sel_state, int sel_id); 695 696 /// Ditto 697 void selectRead (void* ptr, SelectState sel_state, int sel_id); 698 } 699 700 /*********************************************************************** 701 702 An aggregate to hold neccessary information for a select operation 703 704 ***********************************************************************/ 705 706 public struct SelectEntry 707 { 708 /// Reference to a Selectable object 709 Selectable selectable; 710 711 /// Pointer to the select data 712 void* select_data; 713 714 /*********************************************************************** 715 716 Default ctor 717 718 Params: 719 selectable = A selectable interface reference 720 select_data = pointer to the data for the select operation 721 722 ***********************************************************************/ 723 724 this (Selectable selectable, void* select_data) @safe pure nothrow @nogc 725 { 726 this.selectable = selectable; 727 this.select_data = select_data; 728 } 729 } 730 731 /// Consists of the id and result of the select operation that was completed 732 public alias SelectReturn = Tuple!(bool, "success", int, "id"); 733 734 /*********************************************************************** 735 736 Block on multiple `Channel`s 737 738 Only one operation is completed per `select()` call 739 740 Params: 741 read_list = List of `Channel`s to select for read operation 742 write_list = List of `Channel`s to select for write operation 743 timeout = Optional timeout 744 745 Return: 746 Returns success/failure status of the operation and the index 747 of the `Channel` that the operation was carried on. The index is 748 the position of the SelectEntry in `read_list ~ write_list`, ie 749 concatenated lists. 750 751 ***********************************************************************/ 752 753 public SelectReturn select (ref SelectEntry[] read_list, ref SelectEntry[] write_list, 754 Duration timeout = Duration.init) 755 { 756 import std.random : randomShuffle; 757 758 auto ss = new SelectState(thisScheduler().new FiberBlocker()); 759 int sel_id = 0; 760 thisScheduler().addResource(ss); 761 scope (exit) thisScheduler().removeResource(ss); 762 763 read_list = read_list.randomShuffle(); 764 write_list = write_list.randomShuffle(); 765 766 foreach (ref entry; read_list) 767 { 768 if (ss.isConsumed()) 769 break; 770 entry.selectable.selectRead(entry.select_data, ss, sel_id++); 771 } 772 773 foreach (ref entry; write_list) 774 { 775 if (ss.isConsumed()) 776 break; 777 entry.selectable.selectWrite(entry.select_data, ss, sel_id++); 778 } 779 780 if (!ss.blocker.wait(timeout)) 781 return SelectReturn(false, -1); // Timed out 782 783 return SelectReturn(ss.success, ss.id); 784 } 785 786 /*********************************************************************** 787 788 Holds the state of a group of `selectRead`/`selectWrite` calls. 789 Synchronizes peers that will consume those calls, so that only one 790 `selectRead`/`selectWrite` call is completed. 791 792 ***********************************************************************/ 793 794 final private class SelectState : FiberScheduler.Resource 795 { 796 /// Shared blocker object for multiple ChannelQueueEntry objects 797 FiberScheduler.FiberBlocker blocker; 798 799 /*********************************************************************** 800 801 Default constructor 802 803 ***********************************************************************/ 804 805 this (FiberScheduler.FiberBlocker blocker) @safe pure nothrow @nogc 806 { 807 this.blocker = blocker; 808 } 809 810 /*********************************************************************** 811 812 Tries to atomically consume a `SelectState` and sets `id` and `success` 813 fields 814 815 Param: 816 id = ID of the `selectRead`/`selectWrite` call that is consuming 817 this `SelectState` 818 success_in = Success/Failure of the select call 819 820 Return: 821 Returns true if `SelectState` was not already consumed, false otherwise 822 823 ***********************************************************************/ 824 825 bool tryConsume (int id, bool success_in = true) nothrow 826 { 827 if (cas(&this.consumed, false, true)) 828 { 829 this.id = id; 830 this.success = success_in; 831 return true; 832 } 833 return false; 834 } 835 836 /*********************************************************************** 837 838 Returns if `SelectState` is already consumed or not 839 840 ***********************************************************************/ 841 842 bool isConsumed () nothrow 843 { 844 return atomicLoad(this.consumed); 845 } 846 847 /*********************************************************************** 848 849 Consume SelectState so that it is neutralized 850 851 ***********************************************************************/ 852 853 void release () nothrow 854 { 855 this.tryConsume(-1, false); 856 } 857 858 /// ID of the select call that consumed this `SelectState` 859 int id; 860 861 /// Success/failure state of the select call with ID of `id` 862 bool success; 863 864 private: 865 /// Indicates if this `SelectState` is consumed or not 866 shared(bool) consumed; 867 } 868 869 /*********************************************************************** 870 871 A golang style channel implementation with buffered and unbuffered 872 operation modes 873 874 Intended to be used between Fibers 875 876 Param: 877 T = Type of the messages carried accross the `Channel`. Currently 878 all reference and values types are supported. 879 880 ***********************************************************************/ 881 882 final public class Channel (T) : Selectable 883 { 884 /*********************************************************************** 885 886 Constructs a Channel 887 888 Param: 889 max_size = Maximum amount of T a Channel can buffer 890 (0 -> Unbuffered operation, 891 Positive integer -> Buffered operation) 892 893 ***********************************************************************/ 894 895 this (ulong max_size = 0) nothrow 896 { 897 this.max_size = max_size; 898 this.lock = new FiberMutex; 899 if (max_size) 900 this.buffer = new RingBuffer!T(max_size); 901 } 902 903 /*********************************************************************** 904 905 Write a message to the `Channel` with an optional timeout 906 907 Unbuffered mode: 908 909 If a reader is already blocked on the `Channel`, writer copies the 910 message to reader's buffer and wakes up the reader by yielding 911 912 If no reader is ready in the wait queue, writer appends itself 913 to write wait queue and blocks 914 915 Buffered mode: 916 917 If a reader is already blocked on the `Channel`, writer copies the 918 message to reader's buffer and wakes up the reader and returns 919 immediately. 920 921 If the buffer is not full writer puts the message in the `Channel` 922 buffer and returns immediately 923 924 If buffer is full writer appends itself to write wait queue and blocks 925 926 If `Channel` is closed, it returns immediately with a failure, 927 regardless of the operation mode 928 929 Param: 930 val = Message to write to `Channel` 931 duration = Timeout duration 932 933 Returns: 934 Success/Failure - Fails when `Channel` is closed or timeout is reached. 935 936 ***********************************************************************/ 937 938 bool write () (auto ref T val, Duration duration = Duration.init) nothrow 939 { 940 this.lock.lock_nothrow(); 941 942 bool success = tryWrite(val); 943 944 if (!success && !this.isClosed()) 945 { 946 ChannelQueueEntry q_ent = this.enqueueEntry(this.writeq, &val); 947 thisScheduler().addResource(q_ent); 948 scope (exit) thisScheduler().removeResource(q_ent); 949 950 this.lock.unlock_nothrow(); 951 return q_ent.blocker.wait(duration) && q_ent.success; 952 } 953 954 this.lock.unlock_nothrow(); 955 return success; 956 } 957 958 /*********************************************************************** 959 960 Try to write a message to `Channel` without blocking 961 962 tryWrite writes a message if it is possible to do so without blocking. 963 964 If the tryWrite is being executed in a select call, it tries to consume the 965 `caller_sel` with the given `caller_sel_id`. It only proceeds if it can 966 successfully consume the `caller_sel` 967 968 Param: 969 val = Message to write to `Channel` 970 caller_sel = SelectState instace of the select call being executed 971 caller_sel_id = id of the select call being executed 972 973 Returns: 974 Success/Failure 975 976 ***********************************************************************/ 977 978 private bool tryWrite () (auto ref T val, SelectState caller_sel = null, int caller_sel_id = 0) nothrow 979 { 980 if (this.isClosed()) 981 { 982 if (caller_sel) 983 caller_sel.tryConsume(caller_sel_id, false); 984 return false; 985 } 986 987 if (ChannelQueueEntry readq_ent = this.dequeueEntry(this.readq, caller_sel, caller_sel_id)) 988 { 989 *readq_ent.pVal = val; 990 readq_ent.blocker.notify(); 991 return true; 992 } 993 994 if (this.max_size > 0 // this.max_size > 0 = buffered 995 && this.buffer.length < this.max_size 996 && (!caller_sel || caller_sel.tryConsume(caller_sel_id))) 997 { 998 this.buffer.insert(val); 999 return true; 1000 } 1001 1002 return false; 1003 } 1004 1005 /*********************************************************************** 1006 1007 Try to write a message to Channel without blocking and if it fails, 1008 create a write queue entry using the given `sel_state` and `sel_id` 1009 1010 Param: 1011 ptr = Message to write to channel 1012 sel_state = SelectState instace of the select call being executed 1013 sel_id = id of the select call being executed 1014 1015 ***********************************************************************/ 1016 1017 void selectWrite (void* ptr, SelectState sel_state, int sel_id) nothrow 1018 { 1019 assert(ptr !is null); 1020 assert(sel_state !is null); 1021 T* val = cast(T*) ptr; 1022 1023 this.lock.lock_nothrow(); 1024 1025 bool success = tryWrite(*val, sel_state, sel_id); 1026 1027 if (!sel_state.isConsumed()) 1028 this.enqueueEntry(this.writeq, val, sel_state, sel_id); 1029 1030 if (success || this.isClosed() || sel_state.id == -1) 1031 { 1032 this.lock.unlock_nothrow(); 1033 sel_state.blocker.notify(); 1034 } 1035 else 1036 this.lock.unlock_nothrow(); 1037 } 1038 1039 /*********************************************************************** 1040 1041 Read a message from the Channel with an optional timeout 1042 1043 Unbuffered mode: 1044 1045 If a writer is already blocked on the Channel, reader copies the 1046 value to `output` and wakes up the writer by yielding 1047 1048 If no writer is ready in the wait queue, reader appends itself 1049 to read wait queue and blocks 1050 1051 If channel is closed, it returns immediatly with a failure 1052 1053 Buffered mode: 1054 1055 If there are existing messages in the buffer, reader pops one off 1056 the buffer and returns immediatly with success, regardless of the 1057 Channel being closed or not 1058 1059 If there are no messages in the buffer it behaves exactly like the 1060 unbuffered operation 1061 1062 Param: 1063 output = Reference to output variable 1064 duration = Timeout duration 1065 1066 Returns: 1067 Success/Failure - Fails when channel is closed and there are 1068 no existing messages to be read. Fails when timeout is reached. 1069 1070 ***********************************************************************/ 1071 1072 bool read (ref T output, Duration duration = Duration.init) nothrow 1073 { 1074 this.lock.lock_nothrow(); 1075 1076 bool success = tryRead(output); 1077 1078 if (!success && !this.isClosed()) 1079 { 1080 ChannelQueueEntry q_ent = this.enqueueEntry(this.readq, &output); 1081 thisScheduler().addResource(q_ent); 1082 scope (exit) thisScheduler().removeResource(q_ent); 1083 1084 this.lock.unlock_nothrow(); 1085 return q_ent.blocker.wait(duration) && q_ent.success; 1086 } 1087 1088 this.lock.unlock_nothrow(); 1089 return success; 1090 } 1091 1092 /*********************************************************************** 1093 1094 Try to read a message from Channel without blocking 1095 1096 tryRead reads a message if it is possible to do so without blocking. 1097 1098 If the tryRead is being executed in a select call, it tries to consume the 1099 `caller_sel` with the given `caller_sel_id`. It only proceeds if it can 1100 successfully consume the `caller_sel` 1101 1102 Param: 1103 output = Field to write the message to 1104 caller_sel = SelectState instace of the select call being executed 1105 caller_sel_id = id of the select call being executed\ 1106 1107 Returns: 1108 Success/Failure 1109 1110 ***********************************************************************/ 1111 1112 private bool tryRead (ref T output, SelectState caller_sel = null, int caller_sel_id = 0) nothrow 1113 { 1114 ChannelQueueEntry write_ent = this.dequeueEntry(this.writeq, caller_sel, caller_sel_id); 1115 1116 if (this.max_size > 0 && !this.buffer.empty()) 1117 { 1118 // if dequeueEntry fails, we will try to consume caller_sel again. 1119 if (!caller_sel || write_ent || caller_sel.tryConsume(caller_sel_id)) 1120 { 1121 output = this.buffer.front(); 1122 this.buffer.popFront(); 1123 1124 if (write_ent) 1125 { 1126 this.buffer.insert(*write_ent.pVal); 1127 } 1128 } 1129 else 1130 { 1131 return false; 1132 } 1133 } 1134 // if dequeueEntry returns a valid entry, it always successfully consumes the related select states. 1135 // the race between 2 select calls is resolved in dequeueEntry. 1136 else if (write_ent) 1137 { 1138 output = *write_ent.pVal; 1139 } 1140 else 1141 { 1142 if (this.isClosed() && caller_sel) 1143 caller_sel.tryConsume(caller_sel_id, false); 1144 return false; 1145 } 1146 1147 if (write_ent) 1148 write_ent.blocker.notify(); 1149 return true; 1150 } 1151 1152 /*********************************************************************** 1153 1154 Try to read a message from Channel without blocking and if it fails, 1155 create a read queue entry using the given `sel_state` and `sel_id` 1156 1157 Param: 1158 ptr = Buffer to write the message to 1159 sel_state = SelectState instace of the select call being executed 1160 sel_id = id of the select call being executed 1161 1162 ***********************************************************************/ 1163 1164 void selectRead (void* ptr, SelectState sel_state, int sel_id) nothrow 1165 { 1166 assert(ptr !is null); 1167 assert(sel_state !is null); 1168 T* val = cast(T*) ptr; 1169 1170 this.lock.lock_nothrow(); 1171 1172 bool success = tryRead(*val, sel_state, sel_id); 1173 1174 if (!sel_state.isConsumed()) 1175 this.enqueueEntry(this.readq, val, sel_state, sel_id); 1176 1177 if (success || this.isClosed() || sel_state.id == -1) 1178 { 1179 this.lock.unlock_nothrow(); 1180 sel_state.blocker.notify(); 1181 } 1182 else 1183 this.lock.unlock_nothrow(); 1184 } 1185 1186 /*********************************************************************** 1187 1188 Close the channel 1189 1190 Closes the channel by marking it closed and flushing all the wait 1191 queues 1192 1193 ***********************************************************************/ 1194 1195 void close () nothrow 1196 { 1197 if (cas(&this.closed, false, true)) 1198 { 1199 this.lock.lock_nothrow(); 1200 scope (exit) this.lock.unlock_nothrow(); 1201 1202 // Wake blocked Fibers up, report the failure 1203 foreach (ref entry; this.readq) 1204 { 1205 entry.terminate(); 1206 } 1207 foreach (ref entry; this.writeq) 1208 { 1209 entry.terminate(); 1210 } 1211 1212 this.readq.clear(); 1213 this.writeq.clear(); 1214 } 1215 } 1216 1217 /*********************************************************************** 1218 1219 Return the length of the internal buffer 1220 1221 ***********************************************************************/ 1222 1223 size_t length () nothrow 1224 { 1225 this.lock.lock_nothrow(); 1226 scope (exit) this.lock.unlock_nothrow(); 1227 return this.buffer.length; 1228 } 1229 1230 /*********************************************************************** 1231 1232 Return the closed status of the `Channel` 1233 1234 ***********************************************************************/ 1235 1236 bool isClosed () const @safe pure nothrow @nogc scope 1237 { 1238 return atomicLoad(this.closed); 1239 } 1240 1241 /*********************************************************************** 1242 1243 An aggrate of neccessary information to block a Fiber and record 1244 their request 1245 1246 ***********************************************************************/ 1247 1248 private static class ChannelQueueEntry : FiberScheduler.Resource 1249 { 1250 /// FiberBlocker blocking the `Fiber` 1251 FiberScheduler.FiberBlocker blocker; 1252 1253 /// Pointer to the variable that we will read to/from 1254 T* pVal; 1255 1256 /// Result of the blocking read/write call 1257 bool success = true; 1258 1259 /// State of the select call that this entry was created for 1260 SelectState select_state; 1261 1262 /// Id of the select call that this entry was created for 1263 int sel_id; 1264 1265 this (T* pVal, SelectState select_state = null, int sel_id = 0) nothrow 1266 { 1267 this.pVal = pVal; 1268 this.select_state = select_state; 1269 this.sel_id = sel_id; 1270 1271 if (this.select_state) 1272 this.blocker = this.select_state.blocker; 1273 else 1274 this.blocker = thisScheduler().new FiberBlocker(); 1275 } 1276 1277 /*********************************************************************** 1278 1279 Terminate a `ChannelQueueEntry` by waking up the blocked Fiber 1280 and reporting the failure 1281 1282 This is called on all the `ChannelQueueEntry` instances still in 1283 the wait queues when Channel is closed 1284 1285 ***********************************************************************/ 1286 1287 void terminate () nothrow 1288 { 1289 this.success = false; 1290 if (!this.select_state || this.select_state.tryConsume(this.sel_id, this.success)) 1291 this.blocker.notify(); 1292 } 1293 1294 1295 /*********************************************************************** 1296 1297 Terminate ChannelQueueEntry so that it is neutralized 1298 1299 ***********************************************************************/ 1300 1301 void release () nothrow 1302 { 1303 if (this.blocker.stopTimer()) 1304 this.pVal = null; // Sanitize pVal so that we can catch illegal accesses 1305 } 1306 } 1307 1308 /*********************************************************************** 1309 1310 Create and enqueue a `ChannelQueueEntry` to the given entryq 1311 1312 Param: 1313 entryq = Queue to append the new ChannelQueueEntry 1314 pVal = Pointer to the message buffer 1315 sel_state = SelectState object to associate with the 1316 newly created ChannelQueueEntry 1317 sel_id = id of the select call creating the new ChannelQueueEntry 1318 1319 Return: 1320 newly created ChannelQueueEntry 1321 1322 ***********************************************************************/ 1323 1324 private ChannelQueueEntry enqueueEntry (ref DList!ChannelQueueEntry entryq, T* pVal, 1325 SelectState sel_state = null, int sel_id = 0) nothrow 1326 { 1327 assert(pVal !is null); 1328 1329 ChannelQueueEntry q_ent = new ChannelQueueEntry(pVal, sel_state, sel_id); 1330 entryq.insert(q_ent); 1331 1332 return q_ent; 1333 } 1334 1335 /*********************************************************************** 1336 1337 Dequeue a `ChannelQueueEntry` from the given `entryq` 1338 1339 Walks the `entryq` until it finds a suitable entry or the queue 1340 empties. If `dequeueEntry` is called from a select, it tries to 1341 consume the `caller_sel` if the `peer_sel` is not currently consumed. 1342 If it fails to consume the `caller_sel`, returns with a failure. 1343 1344 If selected queue entry is part of a select operation, it is also 1345 consumed. If it consumes `caller_sel` but `peer_sel` was already 1346 consumed whole select operation would fail and caller would need to try 1347 again. This should be a rare case, where the `peer_sel` gets consumed by 1348 someone else between the first if check which verifies that it is not 1349 consumed and the point we actually try to consume it. 1350 1351 Param: 1352 entryq = Queue to append the new ChannelQueueEntry 1353 caller_sel = SelectState instace of the select call being executed 1354 caller_sel_id = id of the select call being executed 1355 1356 Return: 1357 a valid ChannelQueueEntry or null 1358 1359 ***********************************************************************/ 1360 1361 private ChannelQueueEntry dequeueEntry (ref DList!ChannelQueueEntry entryq, 1362 SelectState caller_sel = null, int caller_sel_id = 0) nothrow 1363 { 1364 while (!entryq.empty()) 1365 { 1366 ChannelQueueEntry qent = entryq.front(); 1367 auto peer_sel = qent.select_state; 1368 1369 if ((!peer_sel || !peer_sel.isConsumed()) && qent.blocker.shouldBlock()) 1370 { 1371 // If we are in a select call, try to consume the caller select 1372 // if we can't consume the caller select, no need to continue 1373 if (caller_sel && !caller_sel.tryConsume(caller_sel_id)) 1374 { 1375 return null; 1376 } 1377 1378 // At this point, caller select is consumed. 1379 // Try to consume the peer select if it exists 1380 // If peer_sel was consumed by someone else, tough luck 1381 // In that case, whole select will fail since we consumed the caller_sel 1382 if ((!peer_sel || peer_sel.tryConsume(qent.sel_id)) && qent.blocker.stopTimer()) 1383 { 1384 entryq.removeFront(); 1385 return qent; 1386 } 1387 else if (caller_sel) 1388 { 1389 // Mark caller_sel failed 1390 caller_sel.id = -1; 1391 caller_sel.success = false; 1392 return null; 1393 } 1394 } 1395 1396 entryq.removeFront(); 1397 } 1398 1399 return null; 1400 } 1401 1402 private: 1403 /// Internal data storage 1404 RingBuffer!T buffer; 1405 1406 /// Closed flag 1407 bool closed; 1408 1409 /// Per channel lock 1410 FiberMutex lock; 1411 1412 /// List of fibers blocked on read() 1413 DList!ChannelQueueEntry readq; 1414 1415 /// List of fibers blocked on write() 1416 DList!ChannelQueueEntry writeq; 1417 1418 public: 1419 /// Maximum amount of T a Channel can buffer 1420 immutable ulong max_size; 1421 } 1422 1423 // Test non blocking operation 1424 @system unittest 1425 { 1426 string str = "DEADBEEF"; 1427 string rcv_str; 1428 FiberScheduler scheduler = new FiberScheduler; 1429 auto chn = new Channel!string(2); 1430 1431 scheduler.start({ 1432 chn.write(str ~ " 1"); 1433 assert(chn.length() == 1); 1434 chn.write(str ~ " 2"); 1435 assert(chn.length() == 2); 1436 1437 assert(chn.read(rcv_str)); 1438 assert(rcv_str == str ~ " 1"); 1439 assert(chn.length() == 1); 1440 1441 chn.write(str ~ " 3"); 1442 assert(chn.length() == 2); 1443 1444 assert(chn.read(rcv_str)); 1445 assert(rcv_str == str ~ " 2"); 1446 assert(chn.length() == 1); 1447 1448 assert(chn.read(rcv_str)); 1449 assert(rcv_str == str ~ " 3"); 1450 assert(chn.length() == 0); 1451 }); 1452 } 1453 1454 // Test unbuffered blocking operation with multiple receivers 1455 // Receiver should read every message in the order they were sent 1456 @system unittest 1457 { 1458 FiberScheduler scheduler = new FiberScheduler; 1459 auto chn = new Channel!int(); 1460 int n = 1000; 1461 long sum; 1462 1463 scheduler.spawn( 1464 () { 1465 int val, prev; 1466 bool ret = chn.read(prev); 1467 sum += prev; 1468 assert(ret); 1469 1470 while (chn.read(val)) 1471 { 1472 sum += val; 1473 assert(ret); 1474 assert(prev < val); 1475 prev = val; 1476 } 1477 } 1478 ); 1479 1480 scheduler.spawn( 1481 () { 1482 int val, prev; 1483 bool ret = chn.read(prev); 1484 sum += prev; 1485 assert(ret); 1486 1487 while (chn.read(val)) 1488 { 1489 sum += val; 1490 assert(ret); 1491 assert(prev < val); 1492 prev = val; 1493 } 1494 } 1495 ); 1496 1497 scheduler.start( 1498 () { 1499 for (int i = 0; i <= n; i++) 1500 { 1501 assert(chn.write(i)); 1502 } 1503 chn.close(); 1504 1505 assert(!chn.write(0)); 1506 } 1507 ); 1508 1509 // Sum of [0..1000] 1510 assert(sum == n*(n+1)/2); 1511 } 1512 1513 // Test that writer is not blocked until buffer is full and a read unblocks the writer 1514 // Reader should be able to read remaining messages after chn is closed 1515 @system unittest 1516 { 1517 FiberScheduler scheduler = new FiberScheduler; 1518 auto chn = new Channel!int(5); 1519 1520 scheduler.spawn( 1521 () { 1522 int val; 1523 assert(chn.max_size == chn.length); 1524 assert(chn.read(val)); 1525 chn.close(); 1526 // Read remaining messages after channel is closed 1527 for (int i = 0; i < chn.max_size; i++) 1528 { 1529 assert(chn.read(val)); 1530 } 1531 // No more messages to read on closed chn 1532 assert(!chn.read(val)); 1533 } 1534 ); 1535 1536 scheduler.start( 1537 () { 1538 for (int i = 0; i < chn.max_size; i++) 1539 { 1540 assert(chn.write(i)); 1541 } 1542 assert(chn.max_size == chn.length); 1543 // Wait for read. 1544 assert(chn.write(42)); 1545 // Reader already closed the channel 1546 assert(!chn.write(0)); 1547 } 1548 ); 1549 } 1550 1551 @system unittest 1552 { 1553 struct HyperLoopMessage 1554 { 1555 int id; 1556 MonoTime time; 1557 } 1558 1559 FiberScheduler scheduler = new FiberScheduler; 1560 auto chn1 = new Channel!HyperLoopMessage(); 1561 auto chn2 = new Channel!HyperLoopMessage(); 1562 auto chn3 = new Channel!HyperLoopMessage(); 1563 1564 scheduler.spawn( 1565 () { 1566 HyperLoopMessage msg; 1567 1568 for (int i = 0; i < 1000; ++i) 1569 { 1570 assert(chn2.read(msg)); 1571 assert(msg.id % 3 == 1); 1572 msg.id++; 1573 msg.time = MonoTime.currTime; 1574 assert(chn3.write(msg)); 1575 } 1576 } 1577 ); 1578 1579 scheduler.spawn( 1580 () { 1581 HyperLoopMessage msg; 1582 1583 for (int i = 0; i < 1000; ++i) 1584 { 1585 assert(chn1.read(msg)); 1586 assert(msg.id % 3 == 0); 1587 msg.id++; 1588 msg.time = MonoTime.currTime; 1589 assert(chn2.write(msg)); 1590 } 1591 } 1592 ); 1593 1594 scheduler.start( 1595 () { 1596 HyperLoopMessage msg = { 1597 id : 0, 1598 time : MonoTime.currTime 1599 }; 1600 1601 for (int i = 0; i < 1000; ++i) 1602 { 1603 assert(chn1.write(msg)); 1604 assert(chn3.read(msg)); 1605 assert(msg.id % 3 == 2); 1606 msg.id++; 1607 msg.time = MonoTime.currTime; 1608 } 1609 } 1610 ); 1611 } 1612 1613 // Multiple writer threads writing to a buffered channel 1614 // Reader should receive all messages 1615 @system unittest 1616 { 1617 FiberScheduler scheduler = new FiberScheduler; 1618 immutable int n = 5000; 1619 auto chn1 = new Channel!int(n/10); 1620 1621 shared int sharedVal = 0; 1622 shared int writer_sum = 0; 1623 1624 auto t1 = new Thread({ 1625 FiberScheduler scheduler = new FiberScheduler(); 1626 scheduler.start( 1627 () { 1628 int val = atomicOp!"+="(sharedVal, 1); 1629 while (chn1.write(val)) 1630 { 1631 atomicOp!"+="(writer_sum, val); 1632 val = atomicOp!"+="(sharedVal, 1); 1633 } 1634 } 1635 ); 1636 }); 1637 t1.start(); 1638 1639 auto t2 = new Thread({ 1640 FiberScheduler scheduler = new FiberScheduler(); 1641 scheduler.start( 1642 () { 1643 int val = atomicOp!"+="(sharedVal, 1); 1644 while (chn1.write(val)) 1645 { 1646 atomicOp!"+="(writer_sum, val); 1647 val = atomicOp!"+="(sharedVal, 1); 1648 } 1649 } 1650 ); 1651 }); 1652 t2.start(); 1653 1654 scheduler.start( 1655 () { 1656 int reader_sum, readVal, count; 1657 1658 while(chn1.read(readVal)) 1659 { 1660 reader_sum += readVal; 1661 if (count++ == n) chn1.close(); 1662 } 1663 1664 thread_joinAll(); 1665 assert(reader_sum == writer_sum); 1666 } 1667 ); 1668 } 1669 1670 @system unittest 1671 { 1672 FiberScheduler scheduler = new FiberScheduler; 1673 auto chn1 = new Channel!int(); 1674 auto chn2 = new Channel!int(1); 1675 auto chn3 = new Channel!int(); 1676 1677 scheduler.spawn( 1678 () { 1679 chn1.write(42); 1680 chn1.close(); 1681 chn3.write(37); 1682 } 1683 ); 1684 1685 scheduler.spawn( 1686 () { 1687 chn2.write(44); 1688 chn2.close(); 1689 } 1690 ); 1691 1692 scheduler.start( 1693 () { 1694 bool[3] chn_closed; 1695 int[3] read_val; 1696 for (int i = 0; i < 5; ++i) 1697 { 1698 SelectEntry[] read_list; 1699 SelectEntry[] write_list; 1700 1701 if (!chn_closed[0]) 1702 read_list ~= SelectEntry(chn1, &read_val[0]); 1703 if (!chn_closed[1]) 1704 read_list ~= SelectEntry(chn2, &read_val[1]); 1705 read_list ~= SelectEntry(chn3, &read_val[2]); 1706 1707 auto select_return = select(read_list, write_list); 1708 1709 if (!select_return.success) 1710 { 1711 if (!chn_closed[0]) chn_closed[0] = true; 1712 else if (!chn_closed[1]) chn_closed[1] = true; 1713 else chn_closed[2] = true; 1714 } 1715 } 1716 assert(read_val[0] == 42 && read_val[1] == 44 && read_val[2] == 37); 1717 assert(chn_closed[0] && chn_closed[1] && !chn_closed[2]); 1718 } 1719 ); 1720 } 1721 1722 @system unittest 1723 { 1724 FiberScheduler scheduler = new FiberScheduler; 1725 1726 auto chn1 = new Channel!int(20); 1727 auto chn2 = new Channel!int(); 1728 auto chn3 = new Channel!int(20); 1729 auto chn4 = new Channel!int(); 1730 1731 void thread_func (T) (ref T write_chn, ref T read_chn, int _tid) 1732 { 1733 FiberScheduler scheduler = new FiberScheduler; 1734 int read_val, write_val; 1735 int prev_read = -1; 1736 int n = 10000; 1737 1738 scheduler.start( 1739 () { 1740 while(read_val < n || write_val <= n) 1741 { 1742 int a; 1743 SelectEntry[] read_list; 1744 SelectEntry[] write_list; 1745 1746 if (write_val <= n) 1747 write_list ~= SelectEntry(write_chn, &write_val); 1748 1749 if (read_val < n) 1750 read_list ~= SelectEntry(read_chn, &read_val); 1751 1752 auto select_return = select(read_list, write_list); 1753 1754 if (select_return.success) 1755 { 1756 if (read_list.length > 0 && select_return.id == 0) 1757 { 1758 assert(prev_read + 1 == read_val); 1759 prev_read = read_val; 1760 } 1761 else 1762 { 1763 write_val++; 1764 } 1765 } 1766 } 1767 } 1768 ); 1769 } 1770 1771 auto t1 = new Thread({ 1772 thread_func(chn1, chn2, 0); 1773 }); 1774 t1.start(); 1775 1776 auto t2 = new Thread({ 1777 thread_func(chn2, chn3, 1); 1778 }); 1779 t2.start(); 1780 1781 auto t3 = new Thread({ 1782 thread_func(chn3, chn4, 2); 1783 }); 1784 t3.start(); 1785 1786 thread_func(chn4, chn1, 3); 1787 1788 thread_joinAll(); 1789 } 1790 1791 @system unittest 1792 { 1793 FiberScheduler scheduler = new FiberScheduler; 1794 auto chn1 = new Channel!int(); 1795 auto chn2 = new Channel!int(); 1796 1797 scheduler.spawn( 1798 () { 1799 FiberScheduler.yield(); 1800 chn1.close(); 1801 } 1802 ); 1803 1804 scheduler.spawn( 1805 () { 1806 FiberScheduler.yield(); 1807 chn2.close(); 1808 } 1809 ); 1810 1811 scheduler.spawn( 1812 () { 1813 for (int i = 0; i < 2; ++i) 1814 { 1815 int write_val = 42; 1816 SelectEntry[] read_list; 1817 SelectEntry[] write_list; 1818 write_list ~= SelectEntry(chn1, &write_val); 1819 auto select_return = select(read_list, write_list); 1820 1821 assert(select_return.id == 0); 1822 assert(!select_return.success); 1823 } 1824 } 1825 ); 1826 1827 scheduler.start( 1828 () { 1829 for (int i = 0; i < 2; ++i) 1830 { 1831 int read_val; 1832 SelectEntry[] read_list; 1833 SelectEntry[] write_list; 1834 read_list ~= SelectEntry(chn2, &read_val); 1835 auto select_return = select(read_list, write_list); 1836 1837 assert(select_return.id == 0); 1838 assert(!select_return.success); 1839 } 1840 } 1841 ); 1842 } 1843 1844 @system unittest 1845 { 1846 import core.sync.semaphore; 1847 FiberScheduler scheduler = new FiberScheduler; 1848 auto chn1 = new Channel!int(); 1849 1850 auto start = MonoTime.currTime; 1851 Semaphore writer_sem = new Semaphore(); 1852 Semaphore reader_sem = new Semaphore(); 1853 1854 auto t1 = new Thread({ 1855 FiberScheduler scheduler = new FiberScheduler; 1856 scheduler.start( 1857 () { 1858 writer_sem.wait(); 1859 assert(chn1.write(42)); 1860 } 1861 ); 1862 }); 1863 t1.start(); 1864 1865 auto t2 = new Thread({ 1866 FiberScheduler scheduler = new FiberScheduler; 1867 scheduler.start( 1868 () { 1869 int read_val; 1870 reader_sem.wait(); 1871 assert(chn1.read(read_val)); 1872 assert(read_val == 43); 1873 } 1874 ); 1875 }); 1876 t2.start(); 1877 1878 scheduler.start( 1879 () { 1880 int read_val; 1881 1882 scope (failure) { 1883 reader_sem.notify(); 1884 writer_sem.notify(); 1885 chn1.close(); 1886 } 1887 1888 assert(!chn1.read(read_val, 1000.msecs)); 1889 assert(MonoTime.currTime - start >= 1000.msecs); 1890 1891 writer_sem.notify(); 1892 assert(chn1.read(read_val, 1000.msecs)); 1893 assert(read_val == 42); 1894 1895 start = MonoTime.currTime; 1896 1897 assert(!chn1.write(read_val + 1, 1000.msecs)); 1898 assert(MonoTime.currTime - start >= 1000.msecs); 1899 1900 reader_sem.notify(); 1901 assert(chn1.write(read_val + 1, 1000.msecs)); 1902 } 1903 ); 1904 1905 thread_joinAll(); 1906 } 1907 1908 /// A simple spinlock 1909 struct SpinLock 1910 { 1911 /// Spin until lock is free 1912 void lock() nothrow 1913 { 1914 while (!cas(&locked, false, true)) { } 1915 } 1916 1917 /// Atomically unlock 1918 void unlock() nothrow 1919 { 1920 atomicStore!(MemoryOrder.rel)(locked, false); 1921 } 1922 1923 /// Lock state 1924 private shared(bool) locked; 1925 } 1926 1927 /// A Fiber level Semaphore 1928 class FiberSemaphore 1929 { 1930 1931 /*********************************************************************** 1932 1933 Ctor 1934 1935 Param: 1936 count = Initial count of FiberSemaphore 1937 1938 ************************************************************************/ 1939 1940 this (size_t count = 0) nothrow 1941 { 1942 this.count = count; 1943 } 1944 1945 /*********************************************************************** 1946 1947 Wait for FiberSemaphore count to be greater than 0 1948 1949 ************************************************************************/ 1950 1951 void wait () nothrow 1952 { 1953 this.slock.lock(); 1954 1955 if (this.count > 0) 1956 { 1957 this.count--; 1958 this.slock.unlock(); 1959 return; 1960 } 1961 auto entry = new SemaphoreQueueEntry(); 1962 thisScheduler().addResource(entry); 1963 scope (exit) thisScheduler().removeResource(entry); 1964 this.queue.insert(entry); 1965 1966 this.slock.unlock(); 1967 entry.blocker.wait(); 1968 } 1969 1970 /*********************************************************************** 1971 1972 Increment the FiberSemaphore count 1973 1974 ************************************************************************/ 1975 1976 void notify () nothrow 1977 { 1978 this.slock.lock(); 1979 if (auto entry = this.dequeueEntry()) 1980 entry.blocker.notify(); 1981 else 1982 this.count++; 1983 this.slock.unlock(); 1984 } 1985 1986 /// 1987 private class SemaphoreQueueEntry : FiberScheduler.Resource 1988 { 1989 1990 /// 1991 this () nothrow 1992 { 1993 assert(thisScheduler(), "Can not block with no FiberScheduler running!"); 1994 this.blocker = thisScheduler(). new FiberBlocker(); 1995 } 1996 1997 /*********************************************************************** 1998 1999 Terminate SemaphoreQueueEntry so that it is neutralized 2000 2001 ************************************************************************/ 2002 2003 void release () nothrow 2004 { 2005 if (!blocker.stopTimer()) 2006 this.outer.notify(); 2007 } 2008 2009 /// FiberBlocker blocking the `Fiber` 2010 FiberScheduler.FiberBlocker blocker; 2011 } 2012 2013 /*********************************************************************** 2014 2015 Dequeue a `SemaphoreQueueEntry` from the waiting queue 2016 2017 Return: 2018 a valid SemaphoreQueueEntry or null 2019 2020 ***********************************************************************/ 2021 2022 private SemaphoreQueueEntry dequeueEntry () nothrow 2023 { 2024 while (!this.queue.empty()) 2025 { 2026 auto entry = this.queue.front(); 2027 this.queue.removeFront(); 2028 if (entry.blocker.shouldBlock() && entry.blocker.stopTimer()) 2029 { 2030 return entry; 2031 } 2032 } 2033 return null; 2034 } 2035 2036 /// 2037 private SpinLock slock; 2038 2039 /// Waiting queue for Fibers 2040 private DList!SemaphoreQueueEntry queue; 2041 2042 /// Current semaphore count 2043 private size_t count; 2044 } 2045 2046 /// A Fiber level Mutex, essentially a binary FiberSemaphore 2047 class FiberMutex : FiberSemaphore 2048 { 2049 this () nothrow 2050 { 2051 super(1); 2052 } 2053 2054 /// 2055 alias lock = wait; 2056 2057 /// 2058 alias unlock = notify; 2059 2060 /// 2061 alias lock_nothrow = lock; 2062 2063 /// 2064 alias unlock_nothrow = unlock; 2065 } 2066 2067 // Test releasing a queue entry 2068 @system unittest 2069 { 2070 FiberMutex mtx = new FiberMutex(); 2071 int sharedVal; 2072 2073 auto t1 = new Thread({ 2074 FiberScheduler scheduler = new FiberScheduler; 2075 scheduler.start( 2076 () { 2077 mtx.lock(); 2078 Thread.sleep(400.msecs); 2079 sharedVal += 1; 2080 mtx.unlock(); 2081 } 2082 ); 2083 }); 2084 t1.start(); 2085 2086 auto t2 = new Thread({ 2087 FiberScheduler scheduler = new FiberScheduler; 2088 Thread.sleep(100.msecs); 2089 2090 scheduler.spawn( 2091 () { 2092 Thread.sleep(200.msecs); 2093 throw new Exception(""); 2094 } 2095 ); 2096 2097 try 2098 { 2099 scheduler.start( 2100 () { 2101 mtx.lock(); 2102 sharedVal += 1; 2103 mtx.unlock(); 2104 } 2105 ); 2106 } catch (Exception e) { } 2107 }); 2108 t2.start(); 2109 2110 auto t3 = new Thread({ 2111 FiberScheduler scheduler = new FiberScheduler; 2112 scheduler.start( 2113 () { 2114 Thread.sleep(200.msecs); 2115 mtx.lock(); 2116 assert(sharedVal == 1); 2117 sharedVal += 1; 2118 mtx.unlock(); 2119 } 2120 ); 2121 }); 2122 t3.start(); 2123 2124 thread_joinAll(); 2125 } 2126 2127 @system unittest 2128 { 2129 import std; 2130 FiberScheduler scheduler = new FiberScheduler; 2131 int count; 2132 2133 scheduler.spawn( 2134 () { 2135 assert(count == 2); 2136 count++; 2137 } 2138 ); 2139 2140 scheduler.start( 2141 () { 2142 thisScheduler().enterCriticalSection(); 2143 2144 // this will yield 2145 scheduler.spawn( 2146 () { 2147 assert(count == 3); 2148 count++; 2149 } 2150 ); 2151 2152 thisScheduler().yield(); 2153 // execution should still come back to us 2154 assert(count == 0); 2155 count++; 2156 2157 scope blocker = thisScheduler().new FiberBlocker(); 2158 blocker.wait(10.msecs); 2159 2160 assert(count == 1); 2161 count++; 2162 2163 thisScheduler().exitCriticalSection(); 2164 thisScheduler().yield(); 2165 assert(count == 4); 2166 } 2167 ); 2168 }