1 /*******************************************************************************
2 
3     Provides utilities to moch an async REST node on the network
4 
5     Using `vibe.web.rest` allow to cleanly separate business code
6     from network code, as implementing an interface is all that's
7     needed to create a server.
8 
9     However, in order for tests to simulate an asynchronous system
10     accurately, multiple nodes need to be able to run concurrently.
11 
12     There are two common solutions to this, to use either fibers or threads.
13     Fibers have the advantage of being simpler to implement and predictable.
14     Threads have the advantage of more accurately describing an asynchronous
15     system and thus have the ability to uncover more issues.
16     Fibers also need to cooperate (by yielding), which means the code must
17     be more cautiously instrumented to allow it to be used for tests,
18     while Threads will just work.
19 
20     The later reason is why this module went with Thread.
21     When spawning a node, a thread is spawned, a node is instantiated with
22     the provided arguments, and an event loop waits for messages sent
23     to the Tid. Messages consist of the sender's Tid, the mangled name
24     of the function to call (to support overloading) and the arguments,
25     serialized as a JSON string.
26 
27     While this module's main motivation was to test REST nodes,
28     the only dependency to Vibe.d is actually to it's JSON module,
29     as Vibe.d is the only available JSON module known to the author
30     to provide an interface to deserialize composite types.
31 
32     Author:         Mathias 'Geod24' Lang
33     License:        MIT (See LICENSE.txt)
34     Copyright:      Copyright (c) 2018-2019 Mathias Lang. All rights reserved.
35 
36 *******************************************************************************/
37 
38 module geod24.LocalRest;
39 
40 import vibe.data.json;
41 
42 import std.concurrency;
43 import std.meta : AliasSeq;
44 import std.traits : Parameters, ReturnType;
45 
46 /// Data sent by the caller
47 private struct Command
48 {
49     /// Tid of the sender thread (cannot be JSON serialized)
50     Tid sender;
51     /// Method to call
52     string method;
53     /// Arguments to the method, JSON formatted
54     string args;
55 }
56 
57 /// Data sent by the callee back to the caller
58 private struct Response
59 {
60     /// `true` if the method returned successfully,
61     /// `false` if an `Exception` occured
62     bool success;
63     /// If `success == true`, the JSON-serialized return value.
64     /// Otherwise, it contains `Exception.toString()`.
65     string data;
66 }
67 
68 /// Simple wrapper to deal with tuples
69 /// Vibe.d might emit a pragma(msg) when T.length == 0
70 private struct ArgWrapper (T...)
71 {
72     T args;
73 }
74 
75 /// Ditto
76 public final class RemoteAPI (API, Implementation : API) : API
77 {
78     static if (is(typeof(Implementation.__ctor)))
79         private alias CtorParams = Parameters!(Implementation.__ctor);
80     else
81         private alias CtorParams = AliasSeq!();
82 
83     /***************************************************************************
84 
85         Main dispatch function
86 
87        This function receive string-serialized messages from the calling thread,
88        which is a struct with the sender's Tid, the method's mangleof,
89        and the method's arguments as a tuple, serialized to a JSON string.
90 
91        `std.concurrency.receive` is not `@safe`, so neither is this.
92 
93        Params:
94            args = Arguments to `Implementation`'s constructor
95 
96     ***************************************************************************/
97 
98     private static void spawned (CtorParams...) (CtorParams args)
99     {
100         import std.format;
101 
102         bool terminated = false;
103         scope node = new Implementation(args);
104 
105         scope handler = (Command cmd) {
106             SWITCH:
107                 switch (cmd.method)
108                 {
109                     static foreach (member; __traits(allMembers, API))
110                         static foreach (ovrld; __traits(getOverloads, API, member))
111                         {
112                             mixin(q{
113                                     case `%2$s`:
114                                     alias Method = ovrld;
115                                     try {
116                                         auto args = cmd.args.deserializeJson!(
117                                             ArgWrapper!(Parameters!ovrld));
118                                         static if (!is(ReturnType!ovrld == void)) {
119                                             cmd.sender.send(
120                                                 Response(
121                                                     true,
122                                                     node.%1$s(args.args).serializeToJsonString()));
123                                         } else {
124                                             node.%1$s(args.args);
125                                             cmd.sender.send(Response(true));
126                                         }
127                                     } catch (Throwable t) {
128                                         // Our sender expects a response
129                                         cmd.sender.send(Response(false, t.toString()));
130                                     }
131                                     break SWITCH;
132                                 }.format(member, ovrld.mangleof));
133                         }
134                 default:
135                     assert(0, "Unmatched method name: " ~ cmd.method);
136                 }
137             };
138 
139         while (!terminated)
140         {
141             receive((OwnerTerminated e) { terminated = true; },
142                     handler);
143         }
144     }
145 
146     /// Where to send message to
147     private Tid childTid;
148 
149     /// Whether or not the destructor should destroy the thread
150     private bool owner;
151 
152     /***************************************************************************
153 
154         Instantiate a node node and start it
155 
156         This is usually called from the main thread, which will start all the
157         nodes and then start to process request.
158         In order to have a connected network, no nodes in any thread should have
159         a different reference to the same node.
160         In practice, this means there should only be one `Tid` per `Hash`.
161 
162         When this class is finalized, the child thread will be shut down.
163 
164         Params:
165           args = Arguments to the object's constructor
166 
167     ***************************************************************************/
168 
169     public this (CtorParams...) (CtorParams args)
170     {
171         this.childTid = spawn(&spawned!(CtorParams), args);
172         this.owner = true;
173     }
174 
175     // Vibe.d mandates that method must be @safe
176     @safe:
177 
178     /***************************************************************************
179 
180         Create a reference to an already existing Tid
181 
182         This overload should be used by non-main Threads to get a reference
183         to an already instantiated Node.
184 
185     ***************************************************************************/
186 
187     public this (Tid tid) @nogc pure nothrow
188     {
189         this.childTid = tid;
190         this.owner = false;
191     }
192 
193     public Tid tid () @nogc pure nothrow
194     {
195         return this.childTid;
196     }
197 
198     /***************************************************************************
199 
200         Generate the API `override` which forward to the actual object
201 
202     ***************************************************************************/
203 
204     static foreach (member; __traits(allMembers, API))
205         static foreach (ovrld; __traits(getOverloads, API, member))
206         {
207             mixin(q{
208                 override ReturnType!(ovrld) } ~ member ~ q{ (Parameters!ovrld params)
209                 {
210                     auto serialized = ArgWrapper!(Parameters!ovrld)(params)
211                         .serializeToJsonString();
212                     auto command = Command(thisTid(), ovrld.mangleof, serialized);
213                     // `std.concurrency.send/receive[Only]` is not `@safe` but
214                     // this overload needs to be
215                     auto res = () @trusted {
216                         this.childTid.send(command);
217                         return receiveOnly!(Response);
218                     }();
219                     if (!res.success)
220                         throw new Exception(res.data);
221                     static if (!is(ReturnType!(ovrld) == void))
222                         return res.data.deserializeJson!(typeof(return));
223                 }
224                 });
225         }
226 }
227 
228 /// Simple usage example
229 unittest
230 {
231     static interface API
232     {
233         @safe:
234         public @property ulong pubkey ();
235         public Json getValue (ulong idx);
236         public Json getQuorumSet ();
237         public string recv (Json data);
238     }
239 
240     static class MockAPI : API
241     {
242         @safe:
243         public override @property ulong pubkey ()
244         { return 42; }
245         public override Json getValue (ulong idx)
246         { assert(0); }
247         public override Json getQuorumSet ()
248         { assert(0); }
249         public override string recv (Json data)
250         { assert(0); }
251     }
252 
253     scope test = new RemoteAPI!(API, MockAPI)();
254     assert(test.pubkey() == 42);
255 }
256 
257 /// In a real world usage, users will most likely need to use the registry
258 unittest
259 {
260     import std.conv;
261     static import std.concurrency;
262 
263     static interface API
264     {
265         @safe:
266         public @property ulong pubkey ();
267         public Json getValue (ulong idx);
268         public string recv (Json data);
269         public string recv (ulong index, Json data);
270 
271         public string last ();
272     }
273 
274     static class Node : API
275     {
276         @safe:
277         public this (bool isByzantine) { this.isByzantine = isByzantine; }
278         public override @property ulong pubkey ()
279         { lastCall = `pubkey`; return this.isByzantine ? 0 : 42; }
280         public override Json getValue (ulong idx)
281         { lastCall = `getValue`; return Json.init; }
282         public override string recv (Json data)
283         { lastCall = `recv@1`; return null; }
284         public override string recv (ulong index, Json data)
285         { lastCall = `recv@2`; return null; }
286 
287         public override string last () { return this.lastCall; }
288 
289         private bool isByzantine;
290         private string lastCall;
291     }
292 
293     static API factory (string type, ulong hash)
294     {
295         const name = hash.to!string;
296         auto tid = std.concurrency.locate(name);
297         if (tid != tid.init)
298             return new RemoteAPI!(API, Node)(tid);
299 
300         switch (type)
301         {
302         case "normal":
303             auto ret =  new RemoteAPI!(API, Node)(false);
304             std.concurrency.register(name, ret.tid());
305             return ret;
306         case "byzantine":
307             auto ret =  new RemoteAPI!(API, Node)(true);
308             std.concurrency.register(name, ret.tid());
309             return ret;
310         default:
311             assert(0);
312         }
313     }
314 
315     auto node1 = factory("normal", 1);
316     auto node2 = factory("byzantine", 2);
317 
318     static void testFunc()
319     {
320         auto node1 = factory("this does not matter", 1);
321         auto node2 = factory("neither does this", 2);
322         assert(node1.pubkey() == 42);
323         assert(node1.last() == "pubkey");
324         assert(node2.pubkey() == 0);
325         assert(node2.last() == "pubkey");
326 
327         node1.recv(42, Json.init);
328         assert(node1.last() == "recv@2");
329         node1.recv(Json.init);
330         assert(node1.last() == "recv@1");
331         assert(node2.last() == "pubkey");
332     }
333 
334     auto testerFiber = spawn(&testFunc);
335 }