1 /******************************************************************************* 2 3 Registry implementation for multi-threaded access 4 5 This registry allows to look up a connection based on a `string`. 6 Conceptually, it can be seen as an equivalent to a network router, 7 as it turns addresses into "concrete" routes (pointers). 8 9 It was originally part of the `std.concurrency` module, 10 but was extracted to make it reusable. 11 12 There are two kinds of registries: typed ones (`Registry`), which will 13 provide a bit more type safety for network without intersection, or with 14 a base type, and untyped one (`AnyRegistry). The latter matches real-world 15 heterogenous networks better, as it allows to store unrelated nodes in the 16 same data structure. 17 18 *******************************************************************************/ 19 20 module geod24.Registry; 21 22 import core.sync.mutex; 23 import geod24.concurrency; 24 import geod24.LocalRest; 25 26 /// A typed network router 27 public shared struct Registry (API) 28 { 29 /// Map from a name to a connection. 30 /// Multiple names may point to the same connection. 31 private Listener!API[string] connections; 32 /// Gives all the names associated with a specific connection. 33 private string[][Listener!API] names; 34 private Mutex registryLock; 35 36 /// Initialize this registry, creating the Mutex 37 public void initialize () @safe nothrow 38 { 39 this.registryLock = new shared Mutex; 40 } 41 42 /** 43 * Gets the binding channel associated with `name`. 44 * 45 * Params: 46 * name = The name to locate within the registry. 47 * 48 * Returns: 49 * The associated binding channel or an invalid state 50 * (such as its `init` value) if `name` is not registered. 51 */ 52 public Listener!API locate (string name) 53 { 54 synchronized (registryLock) 55 { 56 if (shared(Listener!API)* c = name in this.connections) 57 return *cast(Listener!API*)c; 58 return Listener!API.init; 59 } 60 } 61 62 /** 63 * Register a new name for a connection. 64 * 65 * Associates `name` with `conn` in a process-local map. When the thread 66 * represented by `conn` terminates, any names associated with it will be 67 * automatically unregistered. 68 * 69 * Params: 70 * name = The name to associate with `conn`. 71 * conn = The connection to register. 72 * 73 * Returns: 74 * `true` if the name is available and `conn` is not known to represent a 75 * defunct thread. 76 */ 77 public bool register (string name, Listener!API conn) 78 { 79 synchronized (registryLock) 80 { 81 if (name in this.connections) 82 return false; 83 if (conn.data.isClosed) 84 return false; 85 this.names[conn] ~= name; 86 this.connections[name] = cast(shared)conn; 87 return true; 88 } 89 } 90 91 /** 92 * Removes the registered name associated with a connection. 93 * 94 * Params: 95 * name = The name to unregister. 96 * 97 * Returns: 98 * true if the name is registered, false if not. 99 */ 100 public bool unregister (string name) 101 { 102 import std.algorithm.mutation : remove, SwapStrategy; 103 import std.algorithm.searching : countUntil; 104 105 synchronized (registryLock) 106 { 107 if (shared(Listener!API)* tid = name in this.connections) 108 { 109 auto allNames = *cast(Listener!API*)tid in this.names; 110 auto pos = countUntil(*allNames, name); 111 remove!(SwapStrategy.unstable)(*allNames, pos); 112 this.connections.remove(name); 113 return true; 114 } 115 return false; 116 } 117 } 118 119 /** 120 * Clear out the content of the registry 121 * 122 * This routine is useful when performing shutdown, 123 * as some nodes might try to access others in parallel, 124 * leading to error on closed channels. 125 * Calling `clear` before starting to shut down node essentially 126 * ensures that nodes are not reachable anymore. 127 */ 128 public void clear () 129 { 130 synchronized (registryLock) 131 { 132 (cast() this.connections).clear(); 133 (cast() this.names).clear(); 134 } 135 } 136 } 137 138 /// An untyped network router 139 public shared struct AnyRegistry 140 { 141 /// This struct just presents a different API, but forwards to 142 /// an instance of `Registry!(void*)` under the hood. 143 private Registry!(void*) impl; 144 145 /// See `Registry.initialize` 146 public void initialize () @safe nothrow 147 { 148 this.impl.initialize(); 149 } 150 151 /// See `Registry.locate` 152 public Listener!API locate (API = void*) (string name) 153 { 154 return cast(Listener!API) this.impl.locate(name); 155 } 156 157 /// See `Registry.register` 158 public bool register (ListenerT : Listener!APIT, APIT) (string name, ListenerT conn) 159 { 160 return this.impl.register(name, cast(Listener!(void*)) conn); 161 } 162 163 /// See `Registry.unregister` 164 public bool unregister (string name) 165 { 166 return this.impl.unregister(name); 167 } 168 169 /// See `Registry.clear` 170 public void clear () 171 { 172 this.impl.clear(); 173 } 174 }