1 /******************************************************************************* 2 3 Provides utilities to moch an async REST node on the network 4 5 Using `vibe.web.rest` allow to cleanly separate business code 6 from network code, as implementing an interface is all that's 7 needed to create a server. 8 9 However, in order for tests to simulate an asynchronous system 10 accurately, multiple nodes need to be able to run concurrently. 11 12 There are two common solutions to this, to use either fibers or threads. 13 Fibers have the advantage of being simpler to implement and predictable. 14 Threads have the advantage of more accurately describing an asynchronous 15 system and thus have the ability to uncover more issues. 16 Fibers also need to cooperate (by yielding), which means the code must 17 be more cautiously instrumented to allow it to be used for tests, 18 while Threads will just work. 19 20 The later reason is why this module went with Thread. 21 When spawning a node, a thread is spawned, a node is instantiated with 22 the provided arguments, and an event loop waits for messages sent 23 to the Tid. Messages consist of the sender's Tid, the mangled name 24 of the function to call (to support overloading) and the arguments, 25 serialized as a JSON string. 26 27 While this module's main motivation was to test REST nodes, 28 the only dependency to Vibe.d is actually to it's JSON module, 29 as Vibe.d is the only available JSON module known to the author 30 to provide an interface to deserialize composite types. 31 32 Author: Mathias 'Geod24' Lang 33 License: MIT (See LICENSE.txt) 34 Copyright: Copyright (c) 2018-2019 Mathias Lang. All rights reserved. 35 36 *******************************************************************************/ 37 38 module geod24.LocalRest; 39 40 import vibe.data.json; 41 42 import std.concurrency; 43 import std.meta : AliasSeq; 44 import std.traits : Parameters, ReturnType; 45 46 /// Data sent by the caller 47 private struct Command 48 { 49 /// Tid of the sender thread (cannot be JSON serialized) 50 Tid sender; 51 /// Method to call 52 string method; 53 /// Arguments to the method, JSON formatted 54 string args; 55 } 56 57 /// Data sent by the callee back to the caller 58 private struct Response 59 { 60 /// `true` if the method returned successfully, 61 /// `false` if an `Exception` occured 62 bool success; 63 /// If `success == true`, the JSON-serialized return value. 64 /// Otherwise, it contains `Exception.toString()`. 65 string data; 66 } 67 68 /// Simple wrapper to deal with tuples 69 /// Vibe.d might emit a pragma(msg) when T.length == 0 70 private struct ArgWrapper (T...) 71 { 72 T args; 73 } 74 75 /// Ditto 76 public final class RemoteAPI (API, Implementation : API) : API 77 { 78 static if (is(typeof(Implementation.__ctor))) 79 private alias CtorParams = Parameters!(Implementation.__ctor); 80 else 81 private alias CtorParams = AliasSeq!(); 82 83 /*************************************************************************** 84 85 Main dispatch function 86 87 This function receive string-serialized messages from the calling thread, 88 which is a struct with the sender's Tid, the method's mangleof, 89 and the method's arguments as a tuple, serialized to a JSON string. 90 91 `std.concurrency.receive` is not `@safe`, so neither is this. 92 93 Params: 94 args = Arguments to `Implementation`'s constructor 95 96 ***************************************************************************/ 97 98 private static void spawned (CtorParams...) (CtorParams args) 99 { 100 import std.format; 101 102 bool terminated = false; 103 scope node = new Implementation(args); 104 105 scope handler = (Command cmd) { 106 SWITCH: 107 switch (cmd.method) 108 { 109 static foreach (member; __traits(allMembers, API)) 110 static foreach (ovrld; __traits(getOverloads, API, member)) 111 { 112 mixin(q{ 113 case `%2$s`: 114 alias Method = ovrld; 115 try { 116 auto args = cmd.args.deserializeJson!( 117 ArgWrapper!(Parameters!ovrld)); 118 static if (!is(ReturnType!ovrld == void)) { 119 cmd.sender.send( 120 Response( 121 true, 122 node.%1$s(args.args).serializeToJsonString())); 123 } else { 124 node.%1$s(args.args); 125 cmd.sender.send(Response(true)); 126 } 127 } catch (Throwable t) { 128 // Our sender expects a response 129 cmd.sender.send(Response(false, t.toString())); 130 } 131 break SWITCH; 132 }.format(member, ovrld.mangleof)); 133 } 134 default: 135 assert(0, "Unmatched method name: " ~ cmd.method); 136 } 137 }; 138 139 while (!terminated) 140 { 141 receive((OwnerTerminated e) { terminated = true; }, 142 handler); 143 } 144 } 145 146 /// Where to send message to 147 private Tid childTid; 148 149 /// Whether or not the destructor should destroy the thread 150 private bool owner; 151 152 /*************************************************************************** 153 154 Instantiate a node node and start it 155 156 This is usually called from the main thread, which will start all the 157 nodes and then start to process request. 158 In order to have a connected network, no nodes in any thread should have 159 a different reference to the same node. 160 In practice, this means there should only be one `Tid` per `Hash`. 161 162 When this class is finalized, the child thread will be shut down. 163 164 Params: 165 args = Arguments to the object's constructor 166 167 ***************************************************************************/ 168 169 public this (CtorParams...) (CtorParams args) 170 { 171 this.childTid = spawn(&spawned!(CtorParams), args); 172 this.owner = true; 173 } 174 175 // Vibe.d mandates that method must be @safe 176 @safe: 177 178 /*************************************************************************** 179 180 Create a reference to an already existing Tid 181 182 This overload should be used by non-main Threads to get a reference 183 to an already instantiated Node. 184 185 ***************************************************************************/ 186 187 public this (Tid tid) @nogc pure nothrow 188 { 189 this.childTid = tid; 190 this.owner = false; 191 } 192 193 public Tid tid () @nogc pure nothrow 194 { 195 return this.childTid; 196 } 197 198 /*************************************************************************** 199 200 Generate the API `override` which forward to the actual object 201 202 ***************************************************************************/ 203 204 static foreach (member; __traits(allMembers, API)) 205 static foreach (ovrld; __traits(getOverloads, API, member)) 206 { 207 mixin(q{ 208 override ReturnType!(ovrld) } ~ member ~ q{ (Parameters!ovrld params) 209 { 210 auto serialized = ArgWrapper!(Parameters!ovrld)(params) 211 .serializeToJsonString(); 212 auto command = Command(thisTid(), ovrld.mangleof, serialized); 213 // `std.concurrency.send/receive[Only]` is not `@safe` but 214 // this overload needs to be 215 auto res = () @trusted { 216 this.childTid.send(command); 217 return receiveOnly!(Response); 218 }(); 219 if (!res.success) 220 throw new Exception(res.data); 221 static if (!is(ReturnType!(ovrld) == void)) 222 return res.data.deserializeJson!(typeof(return)); 223 } 224 }); 225 } 226 } 227 228 /// Simple usage example 229 unittest 230 { 231 static interface API 232 { 233 @safe: 234 public @property ulong pubkey (); 235 public Json getValue (ulong idx); 236 public Json getQuorumSet (); 237 public string recv (Json data); 238 } 239 240 static class MockAPI : API 241 { 242 @safe: 243 public override @property ulong pubkey () 244 { return 42; } 245 public override Json getValue (ulong idx) 246 { assert(0); } 247 public override Json getQuorumSet () 248 { assert(0); } 249 public override string recv (Json data) 250 { assert(0); } 251 } 252 253 scope test = new RemoteAPI!(API, MockAPI)(); 254 assert(test.pubkey() == 42); 255 } 256 257 /// In a real world usage, users will most likely need to use the registry 258 unittest 259 { 260 import std.conv; 261 static import std.concurrency; 262 263 static interface API 264 { 265 @safe: 266 public @property ulong pubkey (); 267 public Json getValue (ulong idx); 268 public string recv (Json data); 269 public string recv (ulong index, Json data); 270 271 public string last (); 272 } 273 274 static class Node : API 275 { 276 @safe: 277 public this (bool isByzantine) { this.isByzantine = isByzantine; } 278 public override @property ulong pubkey () 279 { lastCall = `pubkey`; return this.isByzantine ? 0 : 42; } 280 public override Json getValue (ulong idx) 281 { lastCall = `getValue`; return Json.init; } 282 public override string recv (Json data) 283 { lastCall = `recv@1`; return null; } 284 public override string recv (ulong index, Json data) 285 { lastCall = `recv@2`; return null; } 286 287 public override string last () { return this.lastCall; } 288 289 private bool isByzantine; 290 private string lastCall; 291 } 292 293 static API factory (string type, ulong hash) 294 { 295 const name = hash.to!string; 296 auto tid = std.concurrency.locate(name); 297 if (tid != tid.init) 298 return new RemoteAPI!(API, Node)(tid); 299 300 switch (type) 301 { 302 case "normal": 303 auto ret = new RemoteAPI!(API, Node)(false); 304 std.concurrency.register(name, ret.tid()); 305 return ret; 306 case "byzantine": 307 auto ret = new RemoteAPI!(API, Node)(true); 308 std.concurrency.register(name, ret.tid()); 309 return ret; 310 default: 311 assert(0); 312 } 313 } 314 315 auto node1 = factory("normal", 1); 316 auto node2 = factory("byzantine", 2); 317 318 static void testFunc() 319 { 320 auto node1 = factory("this does not matter", 1); 321 auto node2 = factory("neither does this", 2); 322 assert(node1.pubkey() == 42); 323 assert(node1.last() == "pubkey"); 324 assert(node2.pubkey() == 0); 325 assert(node2.last() == "pubkey"); 326 327 node1.recv(42, Json.init); 328 assert(node1.last() == "recv@2"); 329 node1.recv(Json.init); 330 assert(node1.last() == "recv@1"); 331 assert(node2.last() == "pubkey"); 332 } 333 334 auto testerFiber = spawn(&testFunc); 335 }