1 /**
2  * This is a low-level messaging API upon which more structured or restrictive
3  * APIs may be built.  The general idea is that every messageable entity is
4  * represented by a common handle type called a Tid, which allows messages to
5  * be sent to logical threads that are executing in both the current process
6  * and in external processes using the same interface.  This is an important
7  * aspect of scalability because it allows the components of a program to be
8  * spread across available resources with few to no changes to the actual
9  * implementation.
10  *
11  * A logical thread is an execution context that has its own stack and which
12  * runs asynchronously to other logical threads.  These may be preemptively
13  * scheduled kernel threads, fibers (cooperative user-space threads), or some
14  * other concept with similar behavior.
15  *
16  * The type of concurrency used when logical threads are created is determined
17  * by the Scheduler selected at initialization time.  The default behavior is
18  * currently to create a new kernel thread per call to spawn, but other
19  * schedulers are available that multiplex fibers across the main thread or
20  * use some combination of the two approaches.
21  *
22  * Note:
23  * Copied (almost verbatim) from Phobos at commit 3bfccf4f1 (2019-11-27)
24  * Changes are this notice, and the module rename, from `std.concurrency`
25  * to `geod24.concurrency`.
26  *
27  * Copyright: Copyright Sean Kelly 2009 - 2014.
28  * License:   <a href="http://www.boost.org/LICENSE_1_0.txt">Boost License 1.0</a>.
29  * Authors:   Sean Kelly, Alex Rønne Petersen, Martin Nowak
30  * Source:    $(PHOBOSSRC std/concurrency.d)
31  */
32 /*          Copyright Sean Kelly 2009 - 2014.
33  * Distributed under the Boost Software License, Version 1.0.
34  *    (See accompanying file LICENSE_1_0.txt or copy at
35  *          http://www.boost.org/LICENSE_1_0.txt)
36  */
37 module geod24.concurrency;
38 
39 public import std.variant;
40 
41 import core.atomic;
42 import core.sync.condition;
43 import core.sync.mutex;
44 import core.thread;
45 import core.time : MonoTime;
46 import std.range.primitives;
47 import std.traits;
48 
49 ///
50 @system unittest
51 {
52     __gshared string received;
53     static void spawnedFunc(Tid self, Tid ownerTid)
54     {
55         import std.conv : text;
56         // Receive a message from the owner thread.
57         self.receive((int i){
58             received = text("Received the number ", i);
59 
60             // Send a message back to the owner thread
61             // indicating success.
62             send(ownerTid, true);
63         });
64     }
65 
66     // Start spawnedFunc in a new thread.
67     auto childTid = spawn(&spawnedFunc, thisTid);
68     auto self = thisTid();
69 
70     // Send the number 42 to this new thread.
71     send(childTid, 42);
72 
73     // Receive the result code.
74     auto wasSuccessful = self.receiveOnly!(bool);
75     assert(wasSuccessful);
76     assert(received == "Received the number 42");
77 }
78 
79 private
80 {
81     bool hasLocalAliasing(Types...)()
82     {
83         import std.typecons : Rebindable;
84 
85         // Works around "statement is not reachable"
86         bool doesIt = false;
87         static foreach (T; Types)
88         {
89             static if (is(T == Tid))
90             { /* Allowed */ }
91             else static if (is(T : Rebindable!R, R))
92                 doesIt |= hasLocalAliasing!R;
93             else static if (is(T == struct))
94                 doesIt |= hasLocalAliasing!(typeof(T.tupleof));
95             else
96                 doesIt |= std.traits.hasUnsharedAliasing!(T);
97         }
98         return doesIt;
99     }
100 
101     @safe unittest
102     {
103         static struct Container { Tid t; }
104         static assert(!hasLocalAliasing!(Tid, Container, int));
105     }
106 
107     @safe unittest
108     {
109         /* Issue 20097 */
110         import std.datetime.systime : SysTime;
111         static struct Container { SysTime time; }
112         static assert(!hasLocalAliasing!(SysTime, Container));
113     }
114 
115     struct Message
116     {
117         Variant data;
118 
119         this(T...)(T vals) if (T.length > 0)
120         {
121             static if (T.length == 1)
122             {
123                 data = vals[0];
124             }
125             else
126             {
127                 import std.typecons : Tuple;
128 
129                 data = Tuple!(T)(vals);
130             }
131         }
132 
133         @property auto convertsTo(T...)()
134         {
135             static if (T.length == 1)
136             {
137                 return is(T[0] == Variant) || data.convertsTo!(T);
138             }
139             else
140             {
141                 import std.typecons : Tuple;
142                 return data.convertsTo!(Tuple!(T));
143             }
144         }
145 
146         @property auto get(T...)()
147         {
148             static if (T.length == 1)
149             {
150                 static if (is(T[0] == Variant))
151                     return data;
152                 else
153                     return data.get!(T);
154             }
155             else
156             {
157                 import std.typecons : Tuple;
158                 return data.get!(Tuple!(T));
159             }
160         }
161 
162         auto map(Op)(Op op)
163         {
164             alias Args = Parameters!(Op);
165 
166             static if (Args.length == 1)
167             {
168                 static if (is(Args[0] == Variant))
169                     return op(data);
170                 else
171                     return op(data.get!(Args));
172             }
173             else
174             {
175                 import std.typecons : Tuple;
176                 return op(data.get!(Tuple!(Args)).expand);
177             }
178         }
179     }
180 
181     void checkops(T...)(T ops)
182     {
183         import std.format : format;
184 
185         foreach (i, t1; T)
186         {
187             static assert(isFunctionPointer!t1 || isDelegate!t1,
188                     format!"T %d is not a function pointer or delegates"(i));
189             alias a1 = Parameters!(t1);
190             alias r1 = ReturnType!(t1);
191 
192             static if (i < T.length - 1 && is(r1 == void))
193             {
194                 static assert(a1.length != 1 || !is(a1[0] == Variant),
195                               "function with arguments " ~ a1.stringof ~
196                               " occludes successive function");
197 
198                 foreach (t2; T[i + 1 .. $])
199                 {
200                     alias a2 = Parameters!(t2);
201 
202                     static assert(!is(a1 == a2),
203                         "function with arguments " ~ a1.stringof ~ " occludes successive function");
204                 }
205             }
206         }
207     }
208 
209     @property ref ThreadInfo thisInfo() nothrow
210     {
211         auto t = cast(InfoThread)Thread.getThis();
212 
213         if (t !is null)
214             return t.info;
215 
216         return ThreadInfo.thisInfo;
217     }
218 }
219 
220 static ~this()
221 {
222     thisInfo.cleanup();
223 }
224 
225 // Exceptions
226 
227 /**
228  * Thrown on calls to `receiveOnly` if a message other than the type
229  * the receiving thread expected is sent.
230  */
231 class MessageMismatch : Exception
232 {
233     ///
234     this(string msg = "Unexpected message type") @safe pure nothrow @nogc
235     {
236         super(msg);
237     }
238 }
239 
240 /**
241  * Thrown when a Tid is missing, e.g. when `ownerTid` doesn't
242  * find an owner thread.
243  */
244 class TidMissingException : Exception
245 {
246     import std.exception : basicExceptionCtors;
247     ///
248     mixin basicExceptionCtors;
249 }
250 
251 
252 // Thread ID
253 
254 
255 /**
256  * An opaque type used to represent a logical thread.
257  */
258 struct Tid
259 {
260 package:
261     this(MessageBox m) @safe pure nothrow @nogc
262     {
263         mbox = m;
264     }
265 
266     MessageBox mbox;
267 
268 public:
269 
270     /**
271      * Generate a convenient string for identifying this Tid.  This is only
272      * useful to see if Tid's that are currently executing are the same or
273      * different, e.g. for logging and debugging.  It is potentially possible
274      * that a Tid executed in the future will have the same toString() output
275      * as another Tid that has already terminated.
276      */
277     void toString(scope void delegate(const(char)[]) sink)
278     {
279         import std.format : formattedWrite;
280         formattedWrite(sink, "Tid(%x)", cast(void*) mbox);
281     }
282 
283 }
284 
285 @system unittest
286 {
287     // text!Tid is @system
288     import std.conv : text;
289     Tid tid;
290     assert(text(tid) == "Tid(0)");
291     auto tid2 = thisTid;
292     assert(text(tid2) != "Tid(0)");
293     auto tid3 = tid2;
294     assert(text(tid2) == text(tid3));
295 }
296 
297 /**
298  * Returns: The $(LREF Tid) of the caller's thread.
299  */
300 @property Tid thisTid() @safe
301 {
302     // TODO: remove when concurrency is safe
303     static auto trus() @trusted
304     {
305         if (thisInfo.ident != Tid.init)
306             return thisInfo.ident;
307         thisInfo.ident = Tid(new MessageBox);
308         return thisInfo.ident;
309     }
310 
311     return trus();
312 }
313 
314 // Thread Creation
315 
316 private template isSpawnable(F, T...)
317 {
318     template isParamsImplicitlyConvertible(F1, F2, int i = 0)
319     {
320         alias param1 = Parameters!F1;
321         alias param2 = Parameters!F2;
322         static if (param1.length != param2.length)
323             enum isParamsImplicitlyConvertible = false;
324         else static if (param1.length == i)
325             enum isParamsImplicitlyConvertible = true;
326         else static if (isImplicitlyConvertible!(param2[i], param1[i]))
327             enum isParamsImplicitlyConvertible = isParamsImplicitlyConvertible!(F1,
328                     F2, i + 1);
329         else
330             enum isParamsImplicitlyConvertible = false;
331     }
332 
333     enum isSpawnable = isCallable!F && is(ReturnType!F == void)
334             && isParamsImplicitlyConvertible!(F, void function(Tid, T))
335             && (isFunctionPointer!F || !hasUnsharedAliasing!F);
336 }
337 
338 /**
339  * Starts fn(args) in a new logical thread.
340  *
341  * Executes the supplied function in a new logical thread represented by
342  * `Tid`.
343  *
344  * Params:
345  *  fn   = The function to execute.
346  *  args = Arguments to the function.
347  *
348  * Returns:
349  *  A Tid representing the new logical thread.
350  *
351  * Notes:
352  *  `args` must not have unshared aliasing.  In other words, all arguments
353  *  to `fn` must either be `shared` or `immutable` or have no
354  *  pointer indirection.  This is necessary for enforcing isolation among
355  *  threads.
356  */
357 Tid spawn(F, T...)(F fn, T args)
358 if (isSpawnable!(F, T))
359 {
360     static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed.");
361     return _spawn(fn, args);
362 }
363 
364 ///
365 @system unittest
366 {
367     static void f(Tid self, string msg)
368     {
369         assert(msg == "Hello World");
370     }
371 
372     auto tid = spawn(&f, "Hello World");
373 }
374 
375 /// Fails: char[] has mutable aliasing.
376 @system unittest
377 {
378     string msg = "Hello, World!";
379 
380     static void f1(Tid self, string msg) {}
381     static assert(!__traits(compiles, spawn(&f1, msg.dup)));
382     static assert( __traits(compiles, spawn(&f1, msg.idup)));
383 
384     static void f2(Tid self, char[] msg) {}
385     static assert(!__traits(compiles, spawn(&f2, msg.dup)));
386     static assert(!__traits(compiles, spawn(&f2, msg.idup)));
387 }
388 
389 /// New thread with anonymous function
390 @system unittest
391 {
392     auto self = thisTid();
393     spawn((Tid self, Tid caller) {
394         caller.send("This is so great!");
395     }, self);
396     assert(self.receiveOnly!string == "This is so great!");
397 }
398 
399 /*
400  *
401  */
402 private Tid _spawn(F, T...)(F fn, T args)
403 if (isSpawnable!(F, T))
404 {
405     // TODO: MessageList and &exec should be shared.
406     auto spawnTid = Tid(new MessageBox);
407 
408     void exec()
409     {
410         thisInfo.ident = spawnTid;
411         fn(spawnTid, args);
412     }
413 
414     // TODO: MessageList and &exec should be shared.
415     auto t = new InfoThread(&exec);
416     t.start();
417     return spawnTid;
418 }
419 
420 @system unittest
421 {
422     void function(Tid) fn1;
423     void function(Tid, int) fn2;
424     static assert(__traits(compiles, spawn(fn1)));
425     static assert(__traits(compiles, spawn(fn2, 2)));
426     static assert(!__traits(compiles, spawn(fn1, 1)));
427     static assert(!__traits(compiles, spawn(fn2)));
428 
429     void delegate(Tid, int) shared dg1;
430     shared(void delegate(Tid, int)) dg2;
431     shared(void delegate(Tid, long) shared) dg3;
432     shared(void delegate(Tid, real, int, long) shared) dg4;
433     void delegate(Tid, int) immutable dg5;
434     void delegate(Tid, int) dg6;
435     static assert(__traits(compiles, spawn(dg1, 1)));
436     static assert(__traits(compiles, spawn(dg2, 2)));
437     static assert(__traits(compiles, spawn(dg3, 3)));
438     static assert(__traits(compiles, spawn(dg4, 4, 4, 4)));
439     static assert(__traits(compiles, spawn(dg5, 5)));
440     static assert(!__traits(compiles, spawn(dg6, 6)));
441 
442     auto callable1  = new class{ void opCall(Tid, int) shared {} };
443     auto callable2  = cast(shared) new class{ void opCall(Tid, int) shared {} };
444     auto callable3  = new class{ void opCall(Tid, int) immutable {} };
445     auto callable4  = cast(immutable) new class{ void opCall(Tid, int) immutable {} };
446     auto callable5  = new class{ void opCall(Tid, int) {} };
447     auto callable6  = cast(shared) new class{ void opCall(Tid, int) immutable {} };
448     auto callable7  = cast(immutable) new class{ void opCall(Tid, int) shared {} };
449     auto callable8  = cast(shared) new class{ void opCall(Tid, int) const shared {} };
450     auto callable9  = cast(const shared) new class{ void opCall(Tid, int) shared {} };
451     auto callable10 = cast(const shared) new class{ void opCall(Tid, int) const shared {} };
452     auto callable11 = cast(immutable) new class{ void opCall(Tid, int) const shared {} };
453     static assert(!__traits(compiles, spawn(callable1,  1)));
454     static assert( __traits(compiles, spawn(callable2,  2)));
455     static assert(!__traits(compiles, spawn(callable3,  3)));
456     static assert( __traits(compiles, spawn(callable4,  4)));
457     static assert(!__traits(compiles, spawn(callable5,  5)));
458     static assert(!__traits(compiles, spawn(callable6,  6)));
459     static assert(!__traits(compiles, spawn(callable7,  7)));
460     static assert( __traits(compiles, spawn(callable8,  8)));
461     static assert(!__traits(compiles, spawn(callable9,  9)));
462     static assert( __traits(compiles, spawn(callable10, 10)));
463     static assert( __traits(compiles, spawn(callable11, 11)));
464 }
465 
466 /**
467  * Places the values as a message at the back of tid's message queue.
468  *
469  * Sends the supplied value to the thread represented by tid.  As with
470  * $(REF spawn, std,concurrency), `T` must not have unshared aliasing.
471  */
472 void send(T...)(Tid tid, T vals)
473 {
474     static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed.");
475     auto msg = Message(vals);
476     tid.mbox.put(msg) || assert(0, "MessageBox is closed");
477 }
478 
479 /// Ditto, but do not assert in case of failure
480 bool trySend(T...)(Tid tid, T vals)
481 {
482     static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed.");
483     auto msg = Message(vals);
484     return tid.mbox.put(msg);
485 }
486 
487 /**
488  * Receives a message from another thread.
489  *
490  * Receive a message from another thread, or block if no messages of the
491  * specified types are available.  This function works by pattern matching
492  * a message against a set of delegates and executing the first match found.
493  *
494  * If a delegate that accepts a $(REF Variant, std,variant) is included as
495  * the last argument to `receive`, it will match any message that was not
496  * matched by an earlier delegate.  If more than one argument is sent,
497  * the `Variant` will contain a $(REF Tuple, std,typecons) of all values
498  * sent.
499  */
500 void receive(T...)(Tid self, T ops )
501 in
502 {
503     assert(self.mbox !is null,
504            "Cannot receive a message until a thread was spawned "
505            ~ "or thisTid was passed to a running thread.");
506 }
507 do
508 {
509     checkops( ops );
510     self.mbox.getUntimed(ops);
511 }
512 
513 ///
514 @system unittest
515 {
516     import std.variant : Variant;
517 
518     auto process = (Tid self, Tid caller)
519     {
520         self.receive(
521             (int i) { caller.send(1); },
522             (double f) { caller.send(2); },
523             (Variant v) { caller.send(3); }
524         );
525     };
526 
527     auto self = thisTid();
528     {
529         auto tid = spawn(process, self);
530         send(tid, 42);
531         assert(self.receiveOnly!int == 1);
532     }
533 
534     {
535         auto tid = spawn(process, self);
536         send(tid, 3.14);
537         assert(self.receiveOnly!int == 2);
538     }
539 
540     {
541         auto tid = spawn(process, self);
542         send(tid, "something else");
543         assert(self.receiveOnly!int == 3);
544     }
545 }
546 
547 @safe unittest
548 {
549     static assert( __traits( compiles,
550                       {
551                           receive(Tid.init, (Variant x) {} );
552                           receive(Tid.init, (int x) {}, (Variant x) {} );
553                       } ) );
554 
555     static assert( !__traits( compiles,
556                        {
557                            receive(Tid.init, (Variant x) {}, (int x) {} );
558                        } ) );
559 
560     static assert( !__traits( compiles,
561                        {
562                            receive(Tid.init, (int x) {}, (int x) {} );
563                        } ) );
564 }
565 
566 // Make sure receive() works with free functions as well.
567 version (unittest)
568 {
569     private void receiveFunction(int x) {}
570 }
571 @safe unittest
572 {
573     static assert( __traits( compiles,
574                       {
575                           receive(Tid.init, &receiveFunction );
576                           receive(Tid.init, &receiveFunction, (Variant x) {} );
577                       } ) );
578 }
579 
580 
581 private template receiveOnlyRet(T...)
582 {
583     static if ( T.length == 1 )
584     {
585         alias receiveOnlyRet = T[0];
586     }
587     else
588     {
589         import std.typecons : Tuple;
590         alias receiveOnlyRet = Tuple!(T);
591     }
592 }
593 
594 /**
595  * Receives only messages with arguments of types `T`.
596  *
597  * Throws:  `MessageMismatch` if a message of types other than `T`
598  *          is received.
599  *
600  * Returns: The received message.  If `T.length` is greater than one,
601  *          the message will be packed into a $(REF Tuple, std,typecons).
602  */
603 receiveOnlyRet!(T) receiveOnly(T...)(Tid self)
604 in
605 {
606     assert(self.mbox !is null,
607         "Cannot receive a message until a thread was spawned or thisTid was passed to a running thread.");
608 }
609 do
610 {
611     import std.format : format;
612     import std.typecons : Tuple;
613 
614     Tuple!(T) ret;
615 
616     self.mbox.getUntimed((T val) {
617         static if (T.length)
618             ret.field = val;
619     },
620     (Variant val) {
621         static if (T.length > 1)
622             string exp = T.stringof;
623         else
624             string exp = T[0].stringof;
625 
626         throw new MessageMismatch(
627             format("Unexpected message type: expected '%s', got '%s'", exp, val.type.toString()));
628     });
629     static if (T.length == 1)
630         return ret[0];
631     else
632         return ret;
633 }
634 
635 ///
636 @system unittest
637 {
638     auto tid = spawn(
639     (Tid self) {
640         assert(self.receiveOnly!int == 42);
641     });
642     send(tid, 42);
643 }
644 
645 ///
646 @system unittest
647 {
648     auto tid = spawn(
649     (Tid self) {
650         assert(self.receiveOnly!string == "text");
651     });
652     send(tid, "text");
653 }
654 
655 ///
656 @system unittest
657 {
658     struct Record { string name; int age; }
659 
660     auto tid = spawn(
661     (Tid self) {
662         auto msg = self.receiveOnly!(double, Record);
663         assert(msg[0] == 0.5);
664         assert(msg[1].name == "Alice");
665         assert(msg[1].age == 31);
666     });
667 
668     send(tid, 0.5, Record("Alice", 31));
669 }
670 
671 @system unittest
672 {
673     static void t1(Tid self, Tid mainTid)
674     {
675         try
676         {
677             self.receiveOnly!string();
678             mainTid.send("");
679         }
680         catch (Throwable th)
681         {
682             mainTid.send(th.msg);
683         }
684     }
685 
686     auto self = thisTid();
687     auto tid = spawn(&t1, self);
688     tid.send(1);
689     string result = self.receiveOnly!string();
690     assert(result == "Unexpected message type: expected 'string', got 'int'");
691 }
692 
693 /**
694  * Tries to receive but will give up if no matches arrive within duration.
695  * Won't wait at all if provided $(REF Duration, core,time) is negative.
696  *
697  * Same as `receive` except that rather than wait forever for a message,
698  * it waits until either it receives a message or the given
699  * $(REF Duration, core,time) has passed. It returns `true` if it received a
700  * message and `false` if it timed out waiting for one.
701  */
702 bool receiveTimeout(T...)(Tid self, Duration duration, T ops)
703 in
704 {
705     assert(self.mbox !is null,
706         "Cannot receive a message until a thread was spawned or thisTid was passed to a running thread.");
707 }
708 do
709 {
710     checkops(ops);
711     return self.mbox.get(duration, ops);
712 }
713 
714 @safe unittest
715 {
716     static assert(__traits(compiles, {
717         receiveTimeout(Tid.init, msecs(0), (Variant x) {});
718         receiveTimeout(Tid.init, msecs(0), (int x) {}, (Variant x) {});
719     }));
720 
721     static assert(!__traits(compiles, {
722         receiveTimeout(Tid.init, msecs(0), (Variant x) {}, (int x) {});
723     }));
724 
725     static assert(!__traits(compiles, {
726         receiveTimeout(Tid.init, msecs(0), (int x) {}, (int x) {});
727     }));
728 
729     static assert(__traits(compiles, {
730         receiveTimeout(Tid.init, msecs(10), (int x) {}, (Variant x) {});
731     }));
732 }
733 
734 
735 /**
736  * Encapsulates all implementation-level data needed for scheduling.
737  *
738  * When defining a Scheduler, an instance of this struct must be associated
739  * with each logical thread.  It contains all implementation-level information
740  * needed by the internal API.
741  */
742 struct ThreadInfo
743 {
744     Tid ident;
745 
746     /**
747      * Gets a thread-local instance of ThreadInfo.
748      *
749      * Gets a thread-local instance of ThreadInfo, which should be used as the
750      * default instance when info is requested for a thread not created by the
751      * Scheduler.
752      */
753     static @property ref thisInfo() nothrow
754     {
755         static ThreadInfo val;
756         return val;
757     }
758 
759     /**
760      * Cleans up this ThreadInfo.
761      *
762      * This must be called when a scheduled thread terminates.  It tears down
763      * the messaging system for the thread and notifies interested parties of
764      * the thread's termination.
765      */
766     void cleanup()
767     {
768         if (ident.mbox !is null)
769             ident.mbox.close();
770     }
771 }
772 
773 
774 /***************************************************************************
775 
776     Thread with ThreadInfo,
777     This is implemented to avoid using global variables.
778 
779 ***************************************************************************/
780 
781 public class InfoThread : Thread
782 {
783     public ThreadInfo info;
784 
785     /***************************************************************************
786 
787         Initializes a thread object which is associated with a static
788 
789         Params:
790             fn = The thread function.
791             sz = The stack size for this thread.
792 
793     ***************************************************************************/
794 
795     this (void function() fn, size_t sz = 0) @safe pure nothrow @nogc
796     {
797         super(fn, sz);
798     }
799 
800 
801     /***************************************************************************
802 
803         Initializes a thread object which is associated with a dynamic
804 
805         Params:
806             dg = The thread function.
807             sz = The stack size for this thread.
808 
809     ***************************************************************************/
810 
811     this (void delegate() dg, size_t sz = 0) @safe pure nothrow @nogc
812     {
813         super(dg, sz);
814     }
815 }
816 
817 
818 /**
819  * An example Scheduler using Fibers.
820  *
821  * This is an example scheduler that creates a new Fiber per call to spawn
822  * and multiplexes the execution of all fibers within the main thread.
823  */
824 class FiberScheduler
825 {
826     /**
827      * This creates a new Fiber for the supplied op and then starts the
828      * dispatcher.
829      */
830     void start(void delegate() op)
831     {
832         create(op);
833         // Make sure the just-created fiber is run first
834         dispatch(this.m_fibers.length - 1);
835     }
836 
837     /**
838      * This created a new Fiber for the supplied op and adds it to the
839      * dispatch list.
840      */
841     void spawn(void delegate() op) nothrow
842     {
843         create(op);
844         FiberScheduler.yield();
845     }
846 
847     /**************************************************************************
848 
849         Schedule a task to be run next time the scheduler yields
850 
851         Behave similarly to `spawn`, but instead of running the task
852         immediately, it simply adds it to the queue and continue executing
853         the current task.
854 
855         Params:
856             op = Operation to run
857 
858     **************************************************************************/
859 
860     void schedule(void delegate() op) nothrow
861     {
862         this.create(op);
863     }
864 
865     /**
866      * If the caller is a scheduled Fiber, this yields execution to another
867      * scheduled Fiber.
868      */
869     static void yield() nothrow
870     {
871         // NOTE: It's possible that we should test whether the calling Fiber
872         //       is an InfoFiber before yielding, but I think it's reasonable
873         //       that any fiber should yield here.
874         if (Fiber.getThis())
875             Fiber.yield();
876     }
877 
878     /**
879      * Returns an appropriate ThreadInfo instance.
880      *
881      * Returns a ThreadInfo instance specific to the calling Fiber if the
882      * Fiber was created by this dispatcher, otherwise it returns
883      * ThreadInfo.thisInfo.
884      */
885     @property ref ThreadInfo thisInfo() nothrow
886     {
887         auto f = cast(InfoFiber) Fiber.getThis();
888 
889         if (f !is null)
890             return f.info;
891 
892         auto t = cast(InfoThread)Thread.getThis();
893 
894         if (t !is null)
895             return t.info;
896 
897         return ThreadInfo.thisInfo;
898     }
899 
900 protected:
901     /**
902      * Creates a new Fiber which calls the given delegate.
903      *
904      * Params:
905      *   op = The delegate the fiber should call
906      */
907     void create(void delegate() op) nothrow
908     {
909         void wrap()
910         {
911             scope (exit)
912             {
913                 thisInfo.cleanup();
914             }
915             op();
916         }
917 
918         m_fibers ~= new InfoFiber(&wrap);
919     }
920 
921     /**
922      * Fiber which embeds a ThreadInfo
923      */
924     static class InfoFiber : Fiber
925     {
926         ThreadInfo info;
927 
928         /// Semaphore reference that this Fiber is blocked on
929         FiberBinarySemaphore sem;
930 
931         this(void delegate() op, size_t sz = 512 * 1024) nothrow
932         {
933             super(op, sz);
934         }
935     }
936 
937     protected class FiberBinarySemaphore
938     {
939 
940         /***********************************************************************
941 
942             Associate `FiberBinarySemaphore` with the running `Fiber`
943 
944             `FiberScheduler` will check to see if the `Fiber` is blocking on a
945             `FiberBinarySemaphore` to avoid rescheduling it unnecessarily
946 
947         ***********************************************************************/
948 
949         private void registerToInfoFiber(InfoFiber info_fiber = cast(InfoFiber) Fiber.getThis()) nothrow
950         {
951             assert(info_fiber !is null, "This Fiber does not belong to FiberScheduler");
952             assert(info_fiber.sem is null, "This Fiber already has a registered FiberBinarySemaphore");
953             info_fiber.sem = this;
954 
955         }
956 
957         void wait() nothrow
958         {
959             this.registerToInfoFiber();
960             FiberScheduler.yield();
961             scope (exit) this.notified = false;
962         }
963 
964         bool wait(Duration period) nothrow
965         {
966             this.registerToInfoFiber();
967             this.limit = MonoTime.currTime + period;
968             FiberScheduler.yield();
969 
970             scope (exit)
971             {
972                 this.limit = MonoTime.init;
973                 this.notified = false;
974             }
975             return this.notified;
976         }
977 
978         void notify() nothrow
979         {
980             this.notified = true;
981             FiberScheduler.yield();
982         }
983 
984         /***********************************************************************
985 
986             Query if `FiberBinarySemaphore` should still block
987 
988             FiberBinarySemaphore will block the Fiber until it is notified or
989             the specified timeout is reached.
990 
991         ***********************************************************************/
992 
993         bool shouldBlock() nothrow
994         {
995             bool timed_out = (limit != MonoTime.init
996                                 && MonoTime.currTime >= limit);
997 
998             return !timed_out && !notified;
999         }
1000 
1001     private:
1002         /// Time limit that will eventually unblock the caller if a timeout is specified
1003         MonoTime limit = MonoTime.init;
1004 
1005         /// State of the semaphore
1006         bool notified;
1007     }
1008 
1009 private:
1010 
1011     /***********************************************************************
1012 
1013         Start the scheduling loop
1014 
1015         Param:
1016             pos = m_fibers index of the Fiber to execute first
1017 
1018     ***********************************************************************/
1019 
1020     void dispatch(size_t pos = 0)
1021     {
1022         import std.algorithm.mutation : remove;
1023 
1024         assert(pos < m_fibers.length);
1025         while (m_fibers.length > 0)
1026         {
1027             // Is Fiber waiting on a FiberBinarySemaphore?
1028             if (auto sem = m_fibers[pos].sem)
1029             {
1030                 // Is condition met?
1031                 // TRUE: Clear the sem and schedule the fiber
1032                 // FALSE: Skip it
1033                 if (sem.shouldBlock())
1034                 {
1035                     if (pos++ >= m_fibers.length - 1)
1036                         pos = 0;
1037                     continue;
1038                 }
1039                 else
1040                 {
1041                     m_fibers[pos].sem = null;
1042                 }
1043             }
1044 
1045             auto t = m_fibers[pos].call(Fiber.Rethrow.no);
1046             if (t !is null)
1047             {
1048                 throw t;
1049             }
1050             if (m_fibers[pos].state == Fiber.State.TERM)
1051             {
1052                 if (pos >= (m_fibers = remove(m_fibers, pos)).length)
1053                     pos = 0;
1054             }
1055             else if (pos++ >= m_fibers.length - 1)
1056             {
1057                 pos = 0;
1058             }
1059         }
1060     }
1061 
1062 private:
1063     /// List of `InfoFiber`s currently in the system
1064     InfoFiber[] m_fibers;
1065 }
1066 
1067 /// Ensure argument to `start` is run first
1068 unittest
1069 {
1070     {
1071         scope sched = new FiberScheduler();
1072         bool startHasRun;
1073         sched.spawn(() => assert(startHasRun));
1074         sched.start(() { startHasRun = true; });
1075     }
1076     {
1077         scope sched = new FiberScheduler();
1078         bool startHasRun;
1079         sched.schedule(() => assert(startHasRun));
1080         sched.start(() { startHasRun = true; });
1081     }
1082 }
1083 
1084 /*
1085  * A MessageBox is a message queue for one thread.  Other threads may send
1086  * messages to this owner by calling put(), and the owner receives them by
1087  * calling get().  The put() call is therefore effectively shared and the
1088  * get() call is effectively local.  setMaxMsgs may be used by any thread
1089  * to limit the size of the message queue.
1090  */
1091 package class MessageBox
1092 {
1093     this() @safe nothrow
1094     {
1095         m_lock = new Mutex;
1096         m_closed = false;
1097 
1098         m_putMsg = new Condition(m_lock);
1099     }
1100 
1101     ///
1102     final @property bool isClosed() @safe @nogc pure
1103     {
1104         synchronized (m_lock)
1105         {
1106             return m_closed;
1107         }
1108     }
1109 
1110     /*
1111      * If maxMsgs is not set, the message is added to the queue and the
1112      * owner is notified. If the queue is full, onCrowdingDoThis is called.
1113      * If the routine returns true, this call will block until
1114      * the owner has made space available in the queue.  If it returns
1115      * false, this call will abort.
1116      *
1117      * Params:
1118      *  msg = The message to put in the queue.
1119      *
1120      * Returns:
1121      *  `false` if the message box is closed, `true` otherwise
1122      */
1123     final bool put (ref Message msg)
1124     {
1125         synchronized (m_lock)
1126         {
1127             if (m_closed)
1128                 return false;
1129             m_sharedBox.put(msg);
1130             m_putMsg.notify();
1131         }
1132         return true;
1133     }
1134 
1135     /*
1136      * Matches ops against each message in turn until a match is found.
1137      *
1138      * Params:
1139      *  ops = The operations to match.  Each may return a bool to indicate
1140      *        whether a message with a matching type is truly a match.
1141      *
1142      * Returns:
1143      *  true if a message was retrieved and false if not (such as if a
1144      *  timeout occurred).
1145      */
1146     bool getUntimed(Ops...)(scope Ops ops)
1147     {
1148         return this.get(Duration.init, ops);
1149     }
1150 
1151     bool get(Ops...)(Duration period, scope Ops ops)
1152     {
1153         immutable timedWait = period !is Duration.init;
1154         MonoTime limit = timedWait ? MonoTime.currTime + period : MonoTime.init;
1155 
1156         bool onStandardMsg(ref Message msg)
1157         {
1158             foreach (i, t; Ops)
1159             {
1160                 alias Args = Parameters!(t);
1161                 auto op = ops[i];
1162 
1163                 if (msg.convertsTo!(Args))
1164                 {
1165                     static if (is(ReturnType!(t) == bool))
1166                     {
1167                         return msg.map(op);
1168                     }
1169                     else
1170                     {
1171                         msg.map(op);
1172                         return true;
1173                     }
1174                 }
1175             }
1176             return false;
1177         }
1178 
1179         bool scan(ref ListT list)
1180         {
1181             for (auto range = list[]; !range.empty;)
1182             {
1183                 // Only the message handler will throw, so if this occurs
1184                 // we can be certain that the message was handled.
1185                 scope (failure)
1186                     list.removeAt(range);
1187 
1188                 if (onStandardMsg(range.front))
1189                 {
1190                     list.removeAt(range);
1191                     return true;
1192                 }
1193                 range.popFront();
1194                 continue;
1195             }
1196             return false;
1197         }
1198 
1199         while (true)
1200         {
1201             ListT arrived;
1202 
1203             if (scan(m_localBox))
1204             {
1205                 return true;
1206             }
1207             FiberScheduler.yield();
1208             synchronized (m_lock)
1209             {
1210                 while (m_sharedBox.empty)
1211                 {
1212                     if (timedWait)
1213                     {
1214                         if (period <= Duration.zero || !m_putMsg.wait(period))
1215                             return false;
1216                     }
1217                     else
1218                     {
1219                         m_putMsg.wait();
1220                     }
1221                 }
1222                 arrived.put(m_sharedBox);
1223             }
1224             scope (exit) m_localBox.put(arrived);
1225             if (scan(arrived))
1226                 return true;
1227 
1228             if (timedWait)
1229                 period = limit - MonoTime.currTime;
1230         }
1231     }
1232 
1233     /*
1234      * Called on thread termination.
1235      *
1236      * This routine clears out message queues and sets a flag to reject
1237      * any future messages.
1238      */
1239     final void close()
1240     {
1241         synchronized (m_lock)
1242             m_closed = true;
1243         m_localBox.clear();
1244     }
1245 
1246 private:
1247 
1248     alias ListT = List!(Message);
1249 
1250     ListT m_localBox;
1251 
1252     Mutex m_lock;
1253     Condition m_putMsg;
1254     ListT m_sharedBox;
1255     bool m_closed;
1256 }
1257 
1258 
1259 ///
1260 package struct List (T)
1261 {
1262     struct Range
1263     {
1264         import std.exception : enforce;
1265 
1266         @property bool empty() const
1267         {
1268             return !m_prev.next;
1269         }
1270 
1271         @property ref T front()
1272         {
1273             enforce(m_prev.next, "invalid list node");
1274             return m_prev.next.val;
1275         }
1276 
1277         @property void front(T val)
1278         {
1279             enforce(m_prev.next, "invalid list node");
1280             m_prev.next.val = val;
1281         }
1282 
1283         void popFront()
1284         {
1285             enforce(m_prev.next, "invalid list node");
1286             m_prev = m_prev.next;
1287         }
1288 
1289         private this(Node* p)
1290         {
1291             m_prev = p;
1292         }
1293 
1294         private Node* m_prev;
1295     }
1296 
1297     void put(T val)
1298     {
1299         put(newNode(val));
1300     }
1301 
1302     void put(ref List!(T) rhs)
1303     {
1304         if (!rhs.empty)
1305         {
1306             put(rhs.m_first);
1307             while (m_last.next !is null)
1308             {
1309                 m_last = m_last.next;
1310                 m_count++;
1311             }
1312             rhs.m_first = null;
1313             rhs.m_last = null;
1314             rhs.m_count = 0;
1315         }
1316     }
1317 
1318     Range opSlice()
1319     {
1320         return Range(cast(Node*)&m_first);
1321     }
1322 
1323     void removeAt(Range r)
1324     {
1325         import std.exception : enforce;
1326 
1327         assert(m_count, "Can not remove from empty Range");
1328         Node* n = r.m_prev;
1329         enforce(n && n.next, "attempting to remove invalid list node");
1330 
1331         if (m_last is m_first)
1332             m_last = null;
1333         else if (m_last is n.next)
1334             m_last = n; // nocoverage
1335         Node* to_free = n.next;
1336         n.next = n.next.next;
1337         freeNode(to_free);
1338         m_count--;
1339     }
1340 
1341     @property size_t length()
1342     {
1343         return m_count;
1344     }
1345 
1346     void clear()
1347     {
1348         m_first = m_last = null;
1349         m_count = 0;
1350     }
1351 
1352     @property bool empty()
1353     {
1354         return m_first is null;
1355     }
1356 
1357 private:
1358     struct Node
1359     {
1360         Node* next;
1361         T val;
1362 
1363         this(T v)
1364         {
1365             val = v;
1366         }
1367     }
1368 
1369     static shared struct SpinLock
1370     {
1371         void lock() { while (!cas(&locked, false, true)) { Thread.yield(); } }
1372         void unlock() { atomicStore!(MemoryOrder.rel)(locked, false); }
1373         bool locked;
1374     }
1375 
1376     static shared SpinLock sm_lock;
1377     static shared Node* sm_head;
1378 
1379     Node* newNode(T v)
1380     {
1381         Node* n;
1382         {
1383             sm_lock.lock();
1384             scope (exit) sm_lock.unlock();
1385 
1386             if (sm_head)
1387             {
1388                 n = cast(Node*) sm_head;
1389                 sm_head = sm_head.next;
1390             }
1391         }
1392         if (n)
1393         {
1394             import std.conv : emplace;
1395             emplace!Node(n, v);
1396         }
1397         else
1398         {
1399             n = new Node(v);
1400         }
1401         return n;
1402     }
1403 
1404     void freeNode(Node* n)
1405     {
1406         // destroy val to free any owned GC memory
1407         destroy(n.val);
1408 
1409         sm_lock.lock();
1410         scope (exit) sm_lock.unlock();
1411 
1412         auto sn = cast(shared(Node)*) n;
1413         sn.next = sm_head;
1414         sm_head = sn;
1415     }
1416 
1417     void put(Node* n)
1418     {
1419         m_count++;
1420         if (!empty)
1421         {
1422             m_last.next = n;
1423             m_last = n;
1424             return;
1425         }
1426         m_first = n;
1427         m_last = n;
1428     }
1429 
1430     Node* m_first;
1431     Node* m_last;
1432     size_t m_count;
1433 }
1434 
1435 // test ability to send shared arrays
1436 @system unittest
1437 {
1438     static shared int[] x = new shared(int)[1];
1439     auto tid = spawn((Tid self, Tid caller) {
1440         auto arr = self.receiveOnly!(shared(int)[]);
1441         arr[0] = 5;
1442         caller.send(true);
1443     }, thisTid);
1444     tid.send(x);
1445     auto self = thisTid();
1446     self.receiveOnly!(bool);
1447     assert(x[0] == 5);
1448 }