1 /*******************************************************************************
2 
3     Provides utilities to mock a network in unittests
4 
5     This module is based on the idea that D `interface`s can be used
6     to represent a server's API, and that D `class` inheriting this `interface`
7     are used to define the server's business code,
8     abstracting away the communication layer.
9 
10     For example, a server that exposes an API to concatenate two strings would
11     define the following code:
12     ---
13     interface API { public string concat (string a, string b); }
14     class Server : API
15     {
16         public override string concat (string a, string b)
17         {
18             return a ~ b;
19         }
20     }
21     ---
22 
23     Then one can use "generators" to define how multiple process communicate
24     together. One such generator, that pioneered this design is `vibe.web.rest`,
25     which allows to expose such servers as REST APIs.
26 
27     `localrest` is another generator, which uses message passing and threads
28     to create a local "network".
29     The motivation was to create a testing library that could be used to
30     model a network at a much cheaper cost than spawning processes
31     (and containers) would be, when doing integration tests.
32 
33     Control_Interface:
34     When instantiating a `RemoteAPI`, one has the ability to call foreign
35     implementations through auto-generated `override`s of the `interface`.
36     In addition to that, as this library is intended for testing,
37     a few extra functionalities are offered under a control interface,
38     accessible under the `ctrl` namespace in the instance.
39     The control interface allows to make the node unresponsive to one or all
40     methods, for some defined time or until unblocked, as well as trigger
41     shutdowns or restart. See the methods for more details.
42     The `withTimeout` control method can be used to spawn a scoped copy
43     of the RemoteAPI with a custom configured timeout. The user-provided
44     delegate will be called with this scoped copy that uses the new timeout.
45 
46     Shutdown:
47     The control interface has a shutdown method that can be used to terminate
48     a node gracefully. When the shutdown request is handled by the node,
49     the event loop will exit and the thread will terminate. While the destructor
50     of the node will be called, it might not usable for some actions, for example
51     because D destructors may not allocate GC memory, or one may want
52     to perform some test-specific operations, such a logging some data in case of failure.
53     Therefore, you may provide a shutdown routine in the call to `shutdown`.
54     It must accept a single argument of the interface type, and will be called
55     with the implementation object just before the node is destroyed.
56     If this routine throws, LocalRest will log the error in the console and
57     proceed with destroying the stack-allocated node.
58     Note that control requests are asynchronous, hence requests from the node
59     might be processed / send by the node until the request is actually processed.
60     There is also a `restart` method which accepts the same callback argument.
61 
62     Event_Loop:
63     Server process usually needs to perform some action in an asynchronous way.
64     Additionally, some actions needs to be completed at a semi-regular interval,
65     for example based on a timer.
66     For those use cases, a node should call `runTask` or `sleep`, respectively.
67     Note that this has the same name (and purpose) as Vibe.d's core primitives.
68     Users should only ever call Vibe's `runTask` / `sleep` with `vibe.web.rest`,
69     or only call LocalRest's `runTask` / `sleep` with `RemoteAPI`.
70 
71     Implementation:
72     In order for tests to simulate an asynchronous system accurately,
73     multiple nodes need to be able to run concurrently and asynchronously.
74 
75     There are two common solutions to this, to use either fibers or threads.
76     Fibers have the advantage of being simpler to implement and predictable.
77     Threads have the advantage of more accurately describing an asynchronous
78     system and thus have the ability to uncover more issues.
79 
80     When spawning a node, a thread is spawned, a node is instantiated with
81     the provided arguments, and an event loop waits for messages sent
82     to the Tid. Messages consist of the sender's Tid, the mangled name
83     of the function to call (to support overloading) and the arguments,
84     serialized as a JSON string.
85 
86     Note:
87     While this module's original motivation was to test REST nodes,
88     the only dependency to Vibe.d is actually to it's JSON module,
89     as Vibe.d is the only available JSON module known to the author
90     to provide an interface to deserialize composite types.
91     This dependency is however not enforced by the dub project file,
92     as users can provide their own serializer (see `geod24.Serialization`).
93     If the default parameter for serialization is never used,
94     one's project need not depend on `vibe-d:data`.
95 
96     Author:         Mathias 'Geod24' Lang
97     License:        MIT (See LICENSE.txt)
98     Copyright:      Copyright (c) 2018-2019 Mathias Lang. All rights reserved.
99 
100 *******************************************************************************/
101 
102 module geod24.LocalRest;
103 
104 static import C = geod24.concurrency;
105 import geod24.Serialization;
106 
107 import std.datetime.systime : Clock, SysTime;
108 import std.format;
109 import std.meta : AliasSeq;
110 import std.traits : fullyQualifiedName, Parameters, ReturnType;
111 
112 import core.thread;
113 import core.time;
114 
115 /// Data sent by the caller
116 private struct Command
117 {
118     /// Sequence id of Command
119     size_t id;
120     /// Method to call
121     string method;
122     /// Serialized arguments to the method
123     SerializedData args;
124 }
125 
126 /// Ask the node to exhibit a certain behavior for a given time
127 private struct TimeCommand
128 {
129     /// For how long our remote node apply this behavior
130     Duration dur;
131     /// Whether or not affected messages should be dropped
132     bool drop = false;
133 }
134 
135 /// Ask the node to shut down
136 private struct ShutdownCommand
137 {
138     /// Any callback to call before the Node's destructor is called
139     void function (Object) callback;
140 
141     /// Whether we're restarting or really shutting down
142     bool restart;
143 }
144 
145 /// Filter out requests before they reach a node
146 private struct FilterAPI
147 {
148     /// the mangled symbol name of the function to filter
149     string func_mangleof;
150 
151     /// used for debugging
152     string pretty_func;
153 }
154 
155 /// Status of a request
156 private enum Status
157 {
158     /// Request failed
159     Failed,
160 
161     /// The request failed to to a client error (4xx style error code)
162     ClientFailed,
163 
164     /// Request timed-out
165     Timeout,
166 
167     /// Request succeeded
168     Success
169 }
170 
171 /// Data sent by the callee back to the caller
172 private struct Response
173 {
174     /// Final status of a request (failed, timeout, success, etc)
175     Status status;
176     /// Response id
177     size_t id;
178     /// If `status == Status.Success`, the serialized return value.
179     /// Otherwise, it contains `Exception.toString()`.
180     SerializedData data;
181 }
182 
183 /// Thrown when the sent request is faulty (e.g. 4xx HTTP error types)
184 public class ClientException : Exception
185 {
186     /// Constructor
187     public this (string msg,
188         string file = __FILE__, int line = __LINE__, Exception next = null)
189         @safe pure nothrow
190     {
191         super(msg, file, line, next);
192     }
193 }
194 
195 /// Simple exception to unwind the stack when we need to terminate/restart
196 private final class ExitException : Exception
197 {
198     public bool restart;
199 
200     this () @safe pure nothrow @nogc
201     {
202         super("You should never see this exception - please report a bug");
203     }
204 }
205 
206 /// Simple wrapper to deal with tuples
207 /// Vibe.d might emit a pragma(msg) when T.length == 0
208 private struct ArgWrapper (T...)
209 {
210     static if (T.length == 0)
211         size_t dummy;
212     T args;
213 }
214 
215 // very simple & limited variant, to keep it performant.
216 // should be replaced by a real Variant later
217 private struct Variant
218 {
219     this (Command msg) { this.cmd = msg; this.tag = Variant.Type.command; }
220     this (Response msg) { this.res = msg; this.tag = Variant.Type.response; }
221     this (FilterAPI msg) { this.filter = msg; this.tag = Variant.Type.filter; }
222     this (TimeCommand msg) { this.time = msg; this.tag = Variant.Type.timeCommand; }
223     this (ShutdownCommand msg) { this.shutdown = msg; this.tag = Variant.Type.shutdownCommand; }
224 
225     union
226     {
227         Command cmd;
228         Response res;
229         FilterAPI filter;
230         TimeCommand time;
231         ShutdownCommand shutdown;
232     }
233 
234     Type tag;
235 
236     /// Type of a request
237     enum Type : ubyte
238     {
239         command,
240         response,
241         filter,
242         timeCommand,
243         shutdownCommand,
244     }
245 }
246 
247 private alias CommandChn = C.Channel!Variant;
248 private alias RespChn = C.Channel!Response;
249 
250 /// Represents a connection between a server and a client
251 private class Connection
252 {
253     ///
254     this () nothrow
255     {
256         this.command_chn = new CommandChn();
257         this.resp_chn = new RespChn();
258     }
259 
260     /*******************************************************************************
261 
262         Send a message over the Connection
263 
264         Params:
265             T = Type of the message, should be support by the `Variant` type
266             msg = Message to be sent
267 
268         Returns:
269             Success/failure
270 
271     *******************************************************************************/
272 
273     bool sendCommand (T) (T msg) @trusted
274     {
275         bool ret;
276 
277         if (isMainThread())
278             scheduler.start({ ret = this.command_chn.write(Variant(msg)); });
279         else
280             ret = this.command_chn.write(Variant(msg));
281 
282         return ret;
283     }
284 
285     /*******************************************************************************
286 
287         Get a unique id for a `Command` to be sent from this `Connection`
288 
289         Returns:
290             Unique Command id
291 
292     *******************************************************************************/
293 
294     size_t getNextCommandId () @safe
295     {
296         return this.next_cmd_id++;
297     }
298 
299     /*******************************************************************************
300 
301         Wait for a `Response` with specific id
302 
303         Params:
304             resp_id = Response id to wait for
305             timeout = timeout duration for the operation
306 
307         Returns:
308             Response
309 
310     *******************************************************************************/
311 
312     Response waitResponse (size_t resp_id, Duration timeout) @trusted
313     {
314         if (isMainThread())
315         {
316             Response res;
317             scheduler.start({
318                 // Loop until we get the Response we are looking for
319                 while (this.resp_chn.read(res, timeout))
320                     if (res.id == resp_id)
321                         return;
322                 res = Response(Status.Timeout, resp_id, SerializedData("Request timed-out"));
323             });
324             return res;
325         }
326         else
327         {
328             // Response may already be ready
329             if (auto existing_res = (resp_id in this.waiting_list))
330                 return existing_res.res;
331 
332             // Block while waiting the Response
333             auto blocker = C.thisScheduler().new FiberBlocker();
334             this.waiting_list[resp_id] = Waiting(blocker);
335             if (blocker.wait(timeout))
336                 return this.waiting_list[resp_id].res;
337             else
338                 return Response(Status.Timeout, resp_id, SerializedData("Request timed-out"));
339         }
340     }
341 
342     /*******************************************************************************
343 
344         Notify the task waiting for a Response
345 
346         Params:
347             res = Newly arrived Response
348 
349     *******************************************************************************/
350 
351     void notifyWaiter (Response res)
352     {
353         // Is the waiting Fiber already blocked?
354         if (auto waiter = (res.id in this.waiting_list))
355         {
356             waiter.res = res;
357             waiter.blocker.notify();
358         }
359         else // Fiber is not yet blocked, create an entry for the related Fiber to use
360             this.waiting_list[res.id] = Waiting(null, res);
361     }
362 
363     /*******************************************************************************
364 
365         Close the `Channel`s associated with this `Connection`. Blocked `waitResponse`
366         calls will timeout and blocked `sendCommand` calls will fail
367 
368     *******************************************************************************/
369 
370     void close ()
371     {
372         this.command_chn.close();
373         this.resp_chn.close();
374     }
375 
376     ///
377     private struct Waiting
378     {
379         C.FiberScheduler.FiberBlocker blocker;
380         Response res;
381     }
382 
383     /// List of Fibers waiting for a Response from this Connection
384     private Waiting[size_t] waiting_list;
385 
386     /// Next Command ID
387     private size_t next_cmd_id;
388 
389     /// Channel to send `Command`s to
390     private CommandChn command_chn;
391 
392     /// Channel to read `Response`s from
393     private RespChn resp_chn;
394 }
395 
396 private struct AwaitingMessage
397 {
398     /// Message
399     public Variant var;
400     /// Originating `Connection`
401     public Connection conn;
402 }
403 
404 // Used for controling filtering / sleep within the server implementation
405 private struct Control
406 {
407     public FilterAPI filter;    // filter specific messages
408     public SysTime sleep_until; // sleep until this time
409     public bool drop;           // drop messages if sleeping
410 
411     bool isSleeping () const @safe /* nothrow: Not `nothrow` on Windows */
412     {
413         return this.sleep_until != SysTime.init
414             && Clock.currTime < this.sleep_until;
415     }
416 }
417 
418 
419 /// `Channel` type that the nodes will listen for new `Connection`s
420 public alias BindChn = C.Channel!Connection;
421 
422 /// Thread local outgoing `Connection` list
423 private Connection[RespChn] outgoing_conns;
424 
425 /// We need a scheduler to simulate an event loop and to be re-entrant
426 private C.FiberScheduler scheduler;
427 
428 /***********************************************************************
429 
430     Check if the current context is running inside the main thread and
431     intialize the thread local fiber scheduler if it is not initialized
432 
433     Returns:
434         If the context is the main thread or not
435 
436 ***********************************************************************/
437 
438 private bool isMainThread () @trusted nothrow
439 {
440     // we are in the main thread
441     if (scheduler is null)
442         scheduler = new C.FiberScheduler;
443     return Fiber.getThis() is null;
444 }
445 
446 /*******************************************************************************
447 
448     Provide eventloop-like functionalities
449 
450     Since nodes instantiated via this modules are Vibe.d server,
451     they expect the ability to run an asynchronous task ,
452     usually provided by `vibe.core.core : runTask`.
453 
454     In order for them to properly work, we need to integrate them to our event
455     loop by providing the ability to spawn a task, and wait on some condition,
456     optionally with a timeout.
457 
458     The following functions do that.
459     Note that those facilities are not available from the main thread,
460     while is supposed to do tests and doesn't have a scheduler.
461 
462 *******************************************************************************/
463 
464 public void runTask (void delegate() dg) nothrow
465 {
466     assert(scheduler !is null, "Cannot call this delegate from the main thread");
467     scheduler.spawn(dg);
468 }
469 
470 /// Ditto
471 public void sleep (Duration timeout) nothrow
472 {
473     assert(!isMainThread(), "Cannot call this function from the main thread");
474 
475     // Duration.init (0.seconds) is infinite timeout, ignore
476     if (timeout == Duration.init)
477         return;
478 
479     scope blocker = scheduler..new FiberBlocker();
480     blocker.wait(timeout);
481 }
482 
483 /*******************************************************************************
484 
485     Run an asynchronous task after a given time.
486 
487     The task will first run after the given `timeout`, and
488     can either repeat or run only once (the default).
489     Works similarly to Vibe.d's `setTimer`.
490 
491     Params:
492         timeout = Determines the minimum amount of time that elapses before
493             the timer fires.
494         dg = If non-null, this delegate will be called when the timer fires
495         periodic = Speficies if the timer fires repeatedly or only once
496 
497     Returns:
498         A `Timer` instance with the ability to control the timer
499 
500 *******************************************************************************/
501 
502 public Timer setTimer (Duration timeout, void delegate() dg,
503     bool periodic = false) nothrow
504 {
505     assert(scheduler !is null, "Cannot call this delegate from the main thread");
506     assert(dg !is null, "Cannot call this delegate if null");
507 
508     Timer timer = new Timer(timeout, dg, periodic);
509     scheduler.schedule(&timer.run);
510     return timer;
511 }
512 
513 /// Simple timer
514 public final class Timer
515 {
516     private Duration timeout;
517     private void delegate () dg;
518     // Whether this timer is repeating
519     private bool periodic;
520     // Whether this timer was stopped
521     private bool stopped;
522 
523     public this (Duration timeout, void delegate() dg, bool periodic) @safe nothrow
524     {
525         this.timeout = timeout;
526         this.dg = dg;
527         this.periodic = periodic;
528         this.stopped = false;
529     }
530 
531     // Run a delegate after timeout, and until this.periodic is false
532     private void run ()
533     {
534         do
535         {
536             sleep(timeout);
537             if (this.stopped)
538                 return;
539             dg();
540         } while (this.periodic);
541     }
542 
543     /// Stop the timer. The next time this timer's fiber wakes up
544     /// it will exit the run() function.
545     public void stop () @safe nothrow
546     {
547         this.stopped = true;
548         this.periodic = false;
549     }
550 }
551 
552 /*******************************************************************************
553 
554     A reference to the "listening" connection of a remote thread
555 
556     When a remote thread starts, it initially listens for new connection
557     (similarly to `bind` in C). When a new connection is started, it creates
558     a separate channel for communication (similar to `accept` in C).
559     This newly-created channel is what `RemoteAPI` wraps.
560     The channel / link / connection to the original listener, which is the only
561     one able to establish a connection, is what this data structure wraps.
562 
563 *******************************************************************************/
564 
565 public struct Listener (API)
566 {
567     /// Internal data, do not use
568     package BindChn data;
569 }
570 
571 /*******************************************************************************
572 
573     A reference to an alread-instantiated node
574 
575     This class serves the same purpose as a `RestInterfaceClient`:
576     it is a client for an already instantiated rest `API` interface.
577 
578     In order to instantiate a new server (in a remote thread), use the static
579     `spawn` function.
580 
581     Serialization:
582       In order to support custom serialization policy, one can change the
583       `Serializer` parameter. This parameter is expected to be either a
584       template or an aggregate with two static methods, but no explicit
585       limitation is put on the type.
586       See `geod24.Serialization`'s documentation for more informations.
587 
588     Params:
589       API = The interface defining the API to implement
590       S = An aggregate which follows the requirement explained above.
591 
592 *******************************************************************************/
593 
594 public final class RemoteAPI (API, alias S = VibeJSONSerializer!()) : API
595 {
596     static assert (!serializerInvalidReason!(S).length, serializerInvalidReason!S);
597 
598     /***************************************************************************
599 
600         Instantiate a node and start it
601 
602         This is usually called from the main thread, which will start all the
603         nodes and then start to process request.
604         In order to have a connected network, no nodes in any thread should have
605         a different reference to the same node.
606         In practice, this means there should only be one `Tid` per "address".
607 
608         Note:
609           When the `RemoteAPI` returned by this function is finalized,
610           the child thread will be shut down.
611 
612         Params:
613           Impl = Type of the implementation to instantiate
614           args = Arguments to the object's constructor
615           timeout = (optional) timeout to use with requests
616           file = Path to the file that called this function (for diagnostic)
617           line = Line number tied to the `file` parameter
618 
619         Returns:
620           A `RemoteAPI` owning the node reference
621 
622     ***************************************************************************/
623 
624     public static RemoteAPI spawn (Impl) (
625         CtorParams!Impl args, Duration timeout = 5.seconds,
626         string file = __FILE__, int line = __LINE__)
627     {
628         auto chn = new BindChn();
629         new Thread(
630         {
631             spawned!(Impl)(chn, file, line, args);
632         }).start();
633         return new RemoteAPI(Listener!API(chn), timeout);
634     }
635 
636     /// Helper template to get the constructor's parameters
637     private static template CtorParams (Impl)
638     {
639         static if (is(typeof(Impl.__ctor)))
640             private alias CtorParams = Parameters!(Impl.__ctor);
641         else
642             private alias CtorParams = AliasSeq!();
643     }
644 
645     /***************************************************************************
646 
647         Handler function
648 
649         Performs the dispatch from `cmd` to the proper `node` function,
650         provided the function is not filtered.
651 
652         Params:
653             cmd    = the command to run (contains the method name and the arguments)
654             node   = the node to invoke the method on
655             filter = used for filtering API calls (returns default response)
656             resp_chn = `Channel` to send the `Response` to
657 
658     ***************************************************************************/
659 
660     private static void handleCommand (Command cmd, API node, FilterAPI filter, RespChn resp_chn)
661     {
662         switch (cmd.method)
663         {
664             static foreach (member; __traits(allMembers, API))
665             static foreach (ovrld; __traits(getOverloads, API, member))
666             {
667                 mixin(
668                 q{
669                     case `%2$s`:
670                     Response res = Response(Status.Failed, cmd.id);
671 
672                     // Provide informative message in case of filtered method
673                     if (cmd.method == filter.func_mangleof)
674                         res.data = SerializedData(format("Filtered method '%%s'", filter.pretty_func));
675                     else
676                     {
677                         auto args = S.deserialize!(ArgWrapper!(Parameters!ovrld))(
678                             cmd.args.getS!S);
679 
680                         try
681                         {
682                             static if (!is(ReturnType!ovrld == void))
683                                 res.data = SerializedData(S.serialize(node.%1$s(args.args)));
684                             else
685                                 node.%1$s(args.args);
686                             res.status = Status.Success;
687                         }
688                         catch (Exception e)
689                         {
690                             res.status = Status.ClientFailed;
691                             res.data = SerializedData(e.toString());
692                         }
693                     }
694 
695                     resp_chn.write(res);
696                     return;
697                 }.format(member, ovrld.mangleof));
698             }
699         default:
700             resp_chn.write(Response(Status.ClientFailed, cmd.id, SerializedData("Method not found")));
701             break;
702         }
703     }
704 
705     /***************************************************************************
706 
707         Main dispatch function
708 
709        This function receive string-serialized messages from the calling thread,
710        which is a struct with the method's mangleof, and the method's arguments
711        as a tuple, serialized to a JSON string.
712 
713        Params:
714            Implementation = Type of the implementation to instantiate
715            bind_chn = The channel on which to "listen" to receive new "connections"
716            file = Path to the file that spawned this node
717            line = Line number in the `file` that spawned this node
718            cargs = Arguments to `Implementation`'s constructor
719 
720     ***************************************************************************/
721 
722     private static void spawned (Implementation) (
723         BindChn bind_chn, string file, int line, CtorParams!Implementation cargs)
724         nothrow
725     {
726         import std.algorithm : each;
727         import std.range;
728 
729         scope exc = new ExitException();
730 
731         // The list of `Connection` we are listening to,
732         // equivalent to the list of fd in a select / epoll.
733         Connection[CommandChn] incoming_conns;
734 
735         void runNode ()
736         {
737             scheduler = new C.FiberScheduler;
738             C.thisScheduler(scheduler);
739             scope node = new Implementation(cargs);
740 
741             // Control the node behavior
742             Control control;
743 
744             // we need to keep track of messages which were ignored when
745             // node.sleep() was used, and then handle each message in sequence.
746             AwaitingMessage[] await_msgs;
747 
748             scheduler.start(() {
749                 C.SelectEntry[] read_list;
750                 C.SelectEntry[] write_list;
751 
752                 while (true)
753                 {
754                     Connection new_conn;
755                     Response res;
756                     Variant msg;
757 
758                     read_list.length = 0;
759                     assumeSafeAppend(read_list);
760                     write_list.length = 0;
761                     assumeSafeAppend(write_list);
762 
763                     foreach (ref conn; incoming_conns)
764                         if (!conn.command_chn.isClosed())
765                             read_list ~= C.SelectEntry(conn.command_chn, &msg);
766                     foreach (ref conn; outgoing_conns)
767                         if (!conn.resp_chn.isClosed())
768                             read_list ~= C.SelectEntry(conn.resp_chn, &res);
769                     read_list ~= C.SelectEntry(bind_chn, &new_conn);
770 
771                     auto sel_ret = C.select(read_list, write_list, 10.msecs);
772 
773                     if (await_msgs.length > 0 && !control.isSleeping())
774                     {
775                         scheduler.spawn({
776                             auto prev_await_msgs = await_msgs;
777                             foreach (ref await_msg; prev_await_msgs)
778                                 handleCommand(await_msg.var.cmd, node, control.filter, await_msg.conn.resp_chn);
779                         });
780                         await_msgs.length = 0;
781                     }
782 
783                     if (!sel_ret.success)
784                         continue;
785 
786                     // Bind chn
787                     if (cast(BindChn) read_list[sel_ret.id].selectable)
788                     {
789                         incoming_conns[new_conn.command_chn] = new_conn;
790                     }
791                     // Command
792                     else if (auto comm_chn = cast(CommandChn) read_list[sel_ret.id].selectable)
793                     {
794                         auto curr_conn = incoming_conns[comm_chn];
795                         switch (msg.tag)
796                         {
797                             case Variant.Type.command:
798                                 Command cmd = msg.cmd;
799                                 if (!control.isSleeping())
800                                     scheduler.spawn({
801                                         handleCommand(cmd, node, control.filter, curr_conn.resp_chn);
802                                     });
803                                 else if (!control.drop)
804                                     await_msgs ~= AwaitingMessage(msg, curr_conn);
805                                 break;
806                             case Variant.Type.shutdownCommand:
807                                 ShutdownCommand e = msg.shutdown;
808                                 if (!e.restart)
809                                 {
810                                     bind_chn.close();
811                                     incoming_conns.each!((conn) => conn.close());
812                                 }
813                                 outgoing_conns.each!((conn) => conn.close());
814                                 outgoing_conns.clear();
815                                 if (e.callback !is null)
816                                     e.callback(node);
817                                 exc.restart = e.restart;
818                                 throw exc;
819                             case Variant.Type.timeCommand:
820                                 TimeCommand s = msg.time;
821                                 control.sleep_until = Clock.currTime + s.dur;
822                                 control.drop = s.drop;
823                                 break;
824                             case Variant.Type.filter:
825                                 FilterAPI filter_api = msg.filter;
826                                 control.filter = filter_api;
827                                 break;
828                             default:
829                                 assert(0, "Got invalid Variant.Type: " ~ msg.tag);
830                         }
831                     }
832                     else if (auto resp_chn = cast(RespChn) read_list[sel_ret.id].selectable)
833                     {
834                         // Response
835                         outgoing_conns[resp_chn].notifyWaiter(res);
836                     }
837                 }
838             });
839         }
840 
841         try
842         {
843             while (true)
844             {
845                 try runNode();
846                 // We use this exception to exit the event loop
847                 catch (ExitException e)
848                 {
849                     if (!e.restart)
850                         break;
851                 }
852             }
853         }
854         catch (Throwable t)
855         {
856             import core.stdc.stdio, std.stdio;
857             printf("#### FATAL ERROR: %.*s\n", cast(int) t.msg.length, t.msg.ptr);
858             printf("This node was started at %.*s:%d\n",
859                    cast(int) file.length, file.ptr, line);
860             printf("This most likely means that the node crashed due to an uncaught exception\n");
861             printf("If not, please file a bug at https://github.com/Geod24/localrest/\n");
862 
863             try writeln("Full error: ", t);
864             catch (Exception e) { /* Nothing more we can do at this point */ }
865         }
866     }
867 
868     /// Timeout to use when issuing requests
869     private const Duration timeout;
870 
871     /// Main Channel that this Node will listen for incoming messages
872     private BindChn bind_chn;
873 
874     /// Connection between this instance and the node main thread
875     private Connection conn;
876 
877     /***************************************************************************
878 
879         Create an instante of a client
880 
881         This connects to an already instantiated node.
882         In order to instantiate a node, see the static `spawn` function.
883 
884         Params:
885           listener = The listener used to connect to the node (most frequently
886                      obtained by calling `geod24.concurrency.locate`)
887           timeout = any timeout to use
888 
889     ***************************************************************************/
890 
891     public this (Listener!API listener, Duration timeout = 5.seconds)
892         @trusted nothrow
893     {
894         import std.exception : assumeWontThrow;
895 
896         this.bind_chn = listener.data;
897         this.timeout = timeout;
898         assert(bind_chn);
899 
900         // Create a new Connection, register it and send it to the peer
901         // TODO: What to do when bind_chn is closed?
902         this.conn = new Connection();
903         outgoing_conns[this.conn.resp_chn] = this.conn;
904         if (isMainThread())
905             assumeWontThrow(scheduler.start({ bind_chn.write(this.conn); }));
906         else
907             bind_chn.write(this.conn);
908     }
909 
910     /***************************************************************************
911 
912         Introduce a namespace to avoid name clashes
913 
914         The only way we have a name conflict is if someone exposes `ctrl`,
915         in which case they will be served an error along the following line:
916         LocalRest.d(...): Error: function `RemoteAPI!(...).ctrl` conflicts
917         with mixin RemoteAPI!(...).ControlInterface!() at LocalRest.d(...)
918 
919     ***************************************************************************/
920 
921     public mixin ControlInterface!() ctrl;
922 
923     /// Ditto
924     private mixin template ControlInterface ()
925     {
926         /***********************************************************************
927 
928             Returns the listener this `RemoteAPI` used
929 
930             The connection represents the link to the "bind address" of the
931             remote thread. It can be used with `geod24.concurrency.register`
932             to allow other piece of code to establish a connection.
933             It *cannot* be used to send commands directly to the remote thread.
934             Instead, a new `RemoteAPI` should be instantiated from it.
935 
936         ***********************************************************************/
937 
938         public Listener!API listener () @nogc pure nothrow
939         {
940             return Listener!API(this.bind_chn);
941         }
942 
943         /***********************************************************************
944 
945             Send an async message to the thread to immediately shut down.
946 
947             Params:
948                 callback = if not null, the callback to call in the Node's
949                            thread before the Node is destroyed. Can be used
950                            for cleanup / logging routines.
951 
952         ***********************************************************************/
953 
954         public void shutdown (void function (Object) callback = null)
955             @trusted
956         {
957             this.conn.sendCommand(ShutdownCommand(callback, false));
958         }
959 
960         /***********************************************************************
961 
962             Send an async message to the thread to immediately restart.
963 
964             Note that further non-control messages to the node will block until
965             the node is back "online".
966 
967             Params:
968                 callback = if not null, the callback to call in the Node's
969                            thread before the Node is destroyed, but before
970                            it is restarted. Can be used for cleanup or logging.
971 
972         ***********************************************************************/
973 
974         public void restart (void function (Object) callback = null)
975             @trusted
976         {
977             this.conn.sendCommand(ShutdownCommand(callback, true));
978         }
979 
980         /***********************************************************************
981 
982             Make the remote node sleep for `Duration`
983 
984             Params:
985               delay = Duration the node will sleep for
986               dropMessages = Whether to process the pending requests when the
987                              node come back online (the default), or to drop
988                              pending traffic
989 
990         ***********************************************************************/
991 
992         public void sleep (Duration d, bool dropMessages = false) @trusted
993         {
994             this.conn.sendCommand(TimeCommand(d, dropMessages));
995         }
996 
997         /***********************************************************************
998 
999             Filter any requests issued to the provided method.
1000 
1001             Calling the API endpoint will throw an exception,
1002             therefore the request will fail.
1003 
1004             Use via:
1005 
1006             ----
1007             interface API { void call(); }
1008             class C : API { void call() { } }
1009             auto obj = new RemoteAPI!API(...);
1010             obj.filter!(API.call);
1011             ----
1012 
1013             To match a specific overload of a method, specify the
1014             parameters to match against in the call. For example:
1015 
1016             ----
1017             interface API { void call(int); void call(int, float); }
1018             class C : API { void call(int) {} void call(int, float) {} }
1019             auto obj = new RemoteAPI!API(...);
1020             obj.filter!(API.call, int, float);  // only filters the second overload
1021             ----
1022 
1023             Params:
1024               method = the API method for which to filter out requests
1025               OverloadParams = (optional) the parameters to match against
1026                   to select an overload. Note that if the method has no other
1027                   overloads, then even if that method takes parameters and
1028                   OverloadParams is empty, it will match that method
1029                   out of convenience.
1030 
1031         ***********************************************************************/
1032 
1033         public void filter (alias method, OverloadParams...) () @trusted
1034         {
1035             enum method_name = __traits(identifier, method);
1036 
1037             // return the mangled name of the matching overload
1038             template getBestMatch (T...)
1039             {
1040                 static if (is(Parameters!(T[0]) == OverloadParams))
1041                 {
1042                     enum getBestMatch = T[0].mangleof;
1043                 }
1044                 else static if (T.length > 0)
1045                 {
1046                     enum getBestMatch = getBestMatch!(T[1 .. $]);
1047                 }
1048                 else
1049                 {
1050                     static assert(0,
1051                         format("Couldn't select best overload of '%s' for " ~
1052                         "parameter types: %s",
1053                         method_name, OverloadParams.stringof));
1054                 }
1055             }
1056 
1057             // ensure it's used with API.method, *not* RemoteAPI.method which
1058             // is an override of API.method. Otherwise mangling won't match!
1059             // special-case: no other overloads, and parameter list is empty:
1060             // just select that one API method
1061             alias Overloads = __traits(getOverloads, API, method_name);
1062             static if (Overloads.length == 1 && OverloadParams.length == 0)
1063             {
1064                 immutable pretty = method_name ~ Parameters!(Overloads[0]).stringof;
1065                 enum mangled = Overloads[0].mangleof;
1066             }
1067             else
1068             {
1069                 immutable pretty = format("%s%s", method_name, OverloadParams.stringof);
1070                 enum mangled = getBestMatch!Overloads;
1071             }
1072 
1073             this.conn.sendCommand(FilterAPI(mangled, pretty));
1074         }
1075 
1076 
1077         /***********************************************************************
1078 
1079             Clear out any filtering set by a call to filter()
1080 
1081         ***********************************************************************/
1082 
1083         public void clearFilter () @trusted
1084         {
1085             this.conn.sendCommand(FilterAPI(""));
1086         }
1087 
1088         /***********************************************************************
1089 
1090             Call the provided delegate with a custom timeout
1091 
1092             This allow to perform requests on a client with a different timeout,
1093             usually to allow some requests (e.g. initialization calls) to have longer
1094             timeout, or no timeout at all, or to put a timeout on an otherwise
1095             timeout-less client (e.g. when calling the actual test which could fail).
1096 
1097             To disable timeout, pass the special value `Duration.zero`
1098             (or `0.seconds`, `0.msecs`, etc...).
1099 
1100             Params:
1101                 timeout = the new timeout to use
1102                 dg = the delegate to call with the new scoped RemoteAPI copy
1103 
1104         ***********************************************************************/
1105 
1106         public void withTimeout (Dg) (Duration timeout, scope Dg dg)
1107         {
1108             scope api = new RemoteAPI(this.ctrl.listener(), timeout);
1109             static assert(is(typeof({ dg(api); })),
1110                           "Provided argument of type `" ~ Dg.stringof ~
1111                           "` is not callable with argument type `scope " ~
1112                           fullyQualifiedName!API ~ "`");
1113             dg(api);
1114         }
1115     }
1116 
1117     // Vibe.d mandates that method must be @safe
1118     @safe:
1119 
1120     /***************************************************************************
1121 
1122         Generate the API `override` which forward to the actual object
1123 
1124     ***************************************************************************/
1125 
1126     static foreach (member; __traits(allMembers, API))
1127         static foreach (ovrld; __traits(getOverloads, API, member))
1128         {
1129             mixin(q{
1130                 override ReturnType!(ovrld) } ~ member ~ q{ (Parameters!ovrld params)
1131                 {
1132 
1133                     // `geod24.concurrency.send/receive[Only]` is not `@safe` but
1134                     // this overload needs to be
1135                     auto res = () @trusted {
1136                         auto serialized = S.serialize(ArgWrapper!(Parameters!ovrld)(params));
1137                         auto command = Command(this.conn.getNextCommandId(), ovrld.mangleof, SerializedData(serialized));
1138 
1139                         if(!this.conn.sendCommand(command))
1140                             throw new Exception("Connection with peer closed");
1141                         return this.conn.waitResponse(command.id, this.timeout);
1142                     }();
1143 
1144                     if (res.status == Status.Failed)
1145                         throw new Exception(res.data.get!string);
1146 
1147                     if (res.status == Status.ClientFailed)
1148                         throw new ClientException(
1149                             format("Request to %s couldn't be processed : %s",
1150                                    __PRETTY_FUNCTION__, res.data.get!string));
1151 
1152                     if (res.status == Status.Timeout)
1153                         throw new Exception("Request timed-out");
1154 
1155                     static if (!is(ReturnType!(ovrld) == void))
1156                         return S.deserialize!(typeof(return))(res.data.getS!S());
1157                 }
1158                 });
1159         }
1160 }
1161 
1162 /// Simple usage example
1163 unittest
1164 {
1165     static interface API
1166     {
1167         @safe:
1168         public @property ulong pubkey ();
1169         public string getValue (ulong idx);
1170         public ubyte[32] getQuorumSet ();
1171         public string recv (string data);
1172     }
1173 
1174     static class MockAPI : API
1175     {
1176         @safe:
1177         public override @property ulong pubkey ()
1178         { return 42; }
1179         public override string getValue (ulong idx)
1180         { assert(0); }
1181         public override ubyte[32] getQuorumSet ()
1182         { assert(0); }
1183         public override string recv (string data)
1184         { assert(0); }
1185     }
1186 
1187     scope test = RemoteAPI!API.spawn!MockAPI();
1188     scope (exit) {
1189         test.ctrl.shutdown();
1190         thread_joinAll();
1191     }
1192     assert(test.pubkey() == 42);
1193 }
1194 
1195 /// Example where a shutdown() routine must be called on a node before
1196 /// its destructor is called
1197 unittest
1198 {
1199     __gshared bool dtor_called;
1200     __gshared bool shutdown_called;
1201     __gshared bool onDestroy_called;
1202 
1203     static interface API
1204     {
1205         @safe:
1206         public @property ulong pubkey ();
1207     }
1208 
1209     static class MockAPI : API
1210     {
1211         public override @property ulong pubkey () @safe
1212         { return 42; }
1213         public void shutdown () { shutdown_called = true; }
1214         ~this () { dtor_called = true; }
1215     }
1216 
1217     static void onDestroy (Object node)
1218     {
1219         assert(!dtor_called);
1220         auto mock = cast(MockAPI)node;
1221         assert(mock !is null);
1222         mock.shutdown();
1223         onDestroy_called = true;
1224     }
1225 
1226     scope test = RemoteAPI!API.spawn!MockAPI();
1227     scope (failure) test.ctrl.shutdown();
1228     assert(test.pubkey() == 42);
1229     test.ctrl.shutdown(&onDestroy);
1230     thread_joinAll();
1231     assert(dtor_called);
1232     assert(onDestroy_called);
1233     assert(shutdown_called);
1234 }
1235 
1236 /// In a real world usage, users will most likely need to use the registry
1237 unittest
1238 {
1239     import std.conv;
1240     static import geod24.concurrency;
1241     import geod24.Registry;
1242 
1243     static interface API
1244     {
1245         @safe:
1246         public @property ulong pubkey ();
1247         public string getValue (ulong idx);
1248         public string recv (string data);
1249         public string recv (ulong index, string data);
1250 
1251         public string last ();
1252     }
1253 
1254     __gshared Registry!API registry;
1255     registry.initialize();
1256 
1257     static class Node : API
1258     {
1259         @safe:
1260         public this (bool isByzantine) { this.isByzantine = isByzantine; }
1261         public override @property ulong pubkey ()
1262         { lastCall = `pubkey`; return this.isByzantine ? 0 : 42; }
1263         public override string getValue (ulong idx)
1264         { lastCall = `getValue`; return null; }
1265         public override string recv (string data)
1266         { lastCall = `recv@1`; return null; }
1267         public override string recv (ulong index, string data)
1268         { lastCall = `recv@2`; return null; }
1269 
1270         public override string last () { return this.lastCall; }
1271 
1272         private bool isByzantine;
1273         private string lastCall;
1274     }
1275 
1276     static RemoteAPI!API factory (string type, ulong hash)
1277     {
1278         const name = hash.to!string;
1279         auto listener = registry.locate(name);
1280         if (listener !is Listener!API.init)
1281             return new RemoteAPI!API(listener);
1282 
1283         switch (type)
1284         {
1285         case "normal":
1286             auto ret =  RemoteAPI!API.spawn!Node(false);
1287             registry.register(name, ret.ctrl.listener());
1288             return ret;
1289         case "byzantine":
1290             auto ret =  RemoteAPI!API.spawn!Node(true);
1291             registry.register(name, ret.ctrl.listener());
1292             return ret;
1293         default:
1294             assert(0, type);
1295         }
1296     }
1297 
1298     auto node1 = factory("normal", 1);
1299     auto node2 = factory("byzantine", 2);
1300 
1301     static void testFunc()
1302     {
1303         auto node1 = factory("this does not matter", 1);
1304         auto node2 = factory("neither does this", 2);
1305         scope (exit) {
1306             node1.ctrl.shutdown();
1307             node2.ctrl.shutdown();
1308         }
1309         assert(node1.pubkey() == 42);
1310         assert(node1.last() == "pubkey");
1311         assert(node2.pubkey() == 0);
1312         assert(node2.last() == "pubkey");
1313 
1314         node1.recv(42, null);
1315         assert(node1.last() == "recv@2");
1316         node1.recv(null);
1317         assert(node1.last() == "recv@1");
1318         assert(node2.last() == "pubkey");
1319     }
1320 
1321     scope thread = new Thread(&testFunc);
1322     thread.start();
1323     // Make sure our main thread terminates after everyone else
1324     thread_joinAll();
1325 }
1326 
1327 /// This network have different types of nodes in it
1328 unittest
1329 {
1330     import geod24.concurrency;
1331 
1332     static interface API
1333     {
1334         @safe:
1335         public @property ulong requests ();
1336         public @property ulong value ();
1337     }
1338 
1339     static class MasterNode : API
1340     {
1341         @safe:
1342         public override @property ulong requests()
1343         {
1344             return this.requests_;
1345         }
1346 
1347         public override @property ulong value()
1348         {
1349             this.requests_++;
1350             return 42; // Of course
1351         }
1352 
1353         private ulong requests_;
1354     }
1355 
1356     static class SlaveNode : API
1357     {
1358         @safe:
1359         this(Listener!API masterConn)
1360         {
1361             this.master = new RemoteAPI!API(masterConn);
1362         }
1363 
1364         public override @property ulong requests()
1365         {
1366             return this.requests_;
1367         }
1368 
1369         public override @property ulong value()
1370         {
1371             this.requests_++;
1372             return master.value();
1373         }
1374 
1375         private API master;
1376         private ulong requests_;
1377     }
1378 
1379     RemoteAPI!API[4] nodes;
1380     auto master = RemoteAPI!API.spawn!MasterNode();
1381     nodes[0] = master;
1382     nodes[1] = RemoteAPI!API.spawn!SlaveNode(master.ctrl.listener());
1383     nodes[2] = RemoteAPI!API.spawn!SlaveNode(master.ctrl.listener());
1384     nodes[3] = RemoteAPI!API.spawn!SlaveNode(master.ctrl.listener());
1385     scope (exit) {
1386         import std.algorithm;
1387         nodes.each!(node => node.ctrl.shutdown());
1388         thread_joinAll();
1389     }
1390 
1391     foreach (n; nodes)
1392     {
1393         assert(n.requests() == 0);
1394         assert(n.value() == 42);
1395     }
1396 
1397     assert(nodes[0].requests() == 4);
1398 
1399     foreach (n; nodes[1 .. $])
1400     {
1401         assert(n.value() == 42);
1402         assert(n.requests() == 2);
1403     }
1404 
1405     assert(nodes[0].requests() == 7);
1406 }
1407 
1408 /// Support for circular nodes call
1409 unittest
1410 {
1411     static import geod24.concurrency;
1412 
1413     static interface API
1414     {
1415         @safe:
1416         public ulong call (ulong count, ulong val);
1417         public void setNext (string name);
1418     }
1419 
1420     __gshared Listener!API[string] tbn;
1421 
1422     static class Node : API
1423     {
1424         @safe:
1425         public override ulong call (ulong count, ulong val)
1426         {
1427             if (!count)
1428                 return val;
1429             return this.next.call(count - 1, val + count);
1430         }
1431 
1432         public override void setNext (string name) @trusted
1433         {
1434             this.next = new RemoteAPI!API(tbn[name]);
1435         }
1436 
1437         private API next;
1438     }
1439 
1440     RemoteAPI!(API)[3] nodes = [
1441         RemoteAPI!API.spawn!Node(),
1442         RemoteAPI!API.spawn!Node(),
1443         RemoteAPI!API.spawn!Node(),
1444     ];
1445     scope (exit) {
1446         import std.algorithm;
1447         nodes.each!(node => node.ctrl.shutdown());
1448         thread_joinAll();
1449     }
1450 
1451     foreach (idx, ref api; nodes)
1452         tbn[format("node%d", idx)] = api.ctrl.listener();
1453     nodes[0].setNext("node1");
1454     nodes[1].setNext("node2");
1455     nodes[2].setNext("node0");
1456 
1457     // 7 level of re-entrancy
1458     assert(210 == nodes[0].call(20, 0));
1459 }
1460 
1461 
1462 /// Nodes can start tasks
1463 unittest
1464 {
1465     static import core.thread;
1466     import core.time;
1467 
1468     static interface API
1469     {
1470         public void start ();
1471         public ulong getCounter ();
1472     }
1473 
1474     static class Node : API
1475     {
1476         public override void start ()
1477         {
1478             runTask(&this.task);
1479         }
1480 
1481         public override ulong getCounter ()
1482         {
1483             scope (exit) this.counter = 0;
1484             return this.counter;
1485         }
1486 
1487         private void task ()
1488         {
1489             while (true)
1490             {
1491                 this.counter++;
1492                 sleep(50.msecs);
1493             }
1494         }
1495 
1496         private ulong counter;
1497     }
1498 
1499     auto node = RemoteAPI!API.spawn!Node();
1500     scope (exit) {
1501         node.ctrl.shutdown();
1502         thread_joinAll();
1503     }
1504     assert(node.getCounter() == 0);
1505     node.start();
1506     assert(node.getCounter() == 1);
1507     assert(node.getCounter() == 0);
1508     core.thread.Thread.sleep(1.seconds);
1509     // It should be 19 but some machines are very slow
1510     // (e.g. Travis Mac testers) so be safe
1511     assert(node.getCounter() >= 9);
1512     assert(node.getCounter() < 9);
1513 }
1514 
1515 // Sane name insurance policy
1516 unittest
1517 {
1518     static interface API
1519     {
1520         public ulong tid ();
1521     }
1522 
1523     static class Node : API
1524     {
1525         public override ulong tid () { return 42; }
1526     }
1527 
1528     auto node = RemoteAPI!API.spawn!Node();
1529     scope (exit) {
1530         node.ctrl.shutdown();
1531         thread_joinAll();
1532     }
1533     assert(node.tid == 42);
1534     assert(node.ctrl.listener() !is Listener!API.init);
1535 
1536     static interface DoesntWork
1537     {
1538         public string ctrl ();
1539     }
1540     static assert(!is(typeof(RemoteAPI!DoesntWork)));
1541 }
1542 
1543 // Simulate temporary outage
1544 unittest
1545 {
1546     static interface API
1547     {
1548         public ulong call ();
1549         public void asyncCall ();
1550     }
1551 
1552     __gshared Listener!API n1conn;
1553 
1554     static class Node : API
1555     {
1556         public this()
1557         {
1558             if (n1conn !is Listener!API.init)
1559                 this.remote = new RemoteAPI!API(n1conn);
1560         }
1561 
1562         public override ulong call () { return ++this.count; }
1563         public override void  asyncCall () { runTask(() => cast(void)this.remote.call); }
1564         size_t count;
1565         RemoteAPI!API remote;
1566     }
1567 
1568     auto n1 = RemoteAPI!API.spawn!Node();
1569     n1conn = n1.ctrl.listener();
1570     auto n2 = RemoteAPI!API.spawn!Node();
1571     scope (exit) {
1572         n1.ctrl.shutdown();
1573         n2.ctrl.shutdown();
1574         thread_joinAll();
1575     }
1576 
1577     /// Make sure calls are *relatively* efficient
1578     auto current1 = MonoTime.currTime();
1579     assert(1 == n1.call());
1580     assert(1 == n2.call());
1581     auto current2 = MonoTime.currTime();
1582     assert(current2 - current1 < 200.msecs);
1583 
1584     // Make one of the node sleep
1585     n1.sleep(1.seconds);
1586     // Make sure our main thread is not suspended,
1587     // nor is the second node
1588     assert(2 == n2.call());
1589     auto current3 = MonoTime.currTime();
1590     assert(current3 - current2 < 400.msecs);
1591 
1592     // Wait for n1 to unblock
1593     assert(2 == n1.call());
1594     // Check current time >= 1 second
1595     auto current4 = MonoTime.currTime();
1596     assert(current4 - current2 >= 1.seconds);
1597 
1598     // Now drop many messages
1599     n1.sleep(1.seconds, true);
1600     auto start = MonoTime.currTime;
1601     while (MonoTime.currTime - start < 900.msecs)
1602         n2.asyncCall();
1603     Thread.sleep(200.msecs);
1604     assert(3 == n1.call());
1605 
1606     // Debug output, uncomment if needed
1607     version (none)
1608     {
1609         import std.stdio;
1610         writeln("Two non-blocking calls: ", current2 - current1);
1611         writeln("Sleep + non-blocking call: ", current3 - current2);
1612         writeln("Delta since sleep: ", current4 - current2);
1613     }
1614 }
1615 
1616 // Filter commands
1617 unittest
1618 {
1619     static interface API
1620     {
1621         size_t fooCount();
1622         size_t fooIntCount();
1623         size_t barCount ();
1624         void foo ();
1625         void foo (int);
1626         void bar (int);  // not in any overload set
1627         void callBar (int);
1628         void callFoo ();
1629         void callFoo (int);
1630     }
1631 
1632     __gshared Listener!API node_listener;
1633 
1634     static class Node : API
1635     {
1636         size_t foo_count;
1637         size_t foo_int_count;
1638         size_t bar_count;
1639         RemoteAPI!API remote;
1640 
1641         override size_t fooCount() { return this.foo_count; }
1642         override size_t fooIntCount() { return this.foo_int_count; }
1643         override size_t barCount() { return this.bar_count; }
1644         override void foo () { ++this.foo_count; }
1645         override void foo (int) { ++this.foo_int_count; }
1646         override void bar (int) { ++this.bar_count; }  // not in any overload set
1647         // This one is part of the overload set of the node, but not of the API
1648         // It can't be accessed via API and can't be filtered out
1649         void bar(string) { assert(0); }
1650 
1651         override void callFoo()
1652         {
1653             if (!this.remote)
1654                 this.remote = new RemoteAPI!API(node_listener);
1655 
1656             try
1657             {
1658                 this.remote.foo();
1659             }
1660             catch (Exception ex)
1661             {
1662                 assert(ex.msg == "Filtered method 'foo()'");
1663             }
1664         }
1665 
1666         override void callFoo(int arg)
1667         {
1668             if (!this.remote)
1669                 this.remote = new RemoteAPI!API(node_listener);
1670 
1671             try
1672             {
1673                 this.remote.foo(arg);
1674             }
1675             catch (Exception ex)
1676             {
1677                 assert(ex.msg == "Filtered method 'foo(int)'");
1678             }
1679         }
1680 
1681         override void callBar(int arg)
1682         {
1683             if (!this.remote)
1684                 this.remote = new RemoteAPI!API(node_listener);
1685 
1686             try
1687             {
1688                 this.remote.bar(arg);
1689             }
1690             catch (Exception ex)
1691             {
1692                 assert(ex.msg == "Filtered method 'bar(int)'");
1693             }
1694         }
1695     }
1696 
1697     auto filtered = RemoteAPI!API.spawn!Node();
1698     node_listener = filtered.ctrl.listener();
1699 
1700     // caller will call filtered
1701     auto caller = RemoteAPI!API.spawn!Node();
1702     scope (exit) {
1703         filtered.ctrl.shutdown();
1704         caller.ctrl.shutdown();
1705         thread_joinAll();
1706     }
1707     caller.callFoo();
1708     assert(filtered.fooCount() == 1);
1709 
1710     // both of these work
1711     static assert(is(typeof(filtered.filter!(API.foo))));
1712     static assert(is(typeof(filtered.filter!(filtered.foo))));
1713 
1714     // only method in the overload set that takes a parameter,
1715     // should still match a call to filter with no parameters
1716     static assert(is(typeof(filtered.filter!(filtered.bar))));
1717 
1718     // wrong parameters => fail to compile
1719     static assert(!is(typeof(filtered.filter!(filtered.bar, float))));
1720     // Only `API` overload sets are considered
1721     static assert(!is(typeof(filtered.filter!(filtered.bar, string))));
1722 
1723     filtered.filter!(API.foo);
1724 
1725     caller.callFoo();
1726     assert(filtered.fooCount() == 1);  // it was not called!
1727 
1728     filtered.clearFilter();  // clear the filter
1729     caller.callFoo();
1730     assert(filtered.fooCount() == 2);  // it was called!
1731 
1732     // verify foo(int) works first
1733     caller.callFoo(1);
1734     assert(filtered.fooCount() == 2);
1735     assert(filtered.fooIntCount() == 1);  // first time called
1736 
1737     // now filter only the int overload
1738     filtered.filter!(API.foo, int);
1739 
1740     // make sure the parameterless overload is still not filtered
1741     caller.callFoo();
1742     assert(filtered.fooCount() == 3);  // updated
1743 
1744     caller.callFoo(1);
1745     assert(filtered.fooIntCount() == 1);  // call filtered!
1746 
1747     // not filtered yet
1748     caller.callBar(1);
1749     assert(filtered.barCount() == 1);
1750 
1751     filtered.filter!(filtered.bar);
1752     caller.callBar(1);
1753     assert(filtered.barCount() == 1);  // filtered!
1754 
1755     // last blocking calls, to ensure the previous calls complete
1756     filtered.clearFilter();
1757     caller.foo();
1758     caller.bar(1);
1759 }
1760 
1761 // request timeouts (from main thread)
1762 unittest
1763 {
1764     import core.thread;
1765     import std.exception;
1766 
1767     static interface API
1768     {
1769         size_t sleepFor (long dur);
1770         void ping ();
1771     }
1772 
1773     static class Node : API
1774     {
1775         override size_t sleepFor (long dur)
1776         {
1777             Thread.sleep(msecs(dur));
1778             return 42;
1779         }
1780 
1781         override void ping () { }
1782     }
1783 
1784     // node with no timeout
1785     auto node = RemoteAPI!API.spawn!Node();
1786     scope (exit) {
1787         node.ctrl.shutdown();
1788         thread_joinAll();
1789     }
1790     assert(node.sleepFor(80) == 42);  // no timeout
1791 
1792     // custom timeout
1793     bool called;
1794     node.ctrl.withTimeout(100.msecs,
1795         (scope API api) {
1796             assertThrown!Exception(api.sleepFor(2000));
1797             called = true;
1798         });
1799     assert(called);
1800 
1801     called = false;
1802     struct S
1803     {
1804         void opCall (scope API api)
1805         {
1806             assertThrown!Exception(api.sleepFor(2000));
1807             called = true;
1808         }
1809     }
1810     S s;
1811     node.ctrl.withTimeout(100.msecs, s);
1812     assert(called);
1813 
1814     // Test that attributes are inferred based on the delegate
1815     void doTest () @safe nothrow
1816     {
1817         called = false;
1818         node.ctrl.withTimeout(Duration.zero,
1819                               (scope API api) { called = true; });
1820         assert(called);
1821     }
1822     doTest();
1823 
1824     // node with a configured timeout
1825     auto to_node = RemoteAPI!API.spawn!Node(500.msecs);
1826     scope (exit) to_node.ctrl.shutdown();
1827 
1828     /// none of these should time out
1829     assert(to_node.sleepFor(10) == 42);
1830     assert(to_node.sleepFor(20) == 42);
1831     assert(to_node.sleepFor(30) == 42);
1832     assert(to_node.sleepFor(40) == 42);
1833 
1834     assertThrown!Exception(to_node.sleepFor(2000));
1835     to_node.ctrl.withTimeout(3.seconds,  // wait for the node to wake up
1836           (scope API api) { api.ping(); });
1837 }
1838 
1839 // test-case for responses to re-used requests (from main thread)
1840 unittest
1841 {
1842     import core.thread;
1843     import std.exception;
1844 
1845     static interface API
1846     {
1847         float getFloat();
1848         size_t sleepFor (long dur);
1849     }
1850 
1851     static class Node : API
1852     {
1853         override float getFloat() { return 69.69; }
1854         override size_t sleepFor (long dur)
1855         {
1856             Thread.sleep(msecs(dur));
1857             return 42;
1858         }
1859     }
1860 
1861     // node with no timeout
1862     auto node = RemoteAPI!API.spawn!Node();
1863     scope (exit) {
1864         node.ctrl.shutdown();
1865         thread_joinAll();
1866     }
1867     assert(node.sleepFor(80) == 42);  // no timeout
1868 
1869     // node with a configured timeout
1870     auto to_node = RemoteAPI!API.spawn!Node(500.msecs);
1871     scope (exit) to_node.ctrl.shutdown();
1872     /// none of these should time out
1873     assert(to_node.sleepFor(10) == 42);
1874     assert(to_node.sleepFor(20) == 42);
1875     assert(to_node.sleepFor(30) == 42);
1876     assert(to_node.sleepFor(40) == 42);
1877 
1878     assertThrown!Exception(to_node.sleepFor(2000));
1879     to_node.ctrl.withTimeout(3.seconds,  // wait for the node to wake up
1880       (scope API api) { assert(cast(int)api.getFloat() == 69); });
1881 }
1882 
1883 // request timeouts (foreign node to another node)
1884 unittest
1885 {
1886     static import geod24.concurrency;
1887     import std.exception;
1888 
1889     static interface API
1890     {
1891         void check ();
1892         int ping ();
1893     }
1894 
1895     __gshared Listener!API node_listener;
1896 
1897     static class Node : API
1898     {
1899         override int ping () { return 42; }
1900 
1901         override void check ()
1902         {
1903             auto node = new RemoteAPI!API(node_listener, 500.msecs);
1904 
1905             // no time-out
1906             node.ctrl.sleep(10.msecs);
1907             assert(node.ping() == 42);
1908 
1909             // time-out
1910             node.ctrl.sleep(2000.msecs);
1911             assertThrown!Exception(node.ping());
1912         }
1913     }
1914 
1915     auto node_1 = RemoteAPI!API.spawn!Node(5.seconds);
1916     auto node_2 = RemoteAPI!API.spawn!Node();
1917     scope (exit) {
1918         node_1.ctrl.shutdown();
1919         node_2.ctrl.shutdown();
1920         thread_joinAll();
1921     }
1922     node_listener = node_2.ctrl.listener();
1923     node_1.check();
1924 }
1925 
1926 // test-case for zombie responses
1927 unittest
1928 {
1929     static import geod24.concurrency;
1930     import std.exception;
1931 
1932     static interface API
1933     {
1934         void check ();
1935         int get42 ();
1936         int get69 ();
1937     }
1938 
1939     __gshared Listener!API node_listener;
1940 
1941     static class Node : API
1942     {
1943         override int get42 () { return 42; }
1944         override int get69 () { return 69; }
1945 
1946         override void check ()
1947         {
1948             auto node = new RemoteAPI!API(node_listener, 500.msecs);
1949 
1950             // time-out
1951             node.ctrl.sleep(2000.msecs);
1952             assertThrown!Exception(node.get42());
1953 
1954             // no time-out
1955             node.ctrl.sleep(10.msecs);
1956             assert(node.get69() == 69);
1957         }
1958     }
1959 
1960     auto node_1 = RemoteAPI!API.spawn!Node(5.seconds);
1961     auto node_2 = RemoteAPI!API.spawn!Node();
1962     scope (exit) {
1963         node_1.ctrl.shutdown();
1964         node_2.ctrl.shutdown();
1965         thread_joinAll();
1966     }
1967     node_listener = node_2.ctrl.listener();
1968     node_1.check();
1969 }
1970 
1971 // request timeouts with dropped messages
1972 unittest
1973 {
1974     static import geod24.concurrency;
1975     import std.exception;
1976 
1977     static interface API
1978     {
1979         void check ();
1980         int ping ();
1981     }
1982 
1983     __gshared Listener!API node_listener;
1984 
1985     static class Node : API
1986     {
1987         override int ping () { return 42; }
1988 
1989         override void check ()
1990         {
1991             auto node = new RemoteAPI!API(node_listener, 420.msecs);
1992 
1993             // Requests are dropped, so it times out
1994             assert(node.ping() == 42);
1995             node.ctrl.sleep(10.msecs, true);
1996             assertThrown!Exception(node.ping());
1997         }
1998     }
1999 
2000     auto node_1 = RemoteAPI!API.spawn!Node(5.seconds);
2001     auto node_2 = RemoteAPI!API.spawn!Node();
2002     scope (exit) {
2003         node_1.ctrl.shutdown();
2004         node_2.ctrl.shutdown();
2005         thread_joinAll();
2006     }
2007     node_listener = node_2.ctrl.listener();
2008     node_1.check();
2009 }
2010 
2011 
2012 // Test a node that gets a replay while it's delayed
2013 unittest
2014 {
2015     static import geod24.concurrency;
2016     import std.exception;
2017 
2018     static interface API
2019     {
2020         void check ();
2021         int ping ();
2022     }
2023 
2024     __gshared Listener!API node_listener;
2025 
2026     static class Node : API
2027     {
2028         override int ping () { return 42; }
2029 
2030         override void check ()
2031         {
2032             auto node = new RemoteAPI!API(node_listener, 5000.msecs);
2033             assert(node.ping() == 42);
2034             // We need to return immediately so that the main thread
2035             // puts us to sleep
2036             runTask(() {
2037                     node.ctrl.sleep(200.msecs);
2038                     assert(node.ping() == 42);
2039                 });
2040         }
2041     }
2042 
2043     auto node_1 = RemoteAPI!API.spawn!Node(500.msecs);
2044     auto node_2 = RemoteAPI!API.spawn!Node();
2045     scope (exit) {
2046         node_1.ctrl.shutdown();
2047         node_2.ctrl.shutdown();
2048         thread_joinAll();
2049     }
2050     node_listener = node_2.ctrl.listener();
2051     node_1.check();
2052     node_1.ctrl.sleep(300.msecs);
2053     assert(node_1.ping() == 42);
2054 }
2055 
2056 // Test explicit shutdown
2057 unittest
2058 {
2059     import std.exception;
2060 
2061     static interface API
2062     {
2063         int myping (int value);
2064     }
2065 
2066     static class Node : API
2067     {
2068         override int myping (int value)
2069         {
2070             return value;
2071         }
2072     }
2073 
2074     auto node = RemoteAPI!API.spawn!Node(1.seconds);
2075     scope (failure) node.ctrl.shutdown();
2076     assert(node.myping(42) == 42);
2077     node.ctrl.shutdown();
2078     thread_joinAll();
2079 
2080     try
2081     {
2082         node.myping(69);
2083         assert(0);
2084     }
2085     catch (Exception ex)
2086     {
2087         assert(ex.msg == "Connection with peer closed");
2088     }
2089 }
2090 
2091 unittest
2092 {
2093     import core.thread : thread_joinAll;
2094     static import geod24.concurrency;
2095 
2096     static interface API
2097     {
2098         void segfault ();
2099         void check ();
2100     }
2101 
2102     __gshared Listener!API node_listener;
2103 
2104     static class Node : API
2105     {
2106         override void segfault ()
2107         {
2108             int* ptr; *ptr = 1;
2109         }
2110 
2111         override void check ()
2112         {
2113             auto node = new RemoteAPI!API(node_listener);
2114 
2115             // We need to return immediately so that the main thread can continue testing
2116             runTask(() {
2117                 node.ctrl.sleep(500.msecs);
2118                 // node may have shutdown at this point
2119                 try {
2120                     node.segfault();
2121                 } catch (Exception e) {}
2122             });
2123         }
2124     }
2125 
2126     auto node_1 = RemoteAPI!API.spawn!Node(5.seconds);
2127     auto node_2 = RemoteAPI!API.spawn!Node();
2128     scope (exit) {
2129         node_2.ctrl.shutdown();  // shut it down before wake-up, segfault() command will be ignored
2130         node_1.ctrl.shutdown();
2131         thread_joinAll();
2132     }
2133     node_listener = node_2.ctrl.listener();
2134     node_1.check();
2135 }
2136 
2137 /// Example of a custom (de)serialization policy
2138 unittest
2139 {
2140     static struct Serialize
2141     {
2142     static:
2143         public immutable(ubyte[]) serialize (T) (auto ref T value) @trusted
2144         {
2145             static assert(is(typeof({ T v = immutable(T).init; })));
2146             static if (is(T : const(ubyte)[]))
2147                 return value.idup;
2148             else
2149                 return (cast(ubyte*)&value)[0 .. T.sizeof].idup;
2150         }
2151 
2152         public QT deserialize (QT) (immutable(ubyte)[] data) @trusted
2153         {
2154             return *cast(QT*)(data.dup.ptr);
2155         }
2156     }
2157 
2158     static struct ValueType
2159     {
2160         ulong v1;
2161         uint v2;
2162         uint v3;
2163     }
2164 
2165     static interface API
2166     {
2167         @safe:
2168         public @property ulong pubkey ();
2169         public ValueType getValue (string val);
2170         // Note: Vibe.d's JSON serializer cannot serialize this
2171         public immutable(ubyte[32]) getHash (const ubyte[] val);
2172     }
2173 
2174     static class MockAPI : API
2175     {
2176         @safe:
2177         public override @property ulong pubkey () { return 42; }
2178         public override ValueType getValue (string val) { return ValueType(val.length, 2, 3); }
2179         public override immutable(ubyte[32]) getHash (const ubyte[] val)
2180         {
2181             return val.length >= 32 ? val[0 .. 32] : typeof(return).init;
2182         }
2183     }
2184 
2185     scope test = RemoteAPI!(API, Serialize).spawn!MockAPI();
2186     scope (exit) {
2187         test.ctrl.shutdown();
2188         thread_joinAll();
2189     }
2190     assert(test.pubkey() == 42);
2191     assert(test.getValue("Hello world") == ValueType(11, 2, 3));
2192     ubyte[64] val = 42;
2193     assert(test.getHash(val) == val[0 .. 32]);
2194 }
2195 
2196 /// Test node2 responding to a dead node1
2197 /// See https://github.com/Geod24/localrest/issues/64
2198 unittest
2199 {
2200     static interface API
2201     {
2202         @safe:
2203         // Main thread calls this on the first node
2204         public void call0 ();
2205         // ... which then calls this on the second node
2206         public void call1 ();
2207         public void call2 ();
2208     }
2209 
2210     __gshared Listener!API node1Addr;
2211     __gshared Listener!API node2Addr;
2212 
2213     static class Node : API
2214     {
2215         private RemoteAPI!API self;
2216 
2217         @trusted:
2218         // Main -> Node 1
2219         public override void call0 ()
2220         {
2221             this.self = new RemoteAPI!API(node1Addr);
2222             scope node2 = new RemoteAPI!API(node2Addr);
2223             node2.call1();
2224             assert(0, "This should never return as call2 shutdown this node");
2225         }
2226 
2227         // Node 1 -> Node 2
2228         public override void call1 ()
2229         {
2230             assert(this.self is null);
2231             scope node1 = new RemoteAPI!API(node1Addr);
2232             node1.call2();
2233             // Make really sure Node 1 is dead
2234             while (!node1Addr.data.isClosed())
2235                 sleep(100.msecs);
2236         }
2237 
2238         // Node 2 -> Node 1
2239         public override void call2 ()
2240         {
2241             assert(this.self !is null);
2242             this.self.ctrl.shutdown();
2243         }
2244     }
2245 
2246     // Long timeout to ensure we don't spuriously pass
2247     auto node1 = RemoteAPI!API.spawn!Node(500.msecs);
2248     auto node2 = RemoteAPI!API.spawn!Node();
2249     scope (exit) {
2250         node1.ctrl.shutdown();
2251         node2.ctrl.shutdown();
2252         thread_joinAll();
2253     }
2254     node1Addr = node1.ctrl.listener();
2255     node2Addr = node2.ctrl.listener();
2256 
2257     // This will timeout (because the node will be gone)
2258     // However if something is wrong, either `joinall` will never return,
2259     // or the `assert(0)` in `call0` will be triggered.
2260     try
2261     {
2262         node1.call0();
2263         assert(0, "This should have timed out");
2264     }
2265     catch (Exception e) {}
2266 }
2267 
2268 /// Test Timer
2269 unittest
2270 {
2271     static import core.thread;
2272     import core.time;
2273 
2274     static interface API
2275     {
2276         public void startTimer (bool periodic);
2277         public void stopTimer ();
2278         public ulong getCounter ();
2279         public void resetCounter ();
2280     }
2281 
2282     static class Node : API
2283     {
2284         private ulong counter;
2285         private Timer timer;
2286 
2287         public override void startTimer (bool periodic)
2288         {
2289             this.timer = setTimer(100.msecs, &callback, periodic);
2290         }
2291 
2292         public override void stopTimer ()
2293         {
2294             this.timer.stop();
2295         }
2296 
2297         public void callback ()
2298         {
2299             this.counter++;
2300             if (this.counter == 3)
2301                 this.timer.stop();
2302         }
2303 
2304         public override ulong getCounter ()
2305         {
2306             scope (exit) this.counter = 0;
2307             return this.counter;
2308         }
2309 
2310         public override void resetCounter ()
2311         {
2312             this.counter = 0;
2313         }
2314     }
2315 
2316     auto node = RemoteAPI!API.spawn!Node();
2317     scope (exit) {
2318         node.ctrl.shutdown();
2319         thread_joinAll();
2320     }
2321     assert(node.getCounter() == 0);
2322     node.startTimer(true);
2323     core.thread.Thread.sleep(1.seconds);
2324     // The expected count is 3
2325     // Check means the timer repeated and the timer stoped
2326     assert(node.getCounter() == 3);
2327     node.resetCounter();
2328     node.startTimer(false);
2329     node.stopTimer();
2330     core.thread.Thread.sleep(500.msecs);
2331     assert(node.getCounter() == 0);
2332 }
2333 
2334 /// Test restarting a node
2335 unittest
2336 {
2337     static interface API
2338     {
2339         public uint[2] getCount () @safe;
2340     }
2341 
2342     static class Node : API
2343     {
2344         private static uint instantiationCount;
2345         private static uint destructionCount;
2346 
2347         this ()
2348         {
2349             Node.instantiationCount++;
2350         }
2351 
2352         ~this ()
2353         {
2354             Node.destructionCount++;
2355         }
2356 
2357         public override uint[2] getCount () const @safe
2358         {
2359             return [ Node.instantiationCount, Node.destructionCount, ];
2360         }
2361     }
2362 
2363     auto node = RemoteAPI!API.spawn!Node();
2364     scope (exit) {
2365         node.ctrl.shutdown();
2366         thread_joinAll();
2367     }
2368     assert(node.getCount == [1, 0]);
2369     node.ctrl.restart();
2370     assert(node.getCount == [2, 1]);
2371     node.ctrl.restart();
2372     assert(node.getCount == [3, 2]);
2373 }
2374 
2375 /// Test restarting a node that has responses waiting for it
2376 unittest
2377 {
2378     import core.atomic : atomicLoad, atomicStore;
2379     static interface API
2380     {
2381         @safe:
2382         public void call0 ();
2383         public void call1 ();
2384     }
2385 
2386     __gshared Listener!API node2Addr;
2387     static shared bool done;
2388 
2389     static class Node : API
2390     {
2391         @trusted:
2392 
2393         public override void call0 ()
2394         {
2395             scope node2 = new RemoteAPI!API(node2Addr);
2396             node2.call1();
2397         }
2398 
2399         public override void call1 ()
2400         {
2401             // when this event runs we know call1() has already returned
2402             scheduler.schedule({ atomicStore(done, true); });
2403         }
2404     }
2405 
2406     auto node1 = RemoteAPI!API.spawn!Node(500.msecs);
2407     auto node2 = RemoteAPI!API.spawn!Node();
2408     scope (exit) {
2409         node1.ctrl.shutdown();
2410         node2.ctrl.shutdown();
2411         thread_joinAll();
2412     }
2413     node2Addr = node2.ctrl.listener();
2414     node2.ctrl.sleep(2.seconds, false);
2415 
2416     try
2417     {
2418         node1.call0();
2419         assert(0, "This should have timed out");
2420     }
2421     catch (Exception e) {}
2422 
2423     node1.ctrl.restart();
2424 
2425     // after a while node 1 will receive a response to the timed-out request
2426     // to call1(), but the node restarted and is no longer interested in this
2427     // request (the request map / LocalScheduler is different), so it's filtered
2428     size_t count;
2429     while (!atomicLoad(done))
2430     {
2431         assert(count < 300);  // up to 3 seconds wait
2432         count++;
2433         Thread.sleep(10.msecs);
2434     }
2435 }
2436 
2437 unittest
2438 {
2439     import geod24.concurrency;
2440 
2441     static interface API
2442     {
2443         public void start ();
2444         public int getValue ();
2445     }
2446 
2447     static class Node : API
2448     {
2449         int value;
2450 
2451         public override void start ()
2452         {
2453             // if this is a scoped delegate, it might not have a closure,
2454             // and when the task is resumed again it will segfault.
2455             // therefore runTask() must take a non-scope delegate.
2456             // note: once upstream issue #20868 is fixed, it would become
2457             // a compiler error to escape a scope delegate.
2458             runTask(
2459             {
2460                 value = 1;
2461                 FiberScheduler.yield();
2462                 value = 2;
2463             });
2464         }
2465 
2466         public override int getValue () { return this.value; }
2467     }
2468 
2469     auto node = RemoteAPI!API.spawn!Node();
2470     scope (exit) {
2471         node.ctrl.shutdown();
2472         thread_joinAll();
2473     }
2474     node.start();
2475     assert(node.getValue() == 2);
2476 }
2477 
2478 /// Situation: Calling a node with an interface that doesn't exists
2479 /// Expectation: The client throws an exception with a useful error message
2480 /// This can happen by mistake (API mixup) or when a method is optional.
2481 unittest
2482 {
2483     import std.exception : assertThrown;
2484 
2485     static interface BaseAPI
2486     {
2487         public int required ();
2488     }
2489 
2490     static interface APIExtended : BaseAPI
2491     {
2492         public int optional ();
2493     }
2494 
2495     static class BaseNode : BaseAPI
2496     {
2497         public override int required () { return 42; }
2498     }
2499 
2500     auto node = RemoteAPI!BaseAPI.spawn!BaseNode();
2501     scope (exit) {
2502         node.ctrl.shutdown();
2503         thread_joinAll();
2504     }
2505     // Note: Now that the `Listener` is typed, this kind of error is harder
2506     // to make. However, it might still happen in the wild
2507     // (e.g. true client/server interfacing where to sources get out of date)
2508     scope extnode = new RemoteAPI!APIExtended(
2509         Listener!APIExtended(node.ctrl.listener().data));
2510     assert(extnode.required() == 42);
2511     assertThrown!ClientException(extnode.optional());
2512 }
2513 
2514 /// test that runTask works in the constructor
2515 unittest
2516 {
2517     __gshared bool called;
2518     static interface API
2519     {
2520     @safe:
2521         public @property ulong pubkey ();
2522     }
2523 
2524     static class MockAPI : API
2525     {
2526     @safe:
2527         this () @trusted
2528         {
2529             runTask(&callMe);
2530         }
2531 
2532         void callMe () @trusted
2533         {
2534             called = true;
2535         }
2536 
2537         public override @property ulong pubkey () { return 42; }
2538     }
2539 
2540     scope test = RemoteAPI!API.spawn!MockAPI();
2541     scope (exit)
2542     {
2543         test.ctrl.shutdown();
2544         thread_joinAll();
2545     }
2546     assert(test.pubkey() == 42);
2547     assert(called);
2548 }
2549 
2550 // Self connection deadlock scenario
2551 // without the proper fix, this test should hang
2552 unittest
2553 {
2554     import core.atomic;
2555 
2556     static interface API
2557     {
2558         @safe:
2559         public void call0 ();
2560         public int get44 ();
2561     }
2562 
2563     __gshared Listener!API node1Addr;
2564 
2565     static class Node : API
2566     {
2567         private RemoteAPI!API self;
2568 
2569         @trusted:
2570         public override void call0 ()
2571         {
2572             this.self = new RemoteAPI!API(atomicLoad(node1Addr));
2573             this.self.ctrl.sleep(1.seconds);
2574             assert(this.self.get44() == 44);
2575         }
2576 
2577         public override int get44 ()
2578         {
2579             return 44;
2580         }
2581     }
2582 
2583     auto node1 = RemoteAPI!API.spawn!Node();
2584     scope (exit)
2585     {
2586         node1.ctrl.shutdown();
2587         thread_joinAll();
2588     }
2589 
2590     atomicStore(node1Addr, node1.ctrl.listener());
2591     node1.call0();
2592 }