45template<
typename Derived,
typename UserRegistry,
typename OutputTypesTuple>
49 Derived&
derived() {
return static_cast<Derived&
>(*this); }
50 const Derived&
derived()
const {
return static_cast<const Derived&
>(*this); }
73 constexpr std::size_t num_outputs = std::tuple_size_v<OutputTypesTuple>;
89 constexpr std::size_t num_outputs = std::tuple_size_v<OutputTypesTuple>;
91 if (output_idx >= num_outputs) {
92 std::cerr <<
"[" <<
derived().config_.name <<
"] ERROR: Output index " << output_idx
93 <<
" out of range (num_outputs=" << num_outputs <<
")\n";
103 if (std::find(subs.begin(), subs.end(), sub_info) == subs.end()) {
104 subs.push_back(sub_info);
105 std::cout <<
"[" <<
derived().config_.name <<
"] Added subscriber " << subscriber_base_addr
106 <<
" (input_idx=" <<
static_cast<int>(input_index) <<
") to output[" << output_idx
107 <<
"] (total: " << subs.size() <<
")\n";
140 auto it = std::remove_if(output_subs.begin(), output_subs.end(),
142 return sub.base_addr == subscriber_base_addr;
144 output_subs.erase(it, output_subs.end());
157 template<std::size_t... Is>
158 std::size_t find_output_index_by_type_id_impl(uint16_t type_id_low, std::index_sequence<Is...>)
const {
159 constexpr std::size_t num_outputs = std::tuple_size_v<OutputTypesTuple>;
160 std::size_t result = num_outputs;
163 ((check_output_type_match<Is>(type_id_low, result)) || ...);
176 template<std::
size_t Index>
177 bool check_output_type_match(uint16_t type_id_low, std::size_t& result)
const {
178 using OutputType = std::tuple_element_t<Index, OutputTypesTuple>;
179 constexpr uint32_t output_msg_id = UserRegistry::template get_message_id<OutputType>();
180 constexpr uint16_t output_type_id_low =
static_cast<uint16_t
>(output_msg_id & 0xFFFF);
182 if (output_type_id_low == type_id_low) {
194 constexpr std::size_t num_outputs = std::tuple_size_v<OutputTypesTuple>;
195 return find_output_index_by_type_id_impl(type_id_low, std::make_index_sequence<num_outputs>{});
216 template<std::size_t... Is>
222 (void)std::initializer_list<int>{
223 (spawn_output_work_thread<Is>(), 0)...
232 template<std::
size_t Index>
235 derived().template output_work_loop<Index>();
246 <<
" output work threads...\n";
248 if (thread.joinable()) {
269 template<std::
size_t Index>
271 using OutputType = std::tuple_element_t<Index, OutputTypesTuple>;
273 uint8_t sys_id, inst_id;
274 if (
derived().config_.has_multi_output_config()) {
275 sys_id =
derived().config_.system_id(Index);
276 inst_id =
derived().config_.instance_id(Index);
278 sys_id =
derived().config_.system_id();
279 inst_id =
derived().config_.instance_id();
281 uint32_t work_mailbox_addr = commrat::get_mailbox_address<OutputType, OutputTypesTuple, UserRegistry>(
284 std::cout <<
"[" <<
derived().config_.name <<
"] output_work_loop[" << Index <<
"] started for "
285 <<
typeid(OutputType).name() <<
", listening on WORK mailbox "
286 << work_mailbox_addr <<
"\n" << std::flush;
288 auto& work_mbx =
derived().template get_work_mailbox<Index>();
291 std::cout <<
"[" <<
derived().config_.name <<
"] output_work_loop[" << Index <<
"]: waiting for message...\n" << std::flush;
292 auto visitor = [
this](
auto&& tims_msg) {
293 auto& msg = tims_msg.payload;
294 using MsgType = std::decay_t<
decltype(msg)>;
296 if constexpr (std::is_same_v<MsgType, SubscribeRequestType>) {
297 std::cout <<
"[" <<
derived().config_.name <<
"] output_work_loop[" << Index
298 <<
"] Handling SubscribeRequest\n";
299 derived().handle_subscribe_request(msg, Index);
300 }
else if constexpr (std::is_same_v<MsgType, SubscribeReplyType>) {
301 std::cout <<
"[" <<
derived().config_.name <<
"] output_work_loop[" << Index
302 <<
"] Handling SubscribeReply\n";
303 derived().handle_subscribe_reply(msg);
304 }
else if constexpr (std::is_same_v<MsgType, UnsubscribeRequestType>) {
305 std::cout <<
"[" <<
derived().config_.name <<
"] output_work_loop[" << Index
306 <<
"] Handling UnsubscribeRequest\n";
307 derived().handle_unsubscribe_request(msg);
311 work_mbx.receive_any(visitor);
314 std::cout <<
"[" <<
derived().config_.name <<
"] output_work_loop[" << Index <<
"] ended\n";
326 template<std::size_t... Is>
328 (void)std::initializer_list<int>{
329 (start_mailbox_set<Is>(), 0)...
338 template<std::
size_t Index>
340 auto& cmd =
derived().template get_cmd_mailbox<Index>();
341 auto cmd_result = cmd.start();
343 throw std::runtime_error(
"[Module] Failed to start CMD mailbox for output " + std::to_string(Index));
346 auto& work =
derived().template get_work_mailbox<Index>();
347 auto work_result = work.start();
349 throw std::runtime_error(
"[Module] Failed to start WORK mailbox for output " + std::to_string(Index));
352 auto& publish =
derived().template get_publish_mailbox_public<Index>();
353 auto publish_result = publish.start();
354 if (!publish_result) {
355 throw std::runtime_error(
"[Module] Failed to start PUBLISH mailbox for output " + std::to_string(Index));
364 template<std::size_t... Is>
366 (void)std::initializer_list<int>{
367 (stop_mailbox_set<Is>(), 0)...
376 template<std::
size_t Index>
378 derived().template get_cmd_mailbox<Index>().stop();
379 derived().template get_work_mailbox<Index>().stop();
380 derived().template get_publish_mailbox_public<Index>().stop();
393 template<std::
size_t Index>
395 return std::get<Index>(
derived().mailbox_infrastructure_);
404 template<std::
size_t Index>
406 return *std::get<Index>(
derived().mailbox_infrastructure_).cmd;
415 template<std::
size_t Index>
417 return *std::get<Index>(
derived().mailbox_infrastructure_).work;
426 template<std::
size_t Index>
428 return *std::get<Index>(
derived().mailbox_infrastructure_).publish;
440 template<std::
size_t Index>
442 return *std::get<Index>(
derived().mailbox_infrastructure_).publish;
CRTP mixin for multi-output mailbox and subscriber management.
void remove_subscriber(uint32_t subscriber_base_addr)
Remove subscriber from all output lists.
void join_output_work_threads()
Join all output work threads.
std::size_t find_output_index_by_type_id(uint16_t type_id_low) const
Public wrapper for find_output_index_by_type_id_impl.
void stop_all_mailbox_sets(std::index_sequence< Is... >)
Stop all MailboxSets.
Mutex output_subscribers_mutex_
void spawn_output_work_thread()
Spawn work thread for a specific output index.
void initialize_output_subscribers()
Initialize per-output subscriber lists.
std::vector< SubscriberInfo > get_output_subscribers(std::size_t output_idx) const
Get subscribers for specific output index.
auto & get_work_mailbox()
Get WORK mailbox for specific output index.
std::vector< std::vector< SubscriberInfo > > output_subscribers_
Per-output subscriber lists: output_subscribers_[output_index][subscriber_id].
const Derived & derived() const
auto & get_cmd_mailbox()
Get CMD mailbox for specific output index.
void spawn_all_output_work_threads(std::index_sequence< Is... >)
Spawn all output work threads.
auto & get_mailbox_set()
Get specific MailboxSet by index.
void start_mailbox_set()
Start a single MailboxSet (CMD/WORK/PUBLISH)
void start_all_mailbox_sets(std::index_sequence< Is... >)
Start all MailboxSets (CMD/WORK/PUBLISH mailboxes)
void add_subscriber_to_output(uint32_t subscriber_base_addr, uint8_t input_index=0, std::size_t output_idx=0)
Add subscriber to correct output-specific list.
auto & get_publish_mailbox()
Get PUBLISH mailbox for specific output index (internal)
std::vector< std::thread > output_work_threads_
Per-output work threads: one thread per MailboxSet.
void stop_mailbox_set()
Stop a single MailboxSet (CMD/WORK/PUBLISH)
auto & get_publish_mailbox_public()
Get PUBLISH mailbox for specific output index (public accessor)
void output_work_loop()
Work loop for a specific output index.
Mutex wrapper (future: realtime mutex support)
CommRaT - Modern C++ Real-Time Communication Framework.
std::lock_guard< Mutex > Lock
Scoped lock guard (RAII)
Subscriber routing information.
uint8_t input_index
Which input DATA mailbox to send to (0 = default)
uint32_t base_addr
Subscriber's base mailbox address (OUTPUT type)
bool operator==(const SubscriberInfo &other) const
Unified threading and synchronization abstractions for CommRaT.