1 /******************************************************************************* 2 3 Provides utilities to mock a network in unittests 4 5 This module is based on the idea that D `interface`s can be used 6 to represent a server's API, and that D `class` inheriting this `interface` 7 are used to define the server's business code, 8 abstracting away the communication layer. 9 10 For example, a server that exposes an API to concatenate two strings would 11 define the following code: 12 --- 13 interface API { public string concat (string a, string b); } 14 class Server : API 15 { 16 public override string concat (string a, string b) 17 { 18 return a ~ b; 19 } 20 } 21 --- 22 23 Then one can use "generators" to define how multiple process communicate 24 together. One such generator, that pioneered this design is `vibe.web.rest`, 25 which allows to expose such servers as REST APIs. 26 27 `localrest` is another generator, which uses message passing and threads 28 to create a local "network". 29 The motivation was to create a testing library that could be used to 30 model a network at a much cheaper cost than spawning processes 31 (and containers) would be, when doing integration tests. 32 33 Control_Interface: 34 When instantiating a `RemoteAPI`, one has the ability to call foreign 35 implementations through auto-generated `override`s of the `interface`. 36 In addition to that, as this library is intended for testing, 37 a few extra functionalities are offered under a control interface, 38 accessible under the `ctrl` namespace in the instance. 39 The control interface allows to make the node unresponsive to one or all 40 methods, for some defined time or until unblocked, as well as trigger 41 shutdowns or restart. See the methods for more details. 42 The `withTimeout` control method can be used to spawn a scoped copy 43 of the RemoteAPI with a custom configured timeout. The user-provided 44 delegate will be called with this scoped copy that uses the new timeout. 45 46 Shutdown: 47 The control interface has a shutdown method that can be used to terminate 48 a node gracefully. When the shutdown request is handled by the node, 49 the event loop will exit and the thread will terminate. While the destructor 50 of the node will be called, it might not usable for some actions, for example 51 because D destructors may not allocate GC memory, or one may want 52 to perform some test-specific operations, such a logging some data in case of failure. 53 Therefore, you may provide a shutdown routine in the call to `shutdown`. 54 It must accept a single argument of the interface type, and will be called 55 with the implementation object just before the node is destroyed. 56 If this routine throws, LocalRest will log the error in the console and 57 proceed with destroying the stack-allocated node. 58 Note that control requests are asynchronous, hence requests from the node 59 might be processed / send by the node until the request is actually processed. 60 There is also a `restart` method which accepts the same callback argument. 61 62 Event_Loop: 63 Server process usually needs to perform some action in an asynchronous way. 64 Additionally, some actions needs to be completed at a semi-regular interval, 65 for example based on a timer. 66 For those use cases, a node should call `runTask` or `sleep`, respectively. 67 Note that this has the same name (and purpose) as Vibe.d's core primitives. 68 Users should only ever call Vibe's `runTask` / `sleep` with `vibe.web.rest`, 69 or only call LocalRest's `runTask` / `sleep` with `RemoteAPI`. 70 71 Implementation: 72 In order for tests to simulate an asynchronous system accurately, 73 multiple nodes need to be able to run concurrently and asynchronously. 74 75 There are two common solutions to this, to use either fibers or threads. 76 Fibers have the advantage of being simpler to implement and predictable. 77 Threads have the advantage of more accurately describing an asynchronous 78 system and thus have the ability to uncover more issues. 79 80 When spawning a node, a thread is spawned, a node is instantiated with 81 the provided arguments, and an event loop waits for messages sent 82 to the Tid. Messages consist of the sender's Tid, the mangled name 83 of the function to call (to support overloading) and the arguments, 84 serialized as a JSON string. 85 86 Note: 87 While this module's original motivation was to test REST nodes, 88 the only dependency to Vibe.d is actually to it's JSON module, 89 as Vibe.d is the only available JSON module known to the author 90 to provide an interface to deserialize composite types. 91 This dependency is however not enforced by the dub project file, 92 as users can provide their own serializer (see `geod24.Serialization`). 93 If the default parameter for serialization is never used, 94 one's project need not depend on `vibe-d:data`. 95 96 Author: Mathias 'Geod24' Lang 97 License: MIT (See LICENSE.txt) 98 Copyright: Copyright (c) 2018-2019 Mathias Lang. All rights reserved. 99 100 *******************************************************************************/ 101 102 module geod24.LocalRest; 103 104 static import C = geod24.concurrency; 105 import geod24.Serialization; 106 import std.datetime.systime : Clock, SysTime; 107 import std.format; 108 import std.meta : AliasSeq; 109 import std.traits : fullyQualifiedName, Parameters, ReturnType; 110 111 import core.thread; 112 import core.time; 113 114 /// Request / Response ID 115 private struct ID 116 { 117 /// A node may restart, in which case it will spawn a new request scheduler 118 size_t sched_id; 119 /// In order to support re-entrancy, every request contains an id 120 /// which should be copied in the `Response` 121 /// Initialized to `size_t.max` so not setting it crashes the program 122 size_t id = size_t.max; 123 } 124 125 /// Data sent by the caller 126 private struct Command 127 { 128 /// Tid of the sender thread (cannot be JSON serialized) 129 C.Tid sender; 130 /// ID used for request re-entrancy (See ID definition) 131 ID id; 132 /// Method to call 133 string method; 134 /// Serialized arguments to the method 135 SerializedData args; 136 } 137 138 /// Ask the node to exhibit a certain behavior for a given time 139 private struct TimeCommand 140 { 141 /// For how long our remote node apply this behavior 142 Duration dur; 143 /// Whether or not affected messages should be dropped 144 bool drop = false; 145 } 146 147 /// Ask the node to shut down 148 private struct ShutdownCommand (API) 149 { 150 /// Any callback to call before the Node's destructor is called 151 void function (API) callback; 152 153 /// Whether we're restarting or really shutting down 154 bool restart; 155 } 156 157 /// Filter out requests before they reach a node 158 private struct FilterAPI 159 { 160 /// the mangled symbol name of the function to filter 161 string func_mangleof; 162 163 /// used for debugging 164 string pretty_func; 165 } 166 167 /// Status of a request 168 private enum Status 169 { 170 /// Request failed 171 Failed, 172 173 /// The request failed to to a client error (4xx style error code) 174 ClientFailed, 175 176 /// Request timed-out 177 Timeout, 178 179 /// Request succeeded 180 Success 181 } 182 183 /// Data sent by the callee back to the caller 184 private struct Response 185 { 186 /// Final status of a request (failed, timeout, success, etc) 187 Status status; 188 /// ID used for request re-entrancy (See ID definition) 189 ID id; 190 /// If `status == Status.Success`, the serialized return value. 191 /// Otherwise, it contains `Exception.toString()`. 192 SerializedData data; 193 } 194 195 /// Thrown when the sent request is faulty (e.g. 4xx HTTP error types) 196 public class ClientException : Exception 197 { 198 /// Constructor 199 public this (string msg, 200 string file = __FILE__, int line = __LINE__, Exception next = null) 201 @safe pure nothrow 202 { 203 super(msg, file, line, next); 204 } 205 } 206 207 /// Simple exception unwind the stack when we need to terminate/restart, 208 /// used by `spawn` but kept here as it doesn't depend on template parameters. 209 private final class ExitException : Exception 210 { 211 public bool restart; 212 213 this () @safe pure nothrow @nogc 214 { 215 super("You should never see this exception - please report a bug"); 216 } 217 } 218 219 /// Wrapper for data inside of `spawn`, kept here as it doesn't 220 /// depend on template parameters 221 private struct Variant 222 { 223 pure nothrow @nogc: 224 225 public this (Response res) @trusted 226 { 227 this.res = res; 228 this.tag = 0; 229 } 230 public this (Command cmd) @trusted 231 { 232 this.cmd = cmd; 233 this.tag = 1; 234 } 235 236 union 237 { 238 Response res; 239 Command cmd; 240 } 241 242 public ubyte tag; 243 } 244 245 /// Used by `spawn` for recording current filtering / sleeping setting 246 private struct Control 247 { 248 public FilterAPI filter; // filter specific messages 249 public SysTime sleep_until; // sleep until this time 250 public bool drop; // drop messages if sleeping 251 252 public bool isSleeping() const @safe /* nothrow: Not on Windows (currTime) */ 253 { 254 return this.sleep_until != SysTime.init 255 && Clock.currTime < this.sleep_until; 256 } 257 } 258 259 /// Simple wrapper to deal with tuples 260 /// Vibe.d might emit a pragma(msg) when T.length == 0 261 private struct ArgWrapper (T...) 262 { 263 static if (T.length == 0) 264 size_t dummy; 265 T args; 266 } 267 268 /// Our own little scheduler 269 private final class LocalScheduler : C.FiberScheduler 270 { 271 import core.sync.condition; 272 import core.sync.mutex; 273 274 /// Just a FiberBinarySemaphore with a state 275 private struct Waiting { FiberBinarySemaphore s; bool busy; } 276 277 /// The 'Response' we are currently processing, if any 278 private Response pending; 279 280 /// Request IDs waiting for a response 281 private Waiting[ID] waiting; 282 283 /// scheduler ID 284 private size_t sched_id; 285 286 /// Initialize this scheduler and its unique ID 287 public this () @safe @nogc nothrow 288 { 289 static size_t last_idx; 290 this.sched_id = last_idx++; 291 } 292 293 /// Get the next available request ID 294 public ID getNextResponseId () 295 { 296 static size_t last_idx; 297 return ID(this.sched_id, last_idx++); 298 } 299 300 public Response waitResponse (ID id, Duration duration) nothrow 301 { 302 if (id !in this.waiting) 303 this.waiting[id] = Waiting(new FiberBinarySemaphore, false); 304 305 Waiting* ptr = &this.waiting[id]; 306 if (ptr.busy) 307 assert(0, "Trying to override a pending request"); 308 309 // We yield and wait for an answer 310 ptr.busy = true; 311 312 if (duration == Duration.init) 313 ptr.s.wait(); 314 else if (!ptr.s.wait(duration)) 315 this.pending = Response(Status.Timeout, this.pending.id); 316 317 ptr.busy = false; 318 // After control returns to us, `pending` has been filled 319 scope(exit) this.pending = Response.init; 320 return this.pending; 321 } 322 323 /// Called when a waiting condition was handled and can be safely removed 324 public void remove (ID id) 325 { 326 this.waiting.remove(id); 327 } 328 } 329 330 331 /// We need a scheduler to simulate an event loop and to be re-entrant 332 private LocalScheduler scheduler; 333 334 /// Whether this is the main thread 335 private bool is_main_thread; 336 337 338 /******************************************************************************* 339 340 Provide eventloop-like functionalities 341 342 Since nodes instantiated via this modules are Vibe.d server, 343 they expect the ability to run an asynchronous task , 344 usually provided by `vibe.core.core : runTask`. 345 346 In order for them to properly work, we need to integrate them to our event 347 loop by providing the ability to spawn a task, and wait on some condition, 348 optionally with a timeout. 349 350 The following functions do that. 351 Note that those facilities are not available from the main thread, 352 while is supposed to do tests and doesn't have a scheduler. 353 354 *******************************************************************************/ 355 356 public void runTask (void delegate() dg) nothrow 357 { 358 assert(scheduler !is null, "Cannot call this function from the main thread"); 359 scheduler.spawn(dg); 360 } 361 362 /// Ditto 363 public void sleep (Duration timeout) nothrow 364 { 365 assert(scheduler !is null, "Cannot call this function from the main thread"); 366 scope sem = scheduler..new FiberBinarySemaphore(); 367 sem.wait(timeout); 368 } 369 370 /******************************************************************************* 371 372 Run an asynchronous task after a given time. 373 374 The task will first run after the given `timeout`, and 375 can either repeat or run only once (the default). 376 Works similarly to Vibe.d's `setTimer`. 377 378 Params: 379 timeout = Determines the minimum amount of time that elapses before 380 the timer fires. 381 dg = If non-null, this delegate will be called when the timer fires 382 periodic = Speficies if the timer fires repeatedly or only once 383 384 Returns: 385 A `Timer` instance with the ability to control the timer 386 387 *******************************************************************************/ 388 389 public Timer setTimer (Duration timeout, void delegate() dg, 390 bool periodic = false) nothrow 391 { 392 assert(scheduler !is null, "Cannot call this delegate from the main thread"); 393 assert(dg !is null, "Cannot call this delegate if null"); 394 395 Timer timer = new Timer(timeout, dg, periodic); 396 scheduler.schedule(&timer.run); 397 return timer; 398 } 399 400 /// Simple timer 401 public final class Timer 402 { 403 private Duration timeout; 404 private void delegate () dg; 405 // Whether this timer is repeating 406 private bool periodic; 407 // Whether this timer was stopped 408 private bool stopped; 409 410 public this (Duration timeout, void delegate() dg, bool periodic) @safe nothrow 411 { 412 this.timeout = timeout; 413 this.dg = dg; 414 this.periodic = periodic; 415 this.stopped = false; 416 } 417 418 // Run a delegate after timeout, and until this.periodic is false 419 private void run () 420 { 421 do 422 { 423 sleep(timeout); 424 if (this.stopped) 425 return; 426 dg(); 427 } while (this.periodic); 428 } 429 430 /// Stop the timer. The next time this timer's fiber wakes up 431 /// it will exit the run() function. 432 public void stop () @safe nothrow 433 { 434 this.stopped = true; 435 this.periodic = false; 436 } 437 } 438 439 /******************************************************************************* 440 441 A reference to an alread-instantiated node 442 443 This class serves the same purpose as a `RestInterfaceClient`: 444 it is a client for an already instantiated rest `API` interface. 445 446 In order to instantiate a new server (in a remote thread), use the static 447 `spawn` function. 448 449 Serialization: 450 In order to support custom serialization policy, one can change the 451 `Serializer` parameter. This parameter is expected to be either a 452 template or an aggregate with two static methods, but no explicit 453 limitation is put on the type. 454 See `geod24.Serialization`'s documentation for more informations. 455 456 Params: 457 API = The interface defining the API to implement 458 S = An aggregate which follows the requirement explained above. 459 460 *******************************************************************************/ 461 462 public final class RemoteAPI (API, alias S = VibeJSONSerializer!()) : API 463 { 464 static assert (!serializerInvalidReason!(S).length, serializerInvalidReason!S); 465 466 /*************************************************************************** 467 468 Instantiate a node and start it 469 470 This is usually called from the main thread, which will start all the 471 nodes and then start to process request. 472 In order to have a connected network, no nodes in any thread should have 473 a different reference to the same node. 474 In practice, this means there should only be one `Tid` per "address". 475 476 Note: 477 When the `RemoteAPI` returned by this function is finalized, 478 the child thread will be shut down. 479 480 Params: 481 Impl = Type of the implementation to instantiate 482 args = Arguments to the object's constructor 483 timeout = (optional) timeout to use with requests 484 file = Path to the file that called this function (for diagnostic) 485 line = Line number tied to the `file` parameter 486 487 Returns: 488 A `RemoteAPI` owning the node reference 489 490 ***************************************************************************/ 491 492 public static RemoteAPI spawn (Impl) ( 493 CtorParams!Impl args, Duration timeout = Duration.init, 494 string file = __FILE__, int line = __LINE__) 495 { 496 auto childTid = C.spawn(&spawned!(Impl), file, line, args); 497 return new RemoteAPI(childTid, timeout); 498 } 499 500 /// Helper template to get the constructor's parameters 501 private static template CtorParams (Impl) 502 { 503 static if (is(typeof(Impl.__ctor))) 504 private alias CtorParams = Parameters!(Impl.__ctor); 505 else 506 private alias CtorParams = AliasSeq!(); 507 } 508 509 /*************************************************************************** 510 511 Handler function 512 513 Performs the dispatch from `cmd` to the proper `node` function, 514 provided the function is not filtered. 515 516 Params: 517 cmd = the command to run (contains the method name and the arguments) 518 node = the node to invoke the method on 519 filter = used for filtering API calls (returns default response) 520 521 ***************************************************************************/ 522 523 private static void handleCommand (Command cmd, API node, FilterAPI filter) 524 { 525 switch (cmd.method) 526 { 527 static foreach (member; __traits(allMembers, API)) 528 static foreach (ovrld; __traits(getOverloads, API, member)) 529 { 530 mixin( 531 q{ 532 case `%2$s`: 533 Response res = Response(Status.Failed, cmd.id); 534 535 // Provide informative message in case of filtered method 536 if (cmd.method == filter.func_mangleof) 537 res.data = SerializedData(format("Filtered method '%%s'", filter.pretty_func)); 538 else 539 { 540 auto args = S.deserialize!(ArgWrapper!(Parameters!ovrld))( 541 cmd.args.getS!S); 542 543 try 544 { 545 static if (!is(ReturnType!ovrld == void)) 546 res.data = SerializedData(S.serialize(node.%1$s(args.args))); 547 else 548 node.%1$s(args.args); 549 res.status = Status.Success; 550 } 551 catch (Exception e) 552 { 553 res.status = Status.ClientFailed; 554 res.data = SerializedData(e.toString()); 555 } 556 } 557 558 C.trySend(cmd.sender, res); 559 return; 560 }.format(member, ovrld.mangleof)); 561 } 562 default: 563 C.trySend(cmd.sender, 564 Response(Status.ClientFailed, cmd.id, 565 SerializedData("Method not found"))); 566 } 567 } 568 569 /*************************************************************************** 570 571 Main dispatch function 572 573 This function receive string-serialized messages from the calling thread, 574 which is a struct with the sender's Tid, the method's mangleof, 575 and the method's arguments as a tuple, serialized to a JSON string. 576 577 `geod24.concurrency.receive` is not `@safe`, so neither is this. 578 579 Params: 580 Implementation = Type of the implementation to instantiate 581 self = The channel on which to "listen" to receive new "connections" 582 file = Path to the file that spawned this node 583 line = Line number in the `file` that spawned this node 584 cargs = Arguments to `Implementation`'s constructor 585 586 ***************************************************************************/ 587 588 private static void spawned (Implementation) ( 589 C.Tid self, string file, int line, CtorParams!Implementation cargs) 590 nothrow 591 { 592 import std.algorithm : each; 593 import std.range; 594 595 scope exc = new ExitException(); 596 597 void runNode () 598 { 599 scope node = new Implementation(cargs); 600 scheduler = new LocalScheduler; 601 602 // Control the node behavior 603 Control control; 604 605 // we need to keep track of messages which were ignored when 606 // node.sleep() was used, and then handle each message in sequence. 607 Variant[] await_msgs; 608 609 void handle (T)(T arg) 610 { 611 static if (is(T == Command)) 612 { 613 scheduler.spawn(() => handleCommand(arg, node, control.filter)); 614 } 615 else static if (is(T == Response)) 616 { 617 // response for a previous LocalScheduler instance 618 if (arg.id.sched_id != scheduler.sched_id) 619 return; 620 621 scheduler.pending = arg; 622 scheduler.waiting[arg.id].s.notify(); 623 scheduler.remove(arg.id); 624 } 625 else static assert(0, "Unhandled type: " ~ T.stringof); 626 } 627 628 scheduler.start(() { 629 while (1) 630 { 631 C.receiveTimeout(self, 10.msecs, 632 (ShutdownCommand!API e) 633 { 634 if (e.callback !is null) 635 e.callback(node); 636 exc.restart = e.restart; 637 throw exc; 638 }, 639 (TimeCommand s) { 640 control.sleep_until = Clock.currTime + s.dur; 641 control.drop = s.drop; 642 }, 643 (FilterAPI filter_api) { 644 control.filter = filter_api; 645 }, 646 (Response res) { 647 if (!control.isSleeping()) 648 handle(res); 649 else if (!control.drop) 650 await_msgs ~= Variant(res); 651 }, 652 (Command cmd) 653 { 654 if (!control.isSleeping()) 655 handle(cmd); 656 else if (!control.drop) 657 await_msgs ~= Variant(cmd); 658 }); 659 660 // now handle any leftover messages after any sleep() call 661 if (!control.isSleeping()) 662 { 663 await_msgs.each!(msg => msg.tag == 0 ? handle(msg.res) : handle(msg.cmd)); 664 await_msgs.length = 0; 665 assumeSafeAppend(await_msgs); 666 } 667 } 668 }); 669 } 670 671 try 672 { 673 while (true) 674 { 675 try runNode(); 676 // We use this exception to exit the event loop 677 catch (ExitException e) 678 { 679 if (!e.restart) 680 break; 681 } 682 } 683 } 684 catch (Throwable t) 685 { 686 import core.stdc.stdio, std.stdio; 687 printf("#### FATAL ERROR: %.*s\n", cast(int) t.msg.length, t.msg.ptr); 688 printf("This node was started at %.*s:%d\n", 689 cast(int) file.length, file.ptr, line); 690 printf("This most likely means that the node crashed due to an uncaught exception\n"); 691 printf("If not, please file a bug at https://github.com/Geod24/localrest/\n"); 692 693 try writeln("Full error: ", t); 694 catch (Exception e) { /* Nothing more we can do at this point */ } 695 } 696 } 697 698 /// Where to send message to 699 private C.Tid childTid; 700 701 /// Timeout to use when issuing requests 702 private const Duration timeout; 703 704 /*************************************************************************** 705 706 Create an instante of a client 707 708 This connects to an already instantiated node. 709 In order to instantiate a node, see the static `spawn` function. 710 711 Params: 712 tid = `geod24.concurrency.Tid` of the node. 713 This can usually be obtained by `geod24.concurrency.locate`. 714 timeout = any timeout to use 715 716 ***************************************************************************/ 717 718 public this (C.Tid tid, Duration timeout = Duration.init) 719 @safe @nogc pure nothrow 720 { 721 this.childTid = tid; 722 this.timeout = timeout; 723 } 724 725 /*************************************************************************** 726 727 Introduce a namespace to avoid name clashes 728 729 The only way we have a name conflict is if someone exposes `ctrl`, 730 in which case they will be served an error along the following line: 731 LocalRest.d(...): Error: function `RemoteAPI!(...).ctrl` conflicts 732 with mixin RemoteAPI!(...).ControlInterface!() at LocalRest.d(...) 733 734 ***************************************************************************/ 735 736 public mixin ControlInterface!() ctrl; 737 738 /// Ditto 739 private mixin template ControlInterface () 740 { 741 /*********************************************************************** 742 743 Returns the `Tid` this `RemoteAPI` wraps 744 745 This can be useful for calling `geod24.concurrency.register` or similar. 746 Note that the `Tid` should not be used directly, as our event loop, 747 would error out on an unknown message. 748 749 ***********************************************************************/ 750 751 public C.Tid tid () @nogc pure nothrow 752 { 753 return this.childTid; 754 } 755 756 /*********************************************************************** 757 758 Send an async message to the thread to immediately shut down. 759 760 Params: 761 callback = if not null, the callback to call in the Node's 762 thread before the Node is destroyed. Can be used 763 for cleanup / logging routines. 764 765 ***********************************************************************/ 766 767 public void shutdown (void function (API) callback = null) 768 @trusted 769 { 770 C.send(this.childTid, ShutdownCommand!API(callback, false)); 771 } 772 773 /*********************************************************************** 774 775 Send an async message to the thread to immediately restart. 776 777 Note that further non-control messages to the node will block until 778 the node is back "online". 779 780 Params: 781 callback = if not null, the callback to call in the Node's 782 thread before the Node is destroyed, but before 783 it is restarted. Can be used for cleanup or logging. 784 785 ***********************************************************************/ 786 787 public void restart (void function (API) callback = null) 788 @trusted 789 { 790 C.send(this.childTid, ShutdownCommand!API(callback, true)); 791 } 792 793 /*********************************************************************** 794 795 Make the remote node sleep for `Duration` 796 797 The remote node will call `Thread.sleep`, becoming completely 798 unresponsive, potentially having multiple tasks hanging. 799 This is useful to simulate a delay or a network outage. 800 801 Params: 802 delay = Duration the node will sleep for 803 dropMessages = Whether to process the pending requests when the 804 node come back online (the default), or to drop 805 pending traffic 806 807 ***********************************************************************/ 808 809 public void sleep (Duration d, bool dropMessages = false) @trusted 810 { 811 C.send(this.childTid, TimeCommand(d, dropMessages)); 812 } 813 814 /*********************************************************************** 815 816 Filter any requests issued to the provided method. 817 818 Calling the API endpoint will throw an exception, 819 therefore the request will fail. 820 821 Use via: 822 823 ---- 824 interface API { void call(); } 825 class C : API { void call() { } } 826 auto obj = new RemoteAPI!API(...); 827 obj.filter!(API.call); 828 ---- 829 830 To match a specific overload of a method, specify the 831 parameters to match against in the call. For example: 832 833 ---- 834 interface API { void call(int); void call(int, float); } 835 class C : API { void call(int) {} void call(int, float) {} } 836 auto obj = new RemoteAPI!API(...); 837 obj.filter!(API.call, int, float); // only filters the second overload 838 ---- 839 840 Params: 841 method = the API method for which to filter out requests 842 OverloadParams = (optional) the parameters to match against 843 to select an overload. Note that if the method has no other 844 overloads, then even if that method takes parameters and 845 OverloadParams is empty, it will match that method 846 out of convenience. 847 848 ***********************************************************************/ 849 850 public void filter (alias method, OverloadParams...) () @trusted 851 { 852 enum method_name = __traits(identifier, method); 853 854 // return the mangled name of the matching overload 855 template getBestMatch (T...) 856 { 857 static if (is(Parameters!(T[0]) == OverloadParams)) 858 { 859 enum getBestMatch = T[0].mangleof; 860 } 861 else static if (T.length > 0) 862 { 863 enum getBestMatch = getBestMatch!(T[1 .. $]); 864 } 865 else 866 { 867 static assert(0, 868 format("Couldn't select best overload of '%s' for " ~ 869 "parameter types: %s", 870 method_name, OverloadParams.stringof)); 871 } 872 } 873 874 // ensure it's used with API.method, *not* RemoteAPI.method which 875 // is an override of API.method. Otherwise mangling won't match! 876 // special-case: no other overloads, and parameter list is empty: 877 // just select that one API method 878 alias Overloads = __traits(getOverloads, API, method_name); 879 static if (Overloads.length == 1 && OverloadParams.length == 0) 880 { 881 immutable pretty = method_name ~ Parameters!(Overloads[0]).stringof; 882 enum mangled = Overloads[0].mangleof; 883 } 884 else 885 { 886 immutable pretty = format("%s%s", method_name, OverloadParams.stringof); 887 enum mangled = getBestMatch!Overloads; 888 } 889 890 C.send(this.childTid, FilterAPI(mangled, pretty)); 891 } 892 893 894 /*********************************************************************** 895 896 Clear out any filtering set by a call to filter() 897 898 ***********************************************************************/ 899 900 public void clearFilter () @trusted 901 { 902 C.send(this.childTid, FilterAPI("")); 903 } 904 905 /*********************************************************************** 906 907 Call the provided delegate with a custom timeout 908 909 This allow to perform requests on a client with a different timeout, 910 usually to allow some requests (e.g. initialization calls) to have longer 911 timeout, or no timeout at all, or to put a timeout on an otherwise 912 timeout-less client (e.g. when calling the actual test which could fail). 913 914 To disable timeout, pass the special value `Duration.zero` 915 (or `0.seconds`, `0.msecs`, etc...). 916 917 Params: 918 timeout = the new timeout to use 919 dg = the delegate to call with the new scoped RemoteAPI copy 920 921 ***********************************************************************/ 922 923 public void withTimeout (Dg) (Duration timeout, scope Dg dg) 924 { 925 scope api = new RemoteAPI(this.childTid, timeout); 926 static assert(is(typeof({ dg(api); })), 927 "Provided argument of type `" ~ Dg.stringof ~ 928 "` is not callable with argument type `scope " ~ 929 fullyQualifiedName!API ~ "`"); 930 dg(api); 931 } 932 } 933 934 // Vibe.d mandates that method must be @safe 935 @safe: 936 937 /*************************************************************************** 938 939 Generate the API `override` which forward to the actual object 940 941 ***************************************************************************/ 942 943 static foreach (member; __traits(allMembers, API)) 944 static foreach (ovrld; __traits(getOverloads, API, member)) 945 { 946 mixin(q{ 947 override ReturnType!(ovrld) } ~ member ~ q{ (Parameters!ovrld params) 948 { 949 // we are in the main thread 950 if (scheduler is null) 951 { 952 scheduler = new LocalScheduler; 953 is_main_thread = true; 954 } 955 956 // `geod24.concurrency.send/receive[Only]` is not `@safe` but 957 // this overload needs to be 958 auto res = () @trusted { 959 auto serialized = S.serialize(ArgWrapper!(Parameters!ovrld)(params)); 960 961 auto command = Command(C.thisTid(), scheduler.getNextResponseId(), ovrld.mangleof, 962 SerializedData(serialized)); 963 964 // If the node already shut down, its MessageBox will be 965 // closed. Detect it and notify the user. 966 // Note that it might be expected that the remote died. 967 if (!C.trySend(this.childTid, command)) 968 throw new Exception("Connection with peer closed"); 969 970 // for the main thread, we run the "event loop" until 971 // the request we're interested in receives a response. 972 if (is_main_thread) 973 { 974 bool terminated = false; 975 runTask(() { 976 while (!terminated) 977 { 978 C.receiveTimeout(C.thisTid(), 10.msecs, 979 (Response res) { 980 scheduler.pending = res; 981 scheduler.waiting[res.id].s.notify(); 982 }); 983 984 scheduler.yield(); 985 } 986 }); 987 988 Response res; 989 scheduler.start(() { 990 res = scheduler.waitResponse(command.id, this.timeout); 991 terminated = true; 992 }); 993 return res; 994 } 995 else 996 { 997 return scheduler.waitResponse(command.id, this.timeout); 998 } 999 }(); 1000 1001 if (res.status == Status.Failed) 1002 throw new Exception(res.data.get!string); 1003 1004 if (res.status == Status.ClientFailed) 1005 throw new ClientException( 1006 format("Request to %s couldn't be processed : %s", 1007 __PRETTY_FUNCTION__, res.data.get!string)); 1008 1009 if (res.status == Status.Timeout) 1010 throw new Exception("Request timed-out"); 1011 1012 static if (!is(ReturnType!(ovrld) == void)) 1013 return S.deserialize!(typeof(return))(res.data.getS!S()); 1014 } 1015 }); 1016 } 1017 } 1018 1019 /// Simple usage example 1020 unittest 1021 { 1022 static interface API 1023 { 1024 @safe: 1025 public @property ulong pubkey (); 1026 public string getValue (ulong idx); 1027 public ubyte[32] getQuorumSet (); 1028 public string recv (string data); 1029 } 1030 1031 static class MockAPI : API 1032 { 1033 @safe: 1034 public override @property ulong pubkey () 1035 { return 42; } 1036 public override string getValue (ulong idx) 1037 { assert(0); } 1038 public override ubyte[32] getQuorumSet () 1039 { assert(0); } 1040 public override string recv (string data) 1041 { assert(0); } 1042 } 1043 1044 scope test = RemoteAPI!API.spawn!MockAPI(); 1045 assert(test.pubkey() == 42); 1046 test.ctrl.shutdown(); 1047 thread_joinAll(); 1048 } 1049 1050 /// Example where a shutdown() routine must be called on a node before 1051 /// its destructor is called 1052 unittest 1053 { 1054 __gshared bool dtor_called; 1055 __gshared bool shutdown_called; 1056 __gshared bool onDestroy_called; 1057 1058 static interface API 1059 { 1060 @safe: 1061 public @property ulong pubkey (); 1062 } 1063 1064 static class MockAPI : API 1065 { 1066 public override @property ulong pubkey () @safe 1067 { return 42; } 1068 public void shutdown () { shutdown_called = true; } 1069 ~this () { dtor_called = true; } 1070 } 1071 1072 static void onDestroy (API node) 1073 { 1074 assert(!dtor_called); 1075 auto mock = cast(MockAPI)node; 1076 assert(mock !is null); 1077 mock.shutdown(); 1078 onDestroy_called = true; 1079 } 1080 1081 scope test = RemoteAPI!API.spawn!MockAPI(); 1082 assert(test.pubkey() == 42); 1083 test.ctrl.shutdown(&onDestroy); 1084 thread_joinAll(); 1085 // ctr.shutdown call is asynchronous 1086 assert(dtor_called); 1087 assert(onDestroy_called); 1088 assert(shutdown_called); 1089 } 1090 1091 /// In a real world usage, users will most likely need to use the registry 1092 unittest 1093 { 1094 import std.conv; 1095 static import geod24.concurrency; 1096 import geod24.Registry; 1097 1098 __gshared Registry registry; 1099 registry.initialize(); 1100 1101 static interface API 1102 { 1103 @safe: 1104 public @property ulong pubkey (); 1105 public string getValue (ulong idx); 1106 public string recv (string data); 1107 public string recv (ulong index, string data); 1108 1109 public string last (); 1110 } 1111 1112 static class Node : API 1113 { 1114 @safe: 1115 public this (bool isByzantine) { this.isByzantine = isByzantine; } 1116 public override @property ulong pubkey () 1117 { lastCall = `pubkey`; return this.isByzantine ? 0 : 42; } 1118 public override string getValue (ulong idx) 1119 { lastCall = `getValue`; return null; } 1120 public override string recv (string data) 1121 { lastCall = `recv@1`; return null; } 1122 public override string recv (ulong index, string data) 1123 { lastCall = `recv@2`; return null; } 1124 1125 public override string last () { return this.lastCall; } 1126 1127 private bool isByzantine; 1128 private string lastCall; 1129 } 1130 1131 static RemoteAPI!API factory (string type, ulong hash) 1132 { 1133 const name = hash.to!string; 1134 auto tid = registry.locate(name); 1135 if (tid != tid.init) 1136 return new RemoteAPI!API(tid); 1137 1138 switch (type) 1139 { 1140 case "normal": 1141 auto ret = RemoteAPI!API.spawn!Node(false); 1142 registry.register(name, ret.tid()); 1143 return ret; 1144 case "byzantine": 1145 auto ret = RemoteAPI!API.spawn!Node(true); 1146 registry.register(name, ret.tid()); 1147 return ret; 1148 default: 1149 assert(0, type); 1150 } 1151 } 1152 1153 auto node1 = factory("normal", 1); 1154 auto node2 = factory("byzantine", 2); 1155 1156 static void testFunc() 1157 { 1158 auto node1 = factory("this does not matter", 1); 1159 auto node2 = factory("neither does this", 2); 1160 assert(node1.pubkey() == 42); 1161 assert(node1.last() == "pubkey"); 1162 assert(node2.pubkey() == 0); 1163 assert(node2.last() == "pubkey"); 1164 1165 node1.recv(42, null); 1166 assert(node1.last() == "recv@2"); 1167 node1.recv(null); 1168 assert(node1.last() == "recv@1"); 1169 assert(node2.last() == "pubkey"); 1170 node1.ctrl.shutdown(); 1171 node2.ctrl.shutdown(); 1172 } 1173 1174 scope thread = new Thread(&testFunc); 1175 thread.start(); 1176 // Make sure our main thread terminates after everyone else 1177 thread_joinAll(); 1178 } 1179 1180 /// This network have different types of nodes in it 1181 unittest 1182 { 1183 import geod24.concurrency; 1184 1185 static interface API 1186 { 1187 @safe: 1188 public @property ulong requests (); 1189 public @property ulong value (); 1190 } 1191 1192 static class MasterNode : API 1193 { 1194 @safe: 1195 public override @property ulong requests() 1196 { 1197 return this.requests_; 1198 } 1199 1200 public override @property ulong value() 1201 { 1202 this.requests_++; 1203 return 42; // Of course 1204 } 1205 1206 private ulong requests_; 1207 } 1208 1209 static class SlaveNode : API 1210 { 1211 @safe: 1212 this(Tid masterTid) 1213 { 1214 this.master = new RemoteAPI!API(masterTid); 1215 } 1216 1217 public override @property ulong requests() 1218 { 1219 return this.requests_; 1220 } 1221 1222 public override @property ulong value() 1223 { 1224 this.requests_++; 1225 return master.value(); 1226 } 1227 1228 private API master; 1229 private ulong requests_; 1230 } 1231 1232 RemoteAPI!API[4] nodes; 1233 auto master = RemoteAPI!API.spawn!MasterNode(); 1234 nodes[0] = master; 1235 nodes[1] = RemoteAPI!API.spawn!SlaveNode(master.tid()); 1236 nodes[2] = RemoteAPI!API.spawn!SlaveNode(master.tid()); 1237 nodes[3] = RemoteAPI!API.spawn!SlaveNode(master.tid()); 1238 1239 foreach (n; nodes) 1240 { 1241 assert(n.requests() == 0); 1242 assert(n.value() == 42); 1243 } 1244 1245 assert(nodes[0].requests() == 4); 1246 1247 foreach (n; nodes[1 .. $]) 1248 { 1249 assert(n.value() == 42); 1250 assert(n.requests() == 2); 1251 } 1252 1253 assert(nodes[0].requests() == 7); 1254 import std.algorithm; 1255 nodes.each!(node => node.ctrl.shutdown()); 1256 thread_joinAll(); 1257 } 1258 1259 /// Support for circular nodes call 1260 unittest 1261 { 1262 static import geod24.concurrency; 1263 1264 __gshared C.Tid[string] tbn; 1265 1266 static interface API 1267 { 1268 @safe: 1269 public ulong call (ulong count, ulong val); 1270 public void setNext (string name); 1271 } 1272 1273 static class Node : API 1274 { 1275 @safe: 1276 public override ulong call (ulong count, ulong val) 1277 { 1278 if (!count) 1279 return val; 1280 return this.next.call(count - 1, val + count); 1281 } 1282 1283 public override void setNext (string name) @trusted 1284 { 1285 this.next = new RemoteAPI!API(tbn[name]); 1286 } 1287 1288 private API next; 1289 } 1290 1291 RemoteAPI!(API)[3] nodes = [ 1292 RemoteAPI!API.spawn!Node(), 1293 RemoteAPI!API.spawn!Node(), 1294 RemoteAPI!API.spawn!Node(), 1295 ]; 1296 1297 foreach (idx, ref api; nodes) 1298 tbn[format("node%d", idx)] = api.tid(); 1299 nodes[0].setNext("node1"); 1300 nodes[1].setNext("node2"); 1301 nodes[2].setNext("node0"); 1302 1303 // 7 level of re-entrancy 1304 assert(210 == nodes[0].call(20, 0)); 1305 1306 import std.algorithm; 1307 nodes.each!(node => node.ctrl.shutdown()); 1308 thread_joinAll(); 1309 } 1310 1311 1312 /// Nodes can start tasks 1313 unittest 1314 { 1315 static import core.thread; 1316 import core.time; 1317 1318 static interface API 1319 { 1320 public void start (); 1321 public ulong getCounter (); 1322 } 1323 1324 static class Node : API 1325 { 1326 public override void start () 1327 { 1328 runTask(&this.task); 1329 } 1330 1331 public override ulong getCounter () 1332 { 1333 scope (exit) this.counter = 0; 1334 return this.counter; 1335 } 1336 1337 private void task () 1338 { 1339 while (true) 1340 { 1341 this.counter++; 1342 sleep(50.msecs); 1343 } 1344 } 1345 1346 private ulong counter; 1347 } 1348 1349 auto node = RemoteAPI!API.spawn!Node(); 1350 assert(node.getCounter() == 0); 1351 node.start(); 1352 assert(node.getCounter() == 1); 1353 assert(node.getCounter() == 0); 1354 core.thread.Thread.sleep(1.seconds); 1355 // It should be 19 but some machines are very slow 1356 // (e.g. Travis Mac testers) so be safe 1357 assert(node.getCounter() >= 9); 1358 assert(node.getCounter() == 0); 1359 node.ctrl.shutdown(); 1360 thread_joinAll(); 1361 } 1362 1363 // Sane name insurance policy 1364 unittest 1365 { 1366 import geod24.concurrency : Tid; 1367 1368 static interface API 1369 { 1370 public ulong tid (); 1371 } 1372 1373 static class Node : API 1374 { 1375 public override ulong tid () { return 42; } 1376 } 1377 1378 auto node = RemoteAPI!API.spawn!Node(); 1379 assert(node.tid == 42); 1380 assert(node.ctrl.tid != Tid.init); 1381 1382 static interface DoesntWork 1383 { 1384 public string ctrl (); 1385 } 1386 static assert(!is(typeof(RemoteAPI!DoesntWork))); 1387 node.ctrl.shutdown(); 1388 thread_joinAll(); 1389 } 1390 1391 // Simulate temporary outage 1392 unittest 1393 { 1394 __gshared C.Tid n1tid; 1395 1396 static interface API 1397 { 1398 public ulong call (); 1399 public void asyncCall (); 1400 } 1401 static class Node : API 1402 { 1403 public this() 1404 { 1405 if (n1tid != C.Tid.init) 1406 this.remote = new RemoteAPI!API(n1tid); 1407 } 1408 1409 public override ulong call () { return ++this.count; } 1410 public override void asyncCall () { runTask(() => cast(void)this.remote.call); } 1411 size_t count; 1412 RemoteAPI!API remote; 1413 } 1414 1415 auto n1 = RemoteAPI!API.spawn!Node(); 1416 n1tid = n1.tid(); 1417 auto n2 = RemoteAPI!API.spawn!Node(); 1418 1419 /// Make sure calls are *relatively* efficient 1420 auto current1 = MonoTime.currTime(); 1421 assert(1 == n1.call()); 1422 assert(1 == n2.call()); 1423 auto current2 = MonoTime.currTime(); 1424 assert(current2 - current1 < 200.msecs); 1425 1426 // Make one of the node sleep 1427 n1.sleep(1.seconds); 1428 // Make sure our main thread is not suspended, 1429 // nor is the second node 1430 assert(2 == n2.call()); 1431 auto current3 = MonoTime.currTime(); 1432 assert(current3 - current2 < 400.msecs); 1433 1434 // Wait for n1 to unblock 1435 assert(2 == n1.call()); 1436 // Check current time >= 1 second 1437 auto current4 = MonoTime.currTime(); 1438 assert(current4 - current2 >= 1.seconds); 1439 1440 // Now drop many messages 1441 n1.sleep(1.seconds, true); 1442 // Github Action runs out of memory with MessageCount == 500 1443 version (Windows) enum MessageCount = 100; 1444 else enum MessageCount = 500; 1445 for (size_t i = 0; i < MessageCount; i++) 1446 n2.asyncCall(); 1447 // Make sure we don't end up blocked forever 1448 n1.sleep(0.seconds, false); 1449 assert(3 == n1.call()); 1450 1451 // Debug output, uncomment if needed 1452 version (none) 1453 { 1454 import std.stdio; 1455 writeln("Two non-blocking calls: ", current2 - current1); 1456 writeln("Sleep + non-blocking call: ", current3 - current2); 1457 writeln("Delta since sleep: ", current4 - current2); 1458 } 1459 1460 n1.ctrl.shutdown(); 1461 n2.ctrl.shutdown(); 1462 thread_joinAll(); 1463 } 1464 1465 // Filter commands 1466 unittest 1467 { 1468 __gshared C.Tid node_tid; 1469 1470 static interface API 1471 { 1472 size_t fooCount(); 1473 size_t fooIntCount(); 1474 size_t barCount (); 1475 void foo (); 1476 void foo (int); 1477 void bar (int); // not in any overload set 1478 void callBar (int); 1479 void callFoo (); 1480 void callFoo (int); 1481 } 1482 1483 static class Node : API 1484 { 1485 size_t foo_count; 1486 size_t foo_int_count; 1487 size_t bar_count; 1488 RemoteAPI!API remote; 1489 1490 public this() 1491 { 1492 this.remote = new RemoteAPI!API(node_tid); 1493 } 1494 1495 override size_t fooCount() { return this.foo_count; } 1496 override size_t fooIntCount() { return this.foo_int_count; } 1497 override size_t barCount() { return this.bar_count; } 1498 override void foo () { ++this.foo_count; } 1499 override void foo (int) { ++this.foo_int_count; } 1500 override void bar (int) { ++this.bar_count; } // not in any overload set 1501 // This one is part of the overload set of the node, but not of the API 1502 // It can't be accessed via API and can't be filtered out 1503 void bar(string) { assert(0); } 1504 1505 override void callFoo() 1506 { 1507 try 1508 { 1509 this.remote.foo(); 1510 } 1511 catch (Exception ex) 1512 { 1513 assert(ex.msg == "Filtered method 'foo()'"); 1514 } 1515 } 1516 1517 override void callFoo(int arg) 1518 { 1519 try 1520 { 1521 this.remote.foo(arg); 1522 } 1523 catch (Exception ex) 1524 { 1525 assert(ex.msg == "Filtered method 'foo(int)'"); 1526 } 1527 } 1528 1529 override void callBar(int arg) 1530 { 1531 try 1532 { 1533 this.remote.bar(arg); 1534 } 1535 catch (Exception ex) 1536 { 1537 assert(ex.msg == "Filtered method 'bar(int)'"); 1538 } 1539 } 1540 } 1541 1542 auto filtered = RemoteAPI!API.spawn!Node(); 1543 node_tid = filtered.tid(); 1544 1545 // caller will call filtered 1546 auto caller = RemoteAPI!API.spawn!Node(); 1547 caller.callFoo(); 1548 assert(filtered.fooCount() == 1); 1549 1550 // both of these work 1551 static assert(is(typeof(filtered.filter!(API.foo)))); 1552 static assert(is(typeof(filtered.filter!(filtered.foo)))); 1553 1554 // only method in the overload set that takes a parameter, 1555 // should still match a call to filter with no parameters 1556 static assert(is(typeof(filtered.filter!(filtered.bar)))); 1557 1558 // wrong parameters => fail to compile 1559 static assert(!is(typeof(filtered.filter!(filtered.bar, float)))); 1560 // Only `API` overload sets are considered 1561 static assert(!is(typeof(filtered.filter!(filtered.bar, string)))); 1562 1563 filtered.filter!(API.foo); 1564 1565 caller.callFoo(); 1566 assert(filtered.fooCount() == 1); // it was not called! 1567 1568 filtered.clearFilter(); // clear the filter 1569 caller.callFoo(); 1570 assert(filtered.fooCount() == 2); // it was called! 1571 1572 // verify foo(int) works first 1573 caller.callFoo(1); 1574 assert(filtered.fooCount() == 2); 1575 assert(filtered.fooIntCount() == 1); // first time called 1576 1577 // now filter only the int overload 1578 filtered.filter!(API.foo, int); 1579 1580 // make sure the parameterless overload is still not filtered 1581 caller.callFoo(); 1582 assert(filtered.fooCount() == 3); // updated 1583 1584 caller.callFoo(1); 1585 assert(filtered.fooIntCount() == 1); // call filtered! 1586 1587 // not filtered yet 1588 caller.callBar(1); 1589 assert(filtered.barCount() == 1); 1590 1591 filtered.filter!(filtered.bar); 1592 caller.callBar(1); 1593 assert(filtered.barCount() == 1); // filtered! 1594 1595 // last blocking calls, to ensure the previous calls complete 1596 filtered.clearFilter(); 1597 caller.foo(); 1598 caller.bar(1); 1599 1600 filtered.ctrl.shutdown(); 1601 caller.ctrl.shutdown(); 1602 thread_joinAll(); 1603 } 1604 1605 // request timeouts (from main thread) 1606 unittest 1607 { 1608 import core.thread; 1609 import std.exception; 1610 1611 static interface API 1612 { 1613 size_t sleepFor (long dur); 1614 void ping (); 1615 } 1616 1617 static class Node : API 1618 { 1619 override size_t sleepFor (long dur) 1620 { 1621 Thread.sleep(msecs(dur)); 1622 return 42; 1623 } 1624 1625 override void ping () { } 1626 } 1627 1628 // node with no timeout 1629 auto node = RemoteAPI!API.spawn!Node(); 1630 assert(node.sleepFor(80) == 42); // no timeout 1631 1632 // custom timeout 1633 bool called; 1634 node.ctrl.withTimeout(100.msecs, 1635 (scope API api) { 1636 assertThrown!Exception(api.sleepFor(2000)); 1637 called = true; 1638 }); 1639 assert(called); 1640 1641 called = false; 1642 struct S 1643 { 1644 void opCall (scope API api) 1645 { 1646 assertThrown!Exception(api.sleepFor(2000)); 1647 called = true; 1648 } 1649 } 1650 S s; 1651 node.ctrl.withTimeout(100.msecs, s); 1652 assert(called); 1653 1654 // Test that attributes are inferred based on the delegate 1655 void doTest () @safe pure nothrow @nogc 1656 { 1657 called = false; 1658 node.ctrl.withTimeout(Duration.zero, 1659 (scope API api) { called = true; }); 1660 assert(called); 1661 } 1662 doTest(); 1663 1664 // node with a configured timeout 1665 auto to_node = RemoteAPI!API.spawn!Node(500.msecs); 1666 1667 /// none of these should time out 1668 assert(to_node.sleepFor(10) == 42); 1669 assert(to_node.sleepFor(20) == 42); 1670 assert(to_node.sleepFor(30) == 42); 1671 assert(to_node.sleepFor(40) == 42); 1672 1673 assertThrown!Exception(to_node.sleepFor(2000)); 1674 to_node.ctrl.withTimeout(3.seconds, // wait for the node to wake up 1675 (scope API api) { api.ping(); }); 1676 1677 to_node.ctrl.shutdown(); 1678 node.ctrl.shutdown(); 1679 thread_joinAll(); 1680 } 1681 1682 // test-case for responses to re-used requests (from main thread) 1683 unittest 1684 { 1685 import core.thread; 1686 import std.exception; 1687 1688 static interface API 1689 { 1690 float getFloat(); 1691 size_t sleepFor (long dur); 1692 } 1693 1694 static class Node : API 1695 { 1696 override float getFloat() { return 69.69; } 1697 override size_t sleepFor (long dur) 1698 { 1699 Thread.sleep(msecs(dur)); 1700 return 42; 1701 } 1702 } 1703 1704 // node with no timeout 1705 auto node = RemoteAPI!API.spawn!Node(); 1706 assert(node.sleepFor(80) == 42); // no timeout 1707 1708 // node with a configured timeout 1709 auto to_node = RemoteAPI!API.spawn!Node(500.msecs); 1710 1711 /// none of these should time out 1712 assert(to_node.sleepFor(10) == 42); 1713 assert(to_node.sleepFor(20) == 42); 1714 assert(to_node.sleepFor(30) == 42); 1715 assert(to_node.sleepFor(40) == 42); 1716 1717 assertThrown!Exception(to_node.sleepFor(2000)); 1718 to_node.ctrl.withTimeout(3.seconds, // wait for the node to wake up 1719 (scope API api) { assert(cast(int)api.getFloat() == 69); }); 1720 to_node.ctrl.shutdown(); 1721 node.ctrl.shutdown(); 1722 thread_joinAll(); 1723 } 1724 1725 // request timeouts (foreign node to another node) 1726 unittest 1727 { 1728 static import geod24.concurrency; 1729 import std.exception; 1730 1731 __gshared C.Tid node_tid; 1732 1733 static interface API 1734 { 1735 void check (); 1736 int ping (); 1737 } 1738 1739 static class Node : API 1740 { 1741 override int ping () { return 42; } 1742 1743 override void check () 1744 { 1745 auto node = new RemoteAPI!API(node_tid, 500.msecs); 1746 1747 // no time-out 1748 node.ctrl.sleep(10.msecs); 1749 assert(node.ping() == 42); 1750 1751 // time-out 1752 node.ctrl.sleep(2000.msecs); 1753 assertThrown!Exception(node.ping()); 1754 } 1755 } 1756 1757 auto node_1 = RemoteAPI!API.spawn!Node(); 1758 auto node_2 = RemoteAPI!API.spawn!Node(); 1759 node_tid = node_2.tid; 1760 node_1.check(); 1761 node_1.ctrl.shutdown(); 1762 node_2.ctrl.shutdown(); 1763 thread_joinAll(); 1764 } 1765 1766 // test-case for zombie responses 1767 unittest 1768 { 1769 static import geod24.concurrency; 1770 import std.exception; 1771 1772 __gshared C.Tid node_tid; 1773 1774 static interface API 1775 { 1776 void check (); 1777 int get42 (); 1778 int get69 (); 1779 } 1780 1781 static class Node : API 1782 { 1783 override int get42 () { return 42; } 1784 override int get69 () { return 69; } 1785 1786 override void check () 1787 { 1788 auto node = new RemoteAPI!API(node_tid, 500.msecs); 1789 1790 // time-out 1791 node.ctrl.sleep(2000.msecs); 1792 assertThrown!Exception(node.get42()); 1793 1794 // no time-out 1795 node.ctrl.sleep(10.msecs); 1796 assert(node.get69() == 69); 1797 } 1798 } 1799 1800 auto node_1 = RemoteAPI!API.spawn!Node(); 1801 auto node_2 = RemoteAPI!API.spawn!Node(); 1802 node_tid = node_2.tid; 1803 node_1.check(); 1804 node_1.ctrl.shutdown(); 1805 node_2.ctrl.shutdown(); 1806 thread_joinAll(); 1807 } 1808 1809 // request timeouts with dropped messages 1810 unittest 1811 { 1812 static import geod24.concurrency; 1813 import std.exception; 1814 1815 __gshared C.Tid node_tid; 1816 1817 static interface API 1818 { 1819 void check (); 1820 int ping (); 1821 } 1822 1823 static class Node : API 1824 { 1825 override int ping () { return 42; } 1826 1827 override void check () 1828 { 1829 auto node = new RemoteAPI!API(node_tid, 420.msecs); 1830 1831 // Requests are dropped, so it times out 1832 assert(node.ping() == 42); 1833 node.ctrl.sleep(10.msecs, true); 1834 assertThrown!Exception(node.ping()); 1835 } 1836 } 1837 1838 auto node_1 = RemoteAPI!API.spawn!Node(); 1839 auto node_2 = RemoteAPI!API.spawn!Node(); 1840 node_tid = node_2.tid; 1841 node_1.check(); 1842 node_1.ctrl.shutdown(); 1843 node_2.ctrl.shutdown(); 1844 thread_joinAll(); 1845 } 1846 1847 // Test a node that gets a replay while it's delayed 1848 unittest 1849 { 1850 static import geod24.concurrency; 1851 import std.exception; 1852 1853 __gshared C.Tid node_tid; 1854 1855 static interface API 1856 { 1857 void check (); 1858 int ping (); 1859 } 1860 1861 static class Node : API 1862 { 1863 override int ping () { return 42; } 1864 1865 override void check () 1866 { 1867 auto node = new RemoteAPI!API(node_tid, 5000.msecs); 1868 assert(node.ping() == 42); 1869 // We need to return immediately so that the main thread 1870 // puts us to sleep 1871 runTask(() { 1872 node.ctrl.sleep(200.msecs); 1873 assert(node.ping() == 42); 1874 }); 1875 } 1876 } 1877 1878 auto node_1 = RemoteAPI!API.spawn!Node(500.msecs); 1879 auto node_2 = RemoteAPI!API.spawn!Node(); 1880 node_tid = node_2.tid; 1881 node_1.check(); 1882 node_1.ctrl.sleep(300.msecs); 1883 assert(node_1.ping() == 42); 1884 node_1.ctrl.shutdown(); 1885 node_2.ctrl.shutdown(); 1886 thread_joinAll(); 1887 } 1888 1889 // Test explicit shutdown 1890 unittest 1891 { 1892 import std.exception; 1893 1894 static interface API 1895 { 1896 int myping (int value); 1897 } 1898 1899 static class Node : API 1900 { 1901 override int myping (int value) 1902 { 1903 return value; 1904 } 1905 } 1906 1907 auto node = RemoteAPI!API.spawn!Node(1.seconds); 1908 assert(node.myping(42) == 42); 1909 node.ctrl.shutdown(); 1910 1911 try 1912 { 1913 node.myping(69); 1914 assert(0); 1915 } 1916 catch (Exception ex) 1917 { 1918 assert(ex.msg == "Request timed-out"); 1919 } 1920 thread_joinAll(); 1921 } 1922 1923 unittest 1924 { 1925 import core.thread : thread_joinAll; 1926 static import geod24.concurrency; 1927 __gshared C.Tid node_tid; 1928 1929 static interface API 1930 { 1931 void segfault (); 1932 void check (); 1933 } 1934 1935 static class Node : API 1936 { 1937 override void segfault () 1938 { 1939 int* ptr; *ptr = 1; 1940 } 1941 1942 override void check () 1943 { 1944 auto node = new RemoteAPI!API(node_tid); 1945 1946 // We need to return immediately so that the main thread can continue testing 1947 runTask(() { 1948 node.ctrl.sleep(500.msecs); 1949 node.segfault(); 1950 }); 1951 } 1952 } 1953 1954 auto node_1 = RemoteAPI!API.spawn!Node(); 1955 auto node_2 = RemoteAPI!API.spawn!Node(); 1956 node_tid = node_2.tid; 1957 node_1.check(); 1958 node_2.ctrl.shutdown(); // shut it down before wake-up, segfault() command will be ignored 1959 node_1.ctrl.shutdown(); 1960 thread_joinAll(); 1961 } 1962 1963 /// Example of a custom (de)serialization policy 1964 unittest 1965 { 1966 static struct Serialize 1967 { 1968 static: 1969 public immutable(ubyte[]) serialize (T) (auto ref T value) @trusted 1970 { 1971 static assert(is(typeof({ T v = immutable(T).init; }))); 1972 static if (is(T : const(ubyte)[])) 1973 return value.idup; 1974 else 1975 return (cast(ubyte*)&value)[0 .. T.sizeof].idup; 1976 } 1977 1978 public QT deserialize (QT) (immutable(ubyte)[] data) @trusted 1979 { 1980 return *cast(QT*)(data.dup.ptr); 1981 } 1982 } 1983 1984 static struct ValueType 1985 { 1986 ulong v1; 1987 uint v2; 1988 uint v3; 1989 } 1990 1991 static interface API 1992 { 1993 @safe: 1994 public @property ulong pubkey (); 1995 public ValueType getValue (string val); 1996 // Note: Vibe.d's JSON serializer cannot serialize this 1997 public immutable(ubyte[32]) getHash (const ubyte[] val); 1998 } 1999 2000 static class MockAPI : API 2001 { 2002 @safe: 2003 public override @property ulong pubkey () { return 42; } 2004 public override ValueType getValue (string val) { return ValueType(val.length, 2, 3); } 2005 public override immutable(ubyte[32]) getHash (const ubyte[] val) 2006 { 2007 return val.length >= 32 ? val[0 .. 32] : typeof(return).init; 2008 } 2009 } 2010 2011 scope test = RemoteAPI!(API, Serialize).spawn!MockAPI(); 2012 assert(test.pubkey() == 42); 2013 assert(test.getValue("Hello world") == ValueType(11, 2, 3)); 2014 ubyte[64] val = 42; 2015 assert(test.getHash(val) == val[0 .. 32]); 2016 test.ctrl.shutdown(); 2017 thread_joinAll(); 2018 } 2019 2020 /// Test node2 responding to a dead node1 2021 /// See https://github.com/Geod24/localrest/issues/64 2022 unittest 2023 { 2024 static interface API 2025 { 2026 @safe: 2027 // Main thread calls this on the first node 2028 public void call0 (); 2029 // ... which then calls this on the second node 2030 public void call1 (); 2031 public void call2 (); 2032 } 2033 2034 __gshared C.Tid node1Addr; 2035 __gshared C.Tid node2Addr; 2036 2037 static class Node : API 2038 { 2039 private RemoteAPI!API self; 2040 2041 @trusted: 2042 // Main -> Node 1 2043 public override void call0 () 2044 { 2045 this.self = new RemoteAPI!API(node1Addr); 2046 scope node2 = new RemoteAPI!API(node2Addr); 2047 node2.call1(); 2048 assert(0, "This should never return as call2 shutdown this node"); 2049 } 2050 2051 // Node 1 -> Node 2 2052 public override void call1 () 2053 { 2054 assert(this.self is null); 2055 scope node1 = new RemoteAPI!API(node1Addr); 2056 node1.call2(); 2057 // Make really sure Node 1 is dead 2058 while (!node1Addr.mbox.isClosed()) 2059 sleep(100.msecs); 2060 } 2061 2062 // Node 2 -> Node 1 2063 public override void call2 () 2064 { 2065 assert(this.self !is null); 2066 this.self.ctrl.shutdown(); 2067 } 2068 } 2069 2070 // Long timeout to ensure we don't spuriously pass 2071 auto node1 = RemoteAPI!API.spawn!Node(500.msecs); 2072 auto node2 = RemoteAPI!API.spawn!Node(); 2073 node1Addr = node1.ctrl.tid(); 2074 node2Addr = node2.ctrl.tid(); 2075 2076 // This will timeout (because the node will be gone) 2077 // However if something is wrong, either `joinall` will never return, 2078 // or the `assert(0)` in `call0` will be triggered. 2079 try 2080 { 2081 node1.call0(); 2082 assert(0, "This should have timed out"); 2083 } 2084 catch (Exception e) {} 2085 2086 node2.ctrl.shutdown(); 2087 thread_joinAll(); 2088 } 2089 2090 /// Test Timer 2091 unittest 2092 { 2093 static import core.thread; 2094 import core.time; 2095 2096 static interface API 2097 { 2098 public void startTimer (bool periodic); 2099 public void stopTimer (); 2100 public ulong getCounter (); 2101 public void resetCounter (); 2102 } 2103 2104 static class Node : API 2105 { 2106 private ulong counter; 2107 private Timer timer; 2108 2109 public override void startTimer (bool periodic) 2110 { 2111 this.timer = setTimer(100.msecs, &callback, periodic); 2112 } 2113 2114 public override void stopTimer () 2115 { 2116 this.timer.stop(); 2117 } 2118 2119 public void callback () 2120 { 2121 this.counter++; 2122 if (this.counter == 3) 2123 this.timer.stop(); 2124 } 2125 2126 public override ulong getCounter () 2127 { 2128 scope (exit) this.counter = 0; 2129 return this.counter; 2130 } 2131 2132 public override void resetCounter () 2133 { 2134 this.counter = 0; 2135 } 2136 } 2137 2138 auto node = RemoteAPI!API.spawn!Node(); 2139 assert(node.getCounter() == 0); 2140 node.startTimer(true); 2141 core.thread.Thread.sleep(1.seconds); 2142 // The expected count is 3 2143 // Check means the timer repeated and the timer stoped 2144 assert(node.getCounter() == 3); 2145 node.resetCounter(); 2146 node.startTimer(false); 2147 node.stopTimer(); 2148 core.thread.Thread.sleep(500.msecs); 2149 assert(node.getCounter() == 0); 2150 node.ctrl.shutdown(); 2151 thread_joinAll(); 2152 } 2153 2154 /// Test restarting a node 2155 unittest 2156 { 2157 static interface API 2158 { 2159 public uint[2] getCount () @safe; 2160 } 2161 2162 static class Node : API 2163 { 2164 private static uint instantiationCount; 2165 private static uint destructionCount; 2166 2167 this () 2168 { 2169 Node.instantiationCount++; 2170 } 2171 2172 ~this () 2173 { 2174 Node.destructionCount++; 2175 } 2176 2177 public override uint[2] getCount () const @safe 2178 { 2179 return [ Node.instantiationCount, Node.destructionCount, ]; 2180 } 2181 } 2182 2183 auto node = RemoteAPI!API.spawn!Node(); 2184 assert(node.getCount == [1, 0]); 2185 node.ctrl.restart(); 2186 assert(node.getCount == [2, 1]); 2187 node.ctrl.restart(); 2188 assert(node.getCount == [3, 2]); 2189 node.ctrl.shutdown(); 2190 thread_joinAll(); 2191 } 2192 2193 /// Test restarting a node that has responses waiting for it 2194 unittest 2195 { 2196 import core.atomic : atomicLoad, atomicStore; 2197 static interface API 2198 { 2199 @safe: 2200 public void call0 (); 2201 public void call1 (); 2202 } 2203 2204 __gshared C.Tid node2Addr; 2205 static shared bool done; 2206 2207 static class Node : API 2208 { 2209 @trusted: 2210 2211 public override void call0 () 2212 { 2213 scope node2 = new RemoteAPI!API(node2Addr); 2214 node2.call1(); 2215 } 2216 2217 public override void call1 () 2218 { 2219 // when this event runs we know call1() has already returned 2220 scheduler.schedule({ atomicStore(done, true); }); 2221 } 2222 } 2223 2224 auto node1 = RemoteAPI!API.spawn!Node(500.msecs); 2225 auto node2 = RemoteAPI!API.spawn!Node(); 2226 node2Addr = node2.ctrl.tid(); 2227 node2.ctrl.sleep(2.seconds, false); 2228 2229 try 2230 { 2231 node1.call0(); 2232 assert(0, "This should have timed out"); 2233 } 2234 catch (Exception e) {} 2235 2236 node1.ctrl.restart(); 2237 2238 // after a while node 1 will receive a response to the timed-out request 2239 // to call1(), but the node restarted and is no longer interested in this 2240 // request (the request map / LocalScheduler is different), so it's filtered 2241 size_t count; 2242 while (!atomicLoad(done)) 2243 { 2244 assert(count < 300); // up to 3 seconds wait 2245 count++; 2246 Thread.sleep(10.msecs); 2247 } 2248 2249 node1.ctrl.shutdown(); 2250 node2.ctrl.shutdown(); 2251 thread_joinAll(); 2252 } 2253 2254 unittest 2255 { 2256 import geod24.concurrency; 2257 2258 static interface API 2259 { 2260 public void start (); 2261 public int getValue (); 2262 } 2263 2264 static class Node : API 2265 { 2266 int value; 2267 2268 public override void start () 2269 { 2270 // if this is a scoped delegate, it might not have a closure, 2271 // and when the task is resumed again it will segfault. 2272 // therefore runTask() must take a non-scope delegate. 2273 // note: once upstream issue #20868 is fixed, it would become 2274 // a compiler error to escape a scope delegate. 2275 runTask( 2276 { 2277 value = 1; 2278 FiberScheduler.yield(); 2279 value = 2; 2280 }); 2281 } 2282 2283 public override int getValue () { return this.value; } 2284 } 2285 2286 auto node = RemoteAPI!API.spawn!Node(); 2287 node.start(); 2288 assert(node.getValue() == 2); 2289 node.ctrl.shutdown(); 2290 thread_joinAll(); 2291 } 2292 2293 /// Situation: Calling a node with an interface that doesn't exists 2294 /// Expectation: The client throws an exception with a useful error message 2295 /// This can happen by mistake (API mixup) or when a method is optional. 2296 unittest 2297 { 2298 import std.exception : assertThrown; 2299 2300 static interface BaseAPI 2301 { 2302 public int required (); 2303 } 2304 2305 static interface APIExtended : BaseAPI 2306 { 2307 public int optional (); 2308 } 2309 2310 static class BaseNode : BaseAPI 2311 { 2312 public override int required () { return 42; } 2313 } 2314 2315 auto node = RemoteAPI!BaseAPI.spawn!BaseNode(); 2316 scope extnode = new RemoteAPI!APIExtended(node.ctrl.tid()); 2317 assert(extnode.required() == 42); 2318 assertThrown!ClientException(extnode.optional()); 2319 node.ctrl.shutdown(); 2320 thread_joinAll(); 2321 }