1 /** 2 * This is a low-level messaging API upon which more structured or restrictive 3 * APIs may be built. The general idea is that every messageable entity is 4 * represented by a common handle type called a Tid, which allows messages to 5 * be sent to logical threads that are executing in both the current process 6 * and in external processes using the same interface. This is an important 7 * aspect of scalability because it allows the components of a program to be 8 * spread across available resources with few to no changes to the actual 9 * implementation. 10 * 11 * A logical thread is an execution context that has its own stack and which 12 * runs asynchronously to other logical threads. These may be preemptively 13 * scheduled kernel threads, fibers (cooperative user-space threads), or some 14 * other concept with similar behavior. 15 * 16 * The type of concurrency used when logical threads are created is determined 17 * by the Scheduler selected at initialization time. The default behavior is 18 * currently to create a new kernel thread per call to spawn, but other 19 * schedulers are available that multiplex fibers across the main thread or 20 * use some combination of the two approaches. 21 * 22 * Note: 23 * Copied (almost verbatim) from Phobos at commit 3bfccf4f1 (2019-11-27) 24 * Changes are this notice, and the module rename, from `std.concurrency` 25 * to `geod24.concurrency`. 26 * 27 * Copyright: Copyright Sean Kelly 2009 - 2014. 28 * License: <a href="http://www.boost.org/LICENSE_1_0.txt">Boost License 1.0</a>. 29 * Authors: Sean Kelly, Alex Rønne Petersen, Martin Nowak 30 * Source: $(PHOBOSSRC std/concurrency.d) 31 */ 32 /* Copyright Sean Kelly 2009 - 2014. 33 * Distributed under the Boost Software License, Version 1.0. 34 * (See accompanying file LICENSE_1_0.txt or copy at 35 * http://www.boost.org/LICENSE_1_0.txt) 36 */ 37 module geod24.concurrency; 38 39 public import std.variant; 40 41 import core.atomic; 42 import core.sync.condition; 43 import core.sync.mutex; 44 import core.thread; 45 import core.time : MonoTime; 46 import std.range.primitives; 47 import std.traits; 48 49 /// 50 @system unittest 51 { 52 __gshared string received; 53 static void spawnedFunc(Tid self, Tid ownerTid) 54 { 55 import std.conv : text; 56 // Receive a message from the owner thread. 57 self.receive((int i){ 58 received = text("Received the number ", i); 59 60 // Send a message back to the owner thread 61 // indicating success. 62 send(ownerTid, true); 63 }); 64 } 65 66 // Start spawnedFunc in a new thread. 67 auto childTid = spawn(&spawnedFunc, thisTid); 68 auto self = thisTid(); 69 70 // Send the number 42 to this new thread. 71 send(childTid, 42); 72 73 // Receive the result code. 74 auto wasSuccessful = self.receiveOnly!(bool); 75 assert(wasSuccessful); 76 assert(received == "Received the number 42"); 77 } 78 79 private 80 { 81 bool hasLocalAliasing(Types...)() 82 { 83 import std.typecons : Rebindable; 84 85 // Works around "statement is not reachable" 86 bool doesIt = false; 87 static foreach (T; Types) 88 { 89 static if (is(T == Tid)) 90 { /* Allowed */ } 91 else static if (is(T : Rebindable!R, R)) 92 doesIt |= hasLocalAliasing!R; 93 else static if (is(T == struct)) 94 doesIt |= hasLocalAliasing!(typeof(T.tupleof)); 95 else 96 doesIt |= std.traits.hasUnsharedAliasing!(T); 97 } 98 return doesIt; 99 } 100 101 @safe unittest 102 { 103 static struct Container { Tid t; } 104 static assert(!hasLocalAliasing!(Tid, Container, int)); 105 } 106 107 @safe unittest 108 { 109 /* Issue 20097 */ 110 import std.datetime.systime : SysTime; 111 static struct Container { SysTime time; } 112 static assert(!hasLocalAliasing!(SysTime, Container)); 113 } 114 115 struct Message 116 { 117 Variant data; 118 119 this(T...)(T vals) if (T.length > 0) 120 { 121 static if (T.length == 1) 122 { 123 data = vals[0]; 124 } 125 else 126 { 127 import std.typecons : Tuple; 128 129 data = Tuple!(T)(vals); 130 } 131 } 132 133 @property auto convertsTo(T...)() 134 { 135 static if (T.length == 1) 136 { 137 return is(T[0] == Variant) || data.convertsTo!(T); 138 } 139 else 140 { 141 import std.typecons : Tuple; 142 return data.convertsTo!(Tuple!(T)); 143 } 144 } 145 146 @property auto get(T...)() 147 { 148 static if (T.length == 1) 149 { 150 static if (is(T[0] == Variant)) 151 return data; 152 else 153 return data.get!(T); 154 } 155 else 156 { 157 import std.typecons : Tuple; 158 return data.get!(Tuple!(T)); 159 } 160 } 161 162 auto map(Op)(Op op) 163 { 164 alias Args = Parameters!(Op); 165 166 static if (Args.length == 1) 167 { 168 static if (is(Args[0] == Variant)) 169 return op(data); 170 else 171 return op(data.get!(Args)); 172 } 173 else 174 { 175 import std.typecons : Tuple; 176 return op(data.get!(Tuple!(Args)).expand); 177 } 178 } 179 } 180 181 void checkops(T...)(T ops) 182 { 183 import std.format : format; 184 185 foreach (i, t1; T) 186 { 187 static assert(isFunctionPointer!t1 || isDelegate!t1, 188 format!"T %d is not a function pointer or delegates"(i)); 189 alias a1 = Parameters!(t1); 190 alias r1 = ReturnType!(t1); 191 192 static if (i < T.length - 1 && is(r1 == void)) 193 { 194 static assert(a1.length != 1 || !is(a1[0] == Variant), 195 "function with arguments " ~ a1.stringof ~ 196 " occludes successive function"); 197 198 foreach (t2; T[i + 1 .. $]) 199 { 200 alias a2 = Parameters!(t2); 201 202 static assert(!is(a1 == a2), 203 "function with arguments " ~ a1.stringof ~ " occludes successive function"); 204 } 205 } 206 } 207 } 208 209 @property ref ThreadInfo thisInfo() nothrow 210 { 211 auto t = cast(InfoThread)Thread.getThis(); 212 213 if (t !is null) 214 return t.info; 215 216 return ThreadInfo.thisInfo; 217 } 218 } 219 220 static ~this() 221 { 222 thisInfo.cleanup(); 223 } 224 225 // Exceptions 226 227 /** 228 * Thrown on calls to `receiveOnly` if a message other than the type 229 * the receiving thread expected is sent. 230 */ 231 class MessageMismatch : Exception 232 { 233 /// 234 this(string msg = "Unexpected message type") @safe pure nothrow @nogc 235 { 236 super(msg); 237 } 238 } 239 240 /** 241 * Thrown when a Tid is missing, e.g. when `ownerTid` doesn't 242 * find an owner thread. 243 */ 244 class TidMissingException : Exception 245 { 246 import std.exception : basicExceptionCtors; 247 /// 248 mixin basicExceptionCtors; 249 } 250 251 252 // Thread ID 253 254 255 /** 256 * An opaque type used to represent a logical thread. 257 */ 258 struct Tid 259 { 260 package: 261 this(MessageBox m) @safe pure nothrow @nogc 262 { 263 mbox = m; 264 } 265 266 MessageBox mbox; 267 268 public: 269 270 /** 271 * Generate a convenient string for identifying this Tid. This is only 272 * useful to see if Tid's that are currently executing are the same or 273 * different, e.g. for logging and debugging. It is potentially possible 274 * that a Tid executed in the future will have the same toString() output 275 * as another Tid that has already terminated. 276 */ 277 void toString(scope void delegate(const(char)[]) sink) 278 { 279 import std.format : formattedWrite; 280 formattedWrite(sink, "Tid(%x)", cast(void*) mbox); 281 } 282 283 } 284 285 @system unittest 286 { 287 // text!Tid is @system 288 import std.conv : text; 289 Tid tid; 290 assert(text(tid) == "Tid(0)"); 291 auto tid2 = thisTid; 292 assert(text(tid2) != "Tid(0)"); 293 auto tid3 = tid2; 294 assert(text(tid2) == text(tid3)); 295 } 296 297 /** 298 * Returns: The $(LREF Tid) of the caller's thread. 299 */ 300 @property Tid thisTid() @safe 301 { 302 // TODO: remove when concurrency is safe 303 static auto trus() @trusted 304 { 305 if (thisInfo.ident != Tid.init) 306 return thisInfo.ident; 307 thisInfo.ident = Tid(new MessageBox); 308 return thisInfo.ident; 309 } 310 311 return trus(); 312 } 313 314 // Thread Creation 315 316 private template isSpawnable(F, T...) 317 { 318 template isParamsImplicitlyConvertible(F1, F2, int i = 0) 319 { 320 alias param1 = Parameters!F1; 321 alias param2 = Parameters!F2; 322 static if (param1.length != param2.length) 323 enum isParamsImplicitlyConvertible = false; 324 else static if (param1.length == i) 325 enum isParamsImplicitlyConvertible = true; 326 else static if (isImplicitlyConvertible!(param2[i], param1[i])) 327 enum isParamsImplicitlyConvertible = isParamsImplicitlyConvertible!(F1, 328 F2, i + 1); 329 else 330 enum isParamsImplicitlyConvertible = false; 331 } 332 333 enum isSpawnable = isCallable!F && is(ReturnType!F == void) 334 && isParamsImplicitlyConvertible!(F, void function(Tid, T)) 335 && (isFunctionPointer!F || !hasUnsharedAliasing!F); 336 } 337 338 /** 339 * Starts fn(args) in a new logical thread. 340 * 341 * Executes the supplied function in a new logical thread represented by 342 * `Tid`. 343 * 344 * Params: 345 * fn = The function to execute. 346 * args = Arguments to the function. 347 * 348 * Returns: 349 * A Tid representing the new logical thread. 350 * 351 * Notes: 352 * `args` must not have unshared aliasing. In other words, all arguments 353 * to `fn` must either be `shared` or `immutable` or have no 354 * pointer indirection. This is necessary for enforcing isolation among 355 * threads. 356 */ 357 Tid spawn(F, T...)(F fn, T args) 358 if (isSpawnable!(F, T)) 359 { 360 static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed."); 361 return _spawn(fn, args); 362 } 363 364 /// 365 @system unittest 366 { 367 static void f(Tid self, string msg) 368 { 369 assert(msg == "Hello World"); 370 } 371 372 auto tid = spawn(&f, "Hello World"); 373 } 374 375 /// Fails: char[] has mutable aliasing. 376 @system unittest 377 { 378 string msg = "Hello, World!"; 379 380 static void f1(Tid self, string msg) {} 381 static assert(!__traits(compiles, spawn(&f1, msg.dup))); 382 static assert( __traits(compiles, spawn(&f1, msg.idup))); 383 384 static void f2(Tid self, char[] msg) {} 385 static assert(!__traits(compiles, spawn(&f2, msg.dup))); 386 static assert(!__traits(compiles, spawn(&f2, msg.idup))); 387 } 388 389 /// New thread with anonymous function 390 @system unittest 391 { 392 auto self = thisTid(); 393 spawn((Tid self, Tid caller) { 394 caller.send("This is so great!"); 395 }, self); 396 assert(self.receiveOnly!string == "This is so great!"); 397 } 398 399 /* 400 * 401 */ 402 private Tid _spawn(F, T...)(F fn, T args) 403 if (isSpawnable!(F, T)) 404 { 405 // TODO: MessageList and &exec should be shared. 406 auto spawnTid = Tid(new MessageBox); 407 408 void exec() 409 { 410 thisInfo.ident = spawnTid; 411 fn(spawnTid, args); 412 } 413 414 // TODO: MessageList and &exec should be shared. 415 auto t = new InfoThread(&exec); 416 t.start(); 417 return spawnTid; 418 } 419 420 @system unittest 421 { 422 void function(Tid) fn1; 423 void function(Tid, int) fn2; 424 static assert(__traits(compiles, spawn(fn1))); 425 static assert(__traits(compiles, spawn(fn2, 2))); 426 static assert(!__traits(compiles, spawn(fn1, 1))); 427 static assert(!__traits(compiles, spawn(fn2))); 428 429 void delegate(Tid, int) shared dg1; 430 shared(void delegate(Tid, int)) dg2; 431 shared(void delegate(Tid, long) shared) dg3; 432 shared(void delegate(Tid, real, int, long) shared) dg4; 433 void delegate(Tid, int) immutable dg5; 434 void delegate(Tid, int) dg6; 435 static assert(__traits(compiles, spawn(dg1, 1))); 436 static assert(__traits(compiles, spawn(dg2, 2))); 437 static assert(__traits(compiles, spawn(dg3, 3))); 438 static assert(__traits(compiles, spawn(dg4, 4, 4, 4))); 439 static assert(__traits(compiles, spawn(dg5, 5))); 440 static assert(!__traits(compiles, spawn(dg6, 6))); 441 442 auto callable1 = new class{ void opCall(Tid, int) shared {} }; 443 auto callable2 = cast(shared) new class{ void opCall(Tid, int) shared {} }; 444 auto callable3 = new class{ void opCall(Tid, int) immutable {} }; 445 auto callable4 = cast(immutable) new class{ void opCall(Tid, int) immutable {} }; 446 auto callable5 = new class{ void opCall(Tid, int) {} }; 447 auto callable6 = cast(shared) new class{ void opCall(Tid, int) immutable {} }; 448 auto callable7 = cast(immutable) new class{ void opCall(Tid, int) shared {} }; 449 auto callable8 = cast(shared) new class{ void opCall(Tid, int) const shared {} }; 450 auto callable9 = cast(const shared) new class{ void opCall(Tid, int) shared {} }; 451 auto callable10 = cast(const shared) new class{ void opCall(Tid, int) const shared {} }; 452 auto callable11 = cast(immutable) new class{ void opCall(Tid, int) const shared {} }; 453 static assert(!__traits(compiles, spawn(callable1, 1))); 454 static assert( __traits(compiles, spawn(callable2, 2))); 455 static assert(!__traits(compiles, spawn(callable3, 3))); 456 static assert( __traits(compiles, spawn(callable4, 4))); 457 static assert(!__traits(compiles, spawn(callable5, 5))); 458 static assert(!__traits(compiles, spawn(callable6, 6))); 459 static assert(!__traits(compiles, spawn(callable7, 7))); 460 static assert( __traits(compiles, spawn(callable8, 8))); 461 static assert(!__traits(compiles, spawn(callable9, 9))); 462 static assert( __traits(compiles, spawn(callable10, 10))); 463 static assert( __traits(compiles, spawn(callable11, 11))); 464 } 465 466 /** 467 * Places the values as a message at the back of tid's message queue. 468 * 469 * Sends the supplied value to the thread represented by tid. As with 470 * $(REF spawn, std,concurrency), `T` must not have unshared aliasing. 471 */ 472 void send(T...)(Tid tid, T vals) 473 { 474 static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed."); 475 auto msg = Message(vals); 476 tid.mbox.put(msg) || assert(0, "MessageBox is closed"); 477 } 478 479 /// Ditto, but do not assert in case of failure 480 bool trySend(T...)(Tid tid, T vals) 481 { 482 static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed."); 483 auto msg = Message(vals); 484 return tid.mbox.put(msg); 485 } 486 487 /** 488 * Receives a message from another thread. 489 * 490 * Receive a message from another thread, or block if no messages of the 491 * specified types are available. This function works by pattern matching 492 * a message against a set of delegates and executing the first match found. 493 * 494 * If a delegate that accepts a $(REF Variant, std,variant) is included as 495 * the last argument to `receive`, it will match any message that was not 496 * matched by an earlier delegate. If more than one argument is sent, 497 * the `Variant` will contain a $(REF Tuple, std,typecons) of all values 498 * sent. 499 */ 500 void receive(T...)(Tid self, T ops ) 501 in 502 { 503 assert(self.mbox !is null, 504 "Cannot receive a message until a thread was spawned " 505 ~ "or thisTid was passed to a running thread."); 506 } 507 do 508 { 509 checkops( ops ); 510 self.mbox.getUntimed(ops); 511 } 512 513 /// 514 @system unittest 515 { 516 import std.variant : Variant; 517 518 auto process = (Tid self, Tid caller) 519 { 520 self.receive( 521 (int i) { caller.send(1); }, 522 (double f) { caller.send(2); }, 523 (Variant v) { caller.send(3); } 524 ); 525 }; 526 527 auto self = thisTid(); 528 { 529 auto tid = spawn(process, self); 530 send(tid, 42); 531 assert(self.receiveOnly!int == 1); 532 } 533 534 { 535 auto tid = spawn(process, self); 536 send(tid, 3.14); 537 assert(self.receiveOnly!int == 2); 538 } 539 540 { 541 auto tid = spawn(process, self); 542 send(tid, "something else"); 543 assert(self.receiveOnly!int == 3); 544 } 545 } 546 547 @safe unittest 548 { 549 static assert( __traits( compiles, 550 { 551 receive(Tid.init, (Variant x) {} ); 552 receive(Tid.init, (int x) {}, (Variant x) {} ); 553 } ) ); 554 555 static assert( !__traits( compiles, 556 { 557 receive(Tid.init, (Variant x) {}, (int x) {} ); 558 } ) ); 559 560 static assert( !__traits( compiles, 561 { 562 receive(Tid.init, (int x) {}, (int x) {} ); 563 } ) ); 564 } 565 566 // Make sure receive() works with free functions as well. 567 version (unittest) 568 { 569 private void receiveFunction(int x) {} 570 } 571 @safe unittest 572 { 573 static assert( __traits( compiles, 574 { 575 receive(Tid.init, &receiveFunction ); 576 receive(Tid.init, &receiveFunction, (Variant x) {} ); 577 } ) ); 578 } 579 580 581 private template receiveOnlyRet(T...) 582 { 583 static if ( T.length == 1 ) 584 { 585 alias receiveOnlyRet = T[0]; 586 } 587 else 588 { 589 import std.typecons : Tuple; 590 alias receiveOnlyRet = Tuple!(T); 591 } 592 } 593 594 /** 595 * Receives only messages with arguments of types `T`. 596 * 597 * Throws: `MessageMismatch` if a message of types other than `T` 598 * is received. 599 * 600 * Returns: The received message. If `T.length` is greater than one, 601 * the message will be packed into a $(REF Tuple, std,typecons). 602 */ 603 receiveOnlyRet!(T) receiveOnly(T...)(Tid self) 604 in 605 { 606 assert(self.mbox !is null, 607 "Cannot receive a message until a thread was spawned or thisTid was passed to a running thread."); 608 } 609 do 610 { 611 import std.format : format; 612 import std.typecons : Tuple; 613 614 Tuple!(T) ret; 615 616 self.mbox.getUntimed((T val) { 617 static if (T.length) 618 ret.field = val; 619 }, 620 (Variant val) { 621 static if (T.length > 1) 622 string exp = T.stringof; 623 else 624 string exp = T[0].stringof; 625 626 throw new MessageMismatch( 627 format("Unexpected message type: expected '%s', got '%s'", exp, val.type.toString())); 628 }); 629 static if (T.length == 1) 630 return ret[0]; 631 else 632 return ret; 633 } 634 635 /// 636 @system unittest 637 { 638 auto tid = spawn( 639 (Tid self) { 640 assert(self.receiveOnly!int == 42); 641 }); 642 send(tid, 42); 643 } 644 645 /// 646 @system unittest 647 { 648 auto tid = spawn( 649 (Tid self) { 650 assert(self.receiveOnly!string == "text"); 651 }); 652 send(tid, "text"); 653 } 654 655 /// 656 @system unittest 657 { 658 struct Record { string name; int age; } 659 660 auto tid = spawn( 661 (Tid self) { 662 auto msg = self.receiveOnly!(double, Record); 663 assert(msg[0] == 0.5); 664 assert(msg[1].name == "Alice"); 665 assert(msg[1].age == 31); 666 }); 667 668 send(tid, 0.5, Record("Alice", 31)); 669 } 670 671 @system unittest 672 { 673 static void t1(Tid self, Tid mainTid) 674 { 675 try 676 { 677 self.receiveOnly!string(); 678 mainTid.send(""); 679 } 680 catch (Throwable th) 681 { 682 mainTid.send(th.msg); 683 } 684 } 685 686 auto self = thisTid(); 687 auto tid = spawn(&t1, self); 688 tid.send(1); 689 string result = self.receiveOnly!string(); 690 assert(result == "Unexpected message type: expected 'string', got 'int'"); 691 } 692 693 /** 694 * Tries to receive but will give up if no matches arrive within duration. 695 * Won't wait at all if provided $(REF Duration, core,time) is negative. 696 * 697 * Same as `receive` except that rather than wait forever for a message, 698 * it waits until either it receives a message or the given 699 * $(REF Duration, core,time) has passed. It returns `true` if it received a 700 * message and `false` if it timed out waiting for one. 701 */ 702 bool receiveTimeout(T...)(Tid self, Duration duration, T ops) 703 in 704 { 705 assert(self.mbox !is null, 706 "Cannot receive a message until a thread was spawned or thisTid was passed to a running thread."); 707 } 708 do 709 { 710 checkops(ops); 711 return self.mbox.get(duration, ops); 712 } 713 714 @safe unittest 715 { 716 static assert(__traits(compiles, { 717 receiveTimeout(Tid.init, msecs(0), (Variant x) {}); 718 receiveTimeout(Tid.init, msecs(0), (int x) {}, (Variant x) {}); 719 })); 720 721 static assert(!__traits(compiles, { 722 receiveTimeout(Tid.init, msecs(0), (Variant x) {}, (int x) {}); 723 })); 724 725 static assert(!__traits(compiles, { 726 receiveTimeout(Tid.init, msecs(0), (int x) {}, (int x) {}); 727 })); 728 729 static assert(__traits(compiles, { 730 receiveTimeout(Tid.init, msecs(10), (int x) {}, (Variant x) {}); 731 })); 732 } 733 734 735 /** 736 * Encapsulates all implementation-level data needed for scheduling. 737 * 738 * When defining a Scheduler, an instance of this struct must be associated 739 * with each logical thread. It contains all implementation-level information 740 * needed by the internal API. 741 */ 742 struct ThreadInfo 743 { 744 Tid ident; 745 746 /** 747 * Gets a thread-local instance of ThreadInfo. 748 * 749 * Gets a thread-local instance of ThreadInfo, which should be used as the 750 * default instance when info is requested for a thread not created by the 751 * Scheduler. 752 */ 753 static @property ref thisInfo() nothrow 754 { 755 static ThreadInfo val; 756 return val; 757 } 758 759 /** 760 * Cleans up this ThreadInfo. 761 * 762 * This must be called when a scheduled thread terminates. It tears down 763 * the messaging system for the thread and notifies interested parties of 764 * the thread's termination. 765 */ 766 void cleanup() 767 { 768 if (ident.mbox !is null) 769 ident.mbox.close(); 770 } 771 } 772 773 774 /*************************************************************************** 775 776 Thread with ThreadInfo, 777 This is implemented to avoid using global variables. 778 779 ***************************************************************************/ 780 781 public class InfoThread : Thread 782 { 783 public ThreadInfo info; 784 785 /*************************************************************************** 786 787 Initializes a thread object which is associated with a static 788 789 Params: 790 fn = The thread function. 791 sz = The stack size for this thread. 792 793 ***************************************************************************/ 794 795 this (void function() fn, size_t sz = 0) @safe pure nothrow @nogc 796 { 797 super(fn, sz); 798 } 799 800 801 /*************************************************************************** 802 803 Initializes a thread object which is associated with a dynamic 804 805 Params: 806 dg = The thread function. 807 sz = The stack size for this thread. 808 809 ***************************************************************************/ 810 811 this (void delegate() dg, size_t sz = 0) @safe pure nothrow @nogc 812 { 813 super(dg, sz); 814 } 815 } 816 817 818 /** 819 * An example Scheduler using Fibers. 820 * 821 * This is an example scheduler that creates a new Fiber per call to spawn 822 * and multiplexes the execution of all fibers within the main thread. 823 */ 824 class FiberScheduler 825 { 826 /** 827 * This creates a new Fiber for the supplied op and then starts the 828 * dispatcher. 829 */ 830 void start(void delegate() op) 831 { 832 create(op); 833 // Make sure the just-created fiber is run first 834 dispatch(this.m_fibers.length - 1); 835 } 836 837 /** 838 * This created a new Fiber for the supplied op and adds it to the 839 * dispatch list. 840 */ 841 void spawn(void delegate() op) nothrow 842 { 843 create(op); 844 FiberScheduler.yield(); 845 } 846 847 /************************************************************************** 848 849 Schedule a task to be run next time the scheduler yields 850 851 Behave similarly to `spawn`, but instead of running the task 852 immediately, it simply adds it to the queue and continue executing 853 the current task. 854 855 Params: 856 op = Operation to run 857 858 **************************************************************************/ 859 860 void schedule(void delegate() op) nothrow 861 { 862 this.create(op); 863 } 864 865 /** 866 * If the caller is a scheduled Fiber, this yields execution to another 867 * scheduled Fiber. 868 */ 869 static void yield() nothrow 870 { 871 // NOTE: It's possible that we should test whether the calling Fiber 872 // is an InfoFiber before yielding, but I think it's reasonable 873 // that any fiber should yield here. 874 if (Fiber.getThis()) 875 Fiber.yield(); 876 } 877 878 /** 879 * Returns an appropriate ThreadInfo instance. 880 * 881 * Returns a ThreadInfo instance specific to the calling Fiber if the 882 * Fiber was created by this dispatcher, otherwise it returns 883 * ThreadInfo.thisInfo. 884 */ 885 @property ref ThreadInfo thisInfo() nothrow 886 { 887 auto f = cast(InfoFiber) Fiber.getThis(); 888 889 if (f !is null) 890 return f.info; 891 892 auto t = cast(InfoThread)Thread.getThis(); 893 894 if (t !is null) 895 return t.info; 896 897 return ThreadInfo.thisInfo; 898 } 899 900 protected: 901 /** 902 * Creates a new Fiber which calls the given delegate. 903 * 904 * Params: 905 * op = The delegate the fiber should call 906 */ 907 void create(void delegate() op) nothrow 908 { 909 void wrap() 910 { 911 scope (exit) 912 { 913 thisInfo.cleanup(); 914 } 915 op(); 916 } 917 918 m_fibers ~= new InfoFiber(&wrap); 919 } 920 921 /** 922 * Fiber which embeds a ThreadInfo 923 */ 924 static class InfoFiber : Fiber 925 { 926 ThreadInfo info; 927 928 /// Semaphore reference that this Fiber is blocked on 929 FiberBinarySemaphore sem; 930 931 this(void delegate() op, size_t sz = 512 * 1024) nothrow 932 { 933 super(op, sz); 934 } 935 } 936 937 protected class FiberBinarySemaphore 938 { 939 940 /*********************************************************************** 941 942 Associate `FiberBinarySemaphore` with the running `Fiber` 943 944 `FiberScheduler` will check to see if the `Fiber` is blocking on a 945 `FiberBinarySemaphore` to avoid rescheduling it unnecessarily 946 947 ***********************************************************************/ 948 949 private void registerToInfoFiber(InfoFiber info_fiber = cast(InfoFiber) Fiber.getThis()) nothrow 950 { 951 assert(info_fiber !is null, "This Fiber does not belong to FiberScheduler"); 952 assert(info_fiber.sem is null, "This Fiber already has a registered FiberBinarySemaphore"); 953 info_fiber.sem = this; 954 955 } 956 957 void wait() nothrow 958 { 959 this.registerToInfoFiber(); 960 FiberScheduler.yield(); 961 scope (exit) this.notified = false; 962 } 963 964 bool wait(Duration period) nothrow 965 { 966 this.registerToInfoFiber(); 967 this.limit = MonoTime.currTime + period; 968 FiberScheduler.yield(); 969 970 scope (exit) 971 { 972 this.limit = MonoTime.init; 973 this.notified = false; 974 } 975 return this.notified; 976 } 977 978 void notify() nothrow 979 { 980 this.notified = true; 981 FiberScheduler.yield(); 982 } 983 984 /*********************************************************************** 985 986 Query if `FiberBinarySemaphore` should still block 987 988 FiberBinarySemaphore will block the Fiber until it is notified or 989 the specified timeout is reached. 990 991 ***********************************************************************/ 992 993 bool shouldBlock() nothrow 994 { 995 bool timed_out = (limit != MonoTime.init 996 && MonoTime.currTime >= limit); 997 998 return !timed_out && !notified; 999 } 1000 1001 private: 1002 /// Time limit that will eventually unblock the caller if a timeout is specified 1003 MonoTime limit = MonoTime.init; 1004 1005 /// State of the semaphore 1006 bool notified; 1007 } 1008 1009 private: 1010 1011 /*********************************************************************** 1012 1013 Start the scheduling loop 1014 1015 Param: 1016 pos = m_fibers index of the Fiber to execute first 1017 1018 ***********************************************************************/ 1019 1020 void dispatch(size_t pos = 0) 1021 { 1022 import std.algorithm.mutation : remove; 1023 1024 assert(pos < m_fibers.length); 1025 while (m_fibers.length > 0) 1026 { 1027 // Is Fiber waiting on a FiberBinarySemaphore? 1028 if (auto sem = m_fibers[pos].sem) 1029 { 1030 // Is condition met? 1031 // TRUE: Clear the sem and schedule the fiber 1032 // FALSE: Skip it 1033 if (sem.shouldBlock()) 1034 { 1035 if (pos++ >= m_fibers.length - 1) 1036 pos = 0; 1037 continue; 1038 } 1039 else 1040 { 1041 m_fibers[pos].sem = null; 1042 } 1043 } 1044 1045 auto t = m_fibers[pos].call(Fiber.Rethrow.no); 1046 if (t !is null) 1047 { 1048 throw t; 1049 } 1050 if (m_fibers[pos].state == Fiber.State.TERM) 1051 { 1052 if (pos >= (m_fibers = remove(m_fibers, pos)).length) 1053 pos = 0; 1054 } 1055 else if (pos++ >= m_fibers.length - 1) 1056 { 1057 pos = 0; 1058 } 1059 } 1060 } 1061 1062 private: 1063 /// List of `InfoFiber`s currently in the system 1064 InfoFiber[] m_fibers; 1065 } 1066 1067 /// Ensure argument to `start` is run first 1068 unittest 1069 { 1070 { 1071 scope sched = new FiberScheduler(); 1072 bool startHasRun; 1073 sched.spawn(() => assert(startHasRun)); 1074 sched.start(() { startHasRun = true; }); 1075 } 1076 { 1077 scope sched = new FiberScheduler(); 1078 bool startHasRun; 1079 sched.schedule(() => assert(startHasRun)); 1080 sched.start(() { startHasRun = true; }); 1081 } 1082 } 1083 1084 /* 1085 * A MessageBox is a message queue for one thread. Other threads may send 1086 * messages to this owner by calling put(), and the owner receives them by 1087 * calling get(). The put() call is therefore effectively shared and the 1088 * get() call is effectively local. setMaxMsgs may be used by any thread 1089 * to limit the size of the message queue. 1090 */ 1091 package class MessageBox 1092 { 1093 this() @safe nothrow 1094 { 1095 m_lock = new Mutex; 1096 m_closed = false; 1097 1098 m_putMsg = new Condition(m_lock); 1099 } 1100 1101 /// 1102 final @property bool isClosed() @safe @nogc pure 1103 { 1104 synchronized (m_lock) 1105 { 1106 return m_closed; 1107 } 1108 } 1109 1110 /* 1111 * If maxMsgs is not set, the message is added to the queue and the 1112 * owner is notified. If the queue is full, onCrowdingDoThis is called. 1113 * If the routine returns true, this call will block until 1114 * the owner has made space available in the queue. If it returns 1115 * false, this call will abort. 1116 * 1117 * Params: 1118 * msg = The message to put in the queue. 1119 * 1120 * Returns: 1121 * `false` if the message box is closed, `true` otherwise 1122 */ 1123 final bool put (ref Message msg) 1124 { 1125 synchronized (m_lock) 1126 { 1127 if (m_closed) 1128 return false; 1129 m_sharedBox.put(msg); 1130 m_putMsg.notify(); 1131 } 1132 return true; 1133 } 1134 1135 /* 1136 * Matches ops against each message in turn until a match is found. 1137 * 1138 * Params: 1139 * ops = The operations to match. Each may return a bool to indicate 1140 * whether a message with a matching type is truly a match. 1141 * 1142 * Returns: 1143 * true if a message was retrieved and false if not (such as if a 1144 * timeout occurred). 1145 */ 1146 bool getUntimed(Ops...)(scope Ops ops) 1147 { 1148 return this.get(Duration.init, ops); 1149 } 1150 1151 bool get(Ops...)(Duration period, scope Ops ops) 1152 { 1153 immutable timedWait = period !is Duration.init; 1154 MonoTime limit = timedWait ? MonoTime.currTime + period : MonoTime.init; 1155 1156 bool onStandardMsg(ref Message msg) 1157 { 1158 foreach (i, t; Ops) 1159 { 1160 alias Args = Parameters!(t); 1161 auto op = ops[i]; 1162 1163 if (msg.convertsTo!(Args)) 1164 { 1165 static if (is(ReturnType!(t) == bool)) 1166 { 1167 return msg.map(op); 1168 } 1169 else 1170 { 1171 msg.map(op); 1172 return true; 1173 } 1174 } 1175 } 1176 return false; 1177 } 1178 1179 bool scan(ref ListT list) 1180 { 1181 for (auto range = list[]; !range.empty;) 1182 { 1183 // Only the message handler will throw, so if this occurs 1184 // we can be certain that the message was handled. 1185 scope (failure) 1186 list.removeAt(range); 1187 1188 if (onStandardMsg(range.front)) 1189 { 1190 list.removeAt(range); 1191 return true; 1192 } 1193 range.popFront(); 1194 continue; 1195 } 1196 return false; 1197 } 1198 1199 while (true) 1200 { 1201 ListT arrived; 1202 1203 if (scan(m_localBox)) 1204 { 1205 return true; 1206 } 1207 FiberScheduler.yield(); 1208 synchronized (m_lock) 1209 { 1210 while (m_sharedBox.empty) 1211 { 1212 if (timedWait) 1213 { 1214 if (period <= Duration.zero || !m_putMsg.wait(period)) 1215 return false; 1216 } 1217 else 1218 { 1219 m_putMsg.wait(); 1220 } 1221 } 1222 arrived.put(m_sharedBox); 1223 } 1224 scope (exit) m_localBox.put(arrived); 1225 if (scan(arrived)) 1226 return true; 1227 1228 if (timedWait) 1229 period = limit - MonoTime.currTime; 1230 } 1231 } 1232 1233 /* 1234 * Called on thread termination. 1235 * 1236 * This routine clears out message queues and sets a flag to reject 1237 * any future messages. 1238 */ 1239 final void close() 1240 { 1241 synchronized (m_lock) 1242 m_closed = true; 1243 m_localBox.clear(); 1244 } 1245 1246 private: 1247 1248 alias ListT = List!(Message); 1249 1250 ListT m_localBox; 1251 1252 Mutex m_lock; 1253 Condition m_putMsg; 1254 ListT m_sharedBox; 1255 bool m_closed; 1256 } 1257 1258 1259 /// 1260 package struct List (T) 1261 { 1262 struct Range 1263 { 1264 import std.exception : enforce; 1265 1266 @property bool empty() const 1267 { 1268 return !m_prev.next; 1269 } 1270 1271 @property ref T front() 1272 { 1273 enforce(m_prev.next, "invalid list node"); 1274 return m_prev.next.val; 1275 } 1276 1277 @property void front(T val) 1278 { 1279 enforce(m_prev.next, "invalid list node"); 1280 m_prev.next.val = val; 1281 } 1282 1283 void popFront() 1284 { 1285 enforce(m_prev.next, "invalid list node"); 1286 m_prev = m_prev.next; 1287 } 1288 1289 private this(Node* p) 1290 { 1291 m_prev = p; 1292 } 1293 1294 private Node* m_prev; 1295 } 1296 1297 void put(T val) 1298 { 1299 put(newNode(val)); 1300 } 1301 1302 void put(ref List!(T) rhs) 1303 { 1304 if (!rhs.empty) 1305 { 1306 put(rhs.m_first); 1307 while (m_last.next !is null) 1308 { 1309 m_last = m_last.next; 1310 m_count++; 1311 } 1312 rhs.m_first = null; 1313 rhs.m_last = null; 1314 rhs.m_count = 0; 1315 } 1316 } 1317 1318 Range opSlice() 1319 { 1320 return Range(cast(Node*)&m_first); 1321 } 1322 1323 void removeAt(Range r) 1324 { 1325 import std.exception : enforce; 1326 1327 assert(m_count, "Can not remove from empty Range"); 1328 Node* n = r.m_prev; 1329 enforce(n && n.next, "attempting to remove invalid list node"); 1330 1331 if (m_last is m_first) 1332 m_last = null; 1333 else if (m_last is n.next) 1334 m_last = n; // nocoverage 1335 Node* to_free = n.next; 1336 n.next = n.next.next; 1337 freeNode(to_free); 1338 m_count--; 1339 } 1340 1341 @property size_t length() 1342 { 1343 return m_count; 1344 } 1345 1346 void clear() 1347 { 1348 m_first = m_last = null; 1349 m_count = 0; 1350 } 1351 1352 @property bool empty() 1353 { 1354 return m_first is null; 1355 } 1356 1357 private: 1358 struct Node 1359 { 1360 Node* next; 1361 T val; 1362 1363 this(T v) 1364 { 1365 val = v; 1366 } 1367 } 1368 1369 static shared struct SpinLock 1370 { 1371 void lock() { while (!cas(&locked, false, true)) { Thread.yield(); } } 1372 void unlock() { atomicStore!(MemoryOrder.rel)(locked, false); } 1373 bool locked; 1374 } 1375 1376 static shared SpinLock sm_lock; 1377 static shared Node* sm_head; 1378 1379 Node* newNode(T v) 1380 { 1381 Node* n; 1382 { 1383 sm_lock.lock(); 1384 scope (exit) sm_lock.unlock(); 1385 1386 if (sm_head) 1387 { 1388 n = cast(Node*) sm_head; 1389 sm_head = sm_head.next; 1390 } 1391 } 1392 if (n) 1393 { 1394 import std.conv : emplace; 1395 emplace!Node(n, v); 1396 } 1397 else 1398 { 1399 n = new Node(v); 1400 } 1401 return n; 1402 } 1403 1404 void freeNode(Node* n) 1405 { 1406 // destroy val to free any owned GC memory 1407 destroy(n.val); 1408 1409 sm_lock.lock(); 1410 scope (exit) sm_lock.unlock(); 1411 1412 auto sn = cast(shared(Node)*) n; 1413 sn.next = sm_head; 1414 sm_head = sn; 1415 } 1416 1417 void put(Node* n) 1418 { 1419 m_count++; 1420 if (!empty) 1421 { 1422 m_last.next = n; 1423 m_last = n; 1424 return; 1425 } 1426 m_first = n; 1427 m_last = n; 1428 } 1429 1430 Node* m_first; 1431 Node* m_last; 1432 size_t m_count; 1433 } 1434 1435 // test ability to send shared arrays 1436 @system unittest 1437 { 1438 static shared int[] x = new shared(int)[1]; 1439 auto tid = spawn((Tid self, Tid caller) { 1440 auto arr = self.receiveOnly!(shared(int)[]); 1441 arr[0] = 5; 1442 caller.send(true); 1443 }, thisTid); 1444 tid.send(x); 1445 auto self = thisTid(); 1446 self.receiveOnly!(bool); 1447 assert(x[0] == 5); 1448 }