1 /******************************************************************************* 2 3 Provides utilities to mock a network in unittests 4 5 This module is based on the idea that D `interface`s can be used 6 to represent a server's API, and that D `class` inheriting this `interface` 7 are used to define the server's business code, 8 abstracting away the communication layer. 9 10 For example, a server that exposes an API to concatenate two strings would 11 define the following code: 12 --- 13 interface API { public string concat (string a, string b); } 14 class Server : API 15 { 16 public override string concat (string a, string b) 17 { 18 return a ~ b; 19 } 20 } 21 --- 22 23 Then one can use "generators" to define how multiple process communicate 24 together. One such generator, that pioneered this design is `vibe.web.rest`, 25 which allows to expose such servers as REST APIs. 26 27 `localrest` is another generator, which uses message passing and threads 28 to create a local "network". 29 The motivation was to create a testing library that could be used to 30 model a network at a much cheaper cost than spawning processes 31 (and containers) would be, when doing integration tests. 32 33 Control_Interface: 34 When instantiating a `RemoteAPI`, one has the ability to call foreign 35 implementations through auto-generated `override`s of the `interface`. 36 In addition to that, as this library is intended for testing, 37 a few extra functionalities are offered under a control interface, 38 accessible under the `ctrl` namespace in the instance. 39 The control interface allows to make the node unresponsive to one or all 40 methods, for some defined time or until unblocked, as well as trigger 41 shutdowns or restart. See the methods for more details. 42 The `withTimeout` control method can be used to spawn a scoped copy 43 of the RemoteAPI with a custom configured timeout. The user-provided 44 delegate will be called with this scoped copy that uses the new timeout. 45 46 Shutdown: 47 The control interface has a shutdown method that can be used to terminate 48 a node gracefully. When the shutdown request is handled by the node, 49 the event loop will exit and the thread will terminate. While the destructor 50 of the node will be called, it might not usable for some actions, for example 51 because D destructors may not allocate GC memory, or one may want 52 to perform some test-specific operations, such a logging some data in case of failure. 53 Therefore, you may provide a shutdown routine in the call to `shutdown`. 54 It must accept a single argument of the interface type, and will be called 55 with the implementation object just before the node is destroyed. 56 If this routine throws, LocalRest will log the error in the console and 57 proceed with destroying the stack-allocated node. 58 Note that control requests are asynchronous, hence requests from the node 59 might be processed / send by the node until the request is actually processed. 60 There is also a `restart` method which accepts the same callback argument. 61 62 Event_Loop: 63 Server process usually needs to perform some action in an asynchronous way. 64 Additionally, some actions needs to be completed at a semi-regular interval, 65 for example based on a timer. 66 For those use cases, a node should call `runTask` or `sleep`, respectively. 67 Note that this has the same name (and purpose) as Vibe.d's core primitives. 68 Users should only ever call Vibe's `runTask` / `sleep` with `vibe.web.rest`, 69 or only call LocalRest's `runTask` / `sleep` with `RemoteAPI`. 70 71 Implementation: 72 In order for tests to simulate an asynchronous system accurately, 73 multiple nodes need to be able to run concurrently and asynchronously. 74 75 There are two common solutions to this, to use either fibers or threads. 76 Fibers have the advantage of being simpler to implement and predictable. 77 Threads have the advantage of more accurately describing an asynchronous 78 system and thus have the ability to uncover more issues. 79 80 When spawning a node, a thread is spawned, a node is instantiated with 81 the provided arguments, and an event loop waits for messages sent 82 to the Tid. Messages consist of the sender's Tid, the mangled name 83 of the function to call (to support overloading) and the arguments, 84 serialized as a JSON string. 85 86 Note: 87 While this module's original motivation was to test REST nodes, 88 the only dependency to Vibe.d is actually to it's JSON module, 89 as Vibe.d is the only available JSON module known to the author 90 to provide an interface to deserialize composite types. 91 This dependency is however not enforced by the dub project file, 92 as users can provide their own serializer (see `geod24.Serialization`). 93 If the default parameter for serialization is never used, 94 one's project need not depend on `vibe-d:data`. 95 96 Author: Mathias 'Geod24' Lang 97 License: MIT (See LICENSE.txt) 98 Copyright: Copyright (c) 2018-2019 Mathias Lang. All rights reserved. 99 100 *******************************************************************************/ 101 102 module geod24.LocalRest; 103 104 static import C = geod24.concurrency; 105 import geod24.Serialization; 106 107 import std.datetime.systime : Clock, SysTime; 108 import std.format; 109 import std.meta : AliasSeq; 110 import std.traits : fullyQualifiedName, Parameters, ReturnType; 111 112 import core.thread; 113 import core.time; 114 115 /// Data sent by the caller 116 private struct Command 117 { 118 /// Sequence id of Command 119 size_t id; 120 /// Method to call 121 string method; 122 /// Serialized arguments to the method 123 SerializedData args; 124 } 125 126 /// Ask the node to exhibit a certain behavior for a given time 127 private struct TimeCommand 128 { 129 /// For how long our remote node apply this behavior 130 Duration dur; 131 /// Whether or not affected messages should be dropped 132 bool drop = false; 133 } 134 135 /// Ask the node to shut down 136 private struct ShutdownCommand 137 { 138 /// Any callback to call before the Node's destructor is called 139 void function (Object) callback; 140 141 /// Whether we're restarting or really shutting down 142 bool restart; 143 } 144 145 /// Filter out requests before they reach a node 146 private struct FilterAPI 147 { 148 /// the mangled symbol name of the function to filter 149 string func_mangleof; 150 151 /// used for debugging 152 string pretty_func; 153 } 154 155 /// Status of a request 156 private enum Status 157 { 158 /// Request failed 159 Failed, 160 161 /// The request failed to to a client error (4xx style error code) 162 ClientFailed, 163 164 /// Request timed-out 165 Timeout, 166 167 /// Request succeeded 168 Success 169 } 170 171 /// Data sent by the callee back to the caller 172 private struct Response 173 { 174 /// Final status of a request (failed, timeout, success, etc) 175 Status status; 176 /// Response id 177 size_t id; 178 /// If `status == Status.Success`, the serialized return value. 179 /// Otherwise, it contains `Exception.toString()`. 180 SerializedData data; 181 } 182 183 /// Thrown when the sent request is faulty (e.g. 4xx HTTP error types) 184 public class ClientException : Exception 185 { 186 /// Constructor 187 public this (string msg, 188 string file = __FILE__, int line = __LINE__, Exception next = null) 189 @safe pure nothrow 190 { 191 super(msg, file, line, next); 192 } 193 } 194 195 /// Simple exception to unwind the stack when we need to terminate/restart 196 private final class ExitException : Exception 197 { 198 public bool restart; 199 200 this () @safe pure nothrow @nogc 201 { 202 super("You should never see this exception - please report a bug"); 203 } 204 } 205 206 /// Simple wrapper to deal with tuples 207 /// Vibe.d might emit a pragma(msg) when T.length == 0 208 private struct ArgWrapper (T...) 209 { 210 static if (T.length == 0) 211 size_t dummy; 212 T args; 213 } 214 215 // very simple & limited variant, to keep it performant. 216 // should be replaced by a real Variant later 217 private struct Variant 218 { 219 this (Command msg) { this.cmd = msg; this.tag = Variant.Type.command; } 220 this (Response msg) { this.res = msg; this.tag = Variant.Type.response; } 221 this (FilterAPI msg) { this.filter = msg; this.tag = Variant.Type.filter; } 222 this (TimeCommand msg) { this.time = msg; this.tag = Variant.Type.timeCommand; } 223 this (ShutdownCommand msg) { this.shutdown = msg; this.tag = Variant.Type.shutdownCommand; } 224 225 union 226 { 227 Command cmd; 228 Response res; 229 FilterAPI filter; 230 TimeCommand time; 231 ShutdownCommand shutdown; 232 } 233 234 Type tag; 235 236 /// Type of a request 237 enum Type : ubyte 238 { 239 command, 240 response, 241 filter, 242 timeCommand, 243 shutdownCommand, 244 } 245 } 246 247 private alias CommandChn = C.Channel!Variant; 248 private alias RespChn = C.Channel!Response; 249 250 /// Represents a connection between a server and a client 251 private class Connection 252 { 253 /// 254 this () nothrow 255 { 256 this.command_chn = new CommandChn(); 257 this.resp_chn = new RespChn(); 258 } 259 260 /******************************************************************************* 261 262 Send a message over the Connection 263 264 Params: 265 T = Type of the message, should be support by the `Variant` type 266 msg = Message to be sent 267 268 Returns: 269 Success/failure 270 271 *******************************************************************************/ 272 273 bool sendCommand (T) (T msg) @trusted 274 { 275 bool ret; 276 277 if (isMainThread()) 278 scheduler.start({ ret = this.command_chn.write(Variant(msg)); }); 279 else 280 ret = this.command_chn.write(Variant(msg)); 281 282 return ret; 283 } 284 285 /******************************************************************************* 286 287 Get a unique id for a `Command` to be sent from this `Connection` 288 289 Returns: 290 Unique Command id 291 292 *******************************************************************************/ 293 294 size_t getNextCommandId () @safe 295 { 296 return this.next_cmd_id++; 297 } 298 299 /******************************************************************************* 300 301 Wait for a `Response` with specific id 302 303 Params: 304 resp_id = Response id to wait for 305 timeout = timeout duration for the operation 306 307 Returns: 308 Response 309 310 *******************************************************************************/ 311 312 Response waitResponse (size_t resp_id, Duration timeout) @trusted 313 { 314 if (isMainThread()) 315 { 316 Response res; 317 scheduler.start({ 318 // Loop until we get the Response we are looking for 319 while (this.resp_chn.read(res, timeout)) 320 if (res.id == resp_id) 321 return; 322 res = Response(Status.Timeout, resp_id, SerializedData("Request timed-out")); 323 }); 324 return res; 325 } 326 else 327 { 328 // Response may already be ready 329 if (auto existing_res = (resp_id in this.waiting_list)) 330 return existing_res.res; 331 332 // Block while waiting the Response 333 auto blocker = C.thisScheduler().new FiberBlocker(); 334 this.waiting_list[resp_id] = Waiting(blocker); 335 if (blocker.wait(timeout)) 336 return this.waiting_list[resp_id].res; 337 else 338 return Response(Status.Timeout, resp_id, SerializedData("Request timed-out")); 339 } 340 } 341 342 /******************************************************************************* 343 344 Notify the task waiting for a Response 345 346 Params: 347 res = Newly arrived Response 348 349 *******************************************************************************/ 350 351 void notifyWaiter (Response res) 352 { 353 // Is the waiting Fiber already blocked? 354 if (auto waiter = (res.id in this.waiting_list)) 355 { 356 waiter.res = res; 357 waiter.blocker.notify(); 358 } 359 else // Fiber is not yet blocked, create an entry for the related Fiber to use 360 this.waiting_list[res.id] = Waiting(null, res); 361 } 362 363 /******************************************************************************* 364 365 Close the `Channel`s associated with this `Connection`. Blocked `waitResponse` 366 calls will timeout and blocked `sendCommand` calls will fail 367 368 *******************************************************************************/ 369 370 void close () 371 { 372 this.command_chn.close(); 373 this.resp_chn.close(); 374 } 375 376 /// 377 private struct Waiting 378 { 379 C.FiberScheduler.FiberBlocker blocker; 380 Response res; 381 } 382 383 /// List of Fibers waiting for a Response from this Connection 384 private Waiting[size_t] waiting_list; 385 386 /// Next Command ID 387 private size_t next_cmd_id; 388 389 /// Channel to send `Command`s to 390 private CommandChn command_chn; 391 392 /// Channel to read `Response`s from 393 private RespChn resp_chn; 394 } 395 396 private struct AwaitingMessage 397 { 398 /// Message 399 public Variant var; 400 /// Originating `Connection` 401 public Connection conn; 402 } 403 404 // Used for controling filtering / sleep within the server implementation 405 private struct Control 406 { 407 public FilterAPI filter; // filter specific messages 408 public SysTime sleep_until; // sleep until this time 409 public bool drop; // drop messages if sleeping 410 411 bool isSleeping () const @safe /* nothrow: Not `nothrow` on Windows */ 412 { 413 return this.sleep_until != SysTime.init 414 && Clock.currTime < this.sleep_until; 415 } 416 } 417 418 419 /// `Channel` type that the nodes will listen for new `Connection`s 420 public alias BindChn = C.Channel!Connection; 421 422 /// Thread local outgoing `Connection` list 423 private Connection[RespChn] outgoing_conns; 424 425 /// We need a scheduler to simulate an event loop and to be re-entrant 426 private C.FiberScheduler scheduler; 427 428 /*********************************************************************** 429 430 Check if the current context is running inside the main thread and 431 intialize the thread local fiber scheduler if it is not initialized 432 433 Returns: 434 If the context is the main thread or not 435 436 ***********************************************************************/ 437 438 private bool isMainThread () @trusted nothrow 439 { 440 // we are in the main thread 441 if (scheduler is null) 442 scheduler = new C.FiberScheduler; 443 return Fiber.getThis() is null; 444 } 445 446 /******************************************************************************* 447 448 Provide eventloop-like functionalities 449 450 Since nodes instantiated via this modules are Vibe.d server, 451 they expect the ability to run an asynchronous task , 452 usually provided by `vibe.core.core : runTask`. 453 454 In order for them to properly work, we need to integrate them to our event 455 loop by providing the ability to spawn a task, and wait on some condition, 456 optionally with a timeout. 457 458 The following functions do that. 459 Note that those facilities are not available from the main thread, 460 while is supposed to do tests and doesn't have a scheduler. 461 462 *******************************************************************************/ 463 464 public void runTask (void delegate() dg) nothrow 465 { 466 assert(scheduler !is null, "Cannot call this delegate from the main thread"); 467 scheduler.spawn(dg); 468 } 469 470 /// Ditto 471 public void sleep (Duration timeout) nothrow 472 { 473 assert(!isMainThread(), "Cannot call this function from the main thread"); 474 475 // Duration.init (0.seconds) is infinite timeout, ignore 476 if (timeout == Duration.init) 477 return; 478 479 scope blocker = scheduler..new FiberBlocker(); 480 blocker.wait(timeout); 481 } 482 483 /******************************************************************************* 484 485 Run an asynchronous task after a given time. 486 487 The task will first run after the given `timeout`, and 488 can either repeat or run only once (the default). 489 Works similarly to Vibe.d's `setTimer`. 490 491 Params: 492 timeout = Determines the minimum amount of time that elapses before 493 the timer fires. 494 dg = If non-null, this delegate will be called when the timer fires 495 periodic = Speficies if the timer fires repeatedly or only once 496 497 Returns: 498 A `Timer` instance with the ability to control the timer 499 500 *******************************************************************************/ 501 502 public Timer setTimer (Duration timeout, void delegate() dg, 503 bool periodic = false) nothrow 504 { 505 assert(scheduler !is null, "Cannot call this delegate from the main thread"); 506 assert(dg !is null, "Cannot call this delegate if null"); 507 508 Timer timer = new Timer(timeout, dg, periodic); 509 scheduler.schedule(&timer.run); 510 return timer; 511 } 512 513 /// Simple timer 514 public final class Timer 515 { 516 private Duration timeout; 517 private void delegate () dg; 518 // Whether this timer is repeating 519 private bool periodic; 520 // Whether this timer was stopped 521 private bool stopped; 522 523 public this (Duration timeout, void delegate() dg, bool periodic) @safe nothrow 524 { 525 this.timeout = timeout; 526 this.dg = dg; 527 this.periodic = periodic; 528 this.stopped = false; 529 } 530 531 // Run a delegate after timeout, and until this.periodic is false 532 private void run () 533 { 534 do 535 { 536 sleep(timeout); 537 if (this.stopped) 538 return; 539 dg(); 540 } while (this.periodic); 541 } 542 543 /// Stop the timer. The next time this timer's fiber wakes up 544 /// it will exit the run() function. 545 public void stop () @safe nothrow 546 { 547 this.stopped = true; 548 this.periodic = false; 549 } 550 } 551 552 /******************************************************************************* 553 554 A reference to the "listening" connection of a remote thread 555 556 When a remote thread starts, it initially listens for new connection 557 (similarly to `bind` in C). When a new connection is started, it creates 558 a separate channel for communication (similar to `accept` in C). 559 This newly-created channel is what `RemoteAPI` wraps. 560 The channel / link / connection to the original listener, which is the only 561 one able to establish a connection, is what this data structure wraps. 562 563 *******************************************************************************/ 564 565 public struct Listener (API) 566 { 567 /// Internal data, do not use 568 package BindChn data; 569 } 570 571 /******************************************************************************* 572 573 A reference to an alread-instantiated node 574 575 This class serves the same purpose as a `RestInterfaceClient`: 576 it is a client for an already instantiated rest `API` interface. 577 578 In order to instantiate a new server (in a remote thread), use the static 579 `spawn` function. 580 581 Serialization: 582 In order to support custom serialization policy, one can change the 583 `Serializer` parameter. This parameter is expected to be either a 584 template or an aggregate with two static methods, but no explicit 585 limitation is put on the type. 586 See `geod24.Serialization`'s documentation for more informations. 587 588 Params: 589 API = The interface defining the API to implement 590 S = An aggregate which follows the requirement explained above. 591 592 *******************************************************************************/ 593 594 public final class RemoteAPI (API, alias S = VibeJSONSerializer!()) : API 595 { 596 static assert (!serializerInvalidReason!(S).length, serializerInvalidReason!S); 597 598 /*************************************************************************** 599 600 Instantiate a node and start it 601 602 This is usually called from the main thread, which will start all the 603 nodes and then start to process request. 604 In order to have a connected network, no nodes in any thread should have 605 a different reference to the same node. 606 In practice, this means there should only be one `Tid` per "address". 607 608 Note: 609 When the `RemoteAPI` returned by this function is finalized, 610 the child thread will be shut down. 611 612 Params: 613 Impl = Type of the implementation to instantiate 614 args = Arguments to the object's constructor 615 timeout = (optional) timeout to use with requests 616 file = Path to the file that called this function (for diagnostic) 617 line = Line number tied to the `file` parameter 618 619 Returns: 620 A `RemoteAPI` owning the node reference 621 622 ***************************************************************************/ 623 624 public static RemoteAPI spawn (Impl) ( 625 CtorParams!Impl args, Duration timeout = 5.seconds, 626 string file = __FILE__, int line = __LINE__) 627 { 628 auto chn = new BindChn(); 629 new Thread( 630 { 631 spawned!(Impl)(chn, file, line, args); 632 }).start(); 633 return new RemoteAPI(Listener!API(chn), timeout); 634 } 635 636 /// Helper template to get the constructor's parameters 637 private static template CtorParams (Impl) 638 { 639 static if (is(typeof(Impl.__ctor))) 640 private alias CtorParams = Parameters!(Impl.__ctor); 641 else 642 private alias CtorParams = AliasSeq!(); 643 } 644 645 /*************************************************************************** 646 647 Handler function 648 649 Performs the dispatch from `cmd` to the proper `node` function, 650 provided the function is not filtered. 651 652 Params: 653 cmd = the command to run (contains the method name and the arguments) 654 node = the node to invoke the method on 655 filter = used for filtering API calls (returns default response) 656 resp_chn = `Channel` to send the `Response` to 657 658 ***************************************************************************/ 659 660 private static void handleCommand (Command cmd, API node, FilterAPI filter, RespChn resp_chn) 661 { 662 switch (cmd.method) 663 { 664 static foreach (member; __traits(allMembers, API)) 665 static foreach (ovrld; __traits(getOverloads, API, member)) 666 { 667 mixin( 668 q{ 669 case `%2$s`: 670 Response res = Response(Status.Failed, cmd.id); 671 672 // Provide informative message in case of filtered method 673 if (cmd.method == filter.func_mangleof) 674 res.data = SerializedData(format("Filtered method '%%s'", filter.pretty_func)); 675 else 676 { 677 auto args = S.deserialize!(ArgWrapper!(Parameters!ovrld))( 678 cmd.args.getS!S); 679 680 try 681 { 682 static if (!is(ReturnType!ovrld == void)) 683 res.data = SerializedData(S.serialize(node.%1$s(args.args))); 684 else 685 node.%1$s(args.args); 686 res.status = Status.Success; 687 } 688 catch (Exception e) 689 { 690 res.status = Status.ClientFailed; 691 res.data = SerializedData(e.toString()); 692 } 693 } 694 695 resp_chn.write(res); 696 return; 697 }.format(member, ovrld.mangleof)); 698 } 699 default: 700 resp_chn.write(Response(Status.ClientFailed, cmd.id, SerializedData("Method not found"))); 701 break; 702 } 703 } 704 705 /*************************************************************************** 706 707 Main dispatch function 708 709 This function receive string-serialized messages from the calling thread, 710 which is a struct with the method's mangleof, and the method's arguments 711 as a tuple, serialized to a JSON string. 712 713 Params: 714 Implementation = Type of the implementation to instantiate 715 bind_chn = The channel on which to "listen" to receive new "connections" 716 file = Path to the file that spawned this node 717 line = Line number in the `file` that spawned this node 718 cargs = Arguments to `Implementation`'s constructor 719 720 ***************************************************************************/ 721 722 private static void spawned (Implementation) ( 723 BindChn bind_chn, string file, int line, CtorParams!Implementation cargs) 724 nothrow 725 { 726 import std.algorithm : each; 727 import std.range; 728 729 scope exc = new ExitException(); 730 731 // The list of `Connection` we are listening to, 732 // equivalent to the list of fd in a select / epoll. 733 Connection[CommandChn] incoming_conns; 734 735 void runNode () 736 { 737 scheduler = new C.FiberScheduler; 738 C.thisScheduler(scheduler); 739 scope node = new Implementation(cargs); 740 741 // Control the node behavior 742 Control control; 743 744 // we need to keep track of messages which were ignored when 745 // node.sleep() was used, and then handle each message in sequence. 746 AwaitingMessage[] await_msgs; 747 748 scheduler.start(() { 749 C.SelectEntry[] read_list; 750 C.SelectEntry[] write_list; 751 752 while (true) 753 { 754 Connection new_conn; 755 Response res; 756 Variant msg; 757 758 read_list.length = 0; 759 assumeSafeAppend(read_list); 760 write_list.length = 0; 761 assumeSafeAppend(write_list); 762 763 foreach (ref conn; incoming_conns) 764 if (!conn.command_chn.isClosed()) 765 read_list ~= C.SelectEntry(conn.command_chn, &msg); 766 foreach (ref conn; outgoing_conns) 767 if (!conn.resp_chn.isClosed()) 768 read_list ~= C.SelectEntry(conn.resp_chn, &res); 769 read_list ~= C.SelectEntry(bind_chn, &new_conn); 770 771 auto sel_ret = C.select(read_list, write_list, 10.msecs); 772 773 if (await_msgs.length > 0 && !control.isSleeping()) 774 { 775 scheduler.spawn({ 776 auto prev_await_msgs = await_msgs; 777 foreach (ref await_msg; prev_await_msgs) 778 handleCommand(await_msg.var.cmd, node, control.filter, await_msg.conn.resp_chn); 779 }); 780 await_msgs.length = 0; 781 } 782 783 if (!sel_ret.success) 784 continue; 785 786 // Bind chn 787 if (cast(BindChn) read_list[sel_ret.id].selectable) 788 { 789 incoming_conns[new_conn.command_chn] = new_conn; 790 } 791 // Command 792 else if (auto comm_chn = cast(CommandChn) read_list[sel_ret.id].selectable) 793 { 794 auto curr_conn = incoming_conns[comm_chn]; 795 switch (msg.tag) 796 { 797 case Variant.Type.command: 798 Command cmd = msg.cmd; 799 if (!control.isSleeping()) 800 scheduler.spawn({ 801 handleCommand(cmd, node, control.filter, curr_conn.resp_chn); 802 }); 803 else if (!control.drop) 804 await_msgs ~= AwaitingMessage(msg, curr_conn); 805 break; 806 case Variant.Type.shutdownCommand: 807 ShutdownCommand e = msg.shutdown; 808 if (!e.restart) 809 { 810 bind_chn.close(); 811 incoming_conns.each!((conn) => conn.close()); 812 } 813 outgoing_conns.each!((conn) => conn.close()); 814 outgoing_conns.clear(); 815 if (e.callback !is null) 816 e.callback(node); 817 exc.restart = e.restart; 818 throw exc; 819 case Variant.Type.timeCommand: 820 TimeCommand s = msg.time; 821 control.sleep_until = Clock.currTime + s.dur; 822 control.drop = s.drop; 823 break; 824 case Variant.Type.filter: 825 FilterAPI filter_api = msg.filter; 826 control.filter = filter_api; 827 break; 828 default: 829 assert(0, "Got invalid Variant.Type: " ~ msg.tag); 830 } 831 } 832 else if (auto resp_chn = cast(RespChn) read_list[sel_ret.id].selectable) 833 { 834 // Response 835 outgoing_conns[resp_chn].notifyWaiter(res); 836 } 837 } 838 }); 839 } 840 841 try 842 { 843 while (true) 844 { 845 try runNode(); 846 // We use this exception to exit the event loop 847 catch (ExitException e) 848 { 849 if (!e.restart) 850 break; 851 } 852 } 853 } 854 catch (Throwable t) 855 { 856 import core.stdc.stdio, std.stdio; 857 printf("#### FATAL ERROR: %.*s\n", cast(int) t.msg.length, t.msg.ptr); 858 printf("This node was started at %.*s:%d\n", 859 cast(int) file.length, file.ptr, line); 860 printf("This most likely means that the node crashed due to an uncaught exception\n"); 861 printf("If not, please file a bug at https://github.com/Geod24/localrest/\n"); 862 863 try writeln("Full error: ", t); 864 catch (Exception e) { /* Nothing more we can do at this point */ } 865 } 866 } 867 868 /// Timeout to use when issuing requests 869 private const Duration timeout; 870 871 /// Main Channel that this Node will listen for incoming messages 872 private BindChn bind_chn; 873 874 /// Connection between this instance and the node main thread 875 private Connection conn; 876 877 /*************************************************************************** 878 879 Create an instante of a client 880 881 This connects to an already instantiated node. 882 In order to instantiate a node, see the static `spawn` function. 883 884 Params: 885 listener = The listener used to connect to the node (most frequently 886 obtained by calling `geod24.concurrency.locate`) 887 timeout = any timeout to use 888 889 ***************************************************************************/ 890 891 public this (Listener!API listener, Duration timeout = 5.seconds) 892 @trusted nothrow 893 { 894 import std.exception : assumeWontThrow; 895 896 this.bind_chn = listener.data; 897 this.timeout = timeout; 898 assert(bind_chn); 899 900 // Create a new Connection, register it and send it to the peer 901 // TODO: What to do when bind_chn is closed? 902 this.conn = new Connection(); 903 outgoing_conns[this.conn.resp_chn] = this.conn; 904 if (isMainThread()) 905 assumeWontThrow(scheduler.start({ bind_chn.write(this.conn); })); 906 else 907 bind_chn.write(this.conn); 908 } 909 910 /*************************************************************************** 911 912 Introduce a namespace to avoid name clashes 913 914 The only way we have a name conflict is if someone exposes `ctrl`, 915 in which case they will be served an error along the following line: 916 LocalRest.d(...): Error: function `RemoteAPI!(...).ctrl` conflicts 917 with mixin RemoteAPI!(...).ControlInterface!() at LocalRest.d(...) 918 919 ***************************************************************************/ 920 921 public mixin ControlInterface!() ctrl; 922 923 /// Ditto 924 private mixin template ControlInterface () 925 { 926 /*********************************************************************** 927 928 Returns the listener this `RemoteAPI` used 929 930 The connection represents the link to the "bind address" of the 931 remote thread. It can be used with `geod24.concurrency.register` 932 to allow other piece of code to establish a connection. 933 It *cannot* be used to send commands directly to the remote thread. 934 Instead, a new `RemoteAPI` should be instantiated from it. 935 936 ***********************************************************************/ 937 938 public Listener!API listener () @nogc pure nothrow 939 { 940 return Listener!API(this.bind_chn); 941 } 942 943 /*********************************************************************** 944 945 Send an async message to the thread to immediately shut down. 946 947 Params: 948 callback = if not null, the callback to call in the Node's 949 thread before the Node is destroyed. Can be used 950 for cleanup / logging routines. 951 952 ***********************************************************************/ 953 954 public void shutdown (void function (Object) callback = null) 955 @trusted 956 { 957 this.conn.sendCommand(ShutdownCommand(callback, false)); 958 } 959 960 /*********************************************************************** 961 962 Send an async message to the thread to immediately restart. 963 964 Note that further non-control messages to the node will block until 965 the node is back "online". 966 967 Params: 968 callback = if not null, the callback to call in the Node's 969 thread before the Node is destroyed, but before 970 it is restarted. Can be used for cleanup or logging. 971 972 ***********************************************************************/ 973 974 public void restart (void function (Object) callback = null) 975 @trusted 976 { 977 this.conn.sendCommand(ShutdownCommand(callback, true)); 978 } 979 980 /*********************************************************************** 981 982 Make the remote node sleep for `Duration` 983 984 Params: 985 delay = Duration the node will sleep for 986 dropMessages = Whether to process the pending requests when the 987 node come back online (the default), or to drop 988 pending traffic 989 990 ***********************************************************************/ 991 992 public void sleep (Duration d, bool dropMessages = false) @trusted 993 { 994 this.conn.sendCommand(TimeCommand(d, dropMessages)); 995 } 996 997 /*********************************************************************** 998 999 Filter any requests issued to the provided method. 1000 1001 Calling the API endpoint will throw an exception, 1002 therefore the request will fail. 1003 1004 Use via: 1005 1006 ---- 1007 interface API { void call(); } 1008 class C : API { void call() { } } 1009 auto obj = new RemoteAPI!API(...); 1010 obj.filter!(API.call); 1011 ---- 1012 1013 To match a specific overload of a method, specify the 1014 parameters to match against in the call. For example: 1015 1016 ---- 1017 interface API { void call(int); void call(int, float); } 1018 class C : API { void call(int) {} void call(int, float) {} } 1019 auto obj = new RemoteAPI!API(...); 1020 obj.filter!(API.call, int, float); // only filters the second overload 1021 ---- 1022 1023 Params: 1024 method = the API method for which to filter out requests 1025 OverloadParams = (optional) the parameters to match against 1026 to select an overload. Note that if the method has no other 1027 overloads, then even if that method takes parameters and 1028 OverloadParams is empty, it will match that method 1029 out of convenience. 1030 1031 ***********************************************************************/ 1032 1033 public void filter (alias method, OverloadParams...) () @trusted 1034 { 1035 enum method_name = __traits(identifier, method); 1036 1037 // return the mangled name of the matching overload 1038 template getBestMatch (T...) 1039 { 1040 static if (is(Parameters!(T[0]) == OverloadParams)) 1041 { 1042 enum getBestMatch = T[0].mangleof; 1043 } 1044 else static if (T.length > 0) 1045 { 1046 enum getBestMatch = getBestMatch!(T[1 .. $]); 1047 } 1048 else 1049 { 1050 static assert(0, 1051 format("Couldn't select best overload of '%s' for " ~ 1052 "parameter types: %s", 1053 method_name, OverloadParams.stringof)); 1054 } 1055 } 1056 1057 // ensure it's used with API.method, *not* RemoteAPI.method which 1058 // is an override of API.method. Otherwise mangling won't match! 1059 // special-case: no other overloads, and parameter list is empty: 1060 // just select that one API method 1061 alias Overloads = __traits(getOverloads, API, method_name); 1062 static if (Overloads.length == 1 && OverloadParams.length == 0) 1063 { 1064 immutable pretty = method_name ~ Parameters!(Overloads[0]).stringof; 1065 enum mangled = Overloads[0].mangleof; 1066 } 1067 else 1068 { 1069 immutable pretty = format("%s%s", method_name, OverloadParams.stringof); 1070 enum mangled = getBestMatch!Overloads; 1071 } 1072 1073 this.conn.sendCommand(FilterAPI(mangled, pretty)); 1074 } 1075 1076 1077 /*********************************************************************** 1078 1079 Clear out any filtering set by a call to filter() 1080 1081 ***********************************************************************/ 1082 1083 public void clearFilter () @trusted 1084 { 1085 this.conn.sendCommand(FilterAPI("")); 1086 } 1087 1088 /*********************************************************************** 1089 1090 Call the provided delegate with a custom timeout 1091 1092 This allow to perform requests on a client with a different timeout, 1093 usually to allow some requests (e.g. initialization calls) to have longer 1094 timeout, or no timeout at all, or to put a timeout on an otherwise 1095 timeout-less client (e.g. when calling the actual test which could fail). 1096 1097 To disable timeout, pass the special value `Duration.zero` 1098 (or `0.seconds`, `0.msecs`, etc...). 1099 1100 Params: 1101 timeout = the new timeout to use 1102 dg = the delegate to call with the new scoped RemoteAPI copy 1103 1104 ***********************************************************************/ 1105 1106 public void withTimeout (Dg) (Duration timeout, scope Dg dg) 1107 { 1108 scope api = new RemoteAPI(this.ctrl.listener(), timeout); 1109 static assert(is(typeof({ dg(api); })), 1110 "Provided argument of type `" ~ Dg.stringof ~ 1111 "` is not callable with argument type `scope " ~ 1112 fullyQualifiedName!API ~ "`"); 1113 dg(api); 1114 } 1115 } 1116 1117 // Vibe.d mandates that method must be @safe 1118 @safe: 1119 1120 /*************************************************************************** 1121 1122 Generate the API `override` which forward to the actual object 1123 1124 ***************************************************************************/ 1125 1126 static foreach (member; __traits(allMembers, API)) 1127 static foreach (ovrld; __traits(getOverloads, API, member)) 1128 { 1129 mixin(q{ 1130 override ReturnType!(ovrld) } ~ member ~ q{ (Parameters!ovrld params) 1131 { 1132 1133 // `geod24.concurrency.send/receive[Only]` is not `@safe` but 1134 // this overload needs to be 1135 auto res = () @trusted { 1136 auto serialized = S.serialize(ArgWrapper!(Parameters!ovrld)(params)); 1137 auto command = Command(this.conn.getNextCommandId(), ovrld.mangleof, SerializedData(serialized)); 1138 1139 if(!this.conn.sendCommand(command)) 1140 throw new Exception("Connection with peer closed"); 1141 return this.conn.waitResponse(command.id, this.timeout); 1142 }(); 1143 1144 if (res.status == Status.Failed) 1145 throw new Exception(res.data.get!string); 1146 1147 if (res.status == Status.ClientFailed) 1148 throw new ClientException( 1149 format("Request to %s couldn't be processed : %s", 1150 __PRETTY_FUNCTION__, res.data.get!string)); 1151 1152 if (res.status == Status.Timeout) 1153 throw new Exception("Request timed-out"); 1154 1155 static if (!is(ReturnType!(ovrld) == void)) 1156 return S.deserialize!(typeof(return))(res.data.getS!S()); 1157 } 1158 }); 1159 } 1160 } 1161 1162 /// Simple usage example 1163 unittest 1164 { 1165 static interface API 1166 { 1167 @safe: 1168 public @property ulong pubkey (); 1169 public string getValue (ulong idx); 1170 public ubyte[32] getQuorumSet (); 1171 public string recv (string data); 1172 } 1173 1174 static class MockAPI : API 1175 { 1176 @safe: 1177 public override @property ulong pubkey () 1178 { return 42; } 1179 public override string getValue (ulong idx) 1180 { assert(0); } 1181 public override ubyte[32] getQuorumSet () 1182 { assert(0); } 1183 public override string recv (string data) 1184 { assert(0); } 1185 } 1186 1187 scope test = RemoteAPI!API.spawn!MockAPI(); 1188 scope (exit) { 1189 test.ctrl.shutdown(); 1190 thread_joinAll(); 1191 } 1192 assert(test.pubkey() == 42); 1193 } 1194 1195 /// Example where a shutdown() routine must be called on a node before 1196 /// its destructor is called 1197 unittest 1198 { 1199 __gshared bool dtor_called; 1200 __gshared bool shutdown_called; 1201 __gshared bool onDestroy_called; 1202 1203 static interface API 1204 { 1205 @safe: 1206 public @property ulong pubkey (); 1207 } 1208 1209 static class MockAPI : API 1210 { 1211 public override @property ulong pubkey () @safe 1212 { return 42; } 1213 public void shutdown () { shutdown_called = true; } 1214 ~this () { dtor_called = true; } 1215 } 1216 1217 static void onDestroy (Object node) 1218 { 1219 assert(!dtor_called); 1220 auto mock = cast(MockAPI)node; 1221 assert(mock !is null); 1222 mock.shutdown(); 1223 onDestroy_called = true; 1224 } 1225 1226 scope test = RemoteAPI!API.spawn!MockAPI(); 1227 scope (failure) test.ctrl.shutdown(); 1228 assert(test.pubkey() == 42); 1229 test.ctrl.shutdown(&onDestroy); 1230 thread_joinAll(); 1231 assert(dtor_called); 1232 assert(onDestroy_called); 1233 assert(shutdown_called); 1234 } 1235 1236 /// In a real world usage, users will most likely need to use the registry 1237 unittest 1238 { 1239 import std.conv; 1240 static import geod24.concurrency; 1241 import geod24.Registry; 1242 1243 static interface API 1244 { 1245 @safe: 1246 public @property ulong pubkey (); 1247 public string getValue (ulong idx); 1248 public string recv (string data); 1249 public string recv (ulong index, string data); 1250 1251 public string last (); 1252 } 1253 1254 __gshared Registry!API registry; 1255 registry.initialize(); 1256 1257 static class Node : API 1258 { 1259 @safe: 1260 public this (bool isByzantine) { this.isByzantine = isByzantine; } 1261 public override @property ulong pubkey () 1262 { lastCall = `pubkey`; return this.isByzantine ? 0 : 42; } 1263 public override string getValue (ulong idx) 1264 { lastCall = `getValue`; return null; } 1265 public override string recv (string data) 1266 { lastCall = `recv@1`; return null; } 1267 public override string recv (ulong index, string data) 1268 { lastCall = `recv@2`; return null; } 1269 1270 public override string last () { return this.lastCall; } 1271 1272 private bool isByzantine; 1273 private string lastCall; 1274 } 1275 1276 static RemoteAPI!API factory (string type, ulong hash) 1277 { 1278 const name = hash.to!string; 1279 auto listener = registry.locate(name); 1280 if (listener !is Listener!API.init) 1281 return new RemoteAPI!API(listener); 1282 1283 switch (type) 1284 { 1285 case "normal": 1286 auto ret = RemoteAPI!API.spawn!Node(false); 1287 registry.register(name, ret.ctrl.listener()); 1288 return ret; 1289 case "byzantine": 1290 auto ret = RemoteAPI!API.spawn!Node(true); 1291 registry.register(name, ret.ctrl.listener()); 1292 return ret; 1293 default: 1294 assert(0, type); 1295 } 1296 } 1297 1298 auto node1 = factory("normal", 1); 1299 auto node2 = factory("byzantine", 2); 1300 1301 static void testFunc() 1302 { 1303 auto node1 = factory("this does not matter", 1); 1304 auto node2 = factory("neither does this", 2); 1305 scope (exit) { 1306 node1.ctrl.shutdown(); 1307 node2.ctrl.shutdown(); 1308 } 1309 assert(node1.pubkey() == 42); 1310 assert(node1.last() == "pubkey"); 1311 assert(node2.pubkey() == 0); 1312 assert(node2.last() == "pubkey"); 1313 1314 node1.recv(42, null); 1315 assert(node1.last() == "recv@2"); 1316 node1.recv(null); 1317 assert(node1.last() == "recv@1"); 1318 assert(node2.last() == "pubkey"); 1319 } 1320 1321 scope thread = new Thread(&testFunc); 1322 thread.start(); 1323 // Make sure our main thread terminates after everyone else 1324 thread_joinAll(); 1325 } 1326 1327 /// This network have different types of nodes in it 1328 unittest 1329 { 1330 import geod24.concurrency; 1331 1332 static interface API 1333 { 1334 @safe: 1335 public @property ulong requests (); 1336 public @property ulong value (); 1337 } 1338 1339 static class MasterNode : API 1340 { 1341 @safe: 1342 public override @property ulong requests() 1343 { 1344 return this.requests_; 1345 } 1346 1347 public override @property ulong value() 1348 { 1349 this.requests_++; 1350 return 42; // Of course 1351 } 1352 1353 private ulong requests_; 1354 } 1355 1356 static class SlaveNode : API 1357 { 1358 @safe: 1359 this(Listener!API masterConn) 1360 { 1361 this.master = new RemoteAPI!API(masterConn); 1362 } 1363 1364 public override @property ulong requests() 1365 { 1366 return this.requests_; 1367 } 1368 1369 public override @property ulong value() 1370 { 1371 this.requests_++; 1372 return master.value(); 1373 } 1374 1375 private API master; 1376 private ulong requests_; 1377 } 1378 1379 RemoteAPI!API[4] nodes; 1380 auto master = RemoteAPI!API.spawn!MasterNode(); 1381 nodes[0] = master; 1382 nodes[1] = RemoteAPI!API.spawn!SlaveNode(master.ctrl.listener()); 1383 nodes[2] = RemoteAPI!API.spawn!SlaveNode(master.ctrl.listener()); 1384 nodes[3] = RemoteAPI!API.spawn!SlaveNode(master.ctrl.listener()); 1385 scope (exit) { 1386 import std.algorithm; 1387 nodes.each!(node => node.ctrl.shutdown()); 1388 thread_joinAll(); 1389 } 1390 1391 foreach (n; nodes) 1392 { 1393 assert(n.requests() == 0); 1394 assert(n.value() == 42); 1395 } 1396 1397 assert(nodes[0].requests() == 4); 1398 1399 foreach (n; nodes[1 .. $]) 1400 { 1401 assert(n.value() == 42); 1402 assert(n.requests() == 2); 1403 } 1404 1405 assert(nodes[0].requests() == 7); 1406 } 1407 1408 /// Support for circular nodes call 1409 unittest 1410 { 1411 static import geod24.concurrency; 1412 1413 static interface API 1414 { 1415 @safe: 1416 public ulong call (ulong count, ulong val); 1417 public void setNext (string name); 1418 } 1419 1420 __gshared Listener!API[string] tbn; 1421 1422 static class Node : API 1423 { 1424 @safe: 1425 public override ulong call (ulong count, ulong val) 1426 { 1427 if (!count) 1428 return val; 1429 return this.next.call(count - 1, val + count); 1430 } 1431 1432 public override void setNext (string name) @trusted 1433 { 1434 this.next = new RemoteAPI!API(tbn[name]); 1435 } 1436 1437 private API next; 1438 } 1439 1440 RemoteAPI!(API)[3] nodes = [ 1441 RemoteAPI!API.spawn!Node(), 1442 RemoteAPI!API.spawn!Node(), 1443 RemoteAPI!API.spawn!Node(), 1444 ]; 1445 scope (exit) { 1446 import std.algorithm; 1447 nodes.each!(node => node.ctrl.shutdown()); 1448 thread_joinAll(); 1449 } 1450 1451 foreach (idx, ref api; nodes) 1452 tbn[format("node%d", idx)] = api.ctrl.listener(); 1453 nodes[0].setNext("node1"); 1454 nodes[1].setNext("node2"); 1455 nodes[2].setNext("node0"); 1456 1457 // 7 level of re-entrancy 1458 assert(210 == nodes[0].call(20, 0)); 1459 } 1460 1461 1462 /// Nodes can start tasks 1463 unittest 1464 { 1465 static import core.thread; 1466 import core.time; 1467 1468 static interface API 1469 { 1470 public void start (); 1471 public ulong getCounter (); 1472 } 1473 1474 static class Node : API 1475 { 1476 public override void start () 1477 { 1478 runTask(&this.task); 1479 } 1480 1481 public override ulong getCounter () 1482 { 1483 scope (exit) this.counter = 0; 1484 return this.counter; 1485 } 1486 1487 private void task () 1488 { 1489 while (true) 1490 { 1491 this.counter++; 1492 sleep(50.msecs); 1493 } 1494 } 1495 1496 private ulong counter; 1497 } 1498 1499 auto node = RemoteAPI!API.spawn!Node(); 1500 scope (exit) { 1501 node.ctrl.shutdown(); 1502 thread_joinAll(); 1503 } 1504 assert(node.getCounter() == 0); 1505 node.start(); 1506 assert(node.getCounter() == 1); 1507 assert(node.getCounter() == 0); 1508 core.thread.Thread.sleep(1.seconds); 1509 // It should be 19 but some machines are very slow 1510 // (e.g. Travis Mac testers) so be safe 1511 assert(node.getCounter() >= 9); 1512 assert(node.getCounter() < 9); 1513 } 1514 1515 // Sane name insurance policy 1516 unittest 1517 { 1518 static interface API 1519 { 1520 public ulong tid (); 1521 } 1522 1523 static class Node : API 1524 { 1525 public override ulong tid () { return 42; } 1526 } 1527 1528 auto node = RemoteAPI!API.spawn!Node(); 1529 scope (exit) { 1530 node.ctrl.shutdown(); 1531 thread_joinAll(); 1532 } 1533 assert(node.tid == 42); 1534 assert(node.ctrl.listener() !is Listener!API.init); 1535 1536 static interface DoesntWork 1537 { 1538 public string ctrl (); 1539 } 1540 static assert(!is(typeof(RemoteAPI!DoesntWork))); 1541 } 1542 1543 // Simulate temporary outage 1544 unittest 1545 { 1546 static interface API 1547 { 1548 public ulong call (); 1549 public void asyncCall (); 1550 } 1551 1552 __gshared Listener!API n1conn; 1553 1554 static class Node : API 1555 { 1556 public this() 1557 { 1558 if (n1conn !is Listener!API.init) 1559 this.remote = new RemoteAPI!API(n1conn); 1560 } 1561 1562 public override ulong call () { return ++this.count; } 1563 public override void asyncCall () { runTask(() => cast(void)this.remote.call); } 1564 size_t count; 1565 RemoteAPI!API remote; 1566 } 1567 1568 auto n1 = RemoteAPI!API.spawn!Node(); 1569 n1conn = n1.ctrl.listener(); 1570 auto n2 = RemoteAPI!API.spawn!Node(); 1571 scope (exit) { 1572 n1.ctrl.shutdown(); 1573 n2.ctrl.shutdown(); 1574 thread_joinAll(); 1575 } 1576 1577 /// Make sure calls are *relatively* efficient 1578 auto current1 = MonoTime.currTime(); 1579 assert(1 == n1.call()); 1580 assert(1 == n2.call()); 1581 auto current2 = MonoTime.currTime(); 1582 assert(current2 - current1 < 200.msecs); 1583 1584 // Make one of the node sleep 1585 n1.sleep(1.seconds); 1586 // Make sure our main thread is not suspended, 1587 // nor is the second node 1588 assert(2 == n2.call()); 1589 auto current3 = MonoTime.currTime(); 1590 assert(current3 - current2 < 400.msecs); 1591 1592 // Wait for n1 to unblock 1593 assert(2 == n1.call()); 1594 // Check current time >= 1 second 1595 auto current4 = MonoTime.currTime(); 1596 assert(current4 - current2 >= 1.seconds); 1597 1598 // Now drop many messages 1599 n1.sleep(1.seconds, true); 1600 auto start = MonoTime.currTime; 1601 while (MonoTime.currTime - start < 900.msecs) 1602 n2.asyncCall(); 1603 Thread.sleep(200.msecs); 1604 assert(3 == n1.call()); 1605 1606 // Debug output, uncomment if needed 1607 version (none) 1608 { 1609 import std.stdio; 1610 writeln("Two non-blocking calls: ", current2 - current1); 1611 writeln("Sleep + non-blocking call: ", current3 - current2); 1612 writeln("Delta since sleep: ", current4 - current2); 1613 } 1614 } 1615 1616 // Filter commands 1617 unittest 1618 { 1619 static interface API 1620 { 1621 size_t fooCount(); 1622 size_t fooIntCount(); 1623 size_t barCount (); 1624 void foo (); 1625 void foo (int); 1626 void bar (int); // not in any overload set 1627 void callBar (int); 1628 void callFoo (); 1629 void callFoo (int); 1630 } 1631 1632 __gshared Listener!API node_listener; 1633 1634 static class Node : API 1635 { 1636 size_t foo_count; 1637 size_t foo_int_count; 1638 size_t bar_count; 1639 RemoteAPI!API remote; 1640 1641 override size_t fooCount() { return this.foo_count; } 1642 override size_t fooIntCount() { return this.foo_int_count; } 1643 override size_t barCount() { return this.bar_count; } 1644 override void foo () { ++this.foo_count; } 1645 override void foo (int) { ++this.foo_int_count; } 1646 override void bar (int) { ++this.bar_count; } // not in any overload set 1647 // This one is part of the overload set of the node, but not of the API 1648 // It can't be accessed via API and can't be filtered out 1649 void bar(string) { assert(0); } 1650 1651 override void callFoo() 1652 { 1653 if (!this.remote) 1654 this.remote = new RemoteAPI!API(node_listener); 1655 1656 try 1657 { 1658 this.remote.foo(); 1659 } 1660 catch (Exception ex) 1661 { 1662 assert(ex.msg == "Filtered method 'foo()'"); 1663 } 1664 } 1665 1666 override void callFoo(int arg) 1667 { 1668 if (!this.remote) 1669 this.remote = new RemoteAPI!API(node_listener); 1670 1671 try 1672 { 1673 this.remote.foo(arg); 1674 } 1675 catch (Exception ex) 1676 { 1677 assert(ex.msg == "Filtered method 'foo(int)'"); 1678 } 1679 } 1680 1681 override void callBar(int arg) 1682 { 1683 if (!this.remote) 1684 this.remote = new RemoteAPI!API(node_listener); 1685 1686 try 1687 { 1688 this.remote.bar(arg); 1689 } 1690 catch (Exception ex) 1691 { 1692 assert(ex.msg == "Filtered method 'bar(int)'"); 1693 } 1694 } 1695 } 1696 1697 auto filtered = RemoteAPI!API.spawn!Node(); 1698 node_listener = filtered.ctrl.listener(); 1699 1700 // caller will call filtered 1701 auto caller = RemoteAPI!API.spawn!Node(); 1702 scope (exit) { 1703 filtered.ctrl.shutdown(); 1704 caller.ctrl.shutdown(); 1705 thread_joinAll(); 1706 } 1707 caller.callFoo(); 1708 assert(filtered.fooCount() == 1); 1709 1710 // both of these work 1711 static assert(is(typeof(filtered.filter!(API.foo)))); 1712 static assert(is(typeof(filtered.filter!(filtered.foo)))); 1713 1714 // only method in the overload set that takes a parameter, 1715 // should still match a call to filter with no parameters 1716 static assert(is(typeof(filtered.filter!(filtered.bar)))); 1717 1718 // wrong parameters => fail to compile 1719 static assert(!is(typeof(filtered.filter!(filtered.bar, float)))); 1720 // Only `API` overload sets are considered 1721 static assert(!is(typeof(filtered.filter!(filtered.bar, string)))); 1722 1723 filtered.filter!(API.foo); 1724 1725 caller.callFoo(); 1726 assert(filtered.fooCount() == 1); // it was not called! 1727 1728 filtered.clearFilter(); // clear the filter 1729 caller.callFoo(); 1730 assert(filtered.fooCount() == 2); // it was called! 1731 1732 // verify foo(int) works first 1733 caller.callFoo(1); 1734 assert(filtered.fooCount() == 2); 1735 assert(filtered.fooIntCount() == 1); // first time called 1736 1737 // now filter only the int overload 1738 filtered.filter!(API.foo, int); 1739 1740 // make sure the parameterless overload is still not filtered 1741 caller.callFoo(); 1742 assert(filtered.fooCount() == 3); // updated 1743 1744 caller.callFoo(1); 1745 assert(filtered.fooIntCount() == 1); // call filtered! 1746 1747 // not filtered yet 1748 caller.callBar(1); 1749 assert(filtered.barCount() == 1); 1750 1751 filtered.filter!(filtered.bar); 1752 caller.callBar(1); 1753 assert(filtered.barCount() == 1); // filtered! 1754 1755 // last blocking calls, to ensure the previous calls complete 1756 filtered.clearFilter(); 1757 caller.foo(); 1758 caller.bar(1); 1759 } 1760 1761 // request timeouts (from main thread) 1762 unittest 1763 { 1764 import core.thread; 1765 import std.exception; 1766 1767 static interface API 1768 { 1769 size_t sleepFor (long dur); 1770 void ping (); 1771 } 1772 1773 static class Node : API 1774 { 1775 override size_t sleepFor (long dur) 1776 { 1777 Thread.sleep(msecs(dur)); 1778 return 42; 1779 } 1780 1781 override void ping () { } 1782 } 1783 1784 // node with no timeout 1785 auto node = RemoteAPI!API.spawn!Node(); 1786 scope (exit) { 1787 node.ctrl.shutdown(); 1788 thread_joinAll(); 1789 } 1790 assert(node.sleepFor(80) == 42); // no timeout 1791 1792 // custom timeout 1793 bool called; 1794 node.ctrl.withTimeout(100.msecs, 1795 (scope API api) { 1796 assertThrown!Exception(api.sleepFor(2000)); 1797 called = true; 1798 }); 1799 assert(called); 1800 1801 called = false; 1802 struct S 1803 { 1804 void opCall (scope API api) 1805 { 1806 assertThrown!Exception(api.sleepFor(2000)); 1807 called = true; 1808 } 1809 } 1810 S s; 1811 node.ctrl.withTimeout(100.msecs, s); 1812 assert(called); 1813 1814 // Test that attributes are inferred based on the delegate 1815 void doTest () @safe nothrow 1816 { 1817 called = false; 1818 node.ctrl.withTimeout(Duration.zero, 1819 (scope API api) { called = true; }); 1820 assert(called); 1821 } 1822 doTest(); 1823 1824 // node with a configured timeout 1825 auto to_node = RemoteAPI!API.spawn!Node(500.msecs); 1826 scope (exit) to_node.ctrl.shutdown(); 1827 1828 /// none of these should time out 1829 assert(to_node.sleepFor(10) == 42); 1830 assert(to_node.sleepFor(20) == 42); 1831 assert(to_node.sleepFor(30) == 42); 1832 assert(to_node.sleepFor(40) == 42); 1833 1834 assertThrown!Exception(to_node.sleepFor(2000)); 1835 to_node.ctrl.withTimeout(3.seconds, // wait for the node to wake up 1836 (scope API api) { api.ping(); }); 1837 } 1838 1839 // test-case for responses to re-used requests (from main thread) 1840 unittest 1841 { 1842 import core.thread; 1843 import std.exception; 1844 1845 static interface API 1846 { 1847 float getFloat(); 1848 size_t sleepFor (long dur); 1849 } 1850 1851 static class Node : API 1852 { 1853 override float getFloat() { return 69.69; } 1854 override size_t sleepFor (long dur) 1855 { 1856 Thread.sleep(msecs(dur)); 1857 return 42; 1858 } 1859 } 1860 1861 // node with no timeout 1862 auto node = RemoteAPI!API.spawn!Node(); 1863 scope (exit) { 1864 node.ctrl.shutdown(); 1865 thread_joinAll(); 1866 } 1867 assert(node.sleepFor(80) == 42); // no timeout 1868 1869 // node with a configured timeout 1870 auto to_node = RemoteAPI!API.spawn!Node(500.msecs); 1871 scope (exit) to_node.ctrl.shutdown(); 1872 /// none of these should time out 1873 assert(to_node.sleepFor(10) == 42); 1874 assert(to_node.sleepFor(20) == 42); 1875 assert(to_node.sleepFor(30) == 42); 1876 assert(to_node.sleepFor(40) == 42); 1877 1878 assertThrown!Exception(to_node.sleepFor(2000)); 1879 to_node.ctrl.withTimeout(3.seconds, // wait for the node to wake up 1880 (scope API api) { assert(cast(int)api.getFloat() == 69); }); 1881 } 1882 1883 // request timeouts (foreign node to another node) 1884 unittest 1885 { 1886 static import geod24.concurrency; 1887 import std.exception; 1888 1889 static interface API 1890 { 1891 void check (); 1892 int ping (); 1893 } 1894 1895 __gshared Listener!API node_listener; 1896 1897 static class Node : API 1898 { 1899 override int ping () { return 42; } 1900 1901 override void check () 1902 { 1903 auto node = new RemoteAPI!API(node_listener, 500.msecs); 1904 1905 // no time-out 1906 node.ctrl.sleep(10.msecs); 1907 assert(node.ping() == 42); 1908 1909 // time-out 1910 node.ctrl.sleep(2000.msecs); 1911 assertThrown!Exception(node.ping()); 1912 } 1913 } 1914 1915 auto node_1 = RemoteAPI!API.spawn!Node(5.seconds); 1916 auto node_2 = RemoteAPI!API.spawn!Node(); 1917 scope (exit) { 1918 node_1.ctrl.shutdown(); 1919 node_2.ctrl.shutdown(); 1920 thread_joinAll(); 1921 } 1922 node_listener = node_2.ctrl.listener(); 1923 node_1.check(); 1924 } 1925 1926 // test-case for zombie responses 1927 unittest 1928 { 1929 static import geod24.concurrency; 1930 import std.exception; 1931 1932 static interface API 1933 { 1934 void check (); 1935 int get42 (); 1936 int get69 (); 1937 } 1938 1939 __gshared Listener!API node_listener; 1940 1941 static class Node : API 1942 { 1943 override int get42 () { return 42; } 1944 override int get69 () { return 69; } 1945 1946 override void check () 1947 { 1948 auto node = new RemoteAPI!API(node_listener, 500.msecs); 1949 1950 // time-out 1951 node.ctrl.sleep(2000.msecs); 1952 assertThrown!Exception(node.get42()); 1953 1954 // no time-out 1955 node.ctrl.sleep(10.msecs); 1956 assert(node.get69() == 69); 1957 } 1958 } 1959 1960 auto node_1 = RemoteAPI!API.spawn!Node(5.seconds); 1961 auto node_2 = RemoteAPI!API.spawn!Node(); 1962 scope (exit) { 1963 node_1.ctrl.shutdown(); 1964 node_2.ctrl.shutdown(); 1965 thread_joinAll(); 1966 } 1967 node_listener = node_2.ctrl.listener(); 1968 node_1.check(); 1969 } 1970 1971 // request timeouts with dropped messages 1972 unittest 1973 { 1974 static import geod24.concurrency; 1975 import std.exception; 1976 1977 static interface API 1978 { 1979 void check (); 1980 int ping (); 1981 } 1982 1983 __gshared Listener!API node_listener; 1984 1985 static class Node : API 1986 { 1987 override int ping () { return 42; } 1988 1989 override void check () 1990 { 1991 auto node = new RemoteAPI!API(node_listener, 420.msecs); 1992 1993 // Requests are dropped, so it times out 1994 assert(node.ping() == 42); 1995 node.ctrl.sleep(10.msecs, true); 1996 assertThrown!Exception(node.ping()); 1997 } 1998 } 1999 2000 auto node_1 = RemoteAPI!API.spawn!Node(5.seconds); 2001 auto node_2 = RemoteAPI!API.spawn!Node(); 2002 scope (exit) { 2003 node_1.ctrl.shutdown(); 2004 node_2.ctrl.shutdown(); 2005 thread_joinAll(); 2006 } 2007 node_listener = node_2.ctrl.listener(); 2008 node_1.check(); 2009 } 2010 2011 2012 // Test a node that gets a replay while it's delayed 2013 unittest 2014 { 2015 static import geod24.concurrency; 2016 import std.exception; 2017 2018 static interface API 2019 { 2020 void check (); 2021 int ping (); 2022 } 2023 2024 __gshared Listener!API node_listener; 2025 2026 static class Node : API 2027 { 2028 override int ping () { return 42; } 2029 2030 override void check () 2031 { 2032 auto node = new RemoteAPI!API(node_listener, 5000.msecs); 2033 assert(node.ping() == 42); 2034 // We need to return immediately so that the main thread 2035 // puts us to sleep 2036 runTask(() { 2037 node.ctrl.sleep(200.msecs); 2038 assert(node.ping() == 42); 2039 }); 2040 } 2041 } 2042 2043 auto node_1 = RemoteAPI!API.spawn!Node(500.msecs); 2044 auto node_2 = RemoteAPI!API.spawn!Node(); 2045 scope (exit) { 2046 node_1.ctrl.shutdown(); 2047 node_2.ctrl.shutdown(); 2048 thread_joinAll(); 2049 } 2050 node_listener = node_2.ctrl.listener(); 2051 node_1.check(); 2052 node_1.ctrl.sleep(300.msecs); 2053 assert(node_1.ping() == 42); 2054 } 2055 2056 // Test explicit shutdown 2057 unittest 2058 { 2059 import std.exception; 2060 2061 static interface API 2062 { 2063 int myping (int value); 2064 } 2065 2066 static class Node : API 2067 { 2068 override int myping (int value) 2069 { 2070 return value; 2071 } 2072 } 2073 2074 auto node = RemoteAPI!API.spawn!Node(1.seconds); 2075 scope (failure) node.ctrl.shutdown(); 2076 assert(node.myping(42) == 42); 2077 node.ctrl.shutdown(); 2078 thread_joinAll(); 2079 2080 try 2081 { 2082 node.myping(69); 2083 assert(0); 2084 } 2085 catch (Exception ex) 2086 { 2087 assert(ex.msg == "Connection with peer closed"); 2088 } 2089 } 2090 2091 unittest 2092 { 2093 import core.thread : thread_joinAll; 2094 static import geod24.concurrency; 2095 2096 static interface API 2097 { 2098 void segfault (); 2099 void check (); 2100 } 2101 2102 __gshared Listener!API node_listener; 2103 2104 static class Node : API 2105 { 2106 override void segfault () 2107 { 2108 int* ptr; *ptr = 1; 2109 } 2110 2111 override void check () 2112 { 2113 auto node = new RemoteAPI!API(node_listener); 2114 2115 // We need to return immediately so that the main thread can continue testing 2116 runTask(() { 2117 node.ctrl.sleep(500.msecs); 2118 // node may have shutdown at this point 2119 try { 2120 node.segfault(); 2121 } catch (Exception e) {} 2122 }); 2123 } 2124 } 2125 2126 auto node_1 = RemoteAPI!API.spawn!Node(5.seconds); 2127 auto node_2 = RemoteAPI!API.spawn!Node(); 2128 scope (exit) { 2129 node_2.ctrl.shutdown(); // shut it down before wake-up, segfault() command will be ignored 2130 node_1.ctrl.shutdown(); 2131 thread_joinAll(); 2132 } 2133 node_listener = node_2.ctrl.listener(); 2134 node_1.check(); 2135 } 2136 2137 /// Example of a custom (de)serialization policy 2138 unittest 2139 { 2140 static struct Serialize 2141 { 2142 static: 2143 public immutable(ubyte[]) serialize (T) (auto ref T value) @trusted 2144 { 2145 static assert(is(typeof({ T v = immutable(T).init; }))); 2146 static if (is(T : const(ubyte)[])) 2147 return value.idup; 2148 else 2149 return (cast(ubyte*)&value)[0 .. T.sizeof].idup; 2150 } 2151 2152 public QT deserialize (QT) (immutable(ubyte)[] data) @trusted 2153 { 2154 return *cast(QT*)(data.dup.ptr); 2155 } 2156 } 2157 2158 static struct ValueType 2159 { 2160 ulong v1; 2161 uint v2; 2162 uint v3; 2163 } 2164 2165 static interface API 2166 { 2167 @safe: 2168 public @property ulong pubkey (); 2169 public ValueType getValue (string val); 2170 // Note: Vibe.d's JSON serializer cannot serialize this 2171 public immutable(ubyte[32]) getHash (const ubyte[] val); 2172 } 2173 2174 static class MockAPI : API 2175 { 2176 @safe: 2177 public override @property ulong pubkey () { return 42; } 2178 public override ValueType getValue (string val) { return ValueType(val.length, 2, 3); } 2179 public override immutable(ubyte[32]) getHash (const ubyte[] val) 2180 { 2181 return val.length >= 32 ? val[0 .. 32] : typeof(return).init; 2182 } 2183 } 2184 2185 scope test = RemoteAPI!(API, Serialize).spawn!MockAPI(); 2186 scope (exit) { 2187 test.ctrl.shutdown(); 2188 thread_joinAll(); 2189 } 2190 assert(test.pubkey() == 42); 2191 assert(test.getValue("Hello world") == ValueType(11, 2, 3)); 2192 ubyte[64] val = 42; 2193 assert(test.getHash(val) == val[0 .. 32]); 2194 } 2195 2196 /// Test node2 responding to a dead node1 2197 /// See https://github.com/Geod24/localrest/issues/64 2198 unittest 2199 { 2200 static interface API 2201 { 2202 @safe: 2203 // Main thread calls this on the first node 2204 public void call0 (); 2205 // ... which then calls this on the second node 2206 public void call1 (); 2207 public void call2 (); 2208 } 2209 2210 __gshared Listener!API node1Addr; 2211 __gshared Listener!API node2Addr; 2212 2213 static class Node : API 2214 { 2215 private RemoteAPI!API self; 2216 2217 @trusted: 2218 // Main -> Node 1 2219 public override void call0 () 2220 { 2221 this.self = new RemoteAPI!API(node1Addr); 2222 scope node2 = new RemoteAPI!API(node2Addr); 2223 node2.call1(); 2224 assert(0, "This should never return as call2 shutdown this node"); 2225 } 2226 2227 // Node 1 -> Node 2 2228 public override void call1 () 2229 { 2230 assert(this.self is null); 2231 scope node1 = new RemoteAPI!API(node1Addr); 2232 node1.call2(); 2233 // Make really sure Node 1 is dead 2234 while (!node1Addr.data.isClosed()) 2235 sleep(100.msecs); 2236 } 2237 2238 // Node 2 -> Node 1 2239 public override void call2 () 2240 { 2241 assert(this.self !is null); 2242 this.self.ctrl.shutdown(); 2243 } 2244 } 2245 2246 // Long timeout to ensure we don't spuriously pass 2247 auto node1 = RemoteAPI!API.spawn!Node(500.msecs); 2248 auto node2 = RemoteAPI!API.spawn!Node(); 2249 scope (exit) { 2250 node1.ctrl.shutdown(); 2251 node2.ctrl.shutdown(); 2252 thread_joinAll(); 2253 } 2254 node1Addr = node1.ctrl.listener(); 2255 node2Addr = node2.ctrl.listener(); 2256 2257 // This will timeout (because the node will be gone) 2258 // However if something is wrong, either `joinall` will never return, 2259 // or the `assert(0)` in `call0` will be triggered. 2260 try 2261 { 2262 node1.call0(); 2263 assert(0, "This should have timed out"); 2264 } 2265 catch (Exception e) {} 2266 } 2267 2268 /// Test Timer 2269 unittest 2270 { 2271 static import core.thread; 2272 import core.time; 2273 2274 static interface API 2275 { 2276 public void startTimer (bool periodic); 2277 public void stopTimer (); 2278 public ulong getCounter (); 2279 public void resetCounter (); 2280 } 2281 2282 static class Node : API 2283 { 2284 private ulong counter; 2285 private Timer timer; 2286 2287 public override void startTimer (bool periodic) 2288 { 2289 this.timer = setTimer(100.msecs, &callback, periodic); 2290 } 2291 2292 public override void stopTimer () 2293 { 2294 this.timer.stop(); 2295 } 2296 2297 public void callback () 2298 { 2299 this.counter++; 2300 if (this.counter == 3) 2301 this.timer.stop(); 2302 } 2303 2304 public override ulong getCounter () 2305 { 2306 scope (exit) this.counter = 0; 2307 return this.counter; 2308 } 2309 2310 public override void resetCounter () 2311 { 2312 this.counter = 0; 2313 } 2314 } 2315 2316 auto node = RemoteAPI!API.spawn!Node(); 2317 scope (exit) { 2318 node.ctrl.shutdown(); 2319 thread_joinAll(); 2320 } 2321 assert(node.getCounter() == 0); 2322 node.startTimer(true); 2323 core.thread.Thread.sleep(1.seconds); 2324 // The expected count is 3 2325 // Check means the timer repeated and the timer stoped 2326 assert(node.getCounter() == 3); 2327 node.resetCounter(); 2328 node.startTimer(false); 2329 node.stopTimer(); 2330 core.thread.Thread.sleep(500.msecs); 2331 assert(node.getCounter() == 0); 2332 } 2333 2334 /// Test restarting a node 2335 unittest 2336 { 2337 static interface API 2338 { 2339 public uint[2] getCount () @safe; 2340 } 2341 2342 static class Node : API 2343 { 2344 private static uint instantiationCount; 2345 private static uint destructionCount; 2346 2347 this () 2348 { 2349 Node.instantiationCount++; 2350 } 2351 2352 ~this () 2353 { 2354 Node.destructionCount++; 2355 } 2356 2357 public override uint[2] getCount () const @safe 2358 { 2359 return [ Node.instantiationCount, Node.destructionCount, ]; 2360 } 2361 } 2362 2363 auto node = RemoteAPI!API.spawn!Node(); 2364 scope (exit) { 2365 node.ctrl.shutdown(); 2366 thread_joinAll(); 2367 } 2368 assert(node.getCount == [1, 0]); 2369 node.ctrl.restart(); 2370 assert(node.getCount == [2, 1]); 2371 node.ctrl.restart(); 2372 assert(node.getCount == [3, 2]); 2373 } 2374 2375 /// Test restarting a node that has responses waiting for it 2376 unittest 2377 { 2378 import core.atomic : atomicLoad, atomicStore; 2379 static interface API 2380 { 2381 @safe: 2382 public void call0 (); 2383 public void call1 (); 2384 } 2385 2386 __gshared Listener!API node2Addr; 2387 static shared bool done; 2388 2389 static class Node : API 2390 { 2391 @trusted: 2392 2393 public override void call0 () 2394 { 2395 scope node2 = new RemoteAPI!API(node2Addr); 2396 node2.call1(); 2397 } 2398 2399 public override void call1 () 2400 { 2401 // when this event runs we know call1() has already returned 2402 scheduler.schedule({ atomicStore(done, true); }); 2403 } 2404 } 2405 2406 auto node1 = RemoteAPI!API.spawn!Node(500.msecs); 2407 auto node2 = RemoteAPI!API.spawn!Node(); 2408 scope (exit) { 2409 node1.ctrl.shutdown(); 2410 node2.ctrl.shutdown(); 2411 thread_joinAll(); 2412 } 2413 node2Addr = node2.ctrl.listener(); 2414 node2.ctrl.sleep(2.seconds, false); 2415 2416 try 2417 { 2418 node1.call0(); 2419 assert(0, "This should have timed out"); 2420 } 2421 catch (Exception e) {} 2422 2423 node1.ctrl.restart(); 2424 2425 // after a while node 1 will receive a response to the timed-out request 2426 // to call1(), but the node restarted and is no longer interested in this 2427 // request (the request map / LocalScheduler is different), so it's filtered 2428 size_t count; 2429 while (!atomicLoad(done)) 2430 { 2431 assert(count < 300); // up to 3 seconds wait 2432 count++; 2433 Thread.sleep(10.msecs); 2434 } 2435 } 2436 2437 unittest 2438 { 2439 import geod24.concurrency; 2440 2441 static interface API 2442 { 2443 public void start (); 2444 public int getValue (); 2445 } 2446 2447 static class Node : API 2448 { 2449 int value; 2450 2451 public override void start () 2452 { 2453 // if this is a scoped delegate, it might not have a closure, 2454 // and when the task is resumed again it will segfault. 2455 // therefore runTask() must take a non-scope delegate. 2456 // note: once upstream issue #20868 is fixed, it would become 2457 // a compiler error to escape a scope delegate. 2458 runTask( 2459 { 2460 value = 1; 2461 FiberScheduler.yield(); 2462 value = 2; 2463 }); 2464 } 2465 2466 public override int getValue () { return this.value; } 2467 } 2468 2469 auto node = RemoteAPI!API.spawn!Node(); 2470 scope (exit) { 2471 node.ctrl.shutdown(); 2472 thread_joinAll(); 2473 } 2474 node.start(); 2475 assert(node.getValue() == 2); 2476 } 2477 2478 /// Situation: Calling a node with an interface that doesn't exists 2479 /// Expectation: The client throws an exception with a useful error message 2480 /// This can happen by mistake (API mixup) or when a method is optional. 2481 unittest 2482 { 2483 import std.exception : assertThrown; 2484 2485 static interface BaseAPI 2486 { 2487 public int required (); 2488 } 2489 2490 static interface APIExtended : BaseAPI 2491 { 2492 public int optional (); 2493 } 2494 2495 static class BaseNode : BaseAPI 2496 { 2497 public override int required () { return 42; } 2498 } 2499 2500 auto node = RemoteAPI!BaseAPI.spawn!BaseNode(); 2501 scope (exit) { 2502 node.ctrl.shutdown(); 2503 thread_joinAll(); 2504 } 2505 // Note: Now that the `Listener` is typed, this kind of error is harder 2506 // to make. However, it might still happen in the wild 2507 // (e.g. true client/server interfacing where to sources get out of date) 2508 scope extnode = new RemoteAPI!APIExtended( 2509 Listener!APIExtended(node.ctrl.listener().data)); 2510 assert(extnode.required() == 42); 2511 assertThrown!ClientException(extnode.optional()); 2512 } 2513 2514 /// test that runTask works in the constructor 2515 unittest 2516 { 2517 __gshared bool called; 2518 static interface API 2519 { 2520 @safe: 2521 public @property ulong pubkey (); 2522 } 2523 2524 static class MockAPI : API 2525 { 2526 @safe: 2527 this () @trusted 2528 { 2529 runTask(&callMe); 2530 } 2531 2532 void callMe () @trusted 2533 { 2534 called = true; 2535 } 2536 2537 public override @property ulong pubkey () { return 42; } 2538 } 2539 2540 scope test = RemoteAPI!API.spawn!MockAPI(); 2541 scope (exit) 2542 { 2543 test.ctrl.shutdown(); 2544 thread_joinAll(); 2545 } 2546 assert(test.pubkey() == 42); 2547 assert(called); 2548 } 2549 2550 // Self connection deadlock scenario 2551 // without the proper fix, this test should hang 2552 unittest 2553 { 2554 import core.atomic; 2555 2556 static interface API 2557 { 2558 @safe: 2559 public void call0 (); 2560 public int get44 (); 2561 } 2562 2563 __gshared Listener!API node1Addr; 2564 2565 static class Node : API 2566 { 2567 private RemoteAPI!API self; 2568 2569 @trusted: 2570 public override void call0 () 2571 { 2572 this.self = new RemoteAPI!API(atomicLoad(node1Addr)); 2573 this.self.ctrl.sleep(1.seconds); 2574 assert(this.self.get44() == 44); 2575 } 2576 2577 public override int get44 () 2578 { 2579 return 44; 2580 } 2581 } 2582 2583 auto node1 = RemoteAPI!API.spawn!Node(); 2584 scope (exit) 2585 { 2586 node1.ctrl.shutdown(); 2587 thread_joinAll(); 2588 } 2589 2590 atomicStore(node1Addr, node1.ctrl.listener()); 2591 node1.call0(); 2592 }