RemoteAPI

A reference to an alread-instantiated node

This class serves the same purpose as a RestInterfaceClient: it is a client for an already instantiated rest API interface.

In order to instantiate a new server (in a remote thread), use the static spawn function.

Serialization: In order to support custom serialization policy, one can change the Serializer parameter. This parameter is expected to be either a template or an aggregate with two static methods, but no explicit limitation is put on the type. See geod24.Serialization's documentation for more informations.

Constructors

this
this(Listener!API listener, Duration timeout)

Create an instante of a client

Members

Mixins

ctrl
mixin ControlInterface!() ctrl

Introduce a namespace to avoid name clashes

Static functions

spawn
RemoteAPI spawn(CtorParams!Impl args, Duration timeout, string file, int line)

Instantiate a node and start it

Parameters

API

The interface defining the API to implement

S

An aggregate which follows the requirement explained above.

Examples

Simple usage example

static interface API
{
    @safe:
    public @property ulong pubkey ();
    public string getValue (ulong idx);
    public ubyte[32] getQuorumSet ();
    public string recv (string data);
}

static class MockAPI : API
{
    @safe:
    public override @property ulong pubkey ()
    { return 42; }
    public override string getValue (ulong idx)
    { assert(0); }
    public override ubyte[32] getQuorumSet ()
    { assert(0); }
    public override string recv (string data)
    { assert(0); }
}

scope test = RemoteAPI!API.spawn!MockAPI();
scope (exit) {
    test.ctrl.shutdown();
    thread_joinAll();
}
assert(test.pubkey() == 42);

Example where a shutdown() routine must be called on a node before its destructor is called

__gshared bool dtor_called;
__gshared bool shutdown_called;
__gshared bool onDestroy_called;

static interface API
{
    @safe:
    public @property ulong pubkey ();
}

static class MockAPI : API
{
    public override @property ulong pubkey () @safe
    { return 42; }
    public void shutdown () { shutdown_called = true; }
    ~this () { dtor_called = true; }
}

static void onDestroy (Object node)
{
    assert(!dtor_called);
    auto mock = cast(MockAPI)node;
    assert(mock !is null);
    mock.shutdown();
    onDestroy_called = true;
}

scope test = RemoteAPI!API.spawn!MockAPI();
scope (failure) test.ctrl.shutdown();
assert(test.pubkey() == 42);
test.ctrl.shutdown(&onDestroy);
thread_joinAll();
assert(dtor_called);
assert(onDestroy_called);
assert(shutdown_called);

In a real world usage, users will most likely need to use the registry

1 import std.conv;
2 static import geod24.concurrency;
3 import geod24.Registry;
4 
5 static interface API
6 {
7     @safe:
8     public @property ulong pubkey ();
9     public string getValue (ulong idx);
10     public string recv (string data);
11     public string recv (ulong index, string data);
12 
13     public string last ();
14 }
15 
16 __gshared Registry!API registry;
17 registry.initialize();
18 
19 static class Node : API
20 {
21     @safe:
22     public this (bool isByzantine) { this.isByzantine = isByzantine; }
23     public override @property ulong pubkey ()
24     { lastCall = `pubkey`; return this.isByzantine ? 0 : 42; }
25     public override string getValue (ulong idx)
26     { lastCall = `getValue`; return null; }
27     public override string recv (string data)
28     { lastCall = `recv@1`; return null; }
29     public override string recv (ulong index, string data)
30     { lastCall = `recv@2`; return null; }
31 
32     public override string last () { return this.lastCall; }
33 
34     private bool isByzantine;
35     private string lastCall;
36 }
37 
38 static RemoteAPI!API factory (string type, ulong hash)
39 {
40     const name = hash.to!string;
41     auto listener = registry.locate(name);
42     if (listener !is Listener!API.init)
43         return new RemoteAPI!API(listener);
44 
45     switch (type)
46     {
47     case "normal":
48         auto ret =  RemoteAPI!API.spawn!Node(false);
49         registry.register(name, ret.ctrl.listener());
50         return ret;
51     case "byzantine":
52         auto ret =  RemoteAPI!API.spawn!Node(true);
53         registry.register(name, ret.ctrl.listener());
54         return ret;
55     default:
56         assert(0, type);
57     }
58 }
59 
60 auto node1 = factory("normal", 1);
61 auto node2 = factory("byzantine", 2);
62 
63 static void testFunc()
64 {
65     auto node1 = factory("this does not matter", 1);
66     auto node2 = factory("neither does this", 2);
67     scope (exit) {
68         node1.ctrl.shutdown();
69         node2.ctrl.shutdown();
70     }
71     assert(node1.pubkey() == 42);
72     assert(node1.last() == "pubkey");
73     assert(node2.pubkey() == 0);
74     assert(node2.last() == "pubkey");
75 
76     node1.recv(42, null);
77     assert(node1.last() == "recv@2");
78     node1.recv(null);
79     assert(node1.last() == "recv@1");
80     assert(node2.last() == "pubkey");
81 }
82 
83 scope thread = new Thread(&testFunc);
84 thread.start();
85 // Make sure our main thread terminates after everyone else
86 thread_joinAll();

