1 /**
2  * This is a low-level messaging API upon which more structured or restrictive
3  * APIs may be built.  The general idea is that every messageable entity is
4  * represented by a common handle type called a Tid, which allows messages to
5  * be sent to logical threads that are executing in both the current process
6  * and in external processes using the same interface.  This is an important
7  * aspect of scalability because it allows the components of a program to be
8  * spread across available resources with few to no changes to the actual
9  * implementation.
10  *
11  * A logical thread is an execution context that has its own stack and which
12  * runs asynchronously to other logical threads.  These may be preemptively
13  * scheduled kernel threads, fibers (cooperative user-space threads), or some
14  * other concept with similar behavior.
15  *
16  * The type of concurrency used when logical threads are created is determined
17  * by the Scheduler selected at initialization time.  The default behavior is
18  * currently to create a new kernel thread per call to spawn, but other
19  * schedulers are available that multiplex fibers across the main thread or
20  * use some combination of the two approaches.
21  *
22  * Note:
23  * Copied (almost verbatim) from Phobos at commit 3bfccf4f1 (2019-11-27)
24  * Changes are this notice, and the module rename, from `std.concurrency`
25  * to `geod24.concurrency`.
26  *
27  * Copyright: Copyright Sean Kelly 2009 - 2014.
28  * License:   <a href="http://www.boost.org/LICENSE_1_0.txt">Boost License 1.0</a>.
29  * Authors:   Sean Kelly, Alex Rønne Petersen, Martin Nowak
30  * Source:    $(PHOBOSSRC std/concurrency.d)
31  */
32 module geod24.concurrency;
33 
34 public import std.variant;
35 
36 import core.atomic;
37 import core.sync.condition;
38 import core.sync.mutex;
39 import core.sync.semaphore;
40 import core.thread;
41 import core.time : MonoTime;
42 import std.range.primitives;
43 import std.traits;
44 import std.algorithm;
45 import std.typecons;
46 import std.container : DList, SList;
47 import std.exception : assumeWontThrow;
48 
49 import geod24.RingBuffer;
50 
51 private
52 {
53     bool hasLocalAliasing(Types...)()
54     {
55         // Works around "statement is not reachable"
56         bool doesIt = false;
57         static foreach (T; Types)
58             doesIt |= hasLocalAliasingImpl!T;
59         return doesIt;
60     }
61 
62     template hasLocalAliasingImpl (T)
63     {
64         import std.typecons : Rebindable;
65 
66         static if (is(T : Channel!CT, CT))
67             immutable bool hasLocalAliasingImpl = hasLocalAliasing!CT;
68         else static if (is(T : Rebindable!R, R))
69             immutable bool hasLocalAliasingImpl = hasLocalAliasing!R;
70         else static if (is(T == struct))
71             immutable bool hasLocalAliasingImpl = hasLocalAliasing!(typeof(T.tupleof));
72         else
73             immutable bool hasLocalAliasingImpl = std.traits.hasUnsharedAliasing!(T);
74     }
75 
76     @safe unittest
77     {
78         static struct Container { Channel!int t; Channel!string m; }
79         static assert(!hasLocalAliasing!(Channel!(Channel!int), Channel!int, Container, int));
80         static assert( hasLocalAliasing!(Channel!(Channel!(int[]))));
81     }
82 
83     @safe unittest
84     {
85         /* Issue 20097 */
86         import std.datetime.systime : SysTime;
87         static struct Container { SysTime time; }
88         static assert(!hasLocalAliasing!(SysTime, Container));
89     }
90 }
91 
92 // Exceptions
93 
94 class FiberBlockedException : Exception
95 {
96     this(string msg = "Fiber is blocked") @safe pure nothrow @nogc
97     {
98         super(msg);
99     }
100 }
101 
102 
103 // Thread Creation
104 
105 private FiberScheduler thisSchedulerStorage;
106 
107 /***************************************************************************
108 
109     Get current running FiberScheduler
110 
111     Returns:
112         Returns a reference to the current scheduler or null no
113         scheduler is running
114 
115     TODO: Support for nested schedulers
116 
117 ***************************************************************************/
118 
119 public FiberScheduler thisScheduler () nothrow
120 {
121     return thisSchedulerStorage;
122 }
123 
124 /***************************************************************************
125 
126     Set current running FiberScheduler
127 
128     Params:
129         value = Reference to the current FiberScheduler
130 
131 ***************************************************************************/
132 
133 public void thisScheduler (FiberScheduler value) nothrow
134 {
135     thisSchedulerStorage = value;
136 }
137 
138 /**
139  * An example Scheduler using Fibers.
140  *
141  * This is an example scheduler that creates a new Fiber per call to spawn
142  * and multiplexes the execution of all fibers within the main thread.
143  */
144 public class FiberScheduler
145 {
146     /***************************************************************************
147 
148         Default ctor
149 
150         Params:
151             max_parked_fibers = Maximum number of parked fibers
152 
153     ***************************************************************************/
154 
155     public this (size_t max_parked_fibers = 8) nothrow
156     {
157         this.sem = assumeWontThrow(new Semaphore());
158         this.blocked_ex = new FiberBlockedException();
159         this.max_parked = max_parked_fibers;
160     }
161 
162     /**
163      * This creates a new Fiber for the supplied op and then starts the
164      * dispatcher.
165      */
166     void start (void delegate() op)
167     {
168         this.create(op, true);
169         // Make sure the just-created fiber is run first
170         this.dispatch();
171     }
172 
173     /**
174      * This created a new Fiber for the supplied op and adds it to the
175      * dispatch list.
176      */
177     void spawn (void delegate() op) nothrow
178     {
179         this.create(op);
180         FiberScheduler.yield();
181     }
182 
183     /**************************************************************************
184 
185         Schedule a task to be run next time the scheduler yields
186 
187         Behave similarly to `spawn`, but instead of running the task
188         immediately, it simply adds it to the queue and continue executing
189         the current task.
190 
191         Params:
192             op = Operation to run
193 
194     **************************************************************************/
195 
196     public void schedule (void delegate() op) nothrow
197     {
198         this.create(op);
199     }
200 
201     /**
202      * If the caller is a scheduled Fiber, this yields execution to another
203      * scheduled Fiber.
204      */
205     public static void yield () nothrow
206     {
207         // NOTE: It's possible that we should test whether the calling Fiber
208         //       is an InfoFiber before yielding, but I think it's reasonable
209         //       that any fiber should yield here.
210         if (Fiber.getThis())
211             Fiber.yield();
212     }
213 
214     /**
215      * If the caller is a scheduled Fiber, this yields execution to another
216      * scheduled Fiber.
217      */
218     public static void yieldAndThrow (Throwable t) nothrow
219     {
220         // NOTE: It's possible that we should test whether the calling Fiber
221         //       is an InfoFiber before yielding, but I think it's reasonable
222         //       that any fiber should yield here.
223         if (Fiber.getThis())
224             Fiber.yieldAndThrow(t);
225     }
226 
227     /// Resource type that will be tracked by FiberScheduler
228     protected interface Resource
229     {
230         ///
231         void release () nothrow;
232     }
233 
234     /***********************************************************************
235 
236         Add Resource to the Resource list of runnning Fiber
237 
238         Param:
239             r = Resource instace
240 
241     ***********************************************************************/
242 
243     private void addResource (Resource r, InfoFiber info_fiber = cast(InfoFiber) Fiber.getThis()) nothrow
244     {
245         assert(info_fiber, "Called from outside of an InfoFiber");
246         info_fiber.resources.insert(r);
247     }
248 
249     /***********************************************************************
250 
251         Remove Resource from the Resource list of runnning Fiber
252 
253         Param:
254             r = Resource instance
255 
256         Returns:
257             Success/failure
258 
259     ***********************************************************************/
260 
261     private bool removeResource (Resource r, InfoFiber info_fiber = cast(InfoFiber) Fiber.getThis()) nothrow
262     {
263         assert(info_fiber, "Called from outside of an InfoFiber");
264         // TODO: For some cases, search is not neccesary. We can just pop the last element
265         return assumeWontThrow(info_fiber.resources.linearRemoveElement(r));
266     }
267 
268     /**
269      * Creates a new Fiber which calls the given delegate.
270      *
271      * Params:
272      *   op = The delegate the fiber should call
273      *   insert_front = Fiber will be added to the front
274      *                  of the ready queue to be run first
275      */
276     protected void create (void delegate() op, bool insert_front = false) nothrow
277     {
278         InfoFiber new_fiber;
279         if (this.parked_count > 0)
280         {
281             new_fiber = this.parked_fibers.front();
282             new_fiber.reuse(op);
283 
284             this.parked_fibers.removeFront();
285             this.parked_count--;
286         }
287         else
288         {
289             new_fiber = new InfoFiber(op);
290         }
291 
292         if (insert_front)
293             this.readyq.insertFront(new_fiber);
294         else
295             this.readyq.insertBack(new_fiber);
296     }
297 
298     /**
299      * Fiber which embeds neccessary info for FiberScheduler
300      */
301     protected static class InfoFiber : Fiber
302     {
303         /// Semaphore reference that this Fiber is blocked on
304         FiberBlocker blocker;
305 
306         /// List of Resources held by this Fiber
307         DList!Resource resources;
308 
309         this (void delegate() op, size_t sz = 512 * 1024) nothrow
310         {
311             super(op, sz);
312         }
313 
314         /***********************************************************************
315 
316             Reset the Fiber to be reused with a new delegate
317 
318             Param:
319                 op = Delegate
320 
321         ***********************************************************************/
322 
323         void reuse (void delegate() op) nothrow
324         {
325             assert(this.state == Fiber.State.TERM, "Can not reuse a non terminated Fiber");
326             this.blocker = null;
327             this.resources.clear();
328             this.reset(op);
329         }
330     }
331 
332     public final class FiberBlocker
333     {
334 
335         /***********************************************************************
336 
337             Associate `FiberBlocker` with the running `Fiber`
338 
339             `FiberScheduler` will check to see if the `Fiber` is blocking on a
340             `FiberBlocker` to avoid rescheduling it unnecessarily
341 
342         ***********************************************************************/
343 
344         private void registerToInfoFiber (InfoFiber info_fiber = cast(InfoFiber) Fiber.getThis()) nothrow
345         {
346             assert(info_fiber !is null, "This Fiber does not belong to FiberScheduler");
347             assert(info_fiber.blocker is null, "This Fiber already has a registered FiberBlocker");
348             info_fiber.blocker = this;
349 
350         }
351 
352         /***********************************************************************
353 
354             Wait on the blocker with optional timeout
355 
356             Params:
357                 period = Timeout period
358 
359         ***********************************************************************/
360 
361         bool wait (Duration period = Duration.init) nothrow
362         {
363             if (period != Duration.init)
364                 this.limit = MonoTime.currTime + period;
365 
366             if (this.shouldBlock())
367             {
368                 this.registerToInfoFiber();
369                 FiberScheduler.yieldAndThrow(this.outer.blocked_ex);
370             }
371 
372             this.limit = MonoTime.init;
373             this.notified = false;
374             return !this.hasTimedOut();
375         }
376 
377         /***********************************************************************
378 
379             Unblock the Fiber waiting on this blocker
380 
381         ***********************************************************************/
382 
383         void notify () nothrow
384         {
385             this.stopTimer();
386             this.notified = true;
387             assumeWontThrow(this.outer.sem.notify());
388         }
389 
390         /***********************************************************************
391 
392             Query if `FiberBlocker` should still block
393 
394             FiberBlocker will block the Fiber until it is notified or
395             the specified timeout is reached.
396 
397         ***********************************************************************/
398 
399         bool shouldBlock () nothrow
400         {
401             bool timed_out = (this.limit != MonoTime.init
402                                 && MonoTime.currTime >= this.limit);
403 
404             if (timed_out)
405                 cas(&this.timer_state, TimerState.Running, TimerState.TimedOut);
406 
407             return atomicLoad(this.timer_state) != TimerState.TimedOut && !this.notified;
408         }
409 
410         /***********************************************************************
411 
412             Try freezing the internal timer
413 
414         ***********************************************************************/
415 
416         bool stopTimer () nothrow
417         {
418             return cas(&this.timer_state, TimerState.Running, TimerState.Stopped);
419         }
420 
421         /***********************************************************************
422 
423             Query if the internal timer has timed out
424 
425         ***********************************************************************/
426 
427         bool hasTimedOut () nothrow
428         {
429             return atomicLoad(this.timer_state) == TimerState.TimedOut;
430         }
431 
432         MonoTime getTimeout () nothrow
433         {
434             return limit;
435         }
436 
437     private:
438         enum TimerState
439         {
440             Running,
441             TimedOut,
442             Stopped
443         }
444 
445         /// Time limit that will eventually unblock the caller if a timeout is specified
446         MonoTime limit = MonoTime.init;
447 
448         /// State of the blocker
449         bool notified;
450 
451         /// State of the internal timer
452         shared(TimerState) timer_state;
453     }
454 
455     /***********************************************************************
456 
457         Start the scheduling loop
458 
459     ***********************************************************************/
460 
461     private void dispatch ()
462     {
463         thisScheduler(this);
464         scope (exit) thisScheduler(null);
465 
466         MonoTime earliest_timeout;
467 
468         while (!this.readyq.empty() || !this.wait_list.empty())
469         {
470             while (!readyq.empty())
471             {
472                 InfoFiber cur_fiber = this.readyq.front();
473                 if (this.critical_section_fiber &&
474                     cur_fiber !is this.critical_section_fiber)
475                     break;
476 
477                 this.readyq.removeFront();
478 
479                 assert(cur_fiber.state != Fiber.State.TERM);
480 
481                 auto t = cur_fiber.call(Fiber.Rethrow.no);
482 
483                 // Fibers that block on a FiberBlocker throw an
484                 // exception for scheduler to catch
485                 if (t is this.blocked_ex)
486                 {
487                     auto cur_timeout = cur_fiber.blocker.getTimeout();
488 
489                     // Keep track of the earliest timeout in the system
490                     if (cur_timeout != MonoTime.init
491                             && (earliest_timeout == MonoTime.init || cur_timeout < earliest_timeout))
492                     {
493                         earliest_timeout = cur_timeout;
494                     }
495 
496                     this.wait_list.insert(cur_fiber);
497                     continue;
498                 }
499                 else if (t)
500                 {
501                     // We are exiting the dispatch loop prematurely, all resources
502                     // held by Fibers should be released.
503                     this.releaseResources(cur_fiber);
504                     throw t;
505                 }
506 
507                 if (cur_fiber.state != Fiber.State.TERM)
508                 {
509                     if (this.critical_section_fiber is cur_fiber)
510                         this.readyq.insertFront(cur_fiber);
511                     else
512                         this.readyq.insertBack(cur_fiber);
513                 }
514                 // Park terminated Fiber
515                 else if (this.parked_count < this.max_parked)
516                 {
517                     this.parked_fibers.insertFront(cur_fiber);
518                     this.parked_count++;
519                 }
520                 // Destroy the terminated Fiber to immediately reclaim
521                 // the stack space
522                 else
523                 {
524                     destroy!false(cur_fiber);
525                 }
526 
527                 // See if there are Fibers to be woken up if we reach a timeout
528                 // or the scheduler semaphore was notified
529                 if (MonoTime.currTime >= earliest_timeout || this.sem.tryWait())
530                     earliest_timeout = wakeFibers();
531             }
532 
533             if (!this.wait_list.empty())
534             {
535                 Duration time_to_timeout = earliest_timeout - MonoTime.currTime;
536 
537                 // Sleep until a timeout or an event
538                 if (earliest_timeout == MonoTime.init)
539                     this.sem.wait();
540                 else if (time_to_timeout > 0.seconds)
541                     this.sem.wait(time_to_timeout);
542             }
543 
544             // OS Thread woke up populate ready queue
545             earliest_timeout = wakeFibers();
546         }
547     }
548 
549     /***********************************************************************
550 
551         Move unblocked Fibers to ready queue
552 
553         Return:
554             Returns the earliest timeout left in the waiting list
555 
556     ***********************************************************************/
557 
558     private MonoTime wakeFibers()
559     {
560         import std.range;
561         MonoTime earliest_timeout;
562 
563         auto wait_range = this.wait_list[];
564         while (!wait_range.empty)
565         {
566             auto fiber = wait_range.front;
567 
568             // Remove the unblocked Fiber from wait list and
569             // append it to the end ofready queue
570             if (!fiber.blocker.shouldBlock())
571             {
572                 this.wait_list.popFirstOf(wait_range);
573                 fiber.blocker = null;
574                 if (this.critical_section_fiber is fiber)
575                     this.readyq.insertFront(fiber);
576                 else
577                     this.readyq.insertBack(fiber);
578             }
579             else
580             {
581                 auto timeout = fiber.blocker.getTimeout();
582                 if (timeout != MonoTime.init
583                         && (earliest_timeout == MonoTime.init || timeout < earliest_timeout))
584                 {
585                     earliest_timeout = timeout;
586                 }
587                 wait_range.popFront();
588             }
589         }
590 
591         return earliest_timeout;
592     }
593 
594     /***********************************************************************
595 
596         Release all resources currently held by all Fibers owned by this
597         scheduler
598 
599         Param:
600             cur_fiber = Running Fiber
601 
602     ***********************************************************************/
603 
604     private void releaseResources (InfoFiber cur_fiber)
605     {
606         foreach (ref resource; cur_fiber.resources)
607             resource.release();
608         foreach (ref fiber; this.readyq)
609             foreach (ref resource; fiber.resources)
610                 resource.release();
611         foreach (ref fiber; this.wait_list)
612             foreach (ref resource; fiber.resources)
613                 resource.release();
614     }
615 
616     /// Disables Fiber switching
617     public void enterCriticalSection () nothrow
618     {
619         this.critical_section_fiber = cast(InfoFiber) Fiber.getThis();
620         assert(this.critical_section_fiber !is null);
621     }
622 
623     /// Enables Fiber switching
624     public void exitCriticalSection () nothrow
625     {
626         assert(this.critical_section_fiber is cast(InfoFiber) Fiber.getThis());
627         this.critical_section_fiber = null;
628     }
629 
630 private:
631 
632     /// OS semaphore for scheduler to sleep on
633     Semaphore sem;
634 
635     /// A FIFO Queue of Fibers ready to run
636     DList!InfoFiber readyq;
637 
638     /// List of Fibers waiting for an event
639     DList!InfoFiber wait_list;
640 
641     /// Cached instance of FiberBlockedException
642     FiberBlockedException blocked_ex;
643 
644     /// List of parked fibers to be reused
645     SList!InfoFiber parked_fibers;
646 
647     /// Number of currently parked fibers
648     size_t parked_count;
649 
650     /// Maximum number of parked fibers
651     immutable size_t max_parked;
652 
653     /// InfoFiber that is in critical section
654     InfoFiber critical_section_fiber;
655 }
656 
657 /// Ensure argument to `start` is run first
658 unittest
659 {
660     {
661         scope sched = new FiberScheduler();
662         bool startHasRun;
663         sched.spawn(() => assert(startHasRun));
664         sched.start(() { startHasRun = true; });
665     }
666     {
667         scope sched = new FiberScheduler();
668         bool startHasRun;
669         sched.schedule(() => assert(startHasRun));
670         sched.start(() { startHasRun = true; });
671     }
672 }
673 
674 /***********************************************************************
675 
676     A common interface for objects that can be used in `select()`
677 
678 ***********************************************************************/
679 
680 public interface Selectable
681 {
682     /***********************************************************************
683 
684         Try to read/write to the `Selectable` without blocking. If the
685         operation would block, queue and link it with the `sel_state`
686 
687         Params:
688             ptr = pointer to the data for the select operation
689             sel_state = SelectState instace of the select call being executed
690             sel_id = id of the select call being executed
691 
692     ***********************************************************************/
693 
694     void selectWrite (void* ptr, SelectState sel_state, int sel_id);
695 
696     /// Ditto
697     void selectRead (void* ptr, SelectState sel_state, int sel_id);
698 }
699 
700 /***********************************************************************
701 
702     An aggregate to hold neccessary information for a select operation
703 
704 ***********************************************************************/
705 
706 public struct SelectEntry
707 {
708     /// Reference to a Selectable object
709     Selectable selectable;
710 
711     /// Pointer to the select data
712     void* select_data;
713 
714     /***********************************************************************
715 
716         Default ctor
717 
718         Params:
719             selectable = A selectable interface reference
720             select_data = pointer to the data for the select operation
721 
722     ***********************************************************************/
723 
724     this (Selectable selectable, void* select_data) @safe pure nothrow @nogc
725     {
726         this.selectable = selectable;
727         this.select_data = select_data;
728     }
729 }
730 
731 /// Consists of the id and result of the select operation that was completed
732 public alias SelectReturn = Tuple!(bool, "success", int, "id");
733 
734 /***********************************************************************
735 
736     Block on multiple `Channel`s
737 
738     Only one operation is completed per `select()` call
739 
740     Params:
741         read_list = List of `Channel`s to select for read operation
742         write_list = List of `Channel`s to select for write operation
743         timeout = Optional timeout
744 
745     Return:
746         Returns success/failure status of the operation and the index
747         of the `Channel` that the operation was carried on. The index is
748         the position of the SelectEntry in `read_list ~ write_list`, ie
749         concatenated lists.
750 
751 ***********************************************************************/
752 
753 public SelectReturn select (ref SelectEntry[] read_list, ref SelectEntry[] write_list,
754     Duration timeout = Duration.init)
755 {
756     import std.random : randomShuffle;
757 
758     auto ss = new SelectState(thisScheduler().new FiberBlocker());
759     int sel_id = 0;
760     thisScheduler().addResource(ss);
761     scope (exit) thisScheduler().removeResource(ss);
762 
763     read_list = read_list.randomShuffle();
764     write_list = write_list.randomShuffle();
765 
766     foreach (ref entry; read_list)
767     {
768         if (ss.isConsumed())
769             break;
770         entry.selectable.selectRead(entry.select_data, ss, sel_id++);
771     }
772 
773     foreach (ref entry; write_list)
774     {
775         if (ss.isConsumed())
776             break;
777         entry.selectable.selectWrite(entry.select_data, ss, sel_id++);
778     }
779 
780     if (!ss.blocker.wait(timeout))
781         return SelectReturn(false, -1); // Timed out
782 
783     return SelectReturn(ss.success, ss.id);
784 }
785 
786 /***********************************************************************
787 
788     Holds the state of a group of `selectRead`/`selectWrite` calls.
789     Synchronizes peers that will consume those calls, so that only one
790     `selectRead`/`selectWrite` call is completed.
791 
792 ***********************************************************************/
793 
794 final private class SelectState : FiberScheduler.Resource
795 {
796     /// Shared blocker object for multiple ChannelQueueEntry objects
797     FiberScheduler.FiberBlocker blocker;
798 
799     /***********************************************************************
800 
801         Default constructor
802 
803     ***********************************************************************/
804 
805     this (FiberScheduler.FiberBlocker blocker) @safe pure nothrow @nogc
806     {
807         this.blocker = blocker;
808     }
809 
810     /***********************************************************************
811 
812         Tries to atomically consume a `SelectState` and sets `id` and `success`
813         fields
814 
815         Param:
816             id = ID of the `selectRead`/`selectWrite` call that is consuming
817                  this `SelectState`
818             success_in = Success/Failure of the select call
819 
820         Return:
821             Returns true if `SelectState` was not already consumed, false otherwise
822 
823     ***********************************************************************/
824 
825     bool tryConsume (int id, bool success_in = true) nothrow
826     {
827         if (cas(&this.consumed, false, true))
828         {
829             this.id = id;
830             this.success = success_in;
831             return true;
832         }
833         return false;
834     }
835 
836     /***********************************************************************
837 
838         Returns if `SelectState` is already consumed or not
839 
840     ***********************************************************************/
841 
842     bool isConsumed () nothrow
843     {
844         return atomicLoad(this.consumed);
845     }
846 
847     /***********************************************************************
848 
849         Consume SelectState so that it is neutralized
850 
851     ***********************************************************************/
852 
853     void release () nothrow
854     {
855         this.tryConsume(-1, false);
856     }
857 
858     /// ID of the select call that consumed this `SelectState`
859     int id;
860 
861     /// Success/failure state of the select call with ID of `id`
862     bool success;
863 
864 private:
865     /// Indicates if this `SelectState` is consumed or not
866     shared(bool) consumed;
867 }
868 
869 /***********************************************************************
870 
871     A golang style channel implementation with buffered and unbuffered
872     operation modes
873 
874     Intended to be used between Fibers
875 
876     Param:
877         T = Type of the messages carried accross the `Channel`. Currently
878             all reference and values types are supported.
879 
880 ***********************************************************************/
881 
882 final public class Channel (T) : Selectable
883 {
884     /***********************************************************************
885 
886         Constructs a Channel
887 
888         Param:
889             max_size = Maximum amount of T a Channel can buffer
890                        (0 -> Unbuffered operation,
891                         Positive integer -> Buffered operation)
892 
893     ***********************************************************************/
894 
895     this (ulong max_size = 0) nothrow
896     {
897         this.max_size = max_size;
898         this.lock = new FiberMutex;
899         if (max_size)
900             this.buffer = new RingBuffer!T(max_size);
901     }
902 
903     /***********************************************************************
904 
905         Write a message to the `Channel` with an optional timeout
906 
907         Unbuffered mode:
908 
909             If a reader is already blocked on the `Channel`, writer copies the
910             message to reader's buffer and wakes up the reader by yielding
911 
912             If no reader is ready in the wait queue, writer appends itself
913             to write wait queue and blocks
914 
915         Buffered mode:
916 
917             If a reader is already blocked on the `Channel`, writer copies the
918             message to reader's buffer and wakes up the reader and returns
919             immediately.
920 
921             If the buffer is not full writer puts the message in the `Channel`
922             buffer and returns immediately
923 
924             If buffer is full writer appends itself to write wait queue and blocks
925 
926         If `Channel` is closed, it returns immediately with a failure,
927         regardless of the operation mode
928 
929         Param:
930             val = Message to write to `Channel`
931             duration = Timeout duration
932 
933         Returns:
934             Success/Failure - Fails when `Channel` is closed or timeout is reached.
935 
936     ***********************************************************************/
937 
938     bool write () (auto ref T val, Duration duration = Duration.init) nothrow
939     {
940         this.lock.lock_nothrow();
941 
942         bool success = tryWrite(val);
943 
944         if (!success && !this.isClosed())
945         {
946             ChannelQueueEntry q_ent = this.enqueueEntry(this.writeq, &val);
947             thisScheduler().addResource(q_ent);
948             scope (exit) thisScheduler().removeResource(q_ent);
949 
950             this.lock.unlock_nothrow();
951             return q_ent.blocker.wait(duration) && q_ent.success;
952         }
953 
954         this.lock.unlock_nothrow();
955         return success;
956     }
957 
958     /***********************************************************************
959 
960         Try to write a message to `Channel` without blocking
961 
962         tryWrite writes a message if it is possible to do so without blocking.
963 
964         If the tryWrite is being executed in a select call, it tries to consume the
965         `caller_sel` with the given `caller_sel_id`. It only proceeds if it can
966         successfully consume the `caller_sel`
967 
968         Param:
969             val = Message to write to `Channel`
970             caller_sel = SelectState instace of the select call being executed
971             caller_sel_id = id of the select call being executed
972 
973         Returns:
974             Success/Failure
975 
976     ***********************************************************************/
977 
978     private bool tryWrite () (auto ref T val, SelectState caller_sel = null, int caller_sel_id = 0) nothrow
979     {
980         if (this.isClosed())
981         {
982             if (caller_sel)
983                 caller_sel.tryConsume(caller_sel_id, false);
984             return false;
985         }
986 
987         if (ChannelQueueEntry readq_ent = this.dequeueEntry(this.readq, caller_sel, caller_sel_id))
988         {
989             *readq_ent.pVal = val;
990             readq_ent.blocker.notify();
991             return true;
992         }
993 
994         if (this.max_size > 0 // this.max_size > 0 = buffered
995                     && this.buffer.length < this.max_size
996                     && (!caller_sel || caller_sel.tryConsume(caller_sel_id)))
997         {
998             this.buffer.insert(val);
999             return true;
1000         }
1001 
1002         return false;
1003     }
1004 
1005     /***********************************************************************
1006 
1007         Try to write a message to Channel without blocking and if it fails,
1008         create a write queue entry using the given `sel_state` and `sel_id`
1009 
1010         Param:
1011             ptr = Message to write to channel
1012             sel_state = SelectState instace of the select call being executed
1013             sel_id = id of the select call being executed
1014 
1015     ***********************************************************************/
1016 
1017     void selectWrite (void* ptr, SelectState sel_state, int sel_id) nothrow
1018     {
1019         assert(ptr !is null);
1020         assert(sel_state !is null);
1021         T* val = cast(T*) ptr;
1022 
1023         this.lock.lock_nothrow();
1024 
1025         bool success = tryWrite(*val, sel_state, sel_id);
1026 
1027         if (!sel_state.isConsumed())
1028             this.enqueueEntry(this.writeq, val, sel_state, sel_id);
1029 
1030         if (success || this.isClosed() || sel_state.id == -1)
1031         {
1032             this.lock.unlock_nothrow();
1033             sel_state.blocker.notify();
1034         }
1035         else
1036             this.lock.unlock_nothrow();
1037     }
1038 
1039     /***********************************************************************
1040 
1041         Read a message from the Channel with an optional timeout
1042 
1043         Unbuffered mode:
1044 
1045             If a writer is already blocked on the Channel, reader copies the
1046             value to `output` and wakes up the writer by yielding
1047 
1048             If no writer is ready in the wait queue, reader appends itself
1049             to read wait queue and blocks
1050 
1051             If channel is closed, it returns immediatly with a failure
1052 
1053         Buffered mode:
1054 
1055             If there are existing messages in the buffer, reader pops one off
1056             the buffer and returns immediatly with success, regardless of the
1057             Channel being closed or not
1058 
1059             If there are no messages in the buffer it behaves exactly like the
1060             unbuffered operation
1061 
1062         Param:
1063             output = Reference to output variable
1064             duration = Timeout duration
1065 
1066         Returns:
1067             Success/Failure - Fails when channel is closed and there are
1068             no existing messages to be read. Fails when timeout is reached.
1069 
1070     ***********************************************************************/
1071 
1072     bool read (ref T output, Duration duration = Duration.init) nothrow
1073     {
1074         this.lock.lock_nothrow();
1075 
1076         bool success = tryRead(output);
1077 
1078         if (!success && !this.isClosed())
1079         {
1080             ChannelQueueEntry q_ent = this.enqueueEntry(this.readq, &output);
1081             thisScheduler().addResource(q_ent);
1082             scope (exit) thisScheduler().removeResource(q_ent);
1083 
1084             this.lock.unlock_nothrow();
1085             return q_ent.blocker.wait(duration) && q_ent.success;
1086         }
1087 
1088         this.lock.unlock_nothrow();
1089         return success;
1090     }
1091 
1092     /***********************************************************************
1093 
1094         Try to read a message from Channel without blocking
1095 
1096         tryRead reads a message if it is possible to do so without blocking.
1097 
1098         If the tryRead is being executed in a select call, it tries to consume the
1099         `caller_sel` with the given `caller_sel_id`. It only proceeds if it can
1100         successfully consume the `caller_sel`
1101 
1102         Param:
1103             output = Field to write the message to
1104             caller_sel = SelectState instace of the select call being executed
1105             caller_sel_id = id of the select call being executed\
1106 
1107         Returns:
1108             Success/Failure
1109 
1110     ***********************************************************************/
1111 
1112     private bool tryRead (ref T output, SelectState caller_sel = null, int caller_sel_id = 0) nothrow
1113     {
1114         ChannelQueueEntry write_ent = this.dequeueEntry(this.writeq, caller_sel, caller_sel_id);
1115 
1116         if (this.max_size > 0 && !this.buffer.empty())
1117         {
1118             // if dequeueEntry fails, we will try to consume caller_sel again.
1119             if (!caller_sel || write_ent || caller_sel.tryConsume(caller_sel_id))
1120             {
1121                 output = this.buffer.front();
1122                 this.buffer.popFront();
1123 
1124                 if (write_ent)
1125                 {
1126                     this.buffer.insert(*write_ent.pVal);
1127                 }
1128             }
1129             else
1130             {
1131                 return false;
1132             }
1133         }
1134         // if dequeueEntry returns a valid entry, it always successfully consumes the related select states.
1135         // the race between 2 select calls is resolved in dequeueEntry.
1136         else if (write_ent)
1137         {
1138             output = *write_ent.pVal;
1139         }
1140         else
1141         {
1142             if (this.isClosed() && caller_sel)
1143                 caller_sel.tryConsume(caller_sel_id, false);
1144             return false;
1145         }
1146 
1147         if (write_ent)
1148             write_ent.blocker.notify();
1149         return true;
1150     }
1151 
1152     /***********************************************************************
1153 
1154         Try to read a message from Channel without blocking and if it fails,
1155         create a read queue entry using the given `sel_state` and `sel_id`
1156 
1157         Param:
1158             ptr = Buffer to write the message to
1159             sel_state = SelectState instace of the select call being executed
1160             sel_id = id of the select call being executed
1161 
1162     ***********************************************************************/
1163 
1164     void selectRead (void* ptr, SelectState sel_state, int sel_id) nothrow
1165     {
1166         assert(ptr !is null);
1167         assert(sel_state !is null);
1168         T* val = cast(T*) ptr;
1169 
1170         this.lock.lock_nothrow();
1171 
1172         bool success = tryRead(*val, sel_state, sel_id);
1173 
1174         if (!sel_state.isConsumed())
1175             this.enqueueEntry(this.readq, val, sel_state, sel_id);
1176 
1177         if (success || this.isClosed() || sel_state.id == -1)
1178         {
1179             this.lock.unlock_nothrow();
1180             sel_state.blocker.notify();
1181         }
1182         else
1183             this.lock.unlock_nothrow();
1184     }
1185 
1186     /***********************************************************************
1187 
1188         Close the channel
1189 
1190         Closes the channel by marking it closed and flushing all the wait
1191         queues
1192 
1193     ***********************************************************************/
1194 
1195     void close () nothrow
1196     {
1197         if (cas(&this.closed, false, true))
1198         {
1199             this.lock.lock_nothrow();
1200             scope (exit) this.lock.unlock_nothrow();
1201 
1202             // Wake blocked Fibers up, report the failure
1203             foreach (ref entry; this.readq)
1204             {
1205                 entry.terminate();
1206             }
1207             foreach (ref entry; this.writeq)
1208             {
1209                 entry.terminate();
1210             }
1211 
1212             this.readq.clear();
1213             this.writeq.clear();
1214         }
1215     }
1216 
1217     /***********************************************************************
1218 
1219         Return the length of the internal buffer
1220 
1221     ***********************************************************************/
1222 
1223     size_t length () nothrow
1224     {
1225         this.lock.lock_nothrow();
1226         scope (exit) this.lock.unlock_nothrow();
1227         return this.buffer.length;
1228     }
1229 
1230     /***********************************************************************
1231 
1232         Return the closed status of the `Channel`
1233 
1234     ***********************************************************************/
1235 
1236     bool isClosed () const @safe pure nothrow @nogc scope
1237     {
1238         return atomicLoad(this.closed);
1239     }
1240 
1241     /***********************************************************************
1242 
1243         An aggrate of neccessary information to block a Fiber and record
1244         their request
1245 
1246     ***********************************************************************/
1247 
1248     private static class ChannelQueueEntry : FiberScheduler.Resource
1249     {
1250         /// FiberBlocker blocking the `Fiber`
1251         FiberScheduler.FiberBlocker blocker;
1252 
1253         /// Pointer to the variable that we will read to/from
1254         T* pVal;
1255 
1256         /// Result of the blocking read/write call
1257         bool success = true;
1258 
1259         /// State of the select call that this entry was created for
1260         SelectState select_state;
1261 
1262         /// Id of the select call that this entry was created for
1263         int sel_id;
1264 
1265         this (T* pVal, SelectState select_state = null, int sel_id = 0) nothrow
1266         {
1267             this.pVal = pVal;
1268             this.select_state = select_state;
1269             this.sel_id = sel_id;
1270 
1271             if (this.select_state)
1272                 this.blocker = this.select_state.blocker;
1273             else
1274                 this.blocker = thisScheduler().new FiberBlocker();
1275         }
1276 
1277         /***********************************************************************
1278 
1279             Terminate a `ChannelQueueEntry` by waking up the blocked Fiber
1280             and reporting the failure
1281 
1282             This is called on all the `ChannelQueueEntry` instances still in
1283             the wait queues when Channel is closed
1284 
1285         ***********************************************************************/
1286 
1287         void terminate () nothrow
1288         {
1289             this.success = false;
1290             if (!this.select_state || this.select_state.tryConsume(this.sel_id, this.success))
1291                 this.blocker.notify();
1292         }
1293 
1294 
1295         /***********************************************************************
1296 
1297             Terminate ChannelQueueEntry so that it is neutralized
1298 
1299         ***********************************************************************/
1300 
1301         void release () nothrow
1302         {
1303             if (this.blocker.stopTimer())
1304                 this.pVal = null; // Sanitize pVal so that we can catch illegal accesses
1305         }
1306     }
1307 
1308     /***********************************************************************
1309 
1310         Create and enqueue a `ChannelQueueEntry` to the given entryq
1311 
1312         Param:
1313             entryq = Queue to append the new ChannelQueueEntry
1314             pVal =  Pointer to the message buffer
1315             sel_state = SelectState object to associate with the
1316                         newly created ChannelQueueEntry
1317             sel_id = id of the select call creating the new ChannelQueueEntry
1318 
1319         Return:
1320             newly created ChannelQueueEntry
1321 
1322     ***********************************************************************/
1323 
1324     private ChannelQueueEntry enqueueEntry (ref DList!ChannelQueueEntry entryq, T* pVal,
1325                                             SelectState sel_state = null, int sel_id = 0) nothrow
1326     {
1327         assert(pVal !is null);
1328 
1329         ChannelQueueEntry q_ent = new ChannelQueueEntry(pVal, sel_state, sel_id);
1330         entryq.insert(q_ent);
1331 
1332         return q_ent;
1333     }
1334 
1335     /***********************************************************************
1336 
1337         Dequeue a `ChannelQueueEntry` from the given `entryq`
1338 
1339         Walks the `entryq` until it finds a suitable entry or the queue
1340         empties. If `dequeueEntry` is called from a select, it tries to
1341         consume the `caller_sel` if the `peer_sel` is not currently consumed.
1342         If it fails to consume the `caller_sel`, returns with a failure.
1343 
1344         If selected queue entry is part of a select operation, it is also
1345         consumed. If it consumes `caller_sel` but `peer_sel` was already
1346         consumed whole select operation would fail and caller would need to try
1347         again. This should be a rare case, where the `peer_sel` gets consumed by
1348         someone else between the first if check which verifies that it is not
1349         consumed and the point we actually try to consume it.
1350 
1351         Param:
1352             entryq = Queue to append the new ChannelQueueEntry
1353             caller_sel = SelectState instace of the select call being executed
1354             caller_sel_id = id of the select call being executed
1355 
1356         Return:
1357             a valid ChannelQueueEntry or null
1358 
1359     ***********************************************************************/
1360 
1361     private ChannelQueueEntry dequeueEntry (ref DList!ChannelQueueEntry entryq,
1362                                             SelectState caller_sel = null, int caller_sel_id = 0) nothrow
1363     {
1364         while (!entryq.empty())
1365         {
1366             ChannelQueueEntry qent = entryq.front();
1367             auto peer_sel = qent.select_state;
1368 
1369             if ((!peer_sel || !peer_sel.isConsumed()) && qent.blocker.shouldBlock())
1370             {
1371                 // If we are in a select call, try to consume the caller select
1372                 // if we can't consume the caller select, no need to continue
1373                 if (caller_sel && !caller_sel.tryConsume(caller_sel_id))
1374                 {
1375                     return null;
1376                 }
1377 
1378                 // At this point, caller select is consumed.
1379                 // Try to consume the peer select if it exists
1380                 // If peer_sel was consumed by someone else, tough luck
1381                 // In that case, whole select will fail since we consumed the caller_sel
1382                 if ((!peer_sel || peer_sel.tryConsume(qent.sel_id)) && qent.blocker.stopTimer())
1383                 {
1384                     entryq.removeFront();
1385                     return qent;
1386                 }
1387                 else if (caller_sel)
1388                 {
1389                     // Mark caller_sel failed
1390                     caller_sel.id = -1;
1391                     caller_sel.success = false;
1392                     return null;
1393                 }
1394             }
1395 
1396             entryq.removeFront();
1397         }
1398 
1399         return null;
1400     }
1401 
1402 private:
1403     /// Internal data storage
1404     RingBuffer!T buffer;
1405 
1406     /// Closed flag
1407     bool closed;
1408 
1409     /// Per channel lock
1410     FiberMutex lock;
1411 
1412     /// List of fibers blocked on read()
1413     DList!ChannelQueueEntry readq;
1414 
1415     /// List of fibers blocked on write()
1416     DList!ChannelQueueEntry writeq;
1417 
1418 public:
1419     /// Maximum amount of T a Channel can buffer
1420     immutable ulong max_size;
1421 }
1422 
1423 // Test non blocking operation
1424 @system unittest
1425 {
1426     string str = "DEADBEEF";
1427     string rcv_str;
1428     FiberScheduler scheduler = new FiberScheduler;
1429     auto chn = new Channel!string(2);
1430 
1431     scheduler.start({
1432         chn.write(str ~ " 1");
1433         assert(chn.length() == 1);
1434         chn.write(str ~ " 2");
1435         assert(chn.length() == 2);
1436 
1437         assert(chn.read(rcv_str));
1438         assert(rcv_str == str ~ " 1");
1439         assert(chn.length() == 1);
1440 
1441         chn.write(str ~ " 3");
1442         assert(chn.length() == 2);
1443 
1444         assert(chn.read(rcv_str));
1445         assert(rcv_str == str ~ " 2");
1446         assert(chn.length() == 1);
1447 
1448         assert(chn.read(rcv_str));
1449         assert(rcv_str == str ~ " 3");
1450         assert(chn.length() == 0);
1451     });
1452 }
1453 
1454 // Test unbuffered blocking operation with multiple receivers
1455 // Receiver should read every message in the order they were sent
1456 @system unittest
1457 {
1458     FiberScheduler scheduler = new FiberScheduler;
1459     auto chn = new Channel!int();
1460     int n = 1000;
1461     long sum;
1462 
1463     scheduler.spawn(
1464         () {
1465             int val, prev;
1466             bool ret = chn.read(prev);
1467             sum += prev;
1468             assert(ret);
1469 
1470             while (chn.read(val))
1471             {
1472                 sum += val;
1473                 assert(ret);
1474                 assert(prev < val);
1475                 prev = val;
1476             }
1477         }
1478     );
1479 
1480     scheduler.spawn(
1481         () {
1482             int val, prev;
1483             bool ret = chn.read(prev);
1484             sum += prev;
1485             assert(ret);
1486 
1487             while (chn.read(val))
1488             {
1489                 sum += val;
1490                 assert(ret);
1491                 assert(prev < val);
1492                 prev = val;
1493             }
1494         }
1495     );
1496 
1497     scheduler.start(
1498         () {
1499             for (int i = 0; i <= n; i++)
1500             {
1501                 assert(chn.write(i));
1502             }
1503             chn.close();
1504 
1505             assert(!chn.write(0));
1506         }
1507     );
1508 
1509     // Sum of [0..1000]
1510     assert(sum == n*(n+1)/2);
1511 }
1512 
1513 // Test that writer is not blocked until buffer is full and a read unblocks the writer
1514 // Reader should be able to read remaining messages after chn is closed
1515 @system unittest
1516 {
1517     FiberScheduler scheduler = new FiberScheduler;
1518     auto chn = new Channel!int(5);
1519 
1520     scheduler.spawn(
1521         () {
1522             int val;
1523             assert(chn.max_size == chn.length);
1524             assert(chn.read(val));
1525             chn.close();
1526             // Read remaining messages after channel is closed
1527             for (int i = 0; i < chn.max_size; i++)
1528             {
1529                 assert(chn.read(val));
1530             }
1531             // No more messages to read on closed chn
1532             assert(!chn.read(val));
1533         }
1534     );
1535 
1536     scheduler.start(
1537         () {
1538             for (int i = 0; i < chn.max_size; i++)
1539             {
1540                 assert(chn.write(i));
1541             }
1542             assert(chn.max_size == chn.length);
1543             // Wait for read.
1544             assert(chn.write(42));
1545             // Reader already closed the channel
1546             assert(!chn.write(0));
1547         }
1548     );
1549 }
1550 
1551 @system unittest
1552 {
1553     struct HyperLoopMessage
1554     {
1555         int id;
1556         MonoTime time;
1557     }
1558 
1559     FiberScheduler scheduler = new FiberScheduler;
1560     auto chn1 = new Channel!HyperLoopMessage();
1561     auto chn2 = new Channel!HyperLoopMessage();
1562     auto chn3 = new Channel!HyperLoopMessage();
1563 
1564     scheduler.spawn(
1565         () {
1566             HyperLoopMessage msg;
1567 
1568             for (int i = 0; i < 1000; ++i)
1569             {
1570                 assert(chn2.read(msg));
1571                 assert(msg.id % 3 == 1);
1572                 msg.id++;
1573                 msg.time = MonoTime.currTime;
1574                 assert(chn3.write(msg));
1575             }
1576         }
1577     );
1578 
1579     scheduler.spawn(
1580         () {
1581             HyperLoopMessage msg;
1582 
1583             for (int i = 0; i < 1000; ++i)
1584             {
1585                 assert(chn1.read(msg));
1586                 assert(msg.id % 3 == 0);
1587                 msg.id++;
1588                 msg.time = MonoTime.currTime;
1589                 assert(chn2.write(msg));
1590             }
1591         }
1592     );
1593 
1594     scheduler.start(
1595         () {
1596             HyperLoopMessage msg = {
1597                 id : 0,
1598                 time : MonoTime.currTime
1599             };
1600 
1601             for (int i = 0; i < 1000; ++i)
1602             {
1603                 assert(chn1.write(msg));
1604                 assert(chn3.read(msg));
1605                 assert(msg.id % 3 == 2);
1606                 msg.id++;
1607                 msg.time = MonoTime.currTime;
1608             }
1609         }
1610     );
1611 }
1612 
1613 // Multiple writer threads writing to a buffered channel
1614 // Reader should receive all messages
1615 @system unittest
1616 {
1617     FiberScheduler scheduler = new FiberScheduler;
1618     immutable int n = 5000;
1619     auto chn1 = new Channel!int(n/10);
1620 
1621     shared int sharedVal = 0;
1622     shared int writer_sum = 0;
1623 
1624     auto t1 = new Thread({
1625         FiberScheduler scheduler = new FiberScheduler();
1626         scheduler.start(
1627             () {
1628                 int val = atomicOp!"+="(sharedVal, 1);
1629                 while (chn1.write(val))
1630                 {
1631                     atomicOp!"+="(writer_sum, val);
1632                     val = atomicOp!"+="(sharedVal, 1);
1633                 }
1634             }
1635         );
1636     });
1637     t1.start();
1638 
1639     auto t2 = new Thread({
1640         FiberScheduler scheduler = new FiberScheduler();
1641         scheduler.start(
1642             () {
1643                 int val = atomicOp!"+="(sharedVal, 1);
1644                 while (chn1.write(val))
1645                 {
1646                     atomicOp!"+="(writer_sum, val);
1647                     val = atomicOp!"+="(sharedVal, 1);
1648                 }
1649             }
1650         );
1651     });
1652     t2.start();
1653 
1654     scheduler.start(
1655         () {
1656             int reader_sum, readVal, count;
1657 
1658             while(chn1.read(readVal))
1659             {
1660                 reader_sum += readVal;
1661                 if (count++ == n) chn1.close();
1662             }
1663 
1664             thread_joinAll();
1665             assert(reader_sum == writer_sum);
1666         }
1667     );
1668 }
1669 
1670 @system unittest
1671 {
1672     FiberScheduler scheduler = new FiberScheduler;
1673     auto chn1 = new Channel!int();
1674     auto chn2 = new Channel!int(1);
1675     auto chn3 = new Channel!int();
1676 
1677     scheduler.spawn(
1678         () {
1679             chn1.write(42);
1680             chn1.close();
1681             chn3.write(37);
1682         }
1683     );
1684 
1685     scheduler.spawn(
1686         () {
1687             chn2.write(44);
1688             chn2.close();
1689         }
1690     );
1691 
1692     scheduler.start(
1693         () {
1694             bool[3] chn_closed;
1695             int[3] read_val;
1696             for (int i = 0; i < 5; ++i)
1697             {
1698                 SelectEntry[] read_list;
1699                 SelectEntry[] write_list;
1700 
1701                 if (!chn_closed[0])
1702                     read_list ~= SelectEntry(chn1, &read_val[0]);
1703                 if (!chn_closed[1])
1704                     read_list ~= SelectEntry(chn2, &read_val[1]);
1705                 read_list ~= SelectEntry(chn3, &read_val[2]);
1706 
1707                 auto select_return = select(read_list, write_list);
1708 
1709                 if (!select_return.success)
1710                 {
1711                     if (!chn_closed[0])  chn_closed[0] = true;
1712                     else if (!chn_closed[1])  chn_closed[1] = true;
1713                     else chn_closed[2] = true;
1714                 }
1715             }
1716             assert(read_val[0] == 42 && read_val[1] == 44 && read_val[2] == 37);
1717             assert(chn_closed[0] && chn_closed[1] && !chn_closed[2]);
1718         }
1719     );
1720 }
1721 
1722 @system unittest
1723 {
1724     FiberScheduler scheduler = new FiberScheduler;
1725 
1726     auto chn1 = new Channel!int(20);
1727     auto chn2 = new Channel!int();
1728     auto chn3 = new Channel!int(20);
1729     auto chn4 = new Channel!int();
1730 
1731     void thread_func (T) (ref T write_chn, ref T read_chn, int _tid)
1732     {
1733         FiberScheduler scheduler = new FiberScheduler;
1734         int read_val, write_val;
1735         int prev_read = -1;
1736         int n = 10000;
1737 
1738         scheduler.start(
1739             () {
1740                 while(read_val < n || write_val <= n)
1741                 {
1742                     int a;
1743                     SelectEntry[] read_list;
1744                     SelectEntry[] write_list;
1745 
1746                     if (write_val <= n)
1747                         write_list ~= SelectEntry(write_chn, &write_val);
1748 
1749                     if (read_val < n)
1750                         read_list ~= SelectEntry(read_chn, &read_val);
1751 
1752                     auto select_return = select(read_list, write_list);
1753 
1754                     if (select_return.success)
1755                     {
1756                         if (read_list.length > 0 && select_return.id == 0)
1757                         {
1758                             assert(prev_read + 1 == read_val);
1759                             prev_read = read_val;
1760                         }
1761                         else
1762                         {
1763                             write_val++;
1764                         }
1765                     }
1766                 }
1767             }
1768         );
1769     }
1770 
1771     auto t1 = new Thread({
1772         thread_func(chn1, chn2, 0);
1773     });
1774     t1.start();
1775 
1776     auto t2 = new Thread({
1777         thread_func(chn2, chn3, 1);
1778     });
1779     t2.start();
1780 
1781     auto t3 = new Thread({
1782         thread_func(chn3, chn4, 2);
1783     });
1784     t3.start();
1785 
1786     thread_func(chn4, chn1, 3);
1787 
1788     thread_joinAll();
1789 }
1790 
1791 @system unittest
1792 {
1793     FiberScheduler scheduler = new FiberScheduler;
1794     auto chn1 = new Channel!int();
1795     auto chn2 = new Channel!int();
1796 
1797     scheduler.spawn(
1798         () {
1799             FiberScheduler.yield();
1800             chn1.close();
1801         }
1802     );
1803 
1804     scheduler.spawn(
1805         () {
1806             FiberScheduler.yield();
1807             chn2.close();
1808         }
1809     );
1810 
1811     scheduler.spawn(
1812         () {
1813             for (int i = 0; i < 2; ++i)
1814             {
1815                 int write_val = 42;
1816                 SelectEntry[] read_list;
1817                 SelectEntry[] write_list;
1818                 write_list ~= SelectEntry(chn1, &write_val);
1819                 auto select_return = select(read_list, write_list);
1820 
1821                 assert(select_return.id == 0);
1822                 assert(!select_return.success);
1823             }
1824         }
1825     );
1826 
1827     scheduler.start(
1828         () {
1829             for (int i = 0; i < 2; ++i)
1830             {
1831                 int read_val;
1832                 SelectEntry[] read_list;
1833                 SelectEntry[] write_list;
1834                 read_list ~= SelectEntry(chn2, &read_val);
1835                 auto select_return = select(read_list, write_list);
1836 
1837                 assert(select_return.id == 0);
1838                 assert(!select_return.success);
1839             }
1840         }
1841     );
1842 }
1843 
1844 @system unittest
1845 {
1846     import core.sync.semaphore;
1847     FiberScheduler scheduler = new FiberScheduler;
1848     auto chn1 = new Channel!int();
1849 
1850     auto start = MonoTime.currTime;
1851     Semaphore writer_sem = new Semaphore();
1852     Semaphore reader_sem = new Semaphore();
1853 
1854     auto t1 = new Thread({
1855         FiberScheduler scheduler = new FiberScheduler;
1856         scheduler.start(
1857             () {
1858                 writer_sem.wait();
1859                 assert(chn1.write(42));
1860             }
1861         );
1862     });
1863     t1.start();
1864 
1865     auto t2 = new Thread({
1866         FiberScheduler scheduler = new FiberScheduler;
1867         scheduler.start(
1868             () {
1869                 int read_val;
1870                 reader_sem.wait();
1871                 assert(chn1.read(read_val));
1872                 assert(read_val == 43);
1873             }
1874         );
1875     });
1876     t2.start();
1877 
1878     scheduler.start(
1879         () {
1880             int read_val;
1881 
1882             scope (failure) {
1883                 reader_sem.notify();
1884                 writer_sem.notify();
1885                 chn1.close();
1886             }
1887 
1888             assert(!chn1.read(read_val, 1000.msecs));
1889             assert(MonoTime.currTime - start >= 1000.msecs);
1890 
1891             writer_sem.notify();
1892             assert(chn1.read(read_val, 1000.msecs));
1893             assert(read_val == 42);
1894 
1895             start = MonoTime.currTime;
1896 
1897             assert(!chn1.write(read_val + 1, 1000.msecs));
1898             assert(MonoTime.currTime - start >= 1000.msecs);
1899 
1900             reader_sem.notify();
1901             assert(chn1.write(read_val + 1, 1000.msecs));
1902         }
1903     );
1904 
1905     thread_joinAll();
1906 }
1907 
1908 /// A simple spinlock
1909 struct SpinLock
1910 {
1911     /// Spin until lock is free
1912     void lock() nothrow
1913     {
1914         while (!cas(&locked, false, true)) { }
1915     }
1916 
1917     /// Atomically unlock
1918     void unlock() nothrow
1919     {
1920         atomicStore!(MemoryOrder.rel)(locked, false);
1921     }
1922 
1923     /// Lock state
1924     private shared(bool) locked;
1925 }
1926 
1927 /// A Fiber level Semaphore
1928 class FiberSemaphore
1929 {
1930 
1931     /***********************************************************************
1932 
1933         Ctor
1934 
1935         Param:
1936             count = Initial count of FiberSemaphore
1937 
1938     ************************************************************************/
1939 
1940     this (size_t count = 0) nothrow
1941     {
1942         this.count = count;
1943     }
1944 
1945     /***********************************************************************
1946 
1947         Wait for FiberSemaphore count to be greater than 0
1948 
1949     ************************************************************************/
1950 
1951     void wait () nothrow
1952     {
1953         this.slock.lock();
1954 
1955         if (this.count > 0)
1956         {
1957             this.count--;
1958             this.slock.unlock();
1959             return;
1960         }
1961         auto entry = new SemaphoreQueueEntry();
1962         thisScheduler().addResource(entry);
1963         scope (exit) thisScheduler().removeResource(entry);
1964         this.queue.insert(entry);
1965 
1966         this.slock.unlock();
1967         entry.blocker.wait();
1968     }
1969 
1970     /***********************************************************************
1971 
1972         Increment the FiberSemaphore count
1973 
1974     ************************************************************************/
1975 
1976     void notify () nothrow
1977     {
1978         this.slock.lock();
1979         if (auto entry = this.dequeueEntry())
1980             entry.blocker.notify();
1981         else
1982             this.count++;
1983         this.slock.unlock();
1984     }
1985 
1986     ///
1987     private class SemaphoreQueueEntry : FiberScheduler.Resource
1988     {
1989 
1990         ///
1991         this () nothrow
1992         {
1993             assert(thisScheduler(), "Can not block with no FiberScheduler running!");
1994             this.blocker = thisScheduler(). new FiberBlocker();
1995         }
1996 
1997         /***********************************************************************
1998 
1999             Terminate SemaphoreQueueEntry so that it is neutralized
2000 
2001         ************************************************************************/
2002 
2003         void release () nothrow
2004         {
2005             if (!blocker.stopTimer())
2006                 this.outer.notify();
2007         }
2008 
2009         /// FiberBlocker blocking the `Fiber`
2010         FiberScheduler.FiberBlocker blocker;
2011     }
2012 
2013     /***********************************************************************
2014 
2015         Dequeue a `SemaphoreQueueEntry` from the waiting queue
2016 
2017         Return:
2018             a valid SemaphoreQueueEntry or null
2019 
2020     ***********************************************************************/
2021 
2022     private SemaphoreQueueEntry dequeueEntry () nothrow
2023     {
2024         while (!this.queue.empty())
2025         {
2026             auto entry = this.queue.front();
2027             this.queue.removeFront();
2028             if (entry.blocker.shouldBlock() && entry.blocker.stopTimer())
2029             {
2030                 return entry;
2031             }
2032         }
2033         return null;
2034     }
2035 
2036     ///
2037     private SpinLock slock;
2038 
2039     /// Waiting queue for Fibers
2040     private DList!SemaphoreQueueEntry queue;
2041 
2042     /// Current semaphore count
2043     private size_t count;
2044 }
2045 
2046 /// A Fiber level Mutex, essentially a binary FiberSemaphore
2047 class FiberMutex : FiberSemaphore
2048 {
2049     this () nothrow
2050     {
2051         super(1);
2052     }
2053 
2054     ///
2055     alias lock = wait;
2056 
2057     ///
2058     alias unlock = notify;
2059 
2060     ///
2061     alias lock_nothrow = lock;
2062 
2063     ///
2064     alias unlock_nothrow = unlock;
2065 }
2066 
2067 // Test releasing a queue entry
2068 @system unittest
2069 {
2070     FiberMutex mtx = new FiberMutex();
2071     int sharedVal;
2072 
2073     auto t1 = new Thread({
2074         FiberScheduler scheduler = new FiberScheduler;
2075         scheduler.start(
2076             () {
2077                 mtx.lock();
2078                 Thread.sleep(400.msecs);
2079                 sharedVal += 1;
2080                 mtx.unlock();
2081             }
2082         );
2083     });
2084     t1.start();
2085 
2086     auto t2 = new Thread({
2087         FiberScheduler scheduler = new FiberScheduler;
2088         Thread.sleep(100.msecs);
2089 
2090         scheduler.spawn(
2091             () {
2092                 Thread.sleep(200.msecs);
2093                 throw new Exception("");
2094             }
2095         );
2096 
2097         try
2098         {
2099             scheduler.start(
2100                 () {
2101                     mtx.lock();
2102                     sharedVal += 1;
2103                     mtx.unlock();
2104                 }
2105             );
2106         } catch (Exception e) { }
2107     });
2108     t2.start();
2109 
2110     auto t3 = new Thread({
2111         FiberScheduler scheduler = new FiberScheduler;
2112         scheduler.start(
2113             () {
2114                 Thread.sleep(200.msecs);
2115                 mtx.lock();
2116                 assert(sharedVal == 1);
2117                 sharedVal += 1;
2118                 mtx.unlock();
2119             }
2120         );
2121     });
2122     t3.start();
2123 
2124     thread_joinAll();
2125 }
2126 
2127 @system unittest
2128 {
2129     import std;
2130     FiberScheduler scheduler = new FiberScheduler;
2131     int count;
2132 
2133     scheduler.spawn(
2134         () {
2135             assert(count == 2);
2136             count++;
2137         }
2138     );
2139 
2140     scheduler.start(
2141         () {
2142             thisScheduler().enterCriticalSection();
2143 
2144             // this will yield
2145             scheduler.spawn(
2146                 () {
2147                     assert(count == 3);
2148                     count++;
2149                 }
2150             );
2151 
2152             thisScheduler().yield();
2153             // execution should still come back to us
2154             assert(count == 0);
2155             count++;
2156 
2157             scope blocker = thisScheduler().new FiberBlocker();
2158             blocker.wait(10.msecs);
2159 
2160             assert(count == 1);
2161             count++;
2162 
2163             thisScheduler().exitCriticalSection();
2164             thisScheduler().yield();
2165             assert(count == 4);
2166         }
2167     );
2168 }