1 /**
2  * This is a low-level messaging API upon which more structured or restrictive
3  * APIs may be built.  The general idea is that every messageable entity is
4  * represented by a common handle type called a Tid, which allows messages to
5  * be sent to logical threads that are executing in both the current process
6  * and in external processes using the same interface.  This is an important
7  * aspect of scalability because it allows the components of a program to be
8  * spread across available resources with few to no changes to the actual
9  * implementation.
10  *
11  * A logical thread is an execution context that has its own stack and which
12  * runs asynchronously to other logical threads.  These may be preemptively
13  * scheduled kernel threads, fibers (cooperative user-space threads), or some
14  * other concept with similar behavior.
15  *
16  * The type of concurrency used when logical threads are created is determined
17  * by the Scheduler selected at initialization time.  The default behavior is
18  * currently to create a new kernel thread per call to spawn, but other
19  * schedulers are available that multiplex fibers across the main thread or
20  * use some combination of the two approaches.
21  *
22  * Note:
23  * Copied (almost verbatim) from Phobos at commit 3bfccf4f1 (2019-11-27)
24  * Changes are this notice, and the module rename, from `std.concurrency`
25  * to `geod24.concurrency`.
26  *
27  * Copyright: Copyright Sean Kelly 2009 - 2014.
28  * License:   <a href="http://www.boost.org/LICENSE_1_0.txt">Boost License 1.0</a>.
29  * Authors:   Sean Kelly, Alex Rønne Petersen, Martin Nowak
30  * Source:    $(PHOBOSSRC std/concurrency.d)
31  */
32 module geod24.concurrency;
33 
34 public import std.variant;
35 
36 import core.atomic;
37 import core.sync.condition;
38 import core.sync.mutex;
39 import core.sync.semaphore;
40 import core.thread;
41 import core.time : MonoTime;
42 import std.range.primitives;
43 import std.traits;
44 import std.algorithm;
45 import std.typecons;
46 import std.container : DList, SList;
47 import std.exception : assumeWontThrow;
48 
49 import geod24.RingBuffer;
50 
51 private
52 {
53     bool hasLocalAliasing(Types...)()
54     {
55         // Works around "statement is not reachable"
56         bool doesIt = false;
57         static foreach (T; Types)
58             doesIt |= hasLocalAliasingImpl!T;
59         return doesIt;
60     }
61 
62     template hasLocalAliasingImpl (T)
63     {
64         import std.typecons : Rebindable;
65 
66         static if (is(T : Channel!CT, CT))
67             immutable bool hasLocalAliasingImpl = hasLocalAliasing!CT;
68         else static if (is(T : Rebindable!R, R))
69             immutable bool hasLocalAliasingImpl = hasLocalAliasing!R;
70         else static if (is(T == struct))
71             immutable bool hasLocalAliasingImpl = hasLocalAliasing!(typeof(T.tupleof));
72         else
73             immutable bool hasLocalAliasingImpl = std.traits.hasUnsharedAliasing!(T);
74     }
75 
76     @safe unittest
77     {
78         static struct Container { Channel!int t; Channel!string m; }
79         static assert(!hasLocalAliasing!(Channel!(Channel!int), Channel!int, Container, int));
80         static assert( hasLocalAliasing!(Channel!(Channel!(int[]))));
81     }
82 
83     @safe unittest
84     {
85         /* Issue 20097 */
86         import std.datetime.systime : SysTime;
87         static struct Container { SysTime time; }
88         static assert(!hasLocalAliasing!(SysTime, Container));
89     }
90 }
91 
92 // Exceptions
93 
94 class FiberBlockedException : Exception
95 {
96     this(string msg = "Fiber is blocked") @safe pure nothrow @nogc
97     {
98         super(msg);
99     }
100 }
101 
102 
103 // Thread Creation
104 
105 private FiberScheduler thisSchedulerStorage;
106 
107 /***************************************************************************
108 
109     Get current running FiberScheduler
110 
111     Returns:
112         Returns a reference to the current scheduler or null no
113         scheduler is running
114 
115     TODO: Support for nested schedulers
116 
117 ***************************************************************************/
118 
119 public FiberScheduler thisScheduler () nothrow
120 {
121     return thisSchedulerStorage;
122 }
123 
124 /***************************************************************************
125 
126     Set current running FiberScheduler
127 
128     Params:
129         value = Reference to the current FiberScheduler
130 
131 ***************************************************************************/
132 
133 public void thisScheduler (FiberScheduler value) nothrow
134 {
135     thisSchedulerStorage = value;
136 }
137 
138 /**
139  * An example Scheduler using Fibers.
140  *
141  * This is an example scheduler that creates a new Fiber per call to spawn
142  * and multiplexes the execution of all fibers within the main thread.
143  */
144 public class FiberScheduler
145 {
146     /***************************************************************************
147 
148         Default ctor
149 
150         Params:
151             max_parked_fibers = Maximum number of parked fibers
152 
153     ***************************************************************************/
154 
155     public this (size_t max_parked_fibers = 8) nothrow
156     {
157         this.sem = assumeWontThrow(new Semaphore());
158         this.blocked_ex = new FiberBlockedException();
159         this.max_parked = max_parked_fibers;
160     }
161 
162     /**
163      * This creates a new Fiber for the supplied op and then starts the
164      * dispatcher.
165      */
166     void start (void delegate() op)
167     {
168         this.create(op, true);
169         // Make sure the just-created fiber is run first
170         this.dispatch();
171     }
172 
173     /**
174      * This created a new Fiber for the supplied op and adds it to the
175      * dispatch list.
176      */
177     void spawn (void delegate() op) nothrow
178     {
179         this.create(op);
180         FiberScheduler.yield();
181     }
182 
183     /**************************************************************************
184 
185         Schedule a task to be run next time the scheduler yields
186 
187         Behave similarly to `spawn`, but instead of running the task
188         immediately, it simply adds it to the queue and continue executing
189         the current task.
190 
191         Params:
192             op = Operation to run
193 
194     **************************************************************************/
195 
196     public void schedule (void delegate() op) nothrow
197     {
198         this.create(op);
199     }
200 
201     /**
202      * If the caller is a scheduled Fiber, this yields execution to another
203      * scheduled Fiber.
204      */
205     public static void yield () nothrow
206     {
207         // NOTE: It's possible that we should test whether the calling Fiber
208         //       is an InfoFiber before yielding, but I think it's reasonable
209         //       that any fiber should yield here.
210         if (Fiber.getThis())
211             Fiber.yield();
212     }
213 
214     /**
215      * If the caller is a scheduled Fiber, this yields execution to another
216      * scheduled Fiber.
217      */
218     public static void yieldAndThrow (Throwable t) nothrow
219     {
220         // NOTE: It's possible that we should test whether the calling Fiber
221         //       is an InfoFiber before yielding, but I think it's reasonable
222         //       that any fiber should yield here.
223         if (Fiber.getThis())
224             Fiber.yieldAndThrow(t);
225     }
226 
227     /// Resource type that will be tracked by FiberScheduler
228     protected interface Resource
229     {
230         ///
231         void release () nothrow;
232     }
233 
234     /***********************************************************************
235 
236         Add Resource to the Resource list of runnning Fiber
237 
238         Param:
239             r = Resource instace
240 
241     ***********************************************************************/
242 
243     private void addResource (Resource r, InfoFiber info_fiber = cast(InfoFiber) Fiber.getThis()) nothrow
244     {
245         assert(info_fiber, "Called from outside of an InfoFiber");
246         info_fiber.resources.insert(r);
247     }
248 
249     /***********************************************************************
250 
251         Remove Resource from the Resource list of runnning Fiber
252 
253         Param:
254             r = Resource instance
255 
256         Returns:
257             Success/failure
258 
259     ***********************************************************************/
260 
261     private bool removeResource (Resource r, InfoFiber info_fiber = cast(InfoFiber) Fiber.getThis()) nothrow
262     {
263         assert(info_fiber, "Called from outside of an InfoFiber");
264         // TODO: For some cases, search is not neccesary. We can just pop the last element
265         return assumeWontThrow(info_fiber.resources.linearRemoveElement(r));
266     }
267 
268     /**
269      * Creates a new Fiber which calls the given delegate.
270      *
271      * Params:
272      *   op = The delegate the fiber should call
273      *   insert_front = Fiber will be added to the front
274      *                  of the ready queue to be run first
275      */
276     protected void create (void delegate() op, bool insert_front = false) nothrow
277     {
278         InfoFiber new_fiber;
279         if (this.parked_count > 0)
280         {
281             new_fiber = this.parked_fibers.front();
282             new_fiber.reuse(op);
283 
284             this.parked_fibers.removeFront();
285             this.parked_count--;
286         }
287         else
288         {
289             new_fiber = new InfoFiber(op);
290         }
291 
292         if (insert_front)
293             this.readyq.insertFront(new_fiber);
294         else
295             this.readyq.insertBack(new_fiber);
296     }
297 
298     /**
299      * Fiber which embeds neccessary info for FiberScheduler
300      */
301     protected static class InfoFiber : Fiber
302     {
303         /// Semaphore reference that this Fiber is blocked on
304         FiberBlocker blocker;
305 
306         /// List of Resources held by this Fiber
307         DList!Resource resources;
308 
309         this (void delegate() op, size_t sz = 512 * 1024) nothrow
310         {
311             super(op, sz);
312         }
313 
314         /***********************************************************************
315 
316             Reset the Fiber to be reused with a new delegate
317 
318             Param:
319                 op = Delegate
320 
321         ***********************************************************************/
322 
323         void reuse (void delegate() op) nothrow
324         {
325             assert(this.state == Fiber.State.TERM, "Can not reuse a non terminated Fiber");
326             this.blocker = null;
327             this.resources.clear();
328             this.reset(op);
329         }
330     }
331 
332     public final class FiberBlocker
333     {
334 
335         /***********************************************************************
336 
337             Associate `FiberBlocker` with the running `Fiber`
338 
339             `FiberScheduler` will check to see if the `Fiber` is blocking on a
340             `FiberBlocker` to avoid rescheduling it unnecessarily
341 
342         ***********************************************************************/
343 
344         private void registerToInfoFiber (InfoFiber info_fiber = cast(InfoFiber) Fiber.getThis()) nothrow
345         {
346             assert(info_fiber !is null, "This Fiber does not belong to FiberScheduler");
347             assert(info_fiber.blocker is null, "This Fiber already has a registered FiberBlocker");
348             info_fiber.blocker = this;
349 
350         }
351 
352         /***********************************************************************
353 
354             Wait on the blocker with optional timeout
355 
356             Params:
357                 period = Timeout period
358 
359         ***********************************************************************/
360 
361         bool wait (Duration period = Duration.init) nothrow
362         {
363             if (period != Duration.init)
364                 this.limit = MonoTime.currTime + period;
365 
366             if (this.shouldBlock())
367             {
368                 this.registerToInfoFiber();
369                 FiberScheduler.yieldAndThrow(this.outer.blocked_ex);
370             }
371 
372             this.limit = MonoTime.init;
373             this.notified = false;
374             return !this.hasTimedOut();
375         }
376 
377         /***********************************************************************
378 
379             Unblock the Fiber waiting on this blocker
380 
381         ***********************************************************************/
382 
383         void notify () nothrow
384         {
385             this.stopTimer();
386             this.notified = true;
387             assumeWontThrow(this.outer.sem.notify());
388         }
389 
390         /***********************************************************************
391 
392             Query if `FiberBlocker` should still block
393 
394             FiberBlocker will block the Fiber until it is notified or
395             the specified timeout is reached.
396 
397         ***********************************************************************/
398 
399         bool shouldBlock () nothrow
400         {
401             bool timed_out = (this.limit != MonoTime.init
402                                 && MonoTime.currTime >= this.limit);
403 
404             if (timed_out)
405                 cas(&this.timer_state, TimerState.Running, TimerState.TimedOut);
406 
407             return atomicLoad(this.timer_state) != TimerState.TimedOut && !this.notified;
408         }
409 
410         /***********************************************************************
411 
412             Try freezing the internal timer
413 
414         ***********************************************************************/
415 
416         bool stopTimer () nothrow
417         {
418             return cas(&this.timer_state, TimerState.Running, TimerState.Stopped);
419         }
420 
421         /***********************************************************************
422 
423             Query if the internal timer has timed out
424 
425         ***********************************************************************/
426 
427         bool hasTimedOut () nothrow
428         {
429             return atomicLoad(this.timer_state) == TimerState.TimedOut;
430         }
431 
432         MonoTime getTimeout () nothrow
433         {
434             return limit;
435         }
436 
437     private:
438         enum TimerState
439         {
440             Running,
441             TimedOut,
442             Stopped
443         }
444 
445         /// Time limit that will eventually unblock the caller if a timeout is specified
446         MonoTime limit = MonoTime.init;
447 
448         /// State of the blocker
449         bool notified;
450 
451         /// State of the internal timer
452         shared(TimerState) timer_state;
453     }
454 
455     /***********************************************************************
456 
457         Start the scheduling loop
458 
459     ***********************************************************************/
460 
461     private void dispatch ()
462     {
463         thisScheduler(this);
464         scope (exit) thisScheduler(null);
465 
466         MonoTime earliest_timeout;
467 
468         while (!this.readyq.empty() || !this.wait_list.empty())
469         {
470             while (!readyq.empty())
471             {
472                 InfoFiber cur_fiber = this.readyq.front();
473                 this.readyq.removeFront();
474 
475                 assert(cur_fiber.state != Fiber.State.TERM);
476 
477                 auto t = cur_fiber.call(Fiber.Rethrow.no);
478 
479                 // Fibers that block on a FiberBlocker throw an
480                 // exception for scheduler to catch
481                 if (t is this.blocked_ex)
482                 {
483                     auto cur_timeout = cur_fiber.blocker.getTimeout();
484 
485                     // Keep track of the earliest timeout in the system
486                     if (cur_timeout != MonoTime.init
487                             && (earliest_timeout == MonoTime.init || cur_timeout < earliest_timeout))
488                     {
489                         earliest_timeout = cur_timeout;
490                     }
491 
492                     this.wait_list.insert(cur_fiber);
493                     continue;
494                 }
495                 else if (t)
496                 {
497                     // We are exiting the dispatch loop prematurely, all resources
498                     // held by Fibers should be released.
499                     this.releaseResources(cur_fiber);
500                     throw t;
501                 }
502 
503                 if (cur_fiber.state != Fiber.State.TERM)
504                 {
505                     this.readyq.insert(cur_fiber);
506                 }
507                 // Park terminated Fiber
508                 else if (this.parked_count < this.max_parked)
509                 {
510                     this.parked_fibers.insertFront(cur_fiber);
511                     this.parked_count++;
512                 }
513                 // Destroy the terminated Fiber to immediately reclaim
514                 // the stack space
515                 else
516                 {
517                     destroy!false(cur_fiber);
518                 }
519 
520                 // See if there are Fibers to be woken up if we reach a timeout
521                 // or the scheduler semaphore was notified
522                 if (MonoTime.currTime >= earliest_timeout || this.sem.tryWait())
523                     earliest_timeout = wakeFibers();
524             }
525 
526             if (!this.wait_list.empty())
527             {
528                 Duration time_to_timeout = earliest_timeout - MonoTime.currTime;
529 
530                 // Sleep until a timeout or an event
531                 if (earliest_timeout == MonoTime.init)
532                     this.sem.wait();
533                 else if (time_to_timeout > 0.seconds)
534                     this.sem.wait(time_to_timeout);
535             }
536 
537             // OS Thread woke up populate ready queue
538             earliest_timeout = wakeFibers();
539         }
540     }
541 
542     /***********************************************************************
543 
544         Move unblocked Fibers to ready queue
545 
546         Return:
547             Returns the earliest timeout left in the waiting list
548 
549     ***********************************************************************/
550 
551     private MonoTime wakeFibers()
552     {
553         import std.range;
554         MonoTime earliest_timeout;
555 
556         auto wait_range = this.wait_list[];
557         while (!wait_range.empty)
558         {
559             auto fiber = wait_range.front;
560 
561             // Remove the unblocked Fiber from wait list and
562             // append it to the end ofready queue
563             if (!fiber.blocker.shouldBlock())
564             {
565                 this.wait_list.popFirstOf(wait_range);
566                 fiber.blocker = null;
567                 this.readyq.insert(fiber);
568             }
569             else
570             {
571                 auto timeout = fiber.blocker.getTimeout();
572                 if (timeout != MonoTime.init
573                         && (earliest_timeout == MonoTime.init || timeout < earliest_timeout))
574                 {
575                     earliest_timeout = timeout;
576                 }
577                 wait_range.popFront();
578             }
579         }
580 
581         return earliest_timeout;
582     }
583 
584     /***********************************************************************
585 
586         Release all resources currently held by all Fibers owned by this
587         scheduler
588 
589         Param:
590             cur_fiber = Running Fiber
591 
592     ***********************************************************************/
593 
594     private void releaseResources (InfoFiber cur_fiber)
595     {
596         foreach (ref resource; cur_fiber.resources)
597             resource.release();
598         foreach (ref fiber; this.readyq)
599             foreach (ref resource; fiber.resources)
600                 resource.release();
601         foreach (ref fiber; this.wait_list)
602             foreach (ref resource; fiber.resources)
603                 resource.release();
604     }
605 
606 private:
607 
608     /// OS semaphore for scheduler to sleep on
609     Semaphore sem;
610 
611     /// A FIFO Queue of Fibers ready to run
612     DList!InfoFiber readyq;
613 
614     /// List of Fibers waiting for an event
615     DList!InfoFiber wait_list;
616 
617     /// Cached instance of FiberBlockedException
618     FiberBlockedException blocked_ex;
619 
620     /// List of parked fibers to be reused
621     SList!InfoFiber parked_fibers;
622 
623     /// Number of currently parked fibers
624     size_t parked_count;
625 
626     /// Maximum number of parked fibers
627     immutable size_t max_parked;
628 }
629 
630 /// Ensure argument to `start` is run first
631 unittest
632 {
633     {
634         scope sched = new FiberScheduler();
635         bool startHasRun;
636         sched.spawn(() => assert(startHasRun));
637         sched.start(() { startHasRun = true; });
638     }
639     {
640         scope sched = new FiberScheduler();
641         bool startHasRun;
642         sched.schedule(() => assert(startHasRun));
643         sched.start(() { startHasRun = true; });
644     }
645 }
646 
647 /***********************************************************************
648 
649     A common interface for objects that can be used in `select()`
650 
651 ***********************************************************************/
652 
653 public interface Selectable
654 {
655     /***********************************************************************
656 
657         Try to read/write to the `Selectable` without blocking. If the
658         operation would block, queue and link it with the `sel_state`
659 
660         Params:
661             ptr = pointer to the data for the select operation
662             sel_state = SelectState instace of the select call being executed
663             sel_id = id of the select call being executed
664 
665     ***********************************************************************/
666 
667     void selectWrite (void* ptr, SelectState sel_state, int sel_id);
668 
669     /// Ditto
670     void selectRead (void* ptr, SelectState sel_state, int sel_id);
671 }
672 
673 /***********************************************************************
674 
675     An aggregate to hold neccessary information for a select operation
676 
677 ***********************************************************************/
678 
679 public struct SelectEntry
680 {
681     /// Reference to a Selectable object
682     Selectable selectable;
683 
684     /// Pointer to the select data
685     void* select_data;
686 
687     /***********************************************************************
688 
689         Default ctor
690 
691         Params:
692             selectable = A selectable interface reference
693             select_data = pointer to the data for the select operation
694 
695     ***********************************************************************/
696 
697     this (Selectable selectable, void* select_data) @safe pure nothrow @nogc
698     {
699         this.selectable = selectable;
700         this.select_data = select_data;
701     }
702 }
703 
704 /// Consists of the id and result of the select operation that was completed
705 public alias SelectReturn = Tuple!(bool, "success", int, "id");
706 
707 /***********************************************************************
708 
709     Block on multiple `Channel`s
710 
711     Only one operation is completed per `select()` call
712 
713     Params:
714         read_list = List of `Channel`s to select for read operation
715         write_list = List of `Channel`s to select for write operation
716         timeout = Optional timeout
717 
718     Return:
719         Returns success/failure status of the operation and the index
720         of the `Channel` that the operation was carried on. The index is
721         the position of the SelectEntry in `read_list ~ write_list`, ie
722         concatenated lists.
723 
724 ***********************************************************************/
725 
726 public SelectReturn select (ref SelectEntry[] read_list, ref SelectEntry[] write_list,
727     Duration timeout = Duration.init)
728 {
729     import std.random : randomShuffle;
730 
731     auto ss = new SelectState(thisScheduler().new FiberBlocker());
732     int sel_id = 0;
733     thisScheduler().addResource(ss);
734     scope (exit) thisScheduler().removeResource(ss);
735 
736     read_list = read_list.randomShuffle();
737     write_list = write_list.randomShuffle();
738 
739     foreach (ref entry; read_list)
740     {
741         if (ss.isConsumed())
742             break;
743         entry.selectable.selectRead(entry.select_data, ss, sel_id++);
744     }
745 
746     foreach (ref entry; write_list)
747     {
748         if (ss.isConsumed())
749             break;
750         entry.selectable.selectWrite(entry.select_data, ss, sel_id++);
751     }
752 
753     if (!ss.blocker.wait(timeout))
754         return SelectReturn(false, -1); // Timed out
755 
756     return SelectReturn(ss.success, ss.id);
757 }
758 
759 /***********************************************************************
760 
761     Holds the state of a group of `selectRead`/`selectWrite` calls.
762     Synchronizes peers that will consume those calls, so that only one
763     `selectRead`/`selectWrite` call is completed.
764 
765 ***********************************************************************/
766 
767 final private class SelectState : FiberScheduler.Resource
768 {
769     /// Shared blocker object for multiple ChannelQueueEntry objects
770     FiberScheduler.FiberBlocker blocker;
771 
772     /***********************************************************************
773 
774         Default constructor
775 
776     ***********************************************************************/
777 
778     this (FiberScheduler.FiberBlocker blocker) @safe pure nothrow @nogc
779     {
780         this.blocker = blocker;
781     }
782 
783     /***********************************************************************
784 
785         Tries to atomically consume a `SelectState` and sets `id` and `success`
786         fields
787 
788         Param:
789             id = ID of the `selectRead`/`selectWrite` call that is consuming
790                  this `SelectState`
791             success_in = Success/Failure of the select call
792 
793         Return:
794             Returns true if `SelectState` was not already consumed, false otherwise
795 
796     ***********************************************************************/
797 
798     bool tryConsume (int id, bool success_in = true) nothrow
799     {
800         if (cas(&this.consumed, false, true))
801         {
802             this.id = id;
803             this.success = success_in;
804             return true;
805         }
806         return false;
807     }
808 
809     /***********************************************************************
810 
811         Returns if `SelectState` is already consumed or not
812 
813     ***********************************************************************/
814 
815     bool isConsumed () nothrow
816     {
817         return atomicLoad(this.consumed);
818     }
819 
820     /***********************************************************************
821 
822         Consume SelectState so that it is neutralized
823 
824     ***********************************************************************/
825 
826     void release () nothrow
827     {
828         this.tryConsume(-1, false);
829     }
830 
831     /// ID of the select call that consumed this `SelectState`
832     int id;
833 
834     /// Success/failure state of the select call with ID of `id`
835     bool success;
836 
837 private:
838     /// Indicates if this `SelectState` is consumed or not
839     shared(bool) consumed;
840 }
841 
842 /***********************************************************************
843 
844     A golang style channel implementation with buffered and unbuffered
845     operation modes
846 
847     Intended to be used between Fibers
848 
849     Param:
850         T = Type of the messages carried accross the `Channel`. Currently
851             all reference and values types are supported.
852 
853 ***********************************************************************/
854 
855 final public class Channel (T) : Selectable
856 {
857     /***********************************************************************
858 
859         Constructs a Channel
860 
861         Param:
862             max_size = Maximum amount of T a Channel can buffer
863                        (0 -> Unbuffered operation,
864                         Positive integer -> Buffered operation)
865 
866     ***********************************************************************/
867 
868     this (ulong max_size = 0) nothrow
869     {
870         this.max_size = max_size;
871         this.lock = new FiberMutex;
872         if (max_size)
873             this.buffer = new RingBuffer!T(max_size);
874     }
875 
876     /***********************************************************************
877 
878         Write a message to the `Channel` with an optional timeout
879 
880         Unbuffered mode:
881 
882             If a reader is already blocked on the `Channel`, writer copies the
883             message to reader's buffer and wakes up the reader by yielding
884 
885             If no reader is ready in the wait queue, writer appends itself
886             to write wait queue and blocks
887 
888         Buffered mode:
889 
890             If a reader is already blocked on the `Channel`, writer copies the
891             message to reader's buffer and wakes up the reader and returns
892             immediately.
893 
894             If the buffer is not full writer puts the message in the `Channel`
895             buffer and returns immediately
896 
897             If buffer is full writer appends itself to write wait queue and blocks
898 
899         If `Channel` is closed, it returns immediately with a failure,
900         regardless of the operation mode
901 
902         Param:
903             val = Message to write to `Channel`
904             duration = Timeout duration
905 
906         Returns:
907             Success/Failure - Fails when `Channel` is closed or timeout is reached.
908 
909     ***********************************************************************/
910 
911     bool write () (auto ref T val, Duration duration = Duration.init) nothrow
912     {
913         this.lock.lock_nothrow();
914 
915         bool success = tryWrite(val);
916 
917         if (!success && !this.isClosed())
918         {
919             ChannelQueueEntry q_ent = this.enqueueEntry(this.writeq, &val);
920             thisScheduler().addResource(q_ent);
921             scope (exit) thisScheduler().removeResource(q_ent);
922 
923             this.lock.unlock_nothrow();
924             return q_ent.blocker.wait(duration) && q_ent.success;
925         }
926 
927         this.lock.unlock_nothrow();
928         return success;
929     }
930 
931     /***********************************************************************
932 
933         Try to write a message to `Channel` without blocking
934 
935         tryWrite writes a message if it is possible to do so without blocking.
936 
937         If the tryWrite is being executed in a select call, it tries to consume the
938         `caller_sel` with the given `caller_sel_id`. It only proceeds if it can
939         successfully consume the `caller_sel`
940 
941         Param:
942             val = Message to write to `Channel`
943             caller_sel = SelectState instace of the select call being executed
944             caller_sel_id = id of the select call being executed
945 
946         Returns:
947             Success/Failure
948 
949     ***********************************************************************/
950 
951     private bool tryWrite () (auto ref T val, SelectState caller_sel = null, int caller_sel_id = 0) nothrow
952     {
953         if (this.isClosed())
954         {
955             if (caller_sel)
956                 caller_sel.tryConsume(caller_sel_id, false);
957             return false;
958         }
959 
960         if (ChannelQueueEntry readq_ent = this.dequeueEntry(this.readq, caller_sel, caller_sel_id))
961         {
962             *readq_ent.pVal = val;
963             readq_ent.blocker.notify();
964             return true;
965         }
966 
967         if (this.max_size > 0 // this.max_size > 0 = buffered
968                     && this.buffer.length < this.max_size
969                     && (!caller_sel || caller_sel.tryConsume(caller_sel_id)))
970         {
971             this.buffer.insert(val);
972             return true;
973         }
974 
975         return false;
976     }
977 
978     /***********************************************************************
979 
980         Try to write a message to Channel without blocking and if it fails,
981         create a write queue entry using the given `sel_state` and `sel_id`
982 
983         Param:
984             ptr = Message to write to channel
985             sel_state = SelectState instace of the select call being executed
986             sel_id = id of the select call being executed
987 
988     ***********************************************************************/
989 
990     void selectWrite (void* ptr, SelectState sel_state, int sel_id) nothrow
991     {
992         assert(ptr !is null);
993         assert(sel_state !is null);
994         T* val = cast(T*) ptr;
995 
996         this.lock.lock_nothrow();
997 
998         bool success = tryWrite(*val, sel_state, sel_id);
999 
1000         if (!sel_state.isConsumed())
1001             this.enqueueEntry(this.writeq, val, sel_state, sel_id);
1002 
1003         if (success || this.isClosed() || sel_state.id == -1)
1004         {
1005             this.lock.unlock_nothrow();
1006             sel_state.blocker.notify();
1007         }
1008         else
1009             this.lock.unlock_nothrow();
1010     }
1011 
1012     /***********************************************************************
1013 
1014         Read a message from the Channel with an optional timeout
1015 
1016         Unbuffered mode:
1017 
1018             If a writer is already blocked on the Channel, reader copies the
1019             value to `output` and wakes up the writer by yielding
1020 
1021             If no writer is ready in the wait queue, reader appends itself
1022             to read wait queue and blocks
1023 
1024             If channel is closed, it returns immediatly with a failure
1025 
1026         Buffered mode:
1027 
1028             If there are existing messages in the buffer, reader pops one off
1029             the buffer and returns immediatly with success, regardless of the
1030             Channel being closed or not
1031 
1032             If there are no messages in the buffer it behaves exactly like the
1033             unbuffered operation
1034 
1035         Param:
1036             output = Reference to output variable
1037             duration = Timeout duration
1038 
1039         Returns:
1040             Success/Failure - Fails when channel is closed and there are
1041             no existing messages to be read. Fails when timeout is reached.
1042 
1043     ***********************************************************************/
1044 
1045     bool read (ref T output, Duration duration = Duration.init) nothrow
1046     {
1047         this.lock.lock_nothrow();
1048 
1049         bool success = tryRead(output);
1050 
1051         if (!success && !this.isClosed())
1052         {
1053             ChannelQueueEntry q_ent = this.enqueueEntry(this.readq, &output);
1054             thisScheduler().addResource(q_ent);
1055             scope (exit) thisScheduler().removeResource(q_ent);
1056 
1057             this.lock.unlock_nothrow();
1058             return q_ent.blocker.wait(duration) && q_ent.success;
1059         }
1060 
1061         this.lock.unlock_nothrow();
1062         return success;
1063     }
1064 
1065     /***********************************************************************
1066 
1067         Try to read a message from Channel without blocking
1068 
1069         tryRead reads a message if it is possible to do so without blocking.
1070 
1071         If the tryRead is being executed in a select call, it tries to consume the
1072         `caller_sel` with the given `caller_sel_id`. It only proceeds if it can
1073         successfully consume the `caller_sel`
1074 
1075         Param:
1076             output = Field to write the message to
1077             caller_sel = SelectState instace of the select call being executed
1078             caller_sel_id = id of the select call being executed\
1079 
1080         Returns:
1081             Success/Failure
1082 
1083     ***********************************************************************/
1084 
1085     private bool tryRead (ref T output, SelectState caller_sel = null, int caller_sel_id = 0) nothrow
1086     {
1087         ChannelQueueEntry write_ent = this.dequeueEntry(this.writeq, caller_sel, caller_sel_id);
1088 
1089         if (this.max_size > 0 && !this.buffer.empty())
1090         {
1091             // if dequeueEntry fails, we will try to consume caller_sel again.
1092             if (!caller_sel || write_ent || caller_sel.tryConsume(caller_sel_id))
1093             {
1094                 output = this.buffer.front();
1095                 this.buffer.popFront();
1096 
1097                 if (write_ent)
1098                 {
1099                     this.buffer.insert(*write_ent.pVal);
1100                 }
1101             }
1102             else
1103             {
1104                 return false;
1105             }
1106         }
1107         // if dequeueEntry returns a valid entry, it always successfully consumes the related select states.
1108         // the race between 2 select calls is resolved in dequeueEntry.
1109         else if (write_ent)
1110         {
1111             output = *write_ent.pVal;
1112         }
1113         else
1114         {
1115             if (this.isClosed() && caller_sel)
1116                 caller_sel.tryConsume(caller_sel_id, false);
1117             return false;
1118         }
1119 
1120         if (write_ent)
1121             write_ent.blocker.notify();
1122         return true;
1123     }
1124 
1125     /***********************************************************************
1126 
1127         Try to read a message from Channel without blocking and if it fails,
1128         create a read queue entry using the given `sel_state` and `sel_id`
1129 
1130         Param:
1131             ptr = Buffer to write the message to
1132             sel_state = SelectState instace of the select call being executed
1133             sel_id = id of the select call being executed
1134 
1135     ***********************************************************************/
1136 
1137     void selectRead (void* ptr, SelectState sel_state, int sel_id) nothrow
1138     {
1139         assert(ptr !is null);
1140         assert(sel_state !is null);
1141         T* val = cast(T*) ptr;
1142 
1143         this.lock.lock_nothrow();
1144 
1145         bool success = tryRead(*val, sel_state, sel_id);
1146 
1147         if (!sel_state.isConsumed())
1148             this.enqueueEntry(this.readq, val, sel_state, sel_id);
1149 
1150         if (success || this.isClosed() || sel_state.id == -1)
1151         {
1152             this.lock.unlock_nothrow();
1153             sel_state.blocker.notify();
1154         }
1155         else
1156             this.lock.unlock_nothrow();
1157     }
1158 
1159     /***********************************************************************
1160 
1161         Close the channel
1162 
1163         Closes the channel by marking it closed and flushing all the wait
1164         queues
1165 
1166     ***********************************************************************/
1167 
1168     void close () nothrow
1169     {
1170         if (cas(&this.closed, false, true))
1171         {
1172             this.lock.lock_nothrow();
1173             scope (exit) this.lock.unlock_nothrow();
1174 
1175             // Wake blocked Fibers up, report the failure
1176             foreach (ref entry; this.readq)
1177             {
1178                 entry.terminate();
1179             }
1180             foreach (ref entry; this.writeq)
1181             {
1182                 entry.terminate();
1183             }
1184 
1185             this.readq.clear();
1186             this.writeq.clear();
1187         }
1188     }
1189 
1190     /***********************************************************************
1191 
1192         Return the length of the internal buffer
1193 
1194     ***********************************************************************/
1195 
1196     size_t length () nothrow
1197     {
1198         this.lock.lock_nothrow();
1199         scope (exit) this.lock.unlock_nothrow();
1200         return this.buffer.length;
1201     }
1202 
1203     /***********************************************************************
1204 
1205         Return the closed status of the `Channel`
1206 
1207     ***********************************************************************/
1208 
1209     bool isClosed () const @safe pure nothrow @nogc scope
1210     {
1211         return atomicLoad(this.closed);
1212     }
1213 
1214     /***********************************************************************
1215 
1216         An aggrate of neccessary information to block a Fiber and record
1217         their request
1218 
1219     ***********************************************************************/
1220 
1221     private static class ChannelQueueEntry : FiberScheduler.Resource
1222     {
1223         /// FiberBlocker blocking the `Fiber`
1224         FiberScheduler.FiberBlocker blocker;
1225 
1226         /// Pointer to the variable that we will read to/from
1227         T* pVal;
1228 
1229         /// Result of the blocking read/write call
1230         bool success = true;
1231 
1232         /// State of the select call that this entry was created for
1233         SelectState select_state;
1234 
1235         /// Id of the select call that this entry was created for
1236         int sel_id;
1237 
1238         this (T* pVal, SelectState select_state = null, int sel_id = 0) nothrow
1239         {
1240             this.pVal = pVal;
1241             this.select_state = select_state;
1242             this.sel_id = sel_id;
1243 
1244             if (this.select_state)
1245                 this.blocker = this.select_state.blocker;
1246             else
1247                 this.blocker = thisScheduler().new FiberBlocker();
1248         }
1249 
1250         /***********************************************************************
1251 
1252             Terminate a `ChannelQueueEntry` by waking up the blocked Fiber
1253             and reporting the failure
1254 
1255             This is called on all the `ChannelQueueEntry` instances still in
1256             the wait queues when Channel is closed
1257 
1258         ***********************************************************************/
1259 
1260         void terminate () nothrow
1261         {
1262             this.success = false;
1263             if (!this.select_state || this.select_state.tryConsume(this.sel_id, this.success))
1264                 this.blocker.notify();
1265         }
1266 
1267 
1268         /***********************************************************************
1269 
1270             Terminate ChannelQueueEntry so that it is neutralized
1271 
1272         ***********************************************************************/
1273 
1274         void release () nothrow
1275         {
1276             if (this.blocker.stopTimer())
1277                 this.pVal = null; // Sanitize pVal so that we can catch illegal accesses
1278         }
1279     }
1280 
1281     /***********************************************************************
1282 
1283         Create and enqueue a `ChannelQueueEntry` to the given entryq
1284 
1285         Param:
1286             entryq = Queue to append the new ChannelQueueEntry
1287             pVal =  Pointer to the message buffer
1288             sel_state = SelectState object to associate with the
1289                         newly created ChannelQueueEntry
1290             sel_id = id of the select call creating the new ChannelQueueEntry
1291 
1292         Return:
1293             newly created ChannelQueueEntry
1294 
1295     ***********************************************************************/
1296 
1297     private ChannelQueueEntry enqueueEntry (ref DList!ChannelQueueEntry entryq, T* pVal,
1298                                             SelectState sel_state = null, int sel_id = 0) nothrow
1299     {
1300         assert(pVal !is null);
1301 
1302         ChannelQueueEntry q_ent = new ChannelQueueEntry(pVal, sel_state, sel_id);
1303         entryq.insert(q_ent);
1304 
1305         return q_ent;
1306     }
1307 
1308     /***********************************************************************
1309 
1310         Dequeue a `ChannelQueueEntry` from the given `entryq`
1311 
1312         Walks the `entryq` until it finds a suitable entry or the queue
1313         empties. If `dequeueEntry` is called from a select, it tries to
1314         consume the `caller_sel` if the `peer_sel` is not currently consumed.
1315         If it fails to consume the `caller_sel`, returns with a failure.
1316 
1317         If selected queue entry is part of a select operation, it is also
1318         consumed. If it consumes `caller_sel` but `peer_sel` was already
1319         consumed whole select operation would fail and caller would need to try
1320         again. This should be a rare case, where the `peer_sel` gets consumed by
1321         someone else between the first if check which verifies that it is not
1322         consumed and the point we actually try to consume it.
1323 
1324         Param:
1325             entryq = Queue to append the new ChannelQueueEntry
1326             caller_sel = SelectState instace of the select call being executed
1327             caller_sel_id = id of the select call being executed
1328 
1329         Return:
1330             a valid ChannelQueueEntry or null
1331 
1332     ***********************************************************************/
1333 
1334     private ChannelQueueEntry dequeueEntry (ref DList!ChannelQueueEntry entryq,
1335                                             SelectState caller_sel = null, int caller_sel_id = 0) nothrow
1336     {
1337         while (!entryq.empty())
1338         {
1339             ChannelQueueEntry qent = entryq.front();
1340             auto peer_sel = qent.select_state;
1341 
1342             if ((!peer_sel || !peer_sel.isConsumed()) && qent.blocker.shouldBlock())
1343             {
1344                 // If we are in a select call, try to consume the caller select
1345                 // if we can't consume the caller select, no need to continue
1346                 if (caller_sel && !caller_sel.tryConsume(caller_sel_id))
1347                 {
1348                     return null;
1349                 }
1350 
1351                 // At this point, caller select is consumed.
1352                 // Try to consume the peer select if it exists
1353                 // If peer_sel was consumed by someone else, tough luck
1354                 // In that case, whole select will fail since we consumed the caller_sel
1355                 if ((!peer_sel || peer_sel.tryConsume(qent.sel_id)) && qent.blocker.stopTimer())
1356                 {
1357                     entryq.removeFront();
1358                     return qent;
1359                 }
1360                 else if (caller_sel)
1361                 {
1362                     // Mark caller_sel failed
1363                     caller_sel.id = -1;
1364                     caller_sel.success = false;
1365                     return null;
1366                 }
1367             }
1368 
1369             entryq.removeFront();
1370         }
1371 
1372         return null;
1373     }
1374 
1375 private:
1376     /// Internal data storage
1377     RingBuffer!T buffer;
1378 
1379     /// Closed flag
1380     bool closed;
1381 
1382     /// Per channel lock
1383     FiberMutex lock;
1384 
1385     /// List of fibers blocked on read()
1386     DList!ChannelQueueEntry readq;
1387 
1388     /// List of fibers blocked on write()
1389     DList!ChannelQueueEntry writeq;
1390 
1391 public:
1392     /// Maximum amount of T a Channel can buffer
1393     immutable ulong max_size;
1394 }
1395 
1396 // Test non blocking operation
1397 @system unittest
1398 {
1399     string str = "DEADBEEF";
1400     string rcv_str;
1401     FiberScheduler scheduler = new FiberScheduler;
1402     auto chn = new Channel!string(2);
1403 
1404     scheduler.start({
1405         chn.write(str ~ " 1");
1406         assert(chn.length() == 1);
1407         chn.write(str ~ " 2");
1408         assert(chn.length() == 2);
1409 
1410         assert(chn.read(rcv_str));
1411         assert(rcv_str == str ~ " 1");
1412         assert(chn.length() == 1);
1413 
1414         chn.write(str ~ " 3");
1415         assert(chn.length() == 2);
1416 
1417         assert(chn.read(rcv_str));
1418         assert(rcv_str == str ~ " 2");
1419         assert(chn.length() == 1);
1420 
1421         assert(chn.read(rcv_str));
1422         assert(rcv_str == str ~ " 3");
1423         assert(chn.length() == 0);
1424     });
1425 }
1426 
1427 // Test unbuffered blocking operation with multiple receivers
1428 // Receiver should read every message in the order they were sent
1429 @system unittest
1430 {
1431     FiberScheduler scheduler = new FiberScheduler;
1432     auto chn = new Channel!int();
1433     int n = 1000;
1434     long sum;
1435 
1436     scheduler.spawn(
1437         () {
1438             int val, prev;
1439             bool ret = chn.read(prev);
1440             sum += prev;
1441             assert(ret);
1442 
1443             while (chn.read(val))
1444             {
1445                 sum += val;
1446                 assert(ret);
1447                 assert(prev < val);
1448                 prev = val;
1449             }
1450         }
1451     );
1452 
1453     scheduler.spawn(
1454         () {
1455             int val, prev;
1456             bool ret = chn.read(prev);
1457             sum += prev;
1458             assert(ret);
1459 
1460             while (chn.read(val))
1461             {
1462                 sum += val;
1463                 assert(ret);
1464                 assert(prev < val);
1465                 prev = val;
1466             }
1467         }
1468     );
1469 
1470     scheduler.start(
1471         () {
1472             for (int i = 0; i <= n; i++)
1473             {
1474                 assert(chn.write(i));
1475             }
1476             chn.close();
1477 
1478             assert(!chn.write(0));
1479         }
1480     );
1481 
1482     // Sum of [0..1000]
1483     assert(sum == n*(n+1)/2);
1484 }
1485 
1486 // Test that writer is not blocked until buffer is full and a read unblocks the writer
1487 // Reader should be able to read remaining messages after chn is closed
1488 @system unittest
1489 {
1490     FiberScheduler scheduler = new FiberScheduler;
1491     auto chn = new Channel!int(5);
1492 
1493     scheduler.spawn(
1494         () {
1495             int val;
1496             assert(chn.max_size == chn.length);
1497             assert(chn.read(val));
1498             chn.close();
1499             // Read remaining messages after channel is closed
1500             for (int i = 0; i < chn.max_size; i++)
1501             {
1502                 assert(chn.read(val));
1503             }
1504             // No more messages to read on closed chn
1505             assert(!chn.read(val));
1506         }
1507     );
1508 
1509     scheduler.start(
1510         () {
1511             for (int i = 0; i < chn.max_size; i++)
1512             {
1513                 assert(chn.write(i));
1514             }
1515             assert(chn.max_size == chn.length);
1516             // Wait for read.
1517             assert(chn.write(42));
1518             // Reader already closed the channel
1519             assert(!chn.write(0));
1520         }
1521     );
1522 }
1523 
1524 @system unittest
1525 {
1526     struct HyperLoopMessage
1527     {
1528         int id;
1529         MonoTime time;
1530     }
1531 
1532     FiberScheduler scheduler = new FiberScheduler;
1533     auto chn1 = new Channel!HyperLoopMessage();
1534     auto chn2 = new Channel!HyperLoopMessage();
1535     auto chn3 = new Channel!HyperLoopMessage();
1536 
1537     scheduler.spawn(
1538         () {
1539             HyperLoopMessage msg;
1540 
1541             for (int i = 0; i < 1000; ++i)
1542             {
1543                 assert(chn2.read(msg));
1544                 assert(msg.id % 3 == 1);
1545                 msg.id++;
1546                 msg.time = MonoTime.currTime;
1547                 assert(chn3.write(msg));
1548             }
1549         }
1550     );
1551 
1552     scheduler.spawn(
1553         () {
1554             HyperLoopMessage msg;
1555 
1556             for (int i = 0; i < 1000; ++i)
1557             {
1558                 assert(chn1.read(msg));
1559                 assert(msg.id % 3 == 0);
1560                 msg.id++;
1561                 msg.time = MonoTime.currTime;
1562                 assert(chn2.write(msg));
1563             }
1564         }
1565     );
1566 
1567     scheduler.start(
1568         () {
1569             HyperLoopMessage msg = {
1570                 id : 0,
1571                 time : MonoTime.currTime
1572             };
1573 
1574             for (int i = 0; i < 1000; ++i)
1575             {
1576                 assert(chn1.write(msg));
1577                 assert(chn3.read(msg));
1578                 assert(msg.id % 3 == 2);
1579                 msg.id++;
1580                 msg.time = MonoTime.currTime;
1581             }
1582         }
1583     );
1584 }
1585 
1586 // Multiple writer threads writing to a buffered channel
1587 // Reader should receive all messages
1588 @system unittest
1589 {
1590     FiberScheduler scheduler = new FiberScheduler;
1591     immutable int n = 5000;
1592     auto chn1 = new Channel!int(n/10);
1593 
1594     shared int sharedVal = 0;
1595     shared int writer_sum = 0;
1596 
1597     auto t1 = new Thread({
1598         FiberScheduler scheduler = new FiberScheduler();
1599         scheduler.start(
1600             () {
1601                 int val = atomicOp!"+="(sharedVal, 1);
1602                 while (chn1.write(val))
1603                 {
1604                     atomicOp!"+="(writer_sum, val);
1605                     val = atomicOp!"+="(sharedVal, 1);
1606                 }
1607             }
1608         );
1609     });
1610     t1.start();
1611 
1612     auto t2 = new Thread({
1613         FiberScheduler scheduler = new FiberScheduler();
1614         scheduler.start(
1615             () {
1616                 int val = atomicOp!"+="(sharedVal, 1);
1617                 while (chn1.write(val))
1618                 {
1619                     atomicOp!"+="(writer_sum, val);
1620                     val = atomicOp!"+="(sharedVal, 1);
1621                 }
1622             }
1623         );
1624     });
1625     t2.start();
1626 
1627     scheduler.start(
1628         () {
1629             int reader_sum, readVal, count;
1630 
1631             while(chn1.read(readVal))
1632             {
1633                 reader_sum += readVal;
1634                 if (count++ == n) chn1.close();
1635             }
1636 
1637             thread_joinAll();
1638             assert(reader_sum == writer_sum);
1639         }
1640     );
1641 }
1642 
1643 @system unittest
1644 {
1645     FiberScheduler scheduler = new FiberScheduler;
1646     auto chn1 = new Channel!int();
1647     auto chn2 = new Channel!int(1);
1648     auto chn3 = new Channel!int();
1649 
1650     scheduler.spawn(
1651         () {
1652             chn1.write(42);
1653             chn1.close();
1654             chn3.write(37);
1655         }
1656     );
1657 
1658     scheduler.spawn(
1659         () {
1660             chn2.write(44);
1661             chn2.close();
1662         }
1663     );
1664 
1665     scheduler.start(
1666         () {
1667             bool[3] chn_closed;
1668             int[3] read_val;
1669             for (int i = 0; i < 5; ++i)
1670             {
1671                 SelectEntry[] read_list;
1672                 SelectEntry[] write_list;
1673 
1674                 if (!chn_closed[0])
1675                     read_list ~= SelectEntry(chn1, &read_val[0]);
1676                 if (!chn_closed[1])
1677                     read_list ~= SelectEntry(chn2, &read_val[1]);
1678                 read_list ~= SelectEntry(chn3, &read_val[2]);
1679 
1680                 auto select_return = select(read_list, write_list);
1681 
1682                 if (!select_return.success)
1683                 {
1684                     if (!chn_closed[0])  chn_closed[0] = true;
1685                     else if (!chn_closed[1])  chn_closed[1] = true;
1686                     else chn_closed[2] = true;
1687                 }
1688             }
1689             assert(read_val[0] == 42 && read_val[1] == 44 && read_val[2] == 37);
1690             assert(chn_closed[0] && chn_closed[1] && !chn_closed[2]);
1691         }
1692     );
1693 }
1694 
1695 @system unittest
1696 {
1697     FiberScheduler scheduler = new FiberScheduler;
1698 
1699     auto chn1 = new Channel!int(20);
1700     auto chn2 = new Channel!int();
1701     auto chn3 = new Channel!int(20);
1702     auto chn4 = new Channel!int();
1703 
1704     void thread_func (T) (ref T write_chn, ref T read_chn, int _tid)
1705     {
1706         FiberScheduler scheduler = new FiberScheduler;
1707         int read_val, write_val;
1708         int prev_read = -1;
1709         int n = 10000;
1710 
1711         scheduler.start(
1712             () {
1713                 while(read_val < n || write_val <= n)
1714                 {
1715                     int a;
1716                     SelectEntry[] read_list;
1717                     SelectEntry[] write_list;
1718 
1719                     if (write_val <= n)
1720                         write_list ~= SelectEntry(write_chn, &write_val);
1721 
1722                     if (read_val < n)
1723                         read_list ~= SelectEntry(read_chn, &read_val);
1724 
1725                     auto select_return = select(read_list, write_list);
1726 
1727                     if (select_return.success)
1728                     {
1729                         if (read_list.length > 0 && select_return.id == 0)
1730                         {
1731                             assert(prev_read + 1 == read_val);
1732                             prev_read = read_val;
1733                         }
1734                         else
1735                         {
1736                             write_val++;
1737                         }
1738                     }
1739                 }
1740             }
1741         );
1742     }
1743 
1744     auto t1 = new Thread({
1745         thread_func(chn1, chn2, 0);
1746     });
1747     t1.start();
1748 
1749     auto t2 = new Thread({
1750         thread_func(chn2, chn3, 1);
1751     });
1752     t2.start();
1753 
1754     auto t3 = new Thread({
1755         thread_func(chn3, chn4, 2);
1756     });
1757     t3.start();
1758 
1759     thread_func(chn4, chn1, 3);
1760 
1761     thread_joinAll();
1762 }
1763 
1764 @system unittest
1765 {
1766     FiberScheduler scheduler = new FiberScheduler;
1767     auto chn1 = new Channel!int();
1768     auto chn2 = new Channel!int();
1769 
1770     scheduler.spawn(
1771         () {
1772             FiberScheduler.yield();
1773             chn1.close();
1774         }
1775     );
1776 
1777     scheduler.spawn(
1778         () {
1779             FiberScheduler.yield();
1780             chn2.close();
1781         }
1782     );
1783 
1784     scheduler.spawn(
1785         () {
1786             for (int i = 0; i < 2; ++i)
1787             {
1788                 int write_val = 42;
1789                 SelectEntry[] read_list;
1790                 SelectEntry[] write_list;
1791                 write_list ~= SelectEntry(chn1, &write_val);
1792                 auto select_return = select(read_list, write_list);
1793 
1794                 assert(select_return.id == 0);
1795                 assert(!select_return.success);
1796             }
1797         }
1798     );
1799 
1800     scheduler.start(
1801         () {
1802             for (int i = 0; i < 2; ++i)
1803             {
1804                 int read_val;
1805                 SelectEntry[] read_list;
1806                 SelectEntry[] write_list;
1807                 read_list ~= SelectEntry(chn2, &read_val);
1808                 auto select_return = select(read_list, write_list);
1809 
1810                 assert(select_return.id == 0);
1811                 assert(!select_return.success);
1812             }
1813         }
1814     );
1815 }
1816 
1817 @system unittest
1818 {
1819     import core.sync.semaphore;
1820     FiberScheduler scheduler = new FiberScheduler;
1821     auto chn1 = new Channel!int();
1822 
1823     auto start = MonoTime.currTime;
1824     Semaphore writer_sem = new Semaphore();
1825     Semaphore reader_sem = new Semaphore();
1826 
1827     auto t1 = new Thread({
1828         FiberScheduler scheduler = new FiberScheduler;
1829         scheduler.start(
1830             () {
1831                 writer_sem.wait();
1832                 assert(chn1.write(42));
1833             }
1834         );
1835     });
1836     t1.start();
1837 
1838     auto t2 = new Thread({
1839         FiberScheduler scheduler = new FiberScheduler;
1840         scheduler.start(
1841             () {
1842                 int read_val;
1843                 reader_sem.wait();
1844                 assert(chn1.read(read_val));
1845                 assert(read_val == 43);
1846             }
1847         );
1848     });
1849     t2.start();
1850 
1851     scheduler.start(
1852         () {
1853             int read_val;
1854 
1855             scope (failure) {
1856                 reader_sem.notify();
1857                 writer_sem.notify();
1858                 chn1.close();
1859             }
1860 
1861             assert(!chn1.read(read_val, 1000.msecs));
1862             assert(MonoTime.currTime - start >= 1000.msecs);
1863 
1864             writer_sem.notify();
1865             assert(chn1.read(read_val, 1000.msecs));
1866             assert(read_val == 42);
1867 
1868             start = MonoTime.currTime;
1869 
1870             assert(!chn1.write(read_val + 1, 1000.msecs));
1871             assert(MonoTime.currTime - start >= 1000.msecs);
1872 
1873             reader_sem.notify();
1874             assert(chn1.write(read_val + 1, 1000.msecs));
1875         }
1876     );
1877 
1878     thread_joinAll();
1879 }
1880 
1881 /// A simple spinlock
1882 struct SpinLock
1883 {
1884     /// Spin until lock is free
1885     void lock() nothrow
1886     {
1887         while (!cas(&locked, false, true)) { }
1888     }
1889 
1890     /// Atomically unlock
1891     void unlock() nothrow
1892     {
1893         atomicStore!(MemoryOrder.rel)(locked, false);
1894     }
1895 
1896     /// Lock state
1897     private shared(bool) locked;
1898 }
1899 
1900 /// A Fiber level Semaphore
1901 class FiberSemaphore
1902 {
1903 
1904     /***********************************************************************
1905 
1906         Ctor
1907 
1908         Param:
1909             count = Initial count of FiberSemaphore
1910 
1911     ************************************************************************/
1912 
1913     this (size_t count = 0) nothrow
1914     {
1915         this.count = count;
1916     }
1917 
1918     /***********************************************************************
1919 
1920         Wait for FiberSemaphore count to be greater than 0
1921 
1922     ************************************************************************/
1923 
1924     void wait () nothrow
1925     {
1926         this.slock.lock();
1927 
1928         if (this.count > 0)
1929         {
1930             this.count--;
1931             this.slock.unlock();
1932             return;
1933         }
1934         auto entry = new SemaphoreQueueEntry();
1935         thisScheduler().addResource(entry);
1936         scope (exit) thisScheduler().removeResource(entry);
1937         this.queue.insert(entry);
1938 
1939         this.slock.unlock();
1940         entry.blocker.wait();
1941     }
1942 
1943     /***********************************************************************
1944 
1945         Increment the FiberSemaphore count
1946 
1947     ************************************************************************/
1948 
1949     void notify () nothrow
1950     {
1951         this.slock.lock();
1952         if (auto entry = this.dequeueEntry())
1953             entry.blocker.notify();
1954         else
1955             this.count++;
1956         this.slock.unlock();
1957     }
1958 
1959     ///
1960     private class SemaphoreQueueEntry : FiberScheduler.Resource
1961     {
1962 
1963         ///
1964         this () nothrow
1965         {
1966             assert(thisScheduler(), "Can not block with no FiberScheduler running!");
1967             this.blocker = thisScheduler(). new FiberBlocker();
1968         }
1969 
1970         /***********************************************************************
1971 
1972             Terminate SemaphoreQueueEntry so that it is neutralized
1973 
1974         ************************************************************************/
1975 
1976         void release () nothrow
1977         {
1978             if (!blocker.stopTimer())
1979                 this.outer.notify();
1980         }
1981 
1982         /// FiberBlocker blocking the `Fiber`
1983         FiberScheduler.FiberBlocker blocker;
1984     }
1985 
1986     /***********************************************************************
1987 
1988         Dequeue a `SemaphoreQueueEntry` from the waiting queue
1989 
1990         Return:
1991             a valid SemaphoreQueueEntry or null
1992 
1993     ***********************************************************************/
1994 
1995     private SemaphoreQueueEntry dequeueEntry () nothrow
1996     {
1997         while (!this.queue.empty())
1998         {
1999             auto entry = this.queue.front();
2000             this.queue.removeFront();
2001             if (entry.blocker.shouldBlock() && entry.blocker.stopTimer())
2002             {
2003                 return entry;
2004             }
2005         }
2006         return null;
2007     }
2008 
2009     ///
2010     private SpinLock slock;
2011 
2012     /// Waiting queue for Fibers
2013     private DList!SemaphoreQueueEntry queue;
2014 
2015     /// Current semaphore count
2016     private size_t count;
2017 }
2018 
2019 /// A Fiber level Mutex, essentially a binary FiberSemaphore
2020 class FiberMutex : FiberSemaphore
2021 {
2022     this () nothrow
2023     {
2024         super(1);
2025     }
2026 
2027     ///
2028     alias lock = wait;
2029 
2030     ///
2031     alias unlock = notify;
2032 
2033     ///
2034     alias lock_nothrow = lock;
2035 
2036     ///
2037     alias unlock_nothrow = unlock;
2038 }
2039 
2040 // Test releasing a queue entry
2041 @system unittest
2042 {
2043     FiberMutex mtx = new FiberMutex();
2044     int sharedVal;
2045 
2046     auto t1 = new Thread({
2047         FiberScheduler scheduler = new FiberScheduler;
2048         scheduler.start(
2049             () {
2050                 mtx.lock();
2051                 Thread.sleep(400.msecs);
2052                 sharedVal += 1;
2053                 mtx.unlock();
2054             }
2055         );
2056     });
2057     t1.start();
2058 
2059     auto t2 = new Thread({
2060         FiberScheduler scheduler = new FiberScheduler;
2061         Thread.sleep(100.msecs);
2062 
2063         scheduler.spawn(
2064             () {
2065                 Thread.sleep(200.msecs);
2066                 throw new Exception("");
2067             }
2068         );
2069 
2070         try
2071         {
2072             scheduler.start(
2073                 () {
2074                     mtx.lock();
2075                     sharedVal += 1;
2076                     mtx.unlock();
2077                 }
2078             );
2079         } catch (Exception e) { }
2080     });
2081     t2.start();
2082 
2083     auto t3 = new Thread({
2084         FiberScheduler scheduler = new FiberScheduler;
2085         scheduler.start(
2086             () {
2087                 Thread.sleep(200.msecs);
2088                 mtx.lock();
2089                 assert(sharedVal == 1);
2090                 sharedVal += 1;
2091                 mtx.unlock();
2092             }
2093         );
2094     });
2095     t3.start();
2096 
2097     thread_joinAll();
2098 }