This network have different types of nodes in it

1 import geod24.concurrency;
2 
3 static interface API
4 {
5     @safe:
6     public @property ulong requests ();
7     public @property ulong value ();
8 }
9 
10 static class MasterNode : API
11 {
12     @safe:
13     public override @property ulong requests()
14     {
15         return this.requests_;
16     }
17 
18     public override @property ulong value()
19     {
20         this.requests_++;
21         return 42; // Of course
22     }
23 
24     private ulong requests_;
25 }
26 
27 static class SlaveNode : API
28 {
29     @safe:
30     this(Listener!API masterConn)
31     {
32         this.master = new RemoteAPI!API(masterConn);
33     }
34 
35     public override @property ulong requests()
36     {
37         return this.requests_;
38     }
39 
40     public override @property ulong value()
41     {
42         this.requests_++;
43         return master.value();
44     }
45 
46     private API master;
47     private ulong requests_;
48 }
49 
50 RemoteAPI!API[4] nodes;
51 auto master = RemoteAPI!API.spawn!MasterNode();
52 nodes[0] = master;
53 nodes[1] = RemoteAPI!API.spawn!SlaveNode(master.ctrl.listener());
54 nodes[2] = RemoteAPI!API.spawn!SlaveNode(master.ctrl.listener());
55 nodes[3] = RemoteAPI!API.spawn!SlaveNode(master.ctrl.listener());
56 scope (exit) {
57     import std.algorithm;
58     nodes.each!(node => node.ctrl.shutdown());
59     thread_joinAll();
60 }
61 
62 foreach (n; nodes)
63 {
64     assert(n.requests() == 0);
65     assert(n.value() == 42);
66 }
67 
68 assert(nodes[0].requests() == 4);
69 
70 foreach (n; nodes[1 .. $])
71 {
72     assert(n.value() == 42);
73     assert(n.requests() == 2);
74 }
75 
76 assert(nodes[0].requests() == 7);

Support for circular nodes call

static import geod24.concurrency;

static interface API
{
    @safe:
    public ulong call (ulong count, ulong val);
    public void setNext (string name);
}

__gshared Listener!API[string] tbn;

static class Node : API
{
    @safe:
    public override ulong call (ulong count, ulong val)
    {
        if (!count)
            return val;
        return this.next.call(count - 1, val + count);
    }

    public override void setNext (string name) @trusted
    {
        this.next = new RemoteAPI!API(tbn[name]);
    }

    private API next;
}

RemoteAPI!(API)[3] nodes = [
    RemoteAPI!API.spawn!Node(),
    RemoteAPI!API.spawn!Node(),
    RemoteAPI!API.spawn!Node(),
];
scope (exit) {
    import std.algorithm;
    nodes.each!(node => node.ctrl.shutdown());
    thread_joinAll();
}

foreach (idx, ref api; nodes)
    tbn[format("node%d", idx)] = api.ctrl.listener();
