1 /*******************************************************************************
2 
3     Registry implementation for multi-threaded access
4 
5     This registry allows to look up a connection based on a `string`.
6     Conceptually, it can be seen as an equivalent to a network router,
7     as it turns addresses into "concrete" routes (pointers).
8 
9     It was originally part of the `std.concurrency` module,
10     but was extracted to make it reusable.
11 
12     There are two kinds of registries: typed ones (`Registry`), which will
13     provide a bit more type safety for network without intersection, or with
14     a base type, and untyped one (`AnyRegistry). The latter matches real-world
15     heterogenous networks better, as it allows to store unrelated nodes in the
16     same data structure.
17 
18 *******************************************************************************/
19 
20 module geod24.Registry;
21 
22 import core.sync.mutex;
23 import geod24.concurrency;
24 import geod24.LocalRest;
25 
26 /// A typed network router
27 public shared struct Registry (API)
28 {
29     /// Map from a name to a connection.
30     /// Multiple names may point to the same connection.
31     private Listener!API[string] connections;
32     /// Gives all the names associated with a specific connection.
33     private string[][Listener!API] names;
34     private Mutex registryLock;
35 
36     /// Initialize this registry, creating the Mutex
37     public void initialize () @safe nothrow
38     {
39         this.registryLock = new shared Mutex;
40     }
41 
42     /**
43      * Gets the binding channel associated with `name`.
44      *
45      * Params:
46      *   name = The name to locate within the registry.
47      *
48      * Returns:
49      *   The associated binding channel or an invalid state
50      *   (such as its `init` value) if `name` is not registered.
51      */
52     public Listener!API locate (string name)
53     {
54         synchronized (registryLock)
55         {
56             if (shared(Listener!API)* c = name in this.connections)
57                 return *cast(Listener!API*)c;
58             return Listener!API.init;
59         }
60     }
61 
62     /**
63      * Register a new name for a connection.
64      *
65      * Associates `name` with `conn` in a process-local map. When the thread
66      * represented by `conn` terminates, any names associated with it will be
67      * automatically unregistered.
68      *
69      * Params:
70      *   name = The name to associate with `conn`.
71      *   conn = The connection to register.
72      *
73      * Returns:
74      *  `true` if the name is available and `conn` is not known to represent a
75      *  defunct thread.
76      */
77     public bool register (string name, Listener!API conn)
78     {
79         synchronized (registryLock)
80         {
81             if (name in this.connections)
82                 return false;
83             if (conn.data.isClosed)
84                 return false;
85             this.names[conn] ~= name;
86             this.connections[name] = cast(shared)conn;
87             return true;
88         }
89     }
90 
91     /**
92      * Removes the registered name associated with a connection.
93      *
94      * Params:
95      *  name = The name to unregister.
96      *
97      * Returns:
98      *  true if the name is registered, false if not.
99      */
100     public bool unregister (string name)
101     {
102         import std.algorithm.mutation : remove, SwapStrategy;
103         import std.algorithm.searching : countUntil;
104 
105         synchronized (registryLock)
106         {
107             if (shared(Listener!API)* tid = name in this.connections)
108             {
109                 auto allNames = *cast(Listener!API*)tid in this.names;
110                 auto pos = countUntil(*allNames, name);
111                 remove!(SwapStrategy.unstable)(*allNames, pos);
112                 this.connections.remove(name);
113                 return true;
114             }
115             return false;
116         }
117     }
118 
119     /**
120      * Clear out the content of the registry
121      *
122      * This routine is useful when performing shutdown,
123      * as some nodes might try to access others in parallel,
124      * leading to error on closed channels.
125      * Calling `clear` before starting to shut down node essentially
126      * ensures that nodes are not reachable anymore.
127      */
128     public void clear ()
129     {
130         synchronized (registryLock)
131         {
132             (cast() this.connections).clear();
133             (cast() this.names).clear();
134         }
135     }
136 }
137 
138 /// An untyped network router
139 public shared struct AnyRegistry
140 {
141     /// This struct just presents a different API, but forwards to
142     /// an instance of `Registry!(void*)` under the hood.
143     private Registry!(void*) impl;
144 
145     /// See `Registry.initialize`
146     public void initialize () @safe nothrow
147     {
148         this.impl.initialize();
149     }
150 
151     /// See `Registry.locate`
152     public Listener!API locate (API = void*) (string name)
153     {
154         return cast(Listener!API) this.impl.locate(name);
155     }
156 
157     /// See `Registry.register`
158     public bool register (ListenerT : Listener!APIT, APIT) (string name, ListenerT conn)
159     {
160         return this.impl.register(name, cast(Listener!(void*)) conn);
161     }
162 
163     /// See `Registry.unregister`
164     public bool unregister (string name)
165     {
166         return this.impl.unregister(name);
167     }
168 
169     /// See `Registry.clear`
170     public void clear ()
171     {
172         this.impl.clear();
173     }
174 }