CommRaT 2.0.0
C++20 Real-Time Messaging Framework
Loading...
Searching...
No Matches
multi_output_manager.hpp
Go to the documentation of this file.
1#pragma once
2
6#include <vector>
7#include <mutex>
8#include <algorithm>
9#include <iostream>
10#include <typeinfo>
11
12namespace commrat {
13
21 uint32_t base_addr{0};
22 uint8_t input_index{0};
23
24 bool operator==(const SubscriberInfo& other) const {
25 return base_addr == other.base_addr && input_index == other.input_index;
26 }
27};
28
45template<typename Derived, typename UserRegistry, typename OutputTypesTuple>
47protected:
48 // CRTP: Get reference to derived Module class
49 Derived& derived() { return static_cast<Derived&>(*this); }
50 const Derived& derived() const { return static_cast<const Derived&>(*this); }
51
52 // ========================================================================
53 // Multi-Output Subscriber Management
54 // ========================================================================
55
64 std::vector<std::vector<SubscriberInfo>> output_subscribers_;
65 mutable Mutex output_subscribers_mutex_; // Protects output_subscribers_
66
73 constexpr std::size_t num_outputs = std::tuple_size_v<OutputTypesTuple>;
74 output_subscribers_.resize(num_outputs);
75 }
76
88 void add_subscriber_to_output(uint32_t subscriber_base_addr, uint8_t input_index = 0, std::size_t output_idx = 0) {
89 constexpr std::size_t num_outputs = std::tuple_size_v<OutputTypesTuple>;
90
91 if (output_idx >= num_outputs) {
92 std::cerr << "[" << derived().config_.name << "] ERROR: Output index " << output_idx
93 << " out of range (num_outputs=" << num_outputs << ")\n";
94 return;
95 }
96
98 auto& subs = output_subscribers_[output_idx];
99
100 SubscriberInfo sub_info{subscriber_base_addr, input_index};
101
102 // Check if already subscribed
103 if (std::find(subs.begin(), subs.end(), sub_info) == subs.end()) {
104 subs.push_back(sub_info);
105 std::cout << "[" << derived().config_.name << "] Added subscriber " << subscriber_base_addr
106 << " (input_idx=" << static_cast<int>(input_index) << ") to output[" << output_idx
107 << "] (total: " << subs.size() << ")\n";
108 }
109 }
110
111public:
121 std::vector<SubscriberInfo> get_output_subscribers(std::size_t output_idx) const {
123 if (output_idx < output_subscribers_.size()) {
124 return output_subscribers_[output_idx];
125 }
126 return {};
127 }
128
137 void remove_subscriber(uint32_t subscriber_base_addr) {
139 for (auto& output_subs : output_subscribers_) {
140 auto it = std::remove_if(output_subs.begin(), output_subs.end(),
141 [subscriber_base_addr](const SubscriberInfo& sub) {
142 return sub.base_addr == subscriber_base_addr;
143 });
144 output_subs.erase(it, output_subs.end());
145 }
146 }
147
148private:
157 template<std::size_t... Is>
158 std::size_t find_output_index_by_type_id_impl(uint16_t type_id_low, std::index_sequence<Is...>) const {
159 constexpr std::size_t num_outputs = std::tuple_size_v<OutputTypesTuple>;
160 std::size_t result = num_outputs; // Invalid index by default
161
162 // Check each output type's message ID using fold expression
163 ((check_output_type_match<Is>(type_id_low, result)) || ...);
164
165 return result;
166 }
167
176 template<std::size_t Index>
177 bool check_output_type_match(uint16_t type_id_low, std::size_t& result) const {
178 using OutputType = std::tuple_element_t<Index, OutputTypesTuple>;
179 constexpr uint32_t output_msg_id = UserRegistry::template get_message_id<OutputType>();
180 constexpr uint16_t output_type_id_low = static_cast<uint16_t>(output_msg_id & 0xFFFF);
181
182 if (output_type_id_low == type_id_low) {
183 result = Index;
184 return true; // Found match, stop searching
185 }
186 return false; // Continue searching
187 }
188
189protected:
193 std::size_t find_output_index_by_type_id(uint16_t type_id_low) const {
194 constexpr std::size_t num_outputs = std::tuple_size_v<OutputTypesTuple>;
195 return find_output_index_by_type_id_impl(type_id_low, std::make_index_sequence<num_outputs>{});
196 }
197
198 // ========================================================================
199 // Multi-Output Work Threads
200 // ========================================================================
201
208 std::vector<std::thread> output_work_threads_;
209
216 template<std::size_t... Is>
217 void spawn_all_output_work_threads(std::index_sequence<Is...>) {
218 // Reserve space for all threads
219 output_work_threads_.reserve(sizeof...(Is));
220
221 // Spawn one thread per output type using fold expression
222 (void)std::initializer_list<int>{
223 (spawn_output_work_thread<Is>(), 0)...
224 };
225 }
226
232 template<std::size_t Index>
234 output_work_threads_.emplace_back([this]() {
235 derived().template output_work_loop<Index>();
236 });
237 }
238
245 std::cout << "[" << derived().config_.name << "] Joining " << output_work_threads_.size()
246 << " output work threads...\n";
247 for (auto& thread : output_work_threads_) {
248 if (thread.joinable()) {
249 thread.join();
250 }
251 }
252 output_work_threads_.clear();
253 }
254
269 template<std::size_t Index>
271 using OutputType = std::tuple_element_t<Index, OutputTypesTuple>;
272 // For multi-output config, use indexed access
273 uint8_t sys_id, inst_id;
274 if (derived().config_.has_multi_output_config()) {
275 sys_id = derived().config_.system_id(Index);
276 inst_id = derived().config_.instance_id(Index);
277 } else {
278 sys_id = derived().config_.system_id();
279 inst_id = derived().config_.instance_id();
280 }
281 uint32_t work_mailbox_addr = commrat::get_mailbox_address<OutputType, OutputTypesTuple, UserRegistry>(
282 sys_id, inst_id, static_cast<uint8_t>(MailboxType::WORK));
283
284 std::cout << "[" << derived().config_.name << "] output_work_loop[" << Index << "] started for "
285 << typeid(OutputType).name() << ", listening on WORK mailbox "
286 << work_mailbox_addr << "\n" << std::flush;
287
288 auto& work_mbx = derived().template get_work_mailbox<Index>();
289
290 while (derived().running_) {
291 std::cout << "[" << derived().config_.name << "] output_work_loop[" << Index << "]: waiting for message...\n" << std::flush;
292 auto visitor = [this](auto&& tims_msg) {
293 auto& msg = tims_msg.payload;
294 using MsgType = std::decay_t<decltype(msg)>;
295
296 if constexpr (std::is_same_v<MsgType, SubscribeRequestType>) {
297 std::cout << "[" << derived().config_.name << "] output_work_loop[" << Index
298 << "] Handling SubscribeRequest\n";
299 derived().handle_subscribe_request(msg, Index);
300 } else if constexpr (std::is_same_v<MsgType, SubscribeReplyType>) {
301 std::cout << "[" << derived().config_.name << "] output_work_loop[" << Index
302 << "] Handling SubscribeReply\n";
303 derived().handle_subscribe_reply(msg);
304 } else if constexpr (std::is_same_v<MsgType, UnsubscribeRequestType>) {
305 std::cout << "[" << derived().config_.name << "] output_work_loop[" << Index
306 << "] Handling UnsubscribeRequest\n";
307 derived().handle_unsubscribe_request(msg);
308 }
309 };
310
311 work_mbx.receive_any(visitor);
312 }
313
314 std::cout << "[" << derived().config_.name << "] output_work_loop[" << Index << "] ended\n";
315 }
316
317 // ========================================================================
318 // Multi-Output Mailbox Set Lifecycle
319 // ========================================================================
320
326 template<std::size_t... Is>
327 void start_all_mailbox_sets(std::index_sequence<Is...>) {
328 (void)std::initializer_list<int>{
329 (start_mailbox_set<Is>(), 0)...
330 };
331 }
332
338 template<std::size_t Index>
340 auto& cmd = derived().template get_cmd_mailbox<Index>();
341 auto cmd_result = cmd.start();
342 if (!cmd_result) {
343 throw std::runtime_error("[Module] Failed to start CMD mailbox for output " + std::to_string(Index));
344 }
345
346 auto& work = derived().template get_work_mailbox<Index>();
347 auto work_result = work.start();
348 if (!work_result) {
349 throw std::runtime_error("[Module] Failed to start WORK mailbox for output " + std::to_string(Index));
350 }
351
352 auto& publish = derived().template get_publish_mailbox_public<Index>();
353 auto publish_result = publish.start();
354 if (!publish_result) {
355 throw std::runtime_error("[Module] Failed to start PUBLISH mailbox for output " + std::to_string(Index));
356 }
357 }
358
364 template<std::size_t... Is>
365 void stop_all_mailbox_sets(std::index_sequence<Is...>) {
366 (void)std::initializer_list<int>{
367 (stop_mailbox_set<Is>(), 0)...
368 };
369 }
370
376 template<std::size_t Index>
378 derived().template get_cmd_mailbox<Index>().stop();
379 derived().template get_work_mailbox<Index>().stop();
380 derived().template get_publish_mailbox_public<Index>().stop();
381 }
382
383 // ========================================================================
384 // Multi-Output Mailbox Accessors
385 // ========================================================================
386
393 template<std::size_t Index>
395 return std::get<Index>(derived().mailbox_infrastructure_);
396 }
397
404 template<std::size_t Index>
406 return *std::get<Index>(derived().mailbox_infrastructure_).cmd;
407 }
408
415 template<std::size_t Index>
417 return *std::get<Index>(derived().mailbox_infrastructure_).work;
418 }
419
426 template<std::size_t Index>
428 return *std::get<Index>(derived().mailbox_infrastructure_).publish;
429 }
430
431public:
440 template<std::size_t Index>
442 return *std::get<Index>(derived().mailbox_infrastructure_).publish;
443 }
444};
445
446} // namespace commrat
CRTP mixin for multi-output mailbox and subscriber management.
void remove_subscriber(uint32_t subscriber_base_addr)
Remove subscriber from all output lists.
void join_output_work_threads()
Join all output work threads.
std::size_t find_output_index_by_type_id(uint16_t type_id_low) const
Public wrapper for find_output_index_by_type_id_impl.
void stop_all_mailbox_sets(std::index_sequence< Is... >)
Stop all MailboxSets.
void spawn_output_work_thread()
Spawn work thread for a specific output index.
void initialize_output_subscribers()
Initialize per-output subscriber lists.
std::vector< SubscriberInfo > get_output_subscribers(std::size_t output_idx) const
Get subscribers for specific output index.
auto & get_work_mailbox()
Get WORK mailbox for specific output index.
std::vector< std::vector< SubscriberInfo > > output_subscribers_
Per-output subscriber lists: output_subscribers_[output_index][subscriber_id].
const Derived & derived() const
auto & get_cmd_mailbox()
Get CMD mailbox for specific output index.
void spawn_all_output_work_threads(std::index_sequence< Is... >)
Spawn all output work threads.
auto & get_mailbox_set()
Get specific MailboxSet by index.
void start_mailbox_set()
Start a single MailboxSet (CMD/WORK/PUBLISH)
void start_all_mailbox_sets(std::index_sequence< Is... >)
Start all MailboxSets (CMD/WORK/PUBLISH mailboxes)
void add_subscriber_to_output(uint32_t subscriber_base_addr, uint8_t input_index=0, std::size_t output_idx=0)
Add subscriber to correct output-specific list.
auto & get_publish_mailbox()
Get PUBLISH mailbox for specific output index (internal)
std::vector< std::thread > output_work_threads_
Per-output work threads: one thread per MailboxSet.
void stop_mailbox_set()
Stop a single MailboxSet (CMD/WORK/PUBLISH)
auto & get_publish_mailbox_public()
Get PUBLISH mailbox for specific output index (public accessor)
void output_work_loop()
Work loop for a specific output index.
Mutex wrapper (future: realtime mutex support)
CommRaT - Modern C++ Real-Time Communication Framework.
std::lock_guard< Mutex > Lock
Scoped lock guard (RAII)
Subscriber routing information.
uint8_t input_index
Which input DATA mailbox to send to (0 = default)
uint32_t base_addr
Subscriber's base mailbox address (OUTPUT type)
bool operator==(const SubscriberInfo &other) const
Unified threading and synchronization abstractions for CommRaT.