nodes[0].setNext("node1");
nodes[1].setNext("node2");
nodes[2].setNext("node0");

// 7 level of re-entrancy
assert(210 == nodes[0].call(20, 0));

Nodes can start tasks

static import core.thread;
import core.time;

static interface API
{
    public void start ();
    public ulong getCounter ();
}

static class Node : API
{
    public override void start ()
    {
        runTask(&this.task);
    }

    public override ulong getCounter ()
    {
        scope (exit) this.counter = 0;
        return this.counter;
    }

    private void task ()
    {
        while (true)
        {
            this.counter++;
            sleep(50.msecs);
        }
    }

    private ulong counter;
}

auto node = RemoteAPI!API.spawn!Node();
scope (exit) {
    node.ctrl.shutdown();
    thread_joinAll();
}
assert(node.getCounter() == 0);
node.start();
assert(node.getCounter() == 1);
assert(node.getCounter() == 0);
core.thread.Thread.sleep(1.seconds);
// It should be 19 but some machines are very slow
// (e.g. Travis Mac testers) so be safe
assert(node.getCounter() >= 9);
assert(node.getCounter() < 9);

none of these should time out

import core.thread;
import std.exception;

static interface API
{
    float getFloat();
    size_t sleepFor (long dur);
}

static class Node : API
{
    override float getFloat() { return 69.69; }
    override size_t sleepFor (long dur)
    {
        Thread.sleep(msecs(dur));
        return 42;
    }
}

// node with no timeout
auto node = RemoteAPI!API.spawn!Node();
scope (exit) {
    node.ctrl.shutdown();
    thread_joinAll();
}
assert(node.sleepFor(80) == 42);  // no timeout

// node with a configured timeout
auto to_node = RemoteAPI!API.spawn!Node(500.msecs);
scope (exit) to_node.ctrl.shutdown();
/// none of these should time out
assert(to_node.sleepFor(10) == 42);
assert(to_node.sleepFor(20) == 42);
assert(to_node.sleepFor(30) == 42);
assert(to_node.sleepFor(40) == 42);

assertThrown!Exception(to_node.sleepFor(2000));
to_node.ctrl.withTimeout(3.seconds,  // wait for the node to wake up
  (scope API api) { assert(cast(int)api.getFloat() == 69); });

none of these should time out

static import geod24.concurrency;
import std.exception;

static interface API
{
    void check ();
    int ping ();
}

__gshared Listener!API node_listener;

static class Node : API
{
    override int ping () { return 42; }

    override void check ()
    {
        auto node = new RemoteAPI!API(node_listener, 500.msecs);

        // no time-out
        node.ctrl.sleep(10.msecs);
        assert(node.ping() == 42);

        // time-out
        node.ctrl.sleep(2000.msecs);
        assertThrown!Exception(node.ping());
    }
}

auto node_1 = RemoteAPI!API.spawn!Node(5.seconds);
auto node_2 = RemoteAPI!API.spawn!Node();
scope (exit) {
    node_1.ctrl.shutdown();
    node_2.ctrl.shutdown();
    thread_joinAll();
}
node_listener = node_2.ctrl.listener();
node_1.check();

Example of a custom (de)serialization policy

static struct Serialize
{
static:
    public immutable(ubyte[]) serialize (T) (auto ref T value) @trusted
    {
        static assert(is(typeof({ T v = immutable(T).init; })));
        static if (is(T : const(ubyte)[]))
            return value.idup;
        else
            return (cast(ubyte*)&value)[0 .. T.sizeof].idup;
    }

    public QT deserialize (QT) (immutable(ubyte)[] data) @trusted
    {
        return *cast(QT*)(data.dup.ptr);
    }
}

static struct ValueType
{
    ulong v1;
    uint v2;
    uint v3;
}

static interface API
{
    @safe:
    public @property ulong pubkey ();
    public ValueType getValue (string val);
    // Note: Vibe.d's JSON serializer cannot serialize this
    public immutable(ubyte[32]) getHash (const ubyte[] val);
}

