Create an instante of a client
Introduce a namespace to avoid name clashes
Instantiate a node and start it
The interface defining the API to implement
An aggregate which follows the requirement explained above.
Simple usage example
static interface API { @safe: public @property ulong pubkey (); public string getValue (ulong idx); public ubyte[32] getQuorumSet (); public string recv (string data); } static class MockAPI : API { @safe: public override @property ulong pubkey () { return 42; } public override string getValue (ulong idx) { assert(0); } public override ubyte[32] getQuorumSet () { assert(0); } public override string recv (string data) { assert(0); } } scope test = RemoteAPI!API.spawn!MockAPI(); scope (exit) { test.ctrl.shutdown(); thread_joinAll(); } assert(test.pubkey() == 42);
Example where a shutdown() routine must be called on a node before its destructor is called
__gshared bool dtor_called; __gshared bool shutdown_called; __gshared bool onDestroy_called; static interface API { @safe: public @property ulong pubkey (); } static class MockAPI : API { public override @property ulong pubkey () @safe { return 42; } public void shutdown () { shutdown_called = true; } ~this () { dtor_called = true; } } static void onDestroy (Object node) { assert(!dtor_called); auto mock = cast(MockAPI)node; assert(mock !is null); mock.shutdown(); onDestroy_called = true; } scope test = RemoteAPI!API.spawn!MockAPI(); scope (failure) test.ctrl.shutdown(); assert(test.pubkey() == 42); test.ctrl.shutdown(&onDestroy); thread_joinAll(); assert(dtor_called); assert(onDestroy_called); assert(shutdown_called);
In a real world usage, users will most likely need to use the registry
1 import std.conv; 2 static import geod24.concurrency; 3 import geod24.Registry; 4 5 static interface API 6 { 7 @safe: 8 public @property ulong pubkey (); 9 public string getValue (ulong idx); 10 public string recv (string data); 11 public string recv (ulong index, string data); 12 13 public string last (); 14 } 15 16 __gshared Registry!API registry; 17 registry.initialize(); 18 19 static class Node : API 20 { 21 @safe: 22 public this (bool isByzantine) { this.isByzantine = isByzantine; } 23 public override @property ulong pubkey () 24 { lastCall = `pubkey`; return this.isByzantine ? 0 : 42; } 25 public override string getValue (ulong idx) 26 { lastCall = `getValue`; return null; } 27 public override string recv (string data) 28 { lastCall = `recv@1`; return null; } 29 public override string recv (ulong index, string data) 30 { lastCall = `recv@2`; return null; } 31 32 public override string last () { return this.lastCall; } 33 34 private bool isByzantine; 35 private string lastCall; 36 } 37 38 static RemoteAPI!API factory (string type, ulong hash) 39 { 40 const name = hash.to!string; 41 auto listener = registry.locate(name); 42 if (listener !is Listener!API.init) 43 return new RemoteAPI!API(listener); 44 45 switch (type) 46 { 47 case "normal": 48 auto ret = RemoteAPI!API.spawn!Node(false); 49 registry.register(name, ret.ctrl.listener()); 50 return ret; 51 case "byzantine": 52 auto ret = RemoteAPI!API.spawn!Node(true); 53 registry.register(name, ret.ctrl.listener()); 54 return ret; 55 default: 56 assert(0, type); 57 } 58 } 59 60 auto node1 = factory("normal", 1); 61 auto node2 = factory("byzantine", 2); 62 63 static void testFunc() 64 { 65 auto node1 = factory("this does not matter", 1); 66 auto node2 = factory("neither does this", 2); 67 scope (exit) { 68 node1.ctrl.shutdown(); 69 node2.ctrl.shutdown(); 70 } 71 assert(node1.pubkey() == 42); 72 assert(node1.last() == "pubkey"); 73 assert(node2.pubkey() == 0); 74 assert(node2.last() == "pubkey"); 75 76 node1.recv(42, null); 77 assert(node1.last() == "recv@2"); 78 node1.recv(null); 79 assert(node1.last() == "recv@1"); 80 assert(node2.last() == "pubkey"); 81 } 82 83 scope thread = new Thread(&testFunc); 84 thread.start(); 85 // Make sure our main thread terminates after everyone else 86 thread_joinAll();
This network have different types of nodes in it
1 import geod24.concurrency; 2 3 static interface API 4 { 5 @safe: 6 public @property ulong requests (); 7 public @property ulong value (); 8 } 9 10 static class MasterNode : API 11 { 12 @safe: 13 public override @property ulong requests() 14 { 15 return this.requests_; 16 } 17 18 public override @property ulong value() 19 { 20 this.requests_++; 21 return 42; // Of course 22 } 23 24 private ulong requests_; 25 } 26 27 static class SlaveNode : API 28 { 29 @safe: 30 this(Listener!API masterConn) 31 { 32 this.master = new RemoteAPI!API(masterConn); 33 } 34 35 public override @property ulong requests() 36 { 37 return this.requests_; 38 } 39 40 public override @property ulong value() 41 { 42 this.requests_++; 43 return master.value(); 44 } 45 46 private API master; 47 private ulong requests_; 48 } 49 50 RemoteAPI!API[4] nodes; 51 auto master = RemoteAPI!API.spawn!MasterNode(); 52 nodes[0] = master; 53 nodes[1] = RemoteAPI!API.spawn!SlaveNode(master.ctrl.listener()); 54 nodes[2] = RemoteAPI!API.spawn!SlaveNode(master.ctrl.listener()); 55 nodes[3] = RemoteAPI!API.spawn!SlaveNode(master.ctrl.listener()); 56 scope (exit) { 57 import std.algorithm; 58 nodes.each!(node => node.ctrl.shutdown()); 59 thread_joinAll(); 60 } 61 62 foreach (n; nodes) 63 { 64 assert(n.requests() == 0); 65 assert(n.value() == 42); 66 } 67 68 assert(nodes[0].requests() == 4); 69 70 foreach (n; nodes[1 .. $]) 71 { 72 assert(n.value() == 42); 73 assert(n.requests() == 2); 74 } 75 76 assert(nodes[0].requests() == 7);
Support for circular nodes call
static import geod24.concurrency; static interface API { @safe: public ulong call (ulong count, ulong val); public void setNext (string name); } __gshared Listener!API[string] tbn; static class Node : API { @safe: public override ulong call (ulong count, ulong val) { if (!count) return val; return this.next.call(count - 1, val + count); } public override void setNext (string name) @trusted { this.next = new RemoteAPI!API(tbn[name]); } private API next; } RemoteAPI!(API)[3] nodes = [ RemoteAPI!API.spawn!Node(), RemoteAPI!API.spawn!Node(), RemoteAPI!API.spawn!Node(), ]; scope (exit) { import std.algorithm; nodes.each!(node => node.ctrl.shutdown()); thread_joinAll(); } foreach (idx, ref api; nodes) tbn[format("node%d", idx)] = api.ctrl.listener(); nodes[0].setNext("node1"); nodes[1].setNext("node2"); nodes[2].setNext("node0"); // 7 level of re-entrancy assert(210 == nodes[0].call(20, 0));
Nodes can start tasks
static import core.thread; import core.time; static interface API { public void start (); public ulong getCounter (); } static class Node : API { public override void start () { runTask(&this.task); } public override ulong getCounter () { scope (exit) this.counter = 0; return this.counter; } private void task () { while (true) { this.counter++; sleep(50.msecs); } } private ulong counter; } auto node = RemoteAPI!API.spawn!Node(); scope (exit) { node.ctrl.shutdown(); thread_joinAll(); } assert(node.getCounter() == 0); node.start(); assert(node.getCounter() == 1); assert(node.getCounter() == 0); core.thread.Thread.sleep(1.seconds); // It should be 19 but some machines are very slow // (e.g. Travis Mac testers) so be safe assert(node.getCounter() >= 9); assert(node.getCounter() < 9);
none of these should time out
import core.thread; import std.exception; static interface API { float getFloat(); size_t sleepFor (long dur); } static class Node : API { override float getFloat() { return 69.69; } override size_t sleepFor (long dur) { Thread.sleep(msecs(dur)); return 42; } } // node with no timeout auto node = RemoteAPI!API.spawn!Node(); scope (exit) { node.ctrl.shutdown(); thread_joinAll(); } assert(node.sleepFor(80) == 42); // no timeout // node with a configured timeout auto to_node = RemoteAPI!API.spawn!Node(500.msecs); scope (exit) to_node.ctrl.shutdown(); /// none of these should time out assert(to_node.sleepFor(10) == 42); assert(to_node.sleepFor(20) == 42); assert(to_node.sleepFor(30) == 42); assert(to_node.sleepFor(40) == 42); assertThrown!Exception(to_node.sleepFor(2000)); to_node.ctrl.withTimeout(3.seconds, // wait for the node to wake up (scope API api) { assert(cast(int)api.getFloat() == 69); });
none of these should time out
static import geod24.concurrency; import std.exception; static interface API { void check (); int ping (); } __gshared Listener!API node_listener; static class Node : API { override int ping () { return 42; } override void check () { auto node = new RemoteAPI!API(node_listener, 500.msecs); // no time-out node.ctrl.sleep(10.msecs); assert(node.ping() == 42); // time-out node.ctrl.sleep(2000.msecs); assertThrown!Exception(node.ping()); } } auto node_1 = RemoteAPI!API.spawn!Node(5.seconds); auto node_2 = RemoteAPI!API.spawn!Node(); scope (exit) { node_1.ctrl.shutdown(); node_2.ctrl.shutdown(); thread_joinAll(); } node_listener = node_2.ctrl.listener(); node_1.check();
Example of a custom (de)serialization policy
static struct Serialize { static: public immutable(ubyte[]) serialize (T) (auto ref T value) @trusted { static assert(is(typeof({ T v = immutable(T).init; }))); static if (is(T : const(ubyte)[])) return value.idup; else return (cast(ubyte*)&value)[0 .. T.sizeof].idup; } public QT deserialize (QT) (immutable(ubyte)[] data) @trusted { return *cast(QT*)(data.dup.ptr); } } static struct ValueType { ulong v1; uint v2; uint v3; } static interface API { @safe: public @property ulong pubkey (); public ValueType getValue (string val); // Note: Vibe.d's JSON serializer cannot serialize this public immutable(ubyte[32]) getHash (const ubyte[] val); } static class MockAPI : API { @safe: public override @property ulong pubkey () { return 42; } public override ValueType getValue (string val) { return ValueType(val.length, 2, 3); } public override immutable(ubyte[32]) getHash (const ubyte[] val) { return val.length >= 32 ? val[0 .. 32] : typeof(return).init; } } scope test = RemoteAPI!(API, Serialize).spawn!MockAPI(); scope (exit) { test.ctrl.shutdown(); thread_joinAll(); } assert(test.pubkey() == 42); assert(test.getValue("Hello world") == ValueType(11, 2, 3)); ubyte[64] val = 42; assert(test.getHash(val) == val[0 .. 32]);
Test node2 responding to a dead node1 See https://github.com/Geod24/localrest/issues/64
1 static interface API 2 { 3 @safe: 4 // Main thread calls this on the first node 5 public void call0 (); 6 // ... which then calls this on the second node 7 public void call1 (); 8 public void call2 (); 9 } 10 11 __gshared Listener!API node1Addr; 12 __gshared Listener!API node2Addr; 13 14 static class Node : API 15 { 16 private RemoteAPI!API self; 17 18 @trusted: 19 // Main -> Node 1 20 public override void call0 () 21 { 22 this.self = new RemoteAPI!API(node1Addr); 23 scope node2 = new RemoteAPI!API(node2Addr); 24 node2.call1(); 25 assert(0, "This should never return as call2 shutdown this node"); 26 } 27 28 // Node 1 -> Node 2 29 public override void call1 () 30 { 31 assert(this.self is null); 32 scope node1 = new RemoteAPI!API(node1Addr); 33 node1.call2(); 34 // Make really sure Node 1 is dead 35 while (!node1Addr.data.isClosed()) 36 sleep(100.msecs); 37 } 38 39 // Node 2 -> Node 1 40 public override void call2 () 41 { 42 assert(this.self !is null); 43 this.self.ctrl.shutdown(); 44 } 45 } 46 47 // Long timeout to ensure we don't spuriously pass 48 auto node1 = RemoteAPI!API.spawn!Node(500.msecs); 49 auto node2 = RemoteAPI!API.spawn!Node(); 50 scope (exit) { 51 node1.ctrl.shutdown(); 52 node2.ctrl.shutdown(); 53 thread_joinAll(); 54 } 55 node1Addr = node1.ctrl.listener(); 56 node2Addr = node2.ctrl.listener(); 57 58 // This will timeout (because the node will be gone) 59 // However if something is wrong, either `joinall` will never return, 60 // or the `assert(0)` in `call0` will be triggered. 61 try 62 { 63 node1.call0(); 64 assert(0, "This should have timed out"); 65 } 66 catch (Exception e) {}
Test Timer
1 static import core.thread; 2 import core.time; 3 4 static interface API 5 { 6 public void startTimer (bool periodic); 7 public void stopTimer (); 8 public ulong getCounter (); 9 public void resetCounter (); 10 public void fireTimer (); 11 } 12 13 static class Node : API 14 { 15 private ulong counter; 16 private Timer timer; 17 18 public override void startTimer (bool periodic) 19 { 20 this.timer = setTimer(100.msecs, &callback, periodic); 21 } 22 23 public override void stopTimer () 24 { 25 this.timer.stop(); 26 } 27 28 public override void fireTimer () 29 { 30 this.timer.rearm(0.msecs, false); 31 } 32 33 public void callback () 34 { 35 this.counter++; 36 if (this.counter == 3) 37 this.timer.stop(); 38 } 39 40 public override ulong getCounter () 41 { 42 scope (exit) this.counter = 0; 43 return this.counter; 44 } 45 46 public override void resetCounter () 47 { 48 this.counter = 0; 49 } 50 } 51 52 auto node = RemoteAPI!API.spawn!Node(); 53 scope (exit) { 54 node.ctrl.shutdown(); 55 thread_joinAll(); 56 } 57 assert(node.getCounter() == 0); 58 node.startTimer(true); 59 core.thread.Thread.sleep(1.seconds); 60 // The expected count is 3 61 // Check means the timer repeated and the timer stoped 62 assert(node.getCounter() == 3); 63 node.resetCounter(); 64 node.startTimer(false); 65 node.stopTimer(); 66 core.thread.Thread.sleep(500.msecs); 67 assert(node.getCounter() == 0); 68 node.fireTimer(); 69 core.thread.Thread.sleep(500.msecs); 70 assert(node.getCounter() == 1); 71 72 node.resetCounter(); 73 node.startTimer(true); 74 node.ctrl.deafen(1.seconds); 75 // timer will continue ticking for that duration 76 assert(node.getCounter() == 3); 77 node.stopTimer(); 78 79 node.resetCounter(); 80 node.startTimer(true); 81 node.ctrl.sleep(1.seconds); 82 // timer will not fire for that duration, maybe only once 83 assert(node.getCounter() < 3);
Test restarting a node
static interface API { public uint[2] getCount () @safe; } static class Node : API { private static uint instantiationCount; private static uint destructionCount; this () { Node.instantiationCount++; } ~this () { Node.destructionCount++; } public override uint[2] getCount () const @safe { return [ Node.instantiationCount, Node.destructionCount, ]; } } auto node = RemoteAPI!API.spawn!Node(); scope (exit) { node.ctrl.shutdown(); thread_joinAll(); } assert(node.getCount == [1, 0]); node.ctrl.restart(); assert(node.getCount == [2, 1]); node.ctrl.restart(); assert(node.getCount == [3, 2]);
Test restarting a node that has responses waiting for it
1 import core.atomic : atomicLoad, atomicStore; 2 static interface API 3 { 4 @safe: 5 public void call0 (); 6 public void call1 (); 7 } 8 9 __gshared Listener!API node2Addr; 10 static shared bool done; 11 12 static class Node : API 13 { 14 @trusted: 15 16 public override void call0 () 17 { 18 scope node2 = new RemoteAPI!API(node2Addr); 19 node2.call1(); 20 } 21 22 public override void call1 () 23 { 24 // when this event runs we know call1() has already returned 25 scheduler.schedule({ atomicStore(done, true); }); 26 } 27 } 28 29 auto node1 = RemoteAPI!API.spawn!Node(500.msecs); 30 auto node2 = RemoteAPI!API.spawn!Node(); 31 scope (exit) { 32 node1.ctrl.shutdown(); 33 node2.ctrl.shutdown(); 34 thread_joinAll(); 35 } 36 node2Addr = node2.ctrl.listener(); 37 node2.ctrl.sleep(2.seconds, false); 38 39 try 40 { 41 node1.call0(); 42 assert(0, "This should have timed out"); 43 } 44 catch (Exception e) {} 45 46 node1.ctrl.restart(); 47 48 // after a while node 1 will receive a response to the timed-out request 49 // to call1(), but the node restarted and is no longer interested in this 50 // request (the request map / LocalScheduler is different), so it's filtered 51 size_t count; 52 while (!atomicLoad(done)) 53 { 54 assert(count < 300); // up to 3 seconds wait 55 count++; 56 Thread.sleep(10.msecs); 57 }
Situation: Calling a node with an interface that doesn't exists Expectation: The client throws an exception with a useful error message This can happen by mistake (API mixup) or when a method is optional.
import std.exception : assertThrown; static interface BaseAPI { public int required (); } static interface APIExtended : BaseAPI { public int optional (); } static class BaseNode : BaseAPI { public override int required () { return 42; } } auto node = RemoteAPI!BaseAPI.spawn!BaseNode(); scope (exit) { node.ctrl.shutdown(); thread_joinAll(); } // Note: Now that the `Listener` is typed, this kind of error is harder // to make. However, it might still happen in the wild // (e.g. true client/server interfacing where to sources get out of date) scope extnode = new RemoteAPI!APIExtended( Listener!APIExtended(node.ctrl.listener().data)); assert(extnode.required() == 42); assertThrown!ClientException(extnode.optional());
test that runTask works in the constructor
__gshared bool called; static interface API { @safe: public @property ulong pubkey (); } static class MockAPI : API { @safe: this () @trusted { runTask(&callMe); } void callMe () @trusted { called = true; } public override @property ulong pubkey () { return 42; } } scope test = RemoteAPI!API.spawn!MockAPI(); scope (exit) { test.ctrl.shutdown(); thread_joinAll(); } assert(test.pubkey() == 42); assert(called);
A reference to an alread-instantiated node
This class serves the same purpose as a RestInterfaceClient: it is a client for an already instantiated rest API interface.
In order to instantiate a new server (in a remote thread), use the static spawn function.
Serialization: In order to support custom serialization policy, one can change the Serializer parameter. This parameter is expected to be either a template or an aggregate with two static methods, but no explicit limitation is put on the type. See geod24.Serialization's documentation for more informations.