1 /*******************************************************************************
2 
3     Provides utilities to mock a network in unittests
4 
5     This module is based on the idea that D `interface`s can be used
6     to represent a server's API, and that D `class` inheriting this `interface`
7     are used to define the server's business code,
8     abstracting away the communication layer.
9 
10     For example, a server that exposes an API to concatenate two strings would
11     define the following code:
12     ---
13     interface API { public string concat (string a, string b); }
14     class Server : API
15     {
16         public override string concat (string a, string b)
17         {
18             return a ~ b;
19         }
20     }
21     ---
22 
23     Then one can use "generators" to define how multiple process communicate
24     together. One such generator, that pioneered this design is `vibe.web.rest`,
25     which allows to expose such servers as REST APIs.
26 
27     `localrest` is another generator, which uses message passing and threads
28     to create a local "network".
29     The motivation was to create a testing library that could be used to
30     model a network at a much cheaper cost than spawning processes
31     (and containers) would be, when doing integration tests.
32 
33     Control_Interface:
34     When instantiating a `RemoteAPI`, one has the ability to call foreign
35     implementations through auto-generated `override`s of the `interface`.
36     In addition to that, as this library is intended for testing,
37     a few extra functionalities are offered under a control interface,
38     accessible under the `ctrl` namespace in the instance.
39     The control interface allows to make the node unresponsive to one or all
40     methods, for some defined time or until unblocked, as well as trigger
41     shutdowns or restart. See the methods for more details.
42     The `withTimeout` control method can be used to spawn a scoped copy
43     of the RemoteAPI with a custom configured timeout. The user-provided
44     delegate will be called with this scoped copy that uses the new timeout.
45 
46     Shutdown:
47     The control interface has a shutdown method that can be used to terminate
48     a node gracefully. When the shutdown request is handled by the node,
49     the event loop will exit and the thread will terminate. While the destructor
50     of the node will be called, it might not usable for some actions, for example
51     because D destructors may not allocate GC memory, or one may want
52     to perform some test-specific operations, such a logging some data in case of failure.
53     Therefore, you may provide a shutdown routine in the call to `shutdown`.
54     It must accept a single argument of the interface type, and will be called
55     with the implementation object just before the node is destroyed.
56     If this routine throws, LocalRest will log the error in the console and
57     proceed with destroying the stack-allocated node.
58     Note that control requests are asynchronous, hence requests from the node
59     might be processed / send by the node until the request is actually processed.
60     There is also a `restart` method which accepts the same callback argument.
61 
62     Event_Loop:
63     Server process usually needs to perform some action in an asynchronous way.
64     Additionally, some actions needs to be completed at a semi-regular interval,
65     for example based on a timer.
66     For those use cases, a node should call `runTask` or `sleep`, respectively.
67     Note that this has the same name (and purpose) as Vibe.d's core primitives.
68     Users should only ever call Vibe's `runTask` / `sleep` with `vibe.web.rest`,
69     or only call LocalRest's `runTask` / `sleep` with `RemoteAPI`.
70 
71     Implementation:
72     In order for tests to simulate an asynchronous system accurately,
73     multiple nodes need to be able to run concurrently and asynchronously.
74 
75     There are two common solutions to this, to use either fibers or threads.
76     Fibers have the advantage of being simpler to implement and predictable.
77     Threads have the advantage of more accurately describing an asynchronous
78     system and thus have the ability to uncover more issues.
79 
80     When spawning a node, a thread is spawned, a node is instantiated with
81     the provided arguments, and an event loop waits for messages sent
82     to the Tid. Messages consist of the sender's Tid, the mangled name
83     of the function to call (to support overloading) and the arguments,
84     serialized as a JSON string.
85 
86     Note:
87     While this module's original motivation was to test REST nodes,
88     the only dependency to Vibe.d is actually to it's JSON module,
89     as Vibe.d is the only available JSON module known to the author
90     to provide an interface to deserialize composite types.
91     This dependency is however not enforced by the dub project file,
92     as users can provide their own serializer (see `geod24.Serialization`).
93     If the default parameter for serialization is never used,
94     one's project need not depend on `vibe-d:data`.
95 
96     Author:         Mathias 'Geod24' Lang
97     License:        MIT (See LICENSE.txt)
98     Copyright:      Copyright (c) 2018-2019 Mathias Lang. All rights reserved.
99 
100 *******************************************************************************/
101 
102 module geod24.LocalRest;
103 
104 static import C = geod24.concurrency;
105 import geod24.Serialization;
106 import std.datetime.systime : Clock, SysTime;
107 import std.format;
108 import std.meta : AliasSeq;
109 import std.traits : fullyQualifiedName, Parameters, ReturnType;
110 
111 import core.thread;
112 import core.time;
113 
114 /// Request / Response ID
115 private struct ID
116 {
117     /// A node may restart, in which case it will spawn a new request scheduler
118     size_t sched_id;
119     /// In order to support re-entrancy, every request contains an id
120     /// which should be copied in the `Response`
121     /// Initialized to `size_t.max` so not setting it crashes the program
122     size_t id = size_t.max;
123 }
124 
125 /// Data sent by the caller
126 private struct Command
127 {
128     /// Tid of the sender thread (cannot be JSON serialized)
129     C.Tid sender;
130     /// ID used for request re-entrancy (See ID definition)
131     ID id;
132     /// Method to call
133     string method;
134     /// Serialized arguments to the method
135     SerializedData args;
136 }
137 
138 /// Ask the node to exhibit a certain behavior for a given time
139 private struct TimeCommand
140 {
141     /// For how long our remote node apply this behavior
142     Duration dur;
143     /// Whether or not affected messages should be dropped
144     bool drop = false;
145 }
146 
147 /// Ask the node to shut down
148 private struct ShutdownCommand (API)
149 {
150     /// Any callback to call before the Node's destructor is called
151     void function (API) callback;
152 
153     /// Whether we're restarting or really shutting down
154     bool restart;
155 }
156 
157 /// Filter out requests before they reach a node
158 private struct FilterAPI
159 {
160     /// the mangled symbol name of the function to filter
161     string func_mangleof;
162 
163     /// used for debugging
164     string pretty_func;
165 }
166 
167 /// Status of a request
168 private enum Status
169 {
170     /// Request failed
171     Failed,
172 
173     /// The request failed to to a client error (4xx style error code)
174     ClientFailed,
175 
176     /// Request timed-out
177     Timeout,
178 
179     /// Request succeeded
180     Success
181 }
182 
183 /// Data sent by the callee back to the caller
184 private struct Response
185 {
186     /// Final status of a request (failed, timeout, success, etc)
187     Status status;
188     /// ID used for request re-entrancy (See ID definition)
189     ID id;
190     /// If `status == Status.Success`, the serialized return value.
191     /// Otherwise, it contains `Exception.toString()`.
192     SerializedData data;
193 }
194 
195 /// Thrown when the sent request is faulty (e.g. 4xx HTTP error types)
196 public class ClientException : Exception
197 {
198     /// Constructor
199     public this (string msg,
200         string file = __FILE__, int line = __LINE__, Exception next = null)
201         @safe pure nothrow
202     {
203         super(msg, file, line, next);
204     }
205 }
206 
207 /// Simple exception unwind the stack when we need to terminate/restart,
208 /// used by `spawn` but kept here as it doesn't depend on template parameters.
209 private final class ExitException : Exception
210 {
211     public bool restart;
212 
213     this () @safe pure nothrow @nogc
214     {
215         super("You should never see this exception - please report a bug");
216     }
217 }
218 
219 /// Wrapper for data inside of `spawn`, kept here as it doesn't
220 /// depend on template parameters
221 private struct Variant
222 {
223     pure nothrow @nogc:
224 
225     public this (Response res) @trusted
226     {
227         this.res = res;
228         this.tag = 0;
229     }
230     public this (Command cmd) @trusted
231     {
232         this.cmd = cmd;
233         this.tag = 1;
234     }
235 
236     union
237     {
238         Response res;
239         Command cmd;
240     }
241 
242     public ubyte tag;
243 }
244 
245 /// Used by `spawn` for recording current filtering / sleeping setting
246 private struct Control
247 {
248     public FilterAPI filter;    // filter specific messages
249     public SysTime sleep_until; // sleep until this time
250     public bool drop;           // drop messages if sleeping
251 
252     public bool isSleeping() const @safe /* nothrow: Not on Windows (currTime) */
253     {
254         return this.sleep_until != SysTime.init
255             && Clock.currTime < this.sleep_until;
256     }
257 }
258 
259 /// Simple wrapper to deal with tuples
260 /// Vibe.d might emit a pragma(msg) when T.length == 0
261 private struct ArgWrapper (T...)
262 {
263     static if (T.length == 0)
264         size_t dummy;
265     T args;
266 }
267 
268 /// Our own little scheduler
269 private final class LocalScheduler : C.FiberScheduler
270 {
271     import core.sync.condition;
272     import core.sync.mutex;
273 
274     /// Just a FiberBinarySemaphore with a state
275     private struct Waiting { FiberBinarySemaphore s; bool busy; }
276 
277     /// The 'Response' we are currently processing, if any
278     private Response pending;
279 
280     /// Request IDs waiting for a response
281     private Waiting[ID] waiting;
282 
283     /// scheduler ID
284     private size_t sched_id;
285 
286     /// Initialize this scheduler and its unique ID
287     public this () @safe @nogc nothrow
288     {
289         static size_t last_idx;
290         this.sched_id = last_idx++;
291     }
292 
293     /// Get the next available request ID
294     public ID getNextResponseId ()
295     {
296         static size_t last_idx;
297         return ID(this.sched_id, last_idx++);
298     }
299 
300     public Response waitResponse (ID id, Duration duration) nothrow
301     {
302         if (id !in this.waiting)
303             this.waiting[id] = Waiting(new FiberBinarySemaphore, false);
304 
305         Waiting* ptr = &this.waiting[id];
306         if (ptr.busy)
307             assert(0, "Trying to override a pending request");
308 
309         // We yield and wait for an answer
310         ptr.busy = true;
311 
312         if (duration == Duration.init)
313             ptr.s.wait();
314         else if (!ptr.s.wait(duration))
315             this.pending = Response(Status.Timeout, this.pending.id);
316 
317         ptr.busy = false;
318         // After control returns to us, `pending` has been filled
319         scope(exit) this.pending = Response.init;
320         return this.pending;
321     }
322 
323     /// Called when a waiting condition was handled and can be safely removed
324     public void remove (ID id)
325     {
326         this.waiting.remove(id);
327     }
328 }
329 
330 
331 /// We need a scheduler to simulate an event loop and to be re-entrant
332 private LocalScheduler scheduler;
333 
334 /// Whether this is the main thread
335 private bool is_main_thread;
336 
337 
338 /*******************************************************************************
339 
340     Provide eventloop-like functionalities
341 
342     Since nodes instantiated via this modules are Vibe.d server,
343     they expect the ability to run an asynchronous task ,
344     usually provided by `vibe.core.core : runTask`.
345 
346     In order for them to properly work, we need to integrate them to our event
347     loop by providing the ability to spawn a task, and wait on some condition,
348     optionally with a timeout.
349 
350     The following functions do that.
351     Note that those facilities are not available from the main thread,
352     while is supposed to do tests and doesn't have a scheduler.
353 
354 *******************************************************************************/
355 
356 public void runTask (void delegate() dg) nothrow
357 {
358     assert(scheduler !is null, "Cannot call this function from the main thread");
359     scheduler.spawn(dg);
360 }
361 
362 /// Ditto
363 public void sleep (Duration timeout) nothrow
364 {
365     assert(scheduler !is null, "Cannot call this function from the main thread");
366     scope sem = scheduler..new FiberBinarySemaphore();
367     sem.wait(timeout);
368 }
369 
370 /*******************************************************************************
371 
372     Run an asynchronous task after a given time.
373 
374     The task will first run after the given `timeout`, and
375     can either repeat or run only once (the default).
376     Works similarly to Vibe.d's `setTimer`.
377 
378     Params:
379         timeout = Determines the minimum amount of time that elapses before
380             the timer fires.
381         dg = If non-null, this delegate will be called when the timer fires
382         periodic = Speficies if the timer fires repeatedly or only once
383 
384     Returns:
385         A `Timer` instance with the ability to control the timer
386 
387 *******************************************************************************/
388 
389 public Timer setTimer (Duration timeout, void delegate() dg,
390     bool periodic = false) nothrow
391 {
392     assert(scheduler !is null, "Cannot call this delegate from the main thread");
393     assert(dg !is null, "Cannot call this delegate if null");
394 
395     Timer timer = new Timer(timeout, dg, periodic);
396     scheduler.schedule(&timer.run);
397     return timer;
398 }
399 
400 /// Simple timer
401 public final class Timer
402 {
403     private Duration timeout;
404     private void delegate () dg;
405     // Whether this timer is repeating
406     private bool periodic;
407     // Whether this timer was stopped
408     private bool stopped;
409 
410     public this (Duration timeout, void delegate() dg, bool periodic) @safe nothrow
411     {
412         this.timeout = timeout;
413         this.dg = dg;
414         this.periodic = periodic;
415         this.stopped = false;
416     }
417 
418     // Run a delegate after timeout, and until this.periodic is false
419     private void run ()
420     {
421         do
422         {
423             sleep(timeout);
424             if (this.stopped)
425                 return;
426             dg();
427         } while (this.periodic);
428     }
429 
430     /// Stop the timer. The next time this timer's fiber wakes up
431     /// it will exit the run() function.
432     public void stop () @safe nothrow
433     {
434         this.stopped = true;
435         this.periodic = false;
436     }
437 }
438 
439 /*******************************************************************************
440 
441     A reference to an alread-instantiated node
442 
443     This class serves the same purpose as a `RestInterfaceClient`:
444     it is a client for an already instantiated rest `API` interface.
445 
446     In order to instantiate a new server (in a remote thread), use the static
447     `spawn` function.
448 
449     Serialization:
450       In order to support custom serialization policy, one can change the
451       `Serializer` parameter. This parameter is expected to be either a
452       template or an aggregate with two static methods, but no explicit
453       limitation is put on the type.
454       See `geod24.Serialization`'s documentation for more informations.
455 
456     Params:
457       API = The interface defining the API to implement
458       S = An aggregate which follows the requirement explained above.
459 
460 *******************************************************************************/
461 
462 public final class RemoteAPI (API, alias S = VibeJSONSerializer!()) : API
463 {
464     static assert (!serializerInvalidReason!(S).length, serializerInvalidReason!S);
465 
466     /***************************************************************************
467 
468         Instantiate a node and start it
469 
470         This is usually called from the main thread, which will start all the
471         nodes and then start to process request.
472         In order to have a connected network, no nodes in any thread should have
473         a different reference to the same node.
474         In practice, this means there should only be one `Tid` per "address".
475 
476         Note:
477           When the `RemoteAPI` returned by this function is finalized,
478           the child thread will be shut down.
479 
480         Params:
481           Impl = Type of the implementation to instantiate
482           args = Arguments to the object's constructor
483           timeout = (optional) timeout to use with requests
484           file = Path to the file that called this function (for diagnostic)
485           line = Line number tied to the `file` parameter
486 
487         Returns:
488           A `RemoteAPI` owning the node reference
489 
490     ***************************************************************************/
491 
492     public static RemoteAPI spawn (Impl) (
493         CtorParams!Impl args, Duration timeout = Duration.init,
494         string file = __FILE__, int line = __LINE__)
495     {
496         auto childTid = C.spawn(&spawned!(Impl), file, line, args);
497         return new RemoteAPI(childTid, timeout);
498     }
499 
500     /// Helper template to get the constructor's parameters
501     private static template CtorParams (Impl)
502     {
503         static if (is(typeof(Impl.__ctor)))
504             private alias CtorParams = Parameters!(Impl.__ctor);
505         else
506             private alias CtorParams = AliasSeq!();
507     }
508 
509     /***************************************************************************
510 
511         Handler function
512 
513         Performs the dispatch from `cmd` to the proper `node` function,
514         provided the function is not filtered.
515 
516         Params:
517             cmd    = the command to run (contains the method name and the arguments)
518             node   = the node to invoke the method on
519             filter = used for filtering API calls (returns default response)
520 
521     ***************************************************************************/
522 
523     private static void handleCommand (Command cmd, API node, FilterAPI filter)
524     {
525         switch (cmd.method)
526         {
527             static foreach (member; __traits(allMembers, API))
528             static foreach (ovrld; __traits(getOverloads, API, member))
529             {
530                 mixin(
531                 q{
532                     case `%2$s`:
533                     Response res = Response(Status.Failed, cmd.id);
534 
535                     // Provide informative message in case of filtered method
536                     if (cmd.method == filter.func_mangleof)
537                         res.data = SerializedData(format("Filtered method '%%s'", filter.pretty_func));
538                     else
539                     {
540                         auto args = S.deserialize!(ArgWrapper!(Parameters!ovrld))(
541                             cmd.args.getS!S);
542 
543                         try
544                         {
545                             static if (!is(ReturnType!ovrld == void))
546                                 res.data = SerializedData(S.serialize(node.%1$s(args.args)));
547                             else
548                                 node.%1$s(args.args);
549                             res.status = Status.Success;
550                         }
551                         catch (Exception e)
552                         {
553                             res.status = Status.ClientFailed;
554                             res.data = SerializedData(e.toString());
555                         }
556                     }
557 
558                     C.trySend(cmd.sender, res);
559                     return;
560                 }.format(member, ovrld.mangleof));
561             }
562         default:
563             C.trySend(cmd.sender,
564                       Response(Status.ClientFailed, cmd.id,
565                                SerializedData("Method not found")));
566         }
567     }
568 
569     /***************************************************************************
570 
571         Main dispatch function
572 
573        This function receive string-serialized messages from the calling thread,
574        which is a struct with the sender's Tid, the method's mangleof,
575        and the method's arguments as a tuple, serialized to a JSON string.
576 
577        `geod24.concurrency.receive` is not `@safe`, so neither is this.
578 
579        Params:
580            Implementation = Type of the implementation to instantiate
581            self = The channel on which to "listen" to receive new "connections"
582            file = Path to the file that spawned this node
583            line = Line number in the `file` that spawned this node
584            cargs = Arguments to `Implementation`'s constructor
585 
586     ***************************************************************************/
587 
588     private static void spawned (Implementation) (
589         C.Tid self, string file, int line, CtorParams!Implementation cargs)
590         nothrow
591     {
592         import std.algorithm : each;
593         import std.range;
594 
595         scope exc = new ExitException();
596 
597         void runNode ()
598         {
599             scope node = new Implementation(cargs);
600             scheduler = new LocalScheduler;
601 
602             // Control the node behavior
603             Control control;
604 
605             // we need to keep track of messages which were ignored when
606             // node.sleep() was used, and then handle each message in sequence.
607             Variant[] await_msgs;
608 
609             void handle (T)(T arg)
610             {
611                 static if (is(T == Command))
612                 {
613                     scheduler.spawn(() => handleCommand(arg, node, control.filter));
614                 }
615                 else static if (is(T == Response))
616                 {
617                     // response for a previous LocalScheduler instance
618                     if (arg.id.sched_id != scheduler.sched_id)
619                         return;
620 
621                     scheduler.pending = arg;
622                     scheduler.waiting[arg.id].s.notify();
623                     scheduler.remove(arg.id);
624                 }
625                 else static assert(0, "Unhandled type: " ~ T.stringof);
626             }
627 
628             scheduler.start(() {
629                 while (1)
630                 {
631                     C.receiveTimeout(self, 10.msecs,
632                         (ShutdownCommand!API e)
633                         {
634                             if (e.callback !is null)
635                                 e.callback(node);
636                             exc.restart = e.restart;
637                             throw exc;
638                         },
639                         (TimeCommand s) {
640                             control.sleep_until = Clock.currTime + s.dur;
641                             control.drop = s.drop;
642                         },
643                         (FilterAPI filter_api) {
644                             control.filter = filter_api;
645                         },
646                         (Response res) {
647                             if (!control.isSleeping())
648                                 handle(res);
649                             else if (!control.drop)
650                                 await_msgs ~= Variant(res);
651                         },
652                         (Command cmd)
653                         {
654                             if (!control.isSleeping())
655                                 handle(cmd);
656                             else if (!control.drop)
657                                 await_msgs ~= Variant(cmd);
658                         });
659 
660                     // now handle any leftover messages after any sleep() call
661                     if (!control.isSleeping())
662                     {
663                         await_msgs.each!(msg => msg.tag == 0 ? handle(msg.res) : handle(msg.cmd));
664                         await_msgs.length = 0;
665                         assumeSafeAppend(await_msgs);
666                     }
667                 }
668             });
669         }
670 
671         try
672         {
673             while (true)
674             {
675                 try runNode();
676                 // We use this exception to exit the event loop
677                 catch (ExitException e)
678                 {
679                     if (!e.restart)
680                         break;
681                 }
682             }
683         }
684         catch (Throwable t)
685         {
686             import core.stdc.stdio, std.stdio;
687             printf("#### FATAL ERROR: %.*s\n", cast(int) t.msg.length, t.msg.ptr);
688             printf("This node was started at %.*s:%d\n",
689                    cast(int) file.length, file.ptr, line);
690             printf("This most likely means that the node crashed due to an uncaught exception\n");
691             printf("If not, please file a bug at https://github.com/Geod24/localrest/\n");
692 
693             try writeln("Full error: ", t);
694             catch (Exception e) { /* Nothing more we can do at this point */ }
695         }
696     }
697 
698     /// Where to send message to
699     private C.Tid childTid;
700 
701     /// Timeout to use when issuing requests
702     private const Duration timeout;
703 
704     /***************************************************************************
705 
706         Create an instante of a client
707 
708         This connects to an already instantiated node.
709         In order to instantiate a node, see the static `spawn` function.
710 
711         Params:
712           tid = `geod24.concurrency.Tid` of the node.
713                 This can usually be obtained by `geod24.concurrency.locate`.
714           timeout = any timeout to use
715 
716     ***************************************************************************/
717 
718     public this (C.Tid tid, Duration timeout = Duration.init)
719         @safe @nogc pure nothrow
720     {
721         this.childTid = tid;
722         this.timeout = timeout;
723     }
724 
725     /***************************************************************************
726 
727         Introduce a namespace to avoid name clashes
728 
729         The only way we have a name conflict is if someone exposes `ctrl`,
730         in which case they will be served an error along the following line:
731         LocalRest.d(...): Error: function `RemoteAPI!(...).ctrl` conflicts
732         with mixin RemoteAPI!(...).ControlInterface!() at LocalRest.d(...)
733 
734     ***************************************************************************/
735 
736     public mixin ControlInterface!() ctrl;
737 
738     /// Ditto
739     private mixin template ControlInterface ()
740     {
741         /***********************************************************************
742 
743             Returns the `Tid` this `RemoteAPI` wraps
744 
745             This can be useful for calling `geod24.concurrency.register` or similar.
746             Note that the `Tid` should not be used directly, as our event loop,
747             would error out on an unknown message.
748 
749         ***********************************************************************/
750 
751         public C.Tid tid () @nogc pure nothrow
752         {
753             return this.childTid;
754         }
755 
756         /***********************************************************************
757 
758             Send an async message to the thread to immediately shut down.
759 
760             Params:
761                 callback = if not null, the callback to call in the Node's
762                            thread before the Node is destroyed. Can be used
763                            for cleanup / logging routines.
764 
765         ***********************************************************************/
766 
767         public void shutdown (void function (API) callback = null)
768             @trusted
769         {
770             C.send(this.childTid, ShutdownCommand!API(callback, false));
771         }
772 
773         /***********************************************************************
774 
775             Send an async message to the thread to immediately restart.
776 
777             Note that further non-control messages to the node will block until
778             the node is back "online".
779 
780             Params:
781                 callback = if not null, the callback to call in the Node's
782                            thread before the Node is destroyed, but before
783                            it is restarted. Can be used for cleanup or logging.
784 
785         ***********************************************************************/
786 
787         public void restart (void function (API) callback = null)
788             @trusted
789         {
790             C.send(this.childTid, ShutdownCommand!API(callback, true));
791         }
792 
793         /***********************************************************************
794 
795             Make the remote node sleep for `Duration`
796 
797             The remote node will call `Thread.sleep`, becoming completely
798             unresponsive, potentially having multiple tasks hanging.
799             This is useful to simulate a delay or a network outage.
800 
801             Params:
802               delay = Duration the node will sleep for
803               dropMessages = Whether to process the pending requests when the
804                              node come back online (the default), or to drop
805                              pending traffic
806 
807         ***********************************************************************/
808 
809         public void sleep (Duration d, bool dropMessages = false) @trusted
810         {
811             C.send(this.childTid, TimeCommand(d, dropMessages));
812         }
813 
814         /***********************************************************************
815 
816             Filter any requests issued to the provided method.
817 
818             Calling the API endpoint will throw an exception,
819             therefore the request will fail.
820 
821             Use via:
822 
823             ----
824             interface API { void call(); }
825             class C : API { void call() { } }
826             auto obj = new RemoteAPI!API(...);
827             obj.filter!(API.call);
828             ----
829 
830             To match a specific overload of a method, specify the
831             parameters to match against in the call. For example:
832 
833             ----
834             interface API { void call(int); void call(int, float); }
835             class C : API { void call(int) {} void call(int, float) {} }
836             auto obj = new RemoteAPI!API(...);
837             obj.filter!(API.call, int, float);  // only filters the second overload
838             ----
839 
840             Params:
841               method = the API method for which to filter out requests
842               OverloadParams = (optional) the parameters to match against
843                   to select an overload. Note that if the method has no other
844                   overloads, then even if that method takes parameters and
845                   OverloadParams is empty, it will match that method
846                   out of convenience.
847 
848         ***********************************************************************/
849 
850         public void filter (alias method, OverloadParams...) () @trusted
851         {
852             enum method_name = __traits(identifier, method);
853 
854             // return the mangled name of the matching overload
855             template getBestMatch (T...)
856             {
857                 static if (is(Parameters!(T[0]) == OverloadParams))
858                 {
859                     enum getBestMatch = T[0].mangleof;
860                 }
861                 else static if (T.length > 0)
862                 {
863                     enum getBestMatch = getBestMatch!(T[1 .. $]);
864                 }
865                 else
866                 {
867                     static assert(0,
868                         format("Couldn't select best overload of '%s' for " ~
869                         "parameter types: %s",
870                         method_name, OverloadParams.stringof));
871                 }
872             }
873 
874             // ensure it's used with API.method, *not* RemoteAPI.method which
875             // is an override of API.method. Otherwise mangling won't match!
876             // special-case: no other overloads, and parameter list is empty:
877             // just select that one API method
878             alias Overloads = __traits(getOverloads, API, method_name);
879             static if (Overloads.length == 1 && OverloadParams.length == 0)
880             {
881                 immutable pretty = method_name ~ Parameters!(Overloads[0]).stringof;
882                 enum mangled = Overloads[0].mangleof;
883             }
884             else
885             {
886                 immutable pretty = format("%s%s", method_name, OverloadParams.stringof);
887                 enum mangled = getBestMatch!Overloads;
888             }
889 
890             C.send(this.childTid, FilterAPI(mangled, pretty));
891         }
892 
893 
894         /***********************************************************************
895 
896             Clear out any filtering set by a call to filter()
897 
898         ***********************************************************************/
899 
900         public void clearFilter () @trusted
901         {
902             C.send(this.childTid, FilterAPI(""));
903         }
904 
905         /***********************************************************************
906 
907             Call the provided delegate with a custom timeout
908 
909             This allow to perform requests on a client with a different timeout,
910             usually to allow some requests (e.g. initialization calls) to have longer
911             timeout, or no timeout at all, or to put a timeout on an otherwise
912             timeout-less client (e.g. when calling the actual test which could fail).
913 
914             To disable timeout, pass the special value `Duration.zero`
915             (or `0.seconds`, `0.msecs`, etc...).
916 
917             Params:
918                 timeout = the new timeout to use
919                 dg = the delegate to call with the new scoped RemoteAPI copy
920 
921         ***********************************************************************/
922 
923         public void withTimeout (Dg) (Duration timeout, scope Dg dg)
924         {
925             scope api = new RemoteAPI(this.childTid, timeout);
926             static assert(is(typeof({ dg(api); })),
927                           "Provided argument of type `" ~ Dg.stringof ~
928                           "` is not callable with argument type `scope " ~
929                           fullyQualifiedName!API ~ "`");
930             dg(api);
931         }
932     }
933 
934     // Vibe.d mandates that method must be @safe
935     @safe:
936 
937     /***************************************************************************
938 
939         Generate the API `override` which forward to the actual object
940 
941     ***************************************************************************/
942 
943     static foreach (member; __traits(allMembers, API))
944         static foreach (ovrld; __traits(getOverloads, API, member))
945         {
946             mixin(q{
947                 override ReturnType!(ovrld) } ~ member ~ q{ (Parameters!ovrld params)
948                 {
949                     // we are in the main thread
950                     if (scheduler is null)
951                     {
952                         scheduler = new LocalScheduler;
953                         is_main_thread = true;
954                     }
955 
956                     // `geod24.concurrency.send/receive[Only]` is not `@safe` but
957                     // this overload needs to be
958                     auto res = () @trusted {
959                         auto serialized = S.serialize(ArgWrapper!(Parameters!ovrld)(params));
960 
961                         auto command = Command(C.thisTid(), scheduler.getNextResponseId(), ovrld.mangleof,
962                                                SerializedData(serialized));
963 
964                         // If the node already shut down, its MessageBox will be
965                         // closed. Detect it and notify the user.
966                         // Note that it might be expected that the remote died.
967                         if (!C.trySend(this.childTid, command))
968                             throw new Exception("Connection with peer closed");
969 
970                         // for the main thread, we run the "event loop" until
971                         // the request we're interested in receives a response.
972                         if (is_main_thread)
973                         {
974                             bool terminated = false;
975                             runTask(() {
976                                 while (!terminated)
977                                 {
978                                     C.receiveTimeout(C.thisTid(), 10.msecs,
979                                         (Response res) {
980                                             scheduler.pending = res;
981                                             scheduler.waiting[res.id].s.notify();
982                                         });
983 
984                                     scheduler.yield();
985                                 }
986                             });
987 
988                             Response res;
989                             scheduler.start(() {
990                                 res = scheduler.waitResponse(command.id, this.timeout);
991                                 terminated = true;
992                             });
993                             return res;
994                         }
995                         else
996                         {
997                             return scheduler.waitResponse(command.id, this.timeout);
998                         }
999                     }();
1000 
1001                     if (res.status == Status.Failed)
1002                         throw new Exception(res.data.get!string);
1003 
1004                     if (res.status == Status.ClientFailed)
1005                         throw new ClientException(
1006                             format("Request to %s couldn't be processed : %s",
1007                                    __PRETTY_FUNCTION__, res.data.get!string));
1008 
1009                     if (res.status == Status.Timeout)
1010                         throw new Exception("Request timed-out");
1011 
1012                     static if (!is(ReturnType!(ovrld) == void))
1013                         return S.deserialize!(typeof(return))(res.data.getS!S());
1014                 }
1015                 });
1016         }
1017 }
1018 
1019 /// Simple usage example
1020 unittest
1021 {
1022     static interface API
1023     {
1024         @safe:
1025         public @property ulong pubkey ();
1026         public string getValue (ulong idx);
1027         public ubyte[32] getQuorumSet ();
1028         public string recv (string data);
1029     }
1030 
1031     static class MockAPI : API
1032     {
1033         @safe:
1034         public override @property ulong pubkey ()
1035         { return 42; }
1036         public override string getValue (ulong idx)
1037         { assert(0); }
1038         public override ubyte[32] getQuorumSet ()
1039         { assert(0); }
1040         public override string recv (string data)
1041         { assert(0); }
1042     }
1043 
1044     scope test = RemoteAPI!API.spawn!MockAPI();
1045     assert(test.pubkey() == 42);
1046     test.ctrl.shutdown();
1047     thread_joinAll();
1048 }
1049 
1050 /// Example where a shutdown() routine must be called on a node before
1051 /// its destructor is called
1052 unittest
1053 {
1054     __gshared bool dtor_called;
1055     __gshared bool shutdown_called;
1056     __gshared bool onDestroy_called;
1057 
1058     static interface API
1059     {
1060         @safe:
1061         public @property ulong pubkey ();
1062     }
1063 
1064     static class MockAPI : API
1065     {
1066         public override @property ulong pubkey () @safe
1067         { return 42; }
1068         public void shutdown () { shutdown_called = true; }
1069         ~this () { dtor_called = true; }
1070     }
1071 
1072     static void onDestroy (API node)
1073     {
1074         assert(!dtor_called);
1075         auto mock = cast(MockAPI)node;
1076         assert(mock !is null);
1077         mock.shutdown();
1078         onDestroy_called = true;
1079     }
1080 
1081     scope test = RemoteAPI!API.spawn!MockAPI();
1082     assert(test.pubkey() == 42);
1083     test.ctrl.shutdown(&onDestroy);
1084     thread_joinAll();
1085     // ctr.shutdown call is asynchronous
1086     assert(dtor_called);
1087     assert(onDestroy_called);
1088     assert(shutdown_called);
1089 }
1090 
1091 /// In a real world usage, users will most likely need to use the registry
1092 unittest
1093 {
1094     import std.conv;
1095     static import geod24.concurrency;
1096     import geod24.Registry;
1097 
1098     __gshared Registry registry;
1099     registry.initialize();
1100 
1101     static interface API
1102     {
1103         @safe:
1104         public @property ulong pubkey ();
1105         public string getValue (ulong idx);
1106         public string recv (string data);
1107         public string recv (ulong index, string data);
1108 
1109         public string last ();
1110     }
1111 
1112     static class Node : API
1113     {
1114         @safe:
1115         public this (bool isByzantine) { this.isByzantine = isByzantine; }
1116         public override @property ulong pubkey ()
1117         { lastCall = `pubkey`; return this.isByzantine ? 0 : 42; }
1118         public override string getValue (ulong idx)
1119         { lastCall = `getValue`; return null; }
1120         public override string recv (string data)
1121         { lastCall = `recv@1`; return null; }
1122         public override string recv (ulong index, string data)
1123         { lastCall = `recv@2`; return null; }
1124 
1125         public override string last () { return this.lastCall; }
1126 
1127         private bool isByzantine;
1128         private string lastCall;
1129     }
1130 
1131     static RemoteAPI!API factory (string type, ulong hash)
1132     {
1133         const name = hash.to!string;
1134         auto tid = registry.locate(name);
1135         if (tid != tid.init)
1136             return new RemoteAPI!API(tid);
1137 
1138         switch (type)
1139         {
1140         case "normal":
1141             auto ret =  RemoteAPI!API.spawn!Node(false);
1142             registry.register(name, ret.tid());
1143             return ret;
1144         case "byzantine":
1145             auto ret =  RemoteAPI!API.spawn!Node(true);
1146             registry.register(name, ret.tid());
1147             return ret;
1148         default:
1149             assert(0, type);
1150         }
1151     }
1152 
1153     auto node1 = factory("normal", 1);
1154     auto node2 = factory("byzantine", 2);
1155 
1156     static void testFunc()
1157     {
1158         auto node1 = factory("this does not matter", 1);
1159         auto node2 = factory("neither does this", 2);
1160         assert(node1.pubkey() == 42);
1161         assert(node1.last() == "pubkey");
1162         assert(node2.pubkey() == 0);
1163         assert(node2.last() == "pubkey");
1164 
1165         node1.recv(42, null);
1166         assert(node1.last() == "recv@2");
1167         node1.recv(null);
1168         assert(node1.last() == "recv@1");
1169         assert(node2.last() == "pubkey");
1170         node1.ctrl.shutdown();
1171         node2.ctrl.shutdown();
1172     }
1173 
1174     scope thread = new Thread(&testFunc);
1175     thread.start();
1176     // Make sure our main thread terminates after everyone else
1177     thread_joinAll();
1178 }
1179 
1180 /// This network have different types of nodes in it
1181 unittest
1182 {
1183     import geod24.concurrency;
1184 
1185     static interface API
1186     {
1187         @safe:
1188         public @property ulong requests ();
1189         public @property ulong value ();
1190     }
1191 
1192     static class MasterNode : API
1193     {
1194         @safe:
1195         public override @property ulong requests()
1196         {
1197             return this.requests_;
1198         }
1199 
1200         public override @property ulong value()
1201         {
1202             this.requests_++;
1203             return 42; // Of course
1204         }
1205 
1206         private ulong requests_;
1207     }
1208 
1209     static class SlaveNode : API
1210     {
1211         @safe:
1212         this(Tid masterTid)
1213         {
1214             this.master = new RemoteAPI!API(masterTid);
1215         }
1216 
1217         public override @property ulong requests()
1218         {
1219             return this.requests_;
1220         }
1221 
1222         public override @property ulong value()
1223         {
1224             this.requests_++;
1225             return master.value();
1226         }
1227 
1228         private API master;
1229         private ulong requests_;
1230     }
1231 
1232     RemoteAPI!API[4] nodes;
1233     auto master = RemoteAPI!API.spawn!MasterNode();
1234     nodes[0] = master;
1235     nodes[1] = RemoteAPI!API.spawn!SlaveNode(master.tid());
1236     nodes[2] = RemoteAPI!API.spawn!SlaveNode(master.tid());
1237     nodes[3] = RemoteAPI!API.spawn!SlaveNode(master.tid());
1238 
1239     foreach (n; nodes)
1240     {
1241         assert(n.requests() == 0);
1242         assert(n.value() == 42);
1243     }
1244 
1245     assert(nodes[0].requests() == 4);
1246 
1247     foreach (n; nodes[1 .. $])
1248     {
1249         assert(n.value() == 42);
1250         assert(n.requests() == 2);
1251     }
1252 
1253     assert(nodes[0].requests() == 7);
1254     import std.algorithm;
1255     nodes.each!(node => node.ctrl.shutdown());
1256     thread_joinAll();
1257 }
1258 
1259 /// Support for circular nodes call
1260 unittest
1261 {
1262     static import geod24.concurrency;
1263 
1264     __gshared C.Tid[string] tbn;
1265 
1266     static interface API
1267     {
1268         @safe:
1269         public ulong call (ulong count, ulong val);
1270         public void setNext (string name);
1271     }
1272 
1273     static class Node : API
1274     {
1275         @safe:
1276         public override ulong call (ulong count, ulong val)
1277         {
1278             if (!count)
1279                 return val;
1280             return this.next.call(count - 1, val + count);
1281         }
1282 
1283         public override void setNext (string name) @trusted
1284         {
1285             this.next = new RemoteAPI!API(tbn[name]);
1286         }
1287 
1288         private API next;
1289     }
1290 
1291     RemoteAPI!(API)[3] nodes = [
1292         RemoteAPI!API.spawn!Node(),
1293         RemoteAPI!API.spawn!Node(),
1294         RemoteAPI!API.spawn!Node(),
1295     ];
1296 
1297     foreach (idx, ref api; nodes)
1298         tbn[format("node%d", idx)] = api.tid();
1299     nodes[0].setNext("node1");
1300     nodes[1].setNext("node2");
1301     nodes[2].setNext("node0");
1302 
1303     // 7 level of re-entrancy
1304     assert(210 == nodes[0].call(20, 0));
1305 
1306     import std.algorithm;
1307     nodes.each!(node => node.ctrl.shutdown());
1308     thread_joinAll();
1309 }
1310 
1311 
1312 /// Nodes can start tasks
1313 unittest
1314 {
1315     static import core.thread;
1316     import core.time;
1317 
1318     static interface API
1319     {
1320         public void start ();
1321         public ulong getCounter ();
1322     }
1323 
1324     static class Node : API
1325     {
1326         public override void start ()
1327         {
1328             runTask(&this.task);
1329         }
1330 
1331         public override ulong getCounter ()
1332         {
1333             scope (exit) this.counter = 0;
1334             return this.counter;
1335         }
1336 
1337         private void task ()
1338         {
1339             while (true)
1340             {
1341                 this.counter++;
1342                 sleep(50.msecs);
1343             }
1344         }
1345 
1346         private ulong counter;
1347     }
1348 
1349     auto node = RemoteAPI!API.spawn!Node();
1350     assert(node.getCounter() == 0);
1351     node.start();
1352     assert(node.getCounter() == 1);
1353     assert(node.getCounter() == 0);
1354     core.thread.Thread.sleep(1.seconds);
1355     // It should be 19 but some machines are very slow
1356     // (e.g. Travis Mac testers) so be safe
1357     assert(node.getCounter() >= 9);
1358     assert(node.getCounter() == 0);
1359     node.ctrl.shutdown();
1360     thread_joinAll();
1361 }
1362 
1363 // Sane name insurance policy
1364 unittest
1365 {
1366     import geod24.concurrency : Tid;
1367 
1368     static interface API
1369     {
1370         public ulong tid ();
1371     }
1372 
1373     static class Node : API
1374     {
1375         public override ulong tid () { return 42; }
1376     }
1377 
1378     auto node = RemoteAPI!API.spawn!Node();
1379     assert(node.tid == 42);
1380     assert(node.ctrl.tid != Tid.init);
1381 
1382     static interface DoesntWork
1383     {
1384         public string ctrl ();
1385     }
1386     static assert(!is(typeof(RemoteAPI!DoesntWork)));
1387     node.ctrl.shutdown();
1388     thread_joinAll();
1389 }
1390 
1391 // Simulate temporary outage
1392 unittest
1393 {
1394     __gshared C.Tid n1tid;
1395 
1396     static interface API
1397     {
1398         public ulong call ();
1399         public void asyncCall ();
1400     }
1401     static class Node : API
1402     {
1403         public this()
1404         {
1405             if (n1tid != C.Tid.init)
1406                 this.remote = new RemoteAPI!API(n1tid);
1407         }
1408 
1409         public override ulong call () { return ++this.count; }
1410         public override void  asyncCall () { runTask(() => cast(void)this.remote.call); }
1411         size_t count;
1412         RemoteAPI!API remote;
1413     }
1414 
1415     auto n1 = RemoteAPI!API.spawn!Node();
1416     n1tid = n1.tid();
1417     auto n2 = RemoteAPI!API.spawn!Node();
1418 
1419     /// Make sure calls are *relatively* efficient
1420     auto current1 = MonoTime.currTime();
1421     assert(1 == n1.call());
1422     assert(1 == n2.call());
1423     auto current2 = MonoTime.currTime();
1424     assert(current2 - current1 < 200.msecs);
1425 
1426     // Make one of the node sleep
1427     n1.sleep(1.seconds);
1428     // Make sure our main thread is not suspended,
1429     // nor is the second node
1430     assert(2 == n2.call());
1431     auto current3 = MonoTime.currTime();
1432     assert(current3 - current2 < 400.msecs);
1433 
1434     // Wait for n1 to unblock
1435     assert(2 == n1.call());
1436     // Check current time >= 1 second
1437     auto current4 = MonoTime.currTime();
1438     assert(current4 - current2 >= 1.seconds);
1439 
1440     // Now drop many messages
1441     n1.sleep(1.seconds, true);
1442     // Github Action runs out of memory with MessageCount == 500
1443     version (Windows) enum MessageCount = 100;
1444     else              enum MessageCount = 500;
1445     for (size_t i = 0; i < MessageCount; i++)
1446         n2.asyncCall();
1447     // Make sure we don't end up blocked forever
1448     n1.sleep(0.seconds, false);
1449     assert(3 == n1.call());
1450 
1451     // Debug output, uncomment if needed
1452     version (none)
1453     {
1454         import std.stdio;
1455         writeln("Two non-blocking calls: ", current2 - current1);
1456         writeln("Sleep + non-blocking call: ", current3 - current2);
1457         writeln("Delta since sleep: ", current4 - current2);
1458     }
1459 
1460     n1.ctrl.shutdown();
1461     n2.ctrl.shutdown();
1462     thread_joinAll();
1463 }
1464 
1465 // Filter commands
1466 unittest
1467 {
1468     __gshared C.Tid node_tid;
1469 
1470     static interface API
1471     {
1472         size_t fooCount();
1473         size_t fooIntCount();
1474         size_t barCount ();
1475         void foo ();
1476         void foo (int);
1477         void bar (int);  // not in any overload set
1478         void callBar (int);
1479         void callFoo ();
1480         void callFoo (int);
1481     }
1482 
1483     static class Node : API
1484     {
1485         size_t foo_count;
1486         size_t foo_int_count;
1487         size_t bar_count;
1488         RemoteAPI!API remote;
1489 
1490         public this()
1491         {
1492             this.remote = new RemoteAPI!API(node_tid);
1493         }
1494 
1495         override size_t fooCount() { return this.foo_count; }
1496         override size_t fooIntCount() { return this.foo_int_count; }
1497         override size_t barCount() { return this.bar_count; }
1498         override void foo () { ++this.foo_count; }
1499         override void foo (int) { ++this.foo_int_count; }
1500         override void bar (int) { ++this.bar_count; }  // not in any overload set
1501         // This one is part of the overload set of the node, but not of the API
1502         // It can't be accessed via API and can't be filtered out
1503         void bar(string) { assert(0); }
1504 
1505         override void callFoo()
1506         {
1507             try
1508             {
1509                 this.remote.foo();
1510             }
1511             catch (Exception ex)
1512             {
1513                 assert(ex.msg == "Filtered method 'foo()'");
1514             }
1515         }
1516 
1517         override void callFoo(int arg)
1518         {
1519             try
1520             {
1521                 this.remote.foo(arg);
1522             }
1523             catch (Exception ex)
1524             {
1525                 assert(ex.msg == "Filtered method 'foo(int)'");
1526             }
1527         }
1528 
1529         override void callBar(int arg)
1530         {
1531             try
1532             {
1533                 this.remote.bar(arg);
1534             }
1535             catch (Exception ex)
1536             {
1537                 assert(ex.msg == "Filtered method 'bar(int)'");
1538             }
1539         }
1540     }
1541 
1542     auto filtered = RemoteAPI!API.spawn!Node();
1543     node_tid = filtered.tid();
1544 
1545     // caller will call filtered
1546     auto caller = RemoteAPI!API.spawn!Node();
1547     caller.callFoo();
1548     assert(filtered.fooCount() == 1);
1549 
1550     // both of these work
1551     static assert(is(typeof(filtered.filter!(API.foo))));
1552     static assert(is(typeof(filtered.filter!(filtered.foo))));
1553 
1554     // only method in the overload set that takes a parameter,
1555     // should still match a call to filter with no parameters
1556     static assert(is(typeof(filtered.filter!(filtered.bar))));
1557 
1558     // wrong parameters => fail to compile
1559     static assert(!is(typeof(filtered.filter!(filtered.bar, float))));
1560     // Only `API` overload sets are considered
1561     static assert(!is(typeof(filtered.filter!(filtered.bar, string))));
1562 
1563     filtered.filter!(API.foo);
1564 
1565     caller.callFoo();
1566     assert(filtered.fooCount() == 1);  // it was not called!
1567 
1568     filtered.clearFilter();  // clear the filter
1569     caller.callFoo();
1570     assert(filtered.fooCount() == 2);  // it was called!
1571 
1572     // verify foo(int) works first
1573     caller.callFoo(1);
1574     assert(filtered.fooCount() == 2);
1575     assert(filtered.fooIntCount() == 1);  // first time called
1576 
1577     // now filter only the int overload
1578     filtered.filter!(API.foo, int);
1579 
1580     // make sure the parameterless overload is still not filtered
1581     caller.callFoo();
1582     assert(filtered.fooCount() == 3);  // updated
1583 
1584     caller.callFoo(1);
1585     assert(filtered.fooIntCount() == 1);  // call filtered!
1586 
1587     // not filtered yet
1588     caller.callBar(1);
1589     assert(filtered.barCount() == 1);
1590 
1591     filtered.filter!(filtered.bar);
1592     caller.callBar(1);
1593     assert(filtered.barCount() == 1);  // filtered!
1594 
1595     // last blocking calls, to ensure the previous calls complete
1596     filtered.clearFilter();
1597     caller.foo();
1598     caller.bar(1);
1599 
1600     filtered.ctrl.shutdown();
1601     caller.ctrl.shutdown();
1602     thread_joinAll();
1603 }
1604 
1605 // request timeouts (from main thread)
1606 unittest
1607 {
1608     import core.thread;
1609     import std.exception;
1610 
1611     static interface API
1612     {
1613         size_t sleepFor (long dur);
1614         void ping ();
1615     }
1616 
1617     static class Node : API
1618     {
1619         override size_t sleepFor (long dur)
1620         {
1621             Thread.sleep(msecs(dur));
1622             return 42;
1623         }
1624 
1625         override void ping () { }
1626     }
1627 
1628     // node with no timeout
1629     auto node = RemoteAPI!API.spawn!Node();
1630     assert(node.sleepFor(80) == 42);  // no timeout
1631 
1632     // custom timeout
1633     bool called;
1634     node.ctrl.withTimeout(100.msecs,
1635         (scope API api) {
1636             assertThrown!Exception(api.sleepFor(2000));
1637             called = true;
1638         });
1639     assert(called);
1640 
1641     called = false;
1642     struct S
1643     {
1644         void opCall (scope API api)
1645         {
1646             assertThrown!Exception(api.sleepFor(2000));
1647             called = true;
1648         }
1649     }
1650     S s;
1651     node.ctrl.withTimeout(100.msecs, s);
1652     assert(called);
1653 
1654     // Test that attributes are inferred based on the delegate
1655     void doTest () @safe pure nothrow @nogc
1656     {
1657         called = false;
1658         node.ctrl.withTimeout(Duration.zero,
1659                               (scope API api) { called = true; });
1660         assert(called);
1661     }
1662     doTest();
1663 
1664     // node with a configured timeout
1665     auto to_node = RemoteAPI!API.spawn!Node(500.msecs);
1666 
1667     /// none of these should time out
1668     assert(to_node.sleepFor(10) == 42);
1669     assert(to_node.sleepFor(20) == 42);
1670     assert(to_node.sleepFor(30) == 42);
1671     assert(to_node.sleepFor(40) == 42);
1672 
1673     assertThrown!Exception(to_node.sleepFor(2000));
1674     to_node.ctrl.withTimeout(3.seconds,  // wait for the node to wake up
1675           (scope API api) { api.ping(); });
1676 
1677     to_node.ctrl.shutdown();
1678     node.ctrl.shutdown();
1679     thread_joinAll();
1680 }
1681 
1682 // test-case for responses to re-used requests (from main thread)
1683 unittest
1684 {
1685     import core.thread;
1686     import std.exception;
1687 
1688     static interface API
1689     {
1690         float getFloat();
1691         size_t sleepFor (long dur);
1692     }
1693 
1694     static class Node : API
1695     {
1696         override float getFloat() { return 69.69; }
1697         override size_t sleepFor (long dur)
1698         {
1699             Thread.sleep(msecs(dur));
1700             return 42;
1701         }
1702     }
1703 
1704     // node with no timeout
1705     auto node = RemoteAPI!API.spawn!Node();
1706     assert(node.sleepFor(80) == 42);  // no timeout
1707 
1708     // node with a configured timeout
1709     auto to_node = RemoteAPI!API.spawn!Node(500.msecs);
1710 
1711     /// none of these should time out
1712     assert(to_node.sleepFor(10) == 42);
1713     assert(to_node.sleepFor(20) == 42);
1714     assert(to_node.sleepFor(30) == 42);
1715     assert(to_node.sleepFor(40) == 42);
1716 
1717     assertThrown!Exception(to_node.sleepFor(2000));
1718     to_node.ctrl.withTimeout(3.seconds,  // wait for the node to wake up
1719       (scope API api) { assert(cast(int)api.getFloat() == 69); });
1720     to_node.ctrl.shutdown();
1721     node.ctrl.shutdown();
1722     thread_joinAll();
1723 }
1724 
1725 // request timeouts (foreign node to another node)
1726 unittest
1727 {
1728     static import geod24.concurrency;
1729     import std.exception;
1730 
1731     __gshared C.Tid node_tid;
1732 
1733     static interface API
1734     {
1735         void check ();
1736         int ping ();
1737     }
1738 
1739     static class Node : API
1740     {
1741         override int ping () { return 42; }
1742 
1743         override void check ()
1744         {
1745             auto node = new RemoteAPI!API(node_tid, 500.msecs);
1746 
1747             // no time-out
1748             node.ctrl.sleep(10.msecs);
1749             assert(node.ping() == 42);
1750 
1751             // time-out
1752             node.ctrl.sleep(2000.msecs);
1753             assertThrown!Exception(node.ping());
1754         }
1755     }
1756 
1757     auto node_1 = RemoteAPI!API.spawn!Node();
1758     auto node_2 = RemoteAPI!API.spawn!Node();
1759     node_tid = node_2.tid;
1760     node_1.check();
1761     node_1.ctrl.shutdown();
1762     node_2.ctrl.shutdown();
1763     thread_joinAll();
1764 }
1765 
1766 // test-case for zombie responses
1767 unittest
1768 {
1769     static import geod24.concurrency;
1770     import std.exception;
1771 
1772     __gshared C.Tid node_tid;
1773 
1774     static interface API
1775     {
1776         void check ();
1777         int get42 ();
1778         int get69 ();
1779     }
1780 
1781     static class Node : API
1782     {
1783         override int get42 () { return 42; }
1784         override int get69 () { return 69; }
1785 
1786         override void check ()
1787         {
1788             auto node = new RemoteAPI!API(node_tid, 500.msecs);
1789 
1790             // time-out
1791             node.ctrl.sleep(2000.msecs);
1792             assertThrown!Exception(node.get42());
1793 
1794             // no time-out
1795             node.ctrl.sleep(10.msecs);
1796             assert(node.get69() == 69);
1797         }
1798     }
1799 
1800     auto node_1 = RemoteAPI!API.spawn!Node();
1801     auto node_2 = RemoteAPI!API.spawn!Node();
1802     node_tid = node_2.tid;
1803     node_1.check();
1804     node_1.ctrl.shutdown();
1805     node_2.ctrl.shutdown();
1806     thread_joinAll();
1807 }
1808 
1809 // request timeouts with dropped messages
1810 unittest
1811 {
1812     static import geod24.concurrency;
1813     import std.exception;
1814 
1815     __gshared C.Tid node_tid;
1816 
1817     static interface API
1818     {
1819         void check ();
1820         int ping ();
1821     }
1822 
1823     static class Node : API
1824     {
1825         override int ping () { return 42; }
1826 
1827         override void check ()
1828         {
1829             auto node = new RemoteAPI!API(node_tid, 420.msecs);
1830 
1831             // Requests are dropped, so it times out
1832             assert(node.ping() == 42);
1833             node.ctrl.sleep(10.msecs, true);
1834             assertThrown!Exception(node.ping());
1835         }
1836     }
1837 
1838     auto node_1 = RemoteAPI!API.spawn!Node();
1839     auto node_2 = RemoteAPI!API.spawn!Node();
1840     node_tid = node_2.tid;
1841     node_1.check();
1842     node_1.ctrl.shutdown();
1843     node_2.ctrl.shutdown();
1844     thread_joinAll();
1845 }
1846 
1847 // Test a node that gets a replay while it's delayed
1848 unittest
1849 {
1850     static import geod24.concurrency;
1851     import std.exception;
1852 
1853     __gshared C.Tid node_tid;
1854 
1855     static interface API
1856     {
1857         void check ();
1858         int ping ();
1859     }
1860 
1861     static class Node : API
1862     {
1863         override int ping () { return 42; }
1864 
1865         override void check ()
1866         {
1867             auto node = new RemoteAPI!API(node_tid, 5000.msecs);
1868             assert(node.ping() == 42);
1869             // We need to return immediately so that the main thread
1870             // puts us to sleep
1871             runTask(() {
1872                     node.ctrl.sleep(200.msecs);
1873                     assert(node.ping() == 42);
1874                 });
1875         }
1876     }
1877 
1878     auto node_1 = RemoteAPI!API.spawn!Node(500.msecs);
1879     auto node_2 = RemoteAPI!API.spawn!Node();
1880     node_tid = node_2.tid;
1881     node_1.check();
1882     node_1.ctrl.sleep(300.msecs);
1883     assert(node_1.ping() == 42);
1884     node_1.ctrl.shutdown();
1885     node_2.ctrl.shutdown();
1886     thread_joinAll();
1887 }
1888 
1889 // Test explicit shutdown
1890 unittest
1891 {
1892     import std.exception;
1893 
1894     static interface API
1895     {
1896         int myping (int value);
1897     }
1898 
1899     static class Node : API
1900     {
1901         override int myping (int value)
1902         {
1903             return value;
1904         }
1905     }
1906 
1907     auto node = RemoteAPI!API.spawn!Node(1.seconds);
1908     assert(node.myping(42) == 42);
1909     node.ctrl.shutdown();
1910 
1911     try
1912     {
1913         node.myping(69);
1914         assert(0);
1915     }
1916     catch (Exception ex)
1917     {
1918         assert(ex.msg == "Request timed-out");
1919     }
1920     thread_joinAll();
1921 }
1922 
1923 unittest
1924 {
1925     import core.thread : thread_joinAll;
1926     static import geod24.concurrency;
1927     __gshared C.Tid node_tid;
1928 
1929     static interface API
1930     {
1931         void segfault ();
1932         void check ();
1933     }
1934 
1935     static class Node : API
1936     {
1937         override void segfault ()
1938         {
1939             int* ptr; *ptr = 1;
1940         }
1941 
1942         override void check ()
1943         {
1944             auto node = new RemoteAPI!API(node_tid);
1945 
1946             // We need to return immediately so that the main thread can continue testing
1947             runTask(() {
1948                 node.ctrl.sleep(500.msecs);
1949                 node.segfault();
1950             });
1951         }
1952     }
1953 
1954     auto node_1 = RemoteAPI!API.spawn!Node();
1955     auto node_2 = RemoteAPI!API.spawn!Node();
1956     node_tid = node_2.tid;
1957     node_1.check();
1958     node_2.ctrl.shutdown();  // shut it down before wake-up, segfault() command will be ignored
1959     node_1.ctrl.shutdown();
1960     thread_joinAll();
1961 }
1962 
1963 /// Example of a custom (de)serialization policy
1964 unittest
1965 {
1966     static struct Serialize
1967     {
1968     static:
1969         public immutable(ubyte[]) serialize (T) (auto ref T value) @trusted
1970         {
1971             static assert(is(typeof({ T v = immutable(T).init; })));
1972             static if (is(T : const(ubyte)[]))
1973                 return value.idup;
1974             else
1975                 return (cast(ubyte*)&value)[0 .. T.sizeof].idup;
1976         }
1977 
1978         public QT deserialize (QT) (immutable(ubyte)[] data) @trusted
1979         {
1980             return *cast(QT*)(data.dup.ptr);
1981         }
1982     }
1983 
1984     static struct ValueType
1985     {
1986         ulong v1;
1987         uint v2;
1988         uint v3;
1989     }
1990 
1991     static interface API
1992     {
1993         @safe:
1994         public @property ulong pubkey ();
1995         public ValueType getValue (string val);
1996         // Note: Vibe.d's JSON serializer cannot serialize this
1997         public immutable(ubyte[32]) getHash (const ubyte[] val);
1998     }
1999 
2000     static class MockAPI : API
2001     {
2002         @safe:
2003         public override @property ulong pubkey () { return 42; }
2004         public override ValueType getValue (string val) { return ValueType(val.length, 2, 3); }
2005         public override immutable(ubyte[32]) getHash (const ubyte[] val)
2006         {
2007             return val.length >= 32 ? val[0 .. 32] : typeof(return).init;
2008         }
2009     }
2010 
2011     scope test = RemoteAPI!(API, Serialize).spawn!MockAPI();
2012     assert(test.pubkey() == 42);
2013     assert(test.getValue("Hello world") == ValueType(11, 2, 3));
2014     ubyte[64] val = 42;
2015     assert(test.getHash(val) == val[0 .. 32]);
2016     test.ctrl.shutdown();
2017     thread_joinAll();
2018 }
2019 
2020 /// Test node2 responding to a dead node1
2021 /// See https://github.com/Geod24/localrest/issues/64
2022 unittest
2023 {
2024     static interface API
2025     {
2026         @safe:
2027         // Main thread calls this on the first node
2028         public void call0 ();
2029         // ... which then calls this on the second node
2030         public void call1 ();
2031         public void call2 ();
2032     }
2033 
2034     __gshared C.Tid node1Addr;
2035     __gshared C.Tid node2Addr;
2036 
2037     static class Node : API
2038     {
2039         private RemoteAPI!API self;
2040 
2041         @trusted:
2042         // Main -> Node 1
2043         public override void call0 ()
2044         {
2045             this.self = new RemoteAPI!API(node1Addr);
2046             scope node2 = new RemoteAPI!API(node2Addr);
2047             node2.call1();
2048             assert(0, "This should never return as call2 shutdown this node");
2049         }
2050 
2051         // Node 1 -> Node 2
2052         public override void call1 ()
2053         {
2054             assert(this.self is null);
2055             scope node1 = new RemoteAPI!API(node1Addr);
2056             node1.call2();
2057             // Make really sure Node 1 is dead
2058             while (!node1Addr.mbox.isClosed())
2059                 sleep(100.msecs);
2060         }
2061 
2062         // Node 2 -> Node 1
2063         public override void call2 ()
2064         {
2065             assert(this.self !is null);
2066             this.self.ctrl.shutdown();
2067         }
2068     }
2069 
2070     // Long timeout to ensure we don't spuriously pass
2071     auto node1 = RemoteAPI!API.spawn!Node(500.msecs);
2072     auto node2 = RemoteAPI!API.spawn!Node();
2073     node1Addr = node1.ctrl.tid();
2074     node2Addr = node2.ctrl.tid();
2075 
2076     // This will timeout (because the node will be gone)
2077     // However if something is wrong, either `joinall` will never return,
2078     // or the `assert(0)` in `call0` will be triggered.
2079     try
2080     {
2081         node1.call0();
2082         assert(0, "This should have timed out");
2083     }
2084     catch (Exception e) {}
2085 
2086     node2.ctrl.shutdown();
2087     thread_joinAll();
2088 }
2089 
2090 /// Test Timer
2091 unittest
2092 {
2093     static import core.thread;
2094     import core.time;
2095 
2096     static interface API
2097     {
2098         public void startTimer (bool periodic);
2099         public void stopTimer ();
2100         public ulong getCounter ();
2101         public void resetCounter ();
2102     }
2103 
2104     static class Node : API
2105     {
2106         private ulong counter;
2107         private Timer timer;
2108 
2109         public override void startTimer (bool periodic)
2110         {
2111             this.timer = setTimer(100.msecs, &callback, periodic);
2112         }
2113 
2114         public override void stopTimer ()
2115         {
2116             this.timer.stop();
2117         }
2118 
2119         public void callback ()
2120         {
2121             this.counter++;
2122             if (this.counter == 3)
2123                 this.timer.stop();
2124         }
2125 
2126         public override ulong getCounter ()
2127         {
2128             scope (exit) this.counter = 0;
2129             return this.counter;
2130         }
2131 
2132         public override void resetCounter ()
2133         {
2134             this.counter = 0;
2135         }
2136     }
2137 
2138     auto node = RemoteAPI!API.spawn!Node();
2139     assert(node.getCounter() == 0);
2140     node.startTimer(true);
2141     core.thread.Thread.sleep(1.seconds);
2142     // The expected count is 3
2143     // Check means the timer repeated and the timer stoped
2144     assert(node.getCounter() == 3);
2145     node.resetCounter();
2146     node.startTimer(false);
2147     node.stopTimer();
2148     core.thread.Thread.sleep(500.msecs);
2149     assert(node.getCounter() == 0);
2150     node.ctrl.shutdown();
2151     thread_joinAll();
2152 }
2153 
2154 /// Test restarting a node
2155 unittest
2156 {
2157     static interface API
2158     {
2159         public uint[2] getCount () @safe;
2160     }
2161 
2162     static class Node : API
2163     {
2164         private static uint instantiationCount;
2165         private static uint destructionCount;
2166 
2167         this ()
2168         {
2169             Node.instantiationCount++;
2170         }
2171 
2172         ~this ()
2173         {
2174             Node.destructionCount++;
2175         }
2176 
2177         public override uint[2] getCount () const @safe
2178         {
2179             return [ Node.instantiationCount, Node.destructionCount, ];
2180         }
2181     }
2182 
2183     auto node = RemoteAPI!API.spawn!Node();
2184     assert(node.getCount == [1, 0]);
2185     node.ctrl.restart();
2186     assert(node.getCount == [2, 1]);
2187     node.ctrl.restart();
2188     assert(node.getCount == [3, 2]);
2189     node.ctrl.shutdown();
2190     thread_joinAll();
2191 }
2192 
2193 /// Test restarting a node that has responses waiting for it
2194 unittest
2195 {
2196     import core.atomic : atomicLoad, atomicStore;
2197     static interface API
2198     {
2199         @safe:
2200         public void call0 ();
2201         public void call1 ();
2202     }
2203 
2204     __gshared C.Tid node2Addr;
2205     static shared bool done;
2206 
2207     static class Node : API
2208     {
2209         @trusted:
2210 
2211         public override void call0 ()
2212         {
2213             scope node2 = new RemoteAPI!API(node2Addr);
2214             node2.call1();
2215         }
2216 
2217         public override void call1 ()
2218         {
2219             // when this event runs we know call1() has already returned
2220             scheduler.schedule({ atomicStore(done, true); });
2221         }
2222     }
2223 
2224     auto node1 = RemoteAPI!API.spawn!Node(500.msecs);
2225     auto node2 = RemoteAPI!API.spawn!Node();
2226     node2Addr = node2.ctrl.tid();
2227     node2.ctrl.sleep(2.seconds, false);
2228 
2229     try
2230     {
2231         node1.call0();
2232         assert(0, "This should have timed out");
2233     }
2234     catch (Exception e) {}
2235 
2236     node1.ctrl.restart();
2237 
2238     // after a while node 1 will receive a response to the timed-out request
2239     // to call1(), but the node restarted and is no longer interested in this
2240     // request (the request map / LocalScheduler is different), so it's filtered
2241     size_t count;
2242     while (!atomicLoad(done))
2243     {
2244         assert(count < 300);  // up to 3 seconds wait
2245         count++;
2246         Thread.sleep(10.msecs);
2247     }
2248 
2249     node1.ctrl.shutdown();
2250     node2.ctrl.shutdown();
2251     thread_joinAll();
2252 }
2253 
2254 unittest
2255 {
2256     import geod24.concurrency;
2257 
2258     static interface API
2259     {
2260         public void start ();
2261         public int getValue ();
2262     }
2263 
2264     static class Node : API
2265     {
2266         int value;
2267 
2268         public override void start ()
2269         {
2270             // if this is a scoped delegate, it might not have a closure,
2271             // and when the task is resumed again it will segfault.
2272             // therefore runTask() must take a non-scope delegate.
2273             // note: once upstream issue #20868 is fixed, it would become
2274             // a compiler error to escape a scope delegate.
2275             runTask(
2276             {
2277                 value = 1;
2278                 FiberScheduler.yield();
2279                 value = 2;
2280             });
2281         }
2282 
2283         public override int getValue () { return this.value; }
2284     }
2285 
2286     auto node = RemoteAPI!API.spawn!Node();
2287     node.start();
2288     assert(node.getValue() == 2);
2289     node.ctrl.shutdown();
2290     thread_joinAll();
2291 }
2292 
2293 /// Situation: Calling a node with an interface that doesn't exists
2294 /// Expectation: The client throws an exception with a useful error message
2295 /// This can happen by mistake (API mixup) or when a method is optional.
2296 unittest
2297 {
2298     import std.exception : assertThrown;
2299 
2300     static interface BaseAPI
2301     {
2302         public int required ();
2303     }
2304 
2305     static interface APIExtended : BaseAPI
2306     {
2307         public int optional ();
2308     }
2309 
2310     static class BaseNode : BaseAPI
2311     {
2312         public override int required () { return 42; }
2313     }
2314 
2315     auto node = RemoteAPI!BaseAPI.spawn!BaseNode();
2316     scope extnode = new RemoteAPI!APIExtended(node.ctrl.tid());
2317     assert(extnode.required() == 42);
2318     assertThrown!ClientException(extnode.optional());
2319     node.ctrl.shutdown();
2320     thread_joinAll();
2321 }