static class MockAPI : API
{
    @safe:
    public override @property ulong pubkey () { return 42; }
    public override ValueType getValue (string val) { return ValueType(val.length, 2, 3); }
    public override immutable(ubyte[32]) getHash (const ubyte[] val)
    {
        return val.length >= 32 ? val[0 .. 32] : typeof(return).init;
    }
}

scope test = RemoteAPI!(API, Serialize).spawn!MockAPI();
scope (exit) {
    test.ctrl.shutdown();
    thread_joinAll();
}
assert(test.pubkey() == 42);
assert(test.getValue("Hello world") == ValueType(11, 2, 3));
ubyte[64] val = 42;
assert(test.getHash(val) == val[0 .. 32]);

Test node2 responding to a dead node1 See https://github.com/Geod24/localrest/issues/64

1 static interface API
2 {
3     @safe:
4     // Main thread calls this on the first node
5     public void call0 ();
6     // ... which then calls this on the second node
7     public void call1 ();
8     public void call2 ();
9 }
10 
11 __gshared Listener!API node1Addr;
12 __gshared Listener!API node2Addr;
13 
14 static class Node : API
15 {
16     private RemoteAPI!API self;
17 
18     @trusted:
19     // Main -> Node 1
20     public override void call0 ()
21     {
22         this.self = new RemoteAPI!API(node1Addr);
23         scope node2 = new RemoteAPI!API(node2Addr);
24         node2.call1();
25         assert(0, "This should never return as call2 shutdown this node");
26     }
27 
28     // Node 1 -> Node 2
29     public override void call1 ()
30     {
31         assert(this.self is null);
32         scope node1 = new RemoteAPI!API(node1Addr);
33         node1.call2();
34         // Make really sure Node 1 is dead
35         while (!node1Addr.data.isClosed())
36             sleep(100.msecs);
37     }
38 
39     // Node 2 -> Node 1
40     public override void call2 ()
41     {
42         assert(this.self !is null);
43         this.self.ctrl.shutdown();
44     }
45 }
46 
47 // Long timeout to ensure we don't spuriously pass
48 auto node1 = RemoteAPI!API.spawn!Node(500.msecs);
49 auto node2 = RemoteAPI!API.spawn!Node();
50 scope (exit) {
51     node1.ctrl.shutdown();
52     node2.ctrl.shutdown();
53     thread_joinAll();
54 }
55 node1Addr = node1.ctrl.listener();
56 node2Addr = node2.ctrl.listener();
57 
58 // This will timeout (because the node will be gone)
59 // However if something is wrong, either `joinall` will never return,
60 // or the `assert(0)` in `call0` will be triggered.
61 try
62 {
63     node1.call0();
64     assert(0, "This should have timed out");
65 }
66 catch (Exception e) {}

Test Timer

1 static import core.thread;
2 import core.time;
3 
4 static interface API
5 {
6     public void startTimer (bool periodic);
7     public void stopTimer ();
8     public ulong getCounter ();
9     public void resetCounter ();
10 }
11 
12 static class Node : API
13 {
14     private ulong counter;
15     private Timer timer;
16 
17     public override void startTimer (bool periodic)
18     {
19         this.timer = setTimer(100.msecs, &callback, periodic);
20     }
21 
22     public override void stopTimer ()
23     {
24         this.timer.stop();
25     }
26 
27     public void callback ()
28     {
29         this.counter++;
30         if (this.counter == 3)
31             this.timer.stop();
32     }
33 
34     public override ulong getCounter ()
35     {
36         scope (exit) this.counter = 0;
37         return this.counter;
38     }
39 
40     public override void resetCounter ()
41     {
42         this.counter = 0;
43     }
44 }
45 
46 auto node = RemoteAPI!API.spawn!Node();
47 scope (exit) {
48     node.ctrl.shutdown();
49     thread_joinAll();
50 }
51 assert(node.getCounter() == 0);
52 node.startTimer(true);
53 core.thread.Thread.sleep(1.seconds);
54 // The expected count is 3
55 // Check means the timer repeated and the timer stoped
56 assert(node.getCounter() == 3);
57 node.resetCounter();
58 node.startTimer(false);
59 node.stopTimer();
60 core.thread.Thread.sleep(500.msecs);
61 assert(node.getCounter() == 0);

