Embedded Template Library 1.0
message_broker.h
1/******************************************************************************
2The MIT License(MIT)
3
4Embedded Template Library.
5https://github.com/ETLCPP/etl
6https://www.etlcpp.com
7
8Copyright(c) 2017 John Wellbelove
9
10Permission is hereby granted, free of charge, to any person obtaining a copy
11of this software and associated documentation files(the "Software"), to deal
12in the Software without restriction, including without limitation the rights
13to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
14copies of the Software, and to permit persons to whom the Software is
15furnished to do so, subject to the following conditions :
16
17The above copyright notice and this permission notice shall be included in all
18copies or substantial portions of the Software.
19
20THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
23AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
26SOFTWARE.
27******************************************************************************/
28
29#ifndef ETL_MESSAGE_BROKER_INCLUDED
30#define ETL_MESSAGE_BROKER_INCLUDED
31
32#include "platform.h"
33#include "nullptr.h"
34#include "message_types.h"
35#include "message.h"
36#include "message_router.h"
37#include "span.h"
38
39#include <stdint.h>
40
41namespace etl
42{
43 //***************************************************************************
45 //***************************************************************************
47 {
48 private:
49
50 //*******************************************
51 class subscription_node
52 {
53 friend class message_broker;
54
55 protected:
56
57 //*******************************
58 subscription_node()
59 : p_next(ETL_NULLPTR)
60 {
61 }
62
63 //*******************************
64 void set_next(subscription_node* sub)
65 {
66 p_next = sub;
67 }
68
69 //*******************************
70 subscription_node* get_next() const
71 {
72 return p_next;
73 }
74
75 //*******************************
76 void terminate()
77 {
78 set_next(ETL_NULLPTR);
79 }
80
81 //*******************************
82 void append(subscription_node* sub)
83 {
84 if (sub != ETL_NULLPTR)
85 {
86 sub->set_next(get_next());
87 }
88 set_next(sub);
89 }
90
91 subscription_node* p_next;
92 };
93
94 public:
95
97
98 //*******************************************
99 class subscription : public subscription_node
100 {
101 public:
102
103 friend class message_broker;
104
105 //*******************************
107 : p_router(&router_)
108 {
109 }
110
111 private:
112
113 //*******************************
114 virtual message_id_span_t message_id_list() const = 0;
115
116 //*******************************
117 etl::imessage_router* get_router() const
118 {
119 return p_router;
120 }
121
122 //*******************************
123 subscription* next_subscription() const
124 {
125 return static_cast<subscription*>(get_next());
126 }
127
128 etl::imessage_router* const p_router;
129 };
130
131 using etl::imessage_router::receive;
132
133 //*******************************************
135 //*******************************************
137 : imessage_router(etl::imessage_router::MESSAGE_BROKER)
138 , head()
139 {
140 }
141
142 //*******************************************
144 //*******************************************
146 : imessage_router(etl::imessage_router::MESSAGE_BROKER, successor_)
147 , head()
148 {
149 }
150
151 //*******************************************
153 //*******************************************
154 message_broker(etl::message_router_id_t id_)
155 : imessage_router(id_)
156 , head()
157 {
158 ETL_ASSERT((id_ <= etl::imessage_router::MAX_MESSAGE_ROUTER) || (id_ == etl::imessage_router::MESSAGE_BROKER), ETL_ERROR(etl::message_router_illegal_id));
159 }
160
161 //*******************************************
163 //*******************************************
164 message_broker(etl::message_router_id_t id_, etl::imessage_router& successor_)
165 : imessage_router(id_, successor_)
166 , head()
167 {
168 ETL_ASSERT((id_ <= etl::imessage_router::MAX_MESSAGE_ROUTER) || (id_ == etl::imessage_router::MESSAGE_BROKER), ETL_ERROR(etl::message_router_illegal_id));
169 }
170
171 //*******************************************
173 //*******************************************
175 {
176 initialise_insertion_point(new_sub.get_router(), &new_sub);
177 }
178
179 //*******************************************
180 void unsubscribe(etl::imessage_router& router)
181 {
182 initialise_insertion_point(&router, ETL_NULLPTR);
183 }
184
185 //*******************************************
186 virtual void receive(const etl::imessage& msg) ETL_OVERRIDE
187 {
188 receive(etl::imessage_router::ALL_MESSAGE_ROUTERS, msg);
189 }
190
191 virtual void receive(etl::shared_message shared_msg) ETL_OVERRIDE
192 {
193 receive(etl::imessage_router::ALL_MESSAGE_ROUTERS, shared_msg);
194 }
195
196 //*******************************************
197 virtual void receive(etl::message_router_id_t destination_router_id,
198 const etl::imessage& msg) ETL_OVERRIDE
199 {
200 const etl::message_id_t id = msg.get_message_id();
201
202 if (!empty())
203 {
204 // Scan the subscription lists.
205 subscription* sub = static_cast<subscription*>(head.get_next());
206
207 while (sub != ETL_NULLPTR)
208 {
209 message_id_span_t message_ids = sub->message_id_list();
210
211 message_id_span_t::iterator itr = etl::find(message_ids.begin(), message_ids.end(), id);
212
213 if (itr != message_ids.end())
214 {
215 etl::imessage_router* router = sub->get_router();
216
217 if (destination_router_id == etl::imessage_router::ALL_MESSAGE_ROUTERS ||
218 destination_router_id == router->get_message_router_id())
219 {
220 router->receive(msg);
221 }
222 }
223
224 sub = sub->next_subscription();
225 }
226 }
227
228 // Always pass the message on to the successor.
229 if (has_successor())
230 {
232
233 successor.receive(destination_router_id, msg);
234 }
235 }
236
237 //*******************************************
238 virtual void receive(etl::message_router_id_t destination_router_id,
239 etl::shared_message shared_msg) ETL_OVERRIDE
240 {
241 const etl::message_id_t id = shared_msg.get_message().get_message_id();
242
243 if (!empty())
244 {
245 // Scan the subscription lists.
246 subscription* sub = static_cast<subscription*>(head.get_next());
247
248 while (sub != ETL_NULLPTR)
249 {
250 message_id_span_t message_ids = sub->message_id_list();
251
252 message_id_span_t::iterator itr = etl::find(message_ids.begin(), message_ids.end(), id);
253
254 if (itr != message_ids.end())
255 {
256 etl::imessage_router* router = sub->get_router();
257
258 if (destination_router_id == etl::imessage_router::ALL_MESSAGE_ROUTERS ||
259 destination_router_id == router->get_message_router_id())
260 {
261 router->receive(shared_msg);
262 }
263 }
264
265 sub = sub->next_subscription();
266 }
267 }
268
269 // Always pass the message on to a successor.
270 if (has_successor())
271 {
272 get_successor().receive(destination_router_id, shared_msg);
273 }
274 }
275
276 using imessage_router::accepts;
277
278 //*******************************************
280 //*******************************************
281 virtual bool accepts(etl::message_id_t) const ETL_OVERRIDE
282 {
283 return true;
284 }
285
286 //*******************************************
287 void clear()
288 {
289 head.terminate();
290 }
291
292 //********************************************
293 ETL_DEPRECATED virtual bool is_null_router() const ETL_OVERRIDE
294 {
295 return false;
296 }
297
298 //********************************************
299 virtual bool is_producer() const ETL_OVERRIDE
300 {
301 return true;
302 }
303
304 //********************************************
305 virtual bool is_consumer() const ETL_OVERRIDE
306 {
307 return true;
308 }
309
310 //********************************************
311 bool empty() const
312 {
313 return head.get_next() == ETL_NULLPTR;
314 }
315
316 private:
317
318 //*******************************************
319 void initialise_insertion_point(const etl::imessage_router* p_router, etl::message_broker::subscription* p_new_sub)
320 {
321 const etl::imessage_router* p_target_router = p_router;
322
323 subscription_node* p_sub = head.get_next();
324 subscription_node* p_sub_previous = &head;
325
326 while (p_sub != ETL_NULLPTR)
327 {
328 // Do we already have a subscription for the router?
329 if (static_cast<subscription*>(p_sub)->get_router() == p_target_router)
330 {
331 // Then unlink it.
332 p_sub_previous->set_next(p_sub->get_next()); // Jump over the subscription.
333 p_sub->terminate(); // Terminate the unlinked subscription.
334
335 // We're done now.
336 break;
337 }
338
339 // Move on up the list.
340 p_sub = p_sub->get_next();
341 p_sub_previous = p_sub_previous->get_next();
342 }
343
344 if (p_new_sub != ETL_NULLPTR)
345 {
346 // Link in the new subscription.
347 p_sub_previous->append(p_new_sub);
348 }
349 }
350
351 subscription_node head;
352 };
353}
354
355#endif
This is the base of all message routers.
Definition: message_router_generator.h:121
Definition: message.h:69
Definition: message_broker.h:100
Message broker.
Definition: message_broker.h:47
void subscribe(etl::message_broker::subscription &new_sub)
Subscribe to the broker.
Definition: message_broker.h:174
message_broker()
Constructor.
Definition: message_broker.h:136
message_broker(etl::message_router_id_t id_)
Constructor.
Definition: message_broker.h:154
message_broker(etl::message_router_id_t id_, etl::imessage_router &successor_)
Constructor.
Definition: message_broker.h:164
message_broker(etl::imessage_router &successor_)
Constructor.
Definition: message_broker.h:145
virtual bool accepts(etl::message_id_t) const ETL_OVERRIDE
Message brokers accept all messages.
Definition: message_broker.h:281
Router id is out of the legal range.
Definition: message_router_generator.h:101
Definition: shared_message.h:49
Span - Fixed Extent.
Definition: span.h:62
bool has_successor() const
Does this have a successor?
Definition: successor.h:184
successor_type & get_successor() const
Definition: successor.h:174
successor()
Default constructor.
Definition: successor.h:81
#define ETL_ASSERT(b, e)
Definition: error_handler.h:316
bitset_ext
Definition: absolute.h:38
uint_least8_t message_id_t
Allow alternative type for message id.
Definition: message_types.h:40