Test restarting a node

static interface API
{
    public uint[2] getCount () @safe;
}

static class Node : API
{
    private static uint instantiationCount;
    private static uint destructionCount;

    this ()
    {
        Node.instantiationCount++;
    }

    ~this ()
    {
        Node.destructionCount++;
    }

    public override uint[2] getCount () const @safe
    {
        return [ Node.instantiationCount, Node.destructionCount, ];
    }
}

auto node = RemoteAPI!API.spawn!Node();
scope (exit) {
    node.ctrl.shutdown();
    thread_joinAll();
}
assert(node.getCount == [1, 0]);
node.ctrl.restart();
assert(node.getCount == [2, 1]);
node.ctrl.restart();
assert(node.getCount == [3, 2]);

Test restarting a node that has responses waiting for it

1 import core.atomic : atomicLoad, atomicStore;
2 static interface API
3 {
4     @safe:
5     public void call0 ();
6     public void call1 ();
7 }
8 
9 __gshared Listener!API node2Addr;
10 static shared bool done;
11 
12 static class Node : API
13 {
14     @trusted:
15 
16     public override void call0 ()
17     {
18         scope node2 = new RemoteAPI!API(node2Addr);
19         node2.call1();
20     }
21 
22     public override void call1 ()
23     {
24         // when this event runs we know call1() has already returned
25         scheduler.schedule({ atomicStore(done, true); });
26     }
27 }
28 
29 auto node1 = RemoteAPI!API.spawn!Node(500.msecs);
30 auto node2 = RemoteAPI!API.spawn!Node();
31 scope (exit) {
32     node1.ctrl.shutdown();
33     node2.ctrl.shutdown();
34     thread_joinAll();
35 }
36 node2Addr = node2.ctrl.listener();
37 node2.ctrl.sleep(2.seconds, false);
38 
39 try
40 {
41     node1.call0();
42     assert(0, "This should have timed out");
43 }
44 catch (Exception e) {}
45 
46 node1.ctrl.restart();
47 
48 // after a while node 1 will receive a response to the timed-out request
49 // to call1(), but the node restarted and is no longer interested in this
50 // request (the request map / LocalScheduler is different), so it's filtered
51 size_t count;
52 while (!atomicLoad(done))
53 {
54     assert(count < 300);  // up to 3 seconds wait
55     count++;
56     Thread.sleep(10.msecs);
57 }

Situation: Calling a node with an interface that doesn't exists Expectation: The client throws an exception with a useful error message This can happen by mistake (API mixup) or when a method is optional.

import std.exception : assertThrown;

static interface BaseAPI
{
    public int required ();
}

static interface APIExtended : BaseAPI
{
    public int optional ();
}

static class BaseNode : BaseAPI
{
    public override int required () { return 42; }
}

auto node = RemoteAPI!BaseAPI.spawn!BaseNode();
scope (exit) {
    node.ctrl.shutdown();
    thread_joinAll();
}
// Note: Now that the `Listener` is typed, this kind of error is harder
// to make. However, it might still happen in the wild
// (e.g. true client/server interfacing where to sources get out of date)
scope extnode = new RemoteAPI!APIExtended(
    Listener!APIExtended(node.ctrl.listener().data));
assert(extnode.required() == 42);
assertThrown!ClientException(extnode.optional());

test that runTask works in the constructor

__gshared bool called;
static interface API
{
@safe:
    public @property ulong pubkey ();
}

static class MockAPI : API
{
@safe:
    this () @trusted
    {
        runTask(&callMe);
    }

    void callMe () @trusted
    {
        called = true;
    }

    public override @property ulong pubkey () { return 42; }
}

scope test = RemoteAPI!API.spawn!MockAPI();
scope (exit)
{
    test.ctrl.shutdown();
    thread_joinAll();
}
assert(test.pubkey() == 42);
assert(called);

Meta