25template<
typename Registry>
52 bool has_continuous_input,
54 typename InputData = void,
55 typename InputTypesTuple = std::tuple<>,
56 size_t InputCount = 0,
57 typename OutputData = void,
58 typename OutputTypesTuple = std::tuple<>
89 static_assert(has_continuous_input || has_multi_input,
90 "subscribe_to_all_sources() only for continuous or multi-input modules");
92 if constexpr (has_multi_input) {
95 if (sources.empty()) {
96 std::cerr <<
"[" <<
module_name_ <<
"] ERROR: Multi-input module but input_sources is empty!\n";
103 for (
size_t i = 0; i < sources.size(); ++i) {
104 auto& source = sources[i];
105 source.input_index = i;
108 }
else if constexpr (has_continuous_input) {
121 static_assert(has_continuous_input,
"subscribe_to_source() only available for continuous input modules");
130 static_assert(has_continuous_input,
"unsubscribe_from_source() only available for continuous input modules");
132 uint32_t our_base_addr = commrat::calculate_base_address<OutputData, OutputTypesTuple, Registry>(
140 uint32_t source_base;
141 uint32_t source_work_mbx;
143 if constexpr (has_continuous_input && !has_multi_input) {
145 constexpr uint32_t source_data_type_id = Registry::template get_message_id<InputData>();
146 constexpr uint16_t source_data_type_id_low =
static_cast<uint16_t
>(source_data_type_id & 0xFFFF);
147 source_base = (
static_cast<uint32_t
>(source_data_type_id_low) << 16) | (source_system_id << 8) | source_instance_id;
164 static_assert(has_multi_input,
"unsubscribe_from_multi_input_source() only for multi-input modules");
166 uint32_t our_base_addr = commrat::calculate_base_address<OutputData, OutputTypesTuple, Registry>(
177 uint16_t source_data_type_id_low =
static_cast<uint16_t
>(source_data_type_id & 0xFFFF);
178 uint32_t source_base = (
static_cast<uint32_t
>(source_data_type_id_low) << 16) |
180 uint32_t source_work_mbx = source_base +
static_cast<uint8_t
>(
MailboxType::WORK);
189 template<
typename SubscriberMgr>
197 std::cout <<
"[" <<
module_name_ <<
"] Added subscriber to output-specific list, "
198 <<
"will send to DATA mailbox=0x" << std::hex << subscriber_data_mbx << std::dec <<
"\n";
233 if (sub.subscribed && !sub.reply_received) {
234 sub.reply_received =
true;
236 std::cout <<
"[" <<
module_name_ <<
"] SubscribeReply received: "
237 << (reply.
success ?
"SUCCESS" :
"FAILED")
247 template<
typename SubscriberMgr>
261 size_t source_index) {
263 uint32_t source_data_type_id;
267 uint32_t subscriber_base_addr = commrat::calculate_base_address<OutputData, OutputTypesTuple, Registry>(
271 constexpr uint8_t num_outputs = std::tuple_size_v<OutputTypesTuple>;
272 uint8_t data_mailbox_index;
274 if constexpr (has_multi_input) {
278 data_mailbox_index =
get_data_mbx_base(num_outputs) +
static_cast<uint8_t
>(source_index);
280 else if constexpr (has_continuous_input) {
282 source_data_type_id = Registry::template get_message_id<InputData>();
286 static_assert(has_continuous_input || has_multi_input,
"Invalid input configuration");
292 .mailbox_index = data_mailbox_index,
296 uint16_t source_data_type_id_low =
static_cast<uint16_t
>(source_data_type_id & 0xFFFF);
297 uint32_t source_base = (
static_cast<uint32_t
>(source_data_type_id_low) << 16) |
298 (source_system_id << 8) | source_instance_id;
299 uint32_t source_work_mbx = source_base +
static_cast<uint8_t
>(
MailboxType::WORK);
301 std::cout <<
"[" <<
module_name_ <<
"] Sending SubscribeRequest[" << source_index
302 <<
"] to source WORK mailbox " << source_work_mbx <<
"\n";
306 for (
int i = 0; i < max_retries; ++i) {
310 std::cout <<
"[" <<
module_name_ <<
"] SubscribeRequest[" << source_index
311 <<
"] sent successfully\n";
320 if (i < max_retries - 1) {
321 std::cout <<
"[" <<
module_name_ <<
"] Failed to send SubscribeRequest (attempt "
322 << (i + 1) <<
"/" << max_retries <<
"), retrying...\n";
323 std::this_thread::sleep_for(std::chrono::milliseconds(100));
327 std::cout <<
"[" <<
module_name_ <<
"] Failed to send SubscribeRequest[" << source_index
328 <<
"] after " << max_retries <<
" attempts!\n";
334 template<
size_t... Is>
337 ((index == Is ? (result = Registry::template get_message_id<std::tuple_element_t<Is, InputTypesTuple>>(),
true) :
false) || ...);
342 if constexpr (has_multi_input) {
345 return Registry::template get_message_id<InputData>();
Mailbox that takes a MessageRegistry and exposes payload-only interface.
Subscription protocol handler.
void handle_subscribe_reply(const SubscribeReplyType &reply)
Handle incoming SubscribeReply (consumer side)
const ModuleConfig * config_
void subscribe_to_source(uint8_t source_system_id, uint8_t source_instance_id)
Legacy single-input subscription (backward compatible)
uint32_t get_input_type_id_at_index(size_t index) const
void subscribe_to_source_impl(uint8_t source_system_id, uint8_t source_instance_id, size_t source_index)
Internal implementation: send SubscribeRequest to one source.
void unsubscribe_from_multi_input_source(const MultiInputConfig::InputSource &source)
Unsubscribe from multi-input source.
RegistryMailbox< SystemRegistry > * work_mailbox_
std::mutex subscription_mutex_
void handle_subscribe_request(const SubscribeRequestType &req, SubscriberMgr &sub_mgr, std::size_t output_idx=0)
Handle incoming SubscribeRequest (producer side)
void handle_unsubscribe_request(const UnsubscribeRequestType &req, SubscriberMgr &sub_mgr)
Handle incoming UnsubscribeRequest (producer side)
void unsubscribe_from_source(uint8_t source_system_id, uint8_t source_instance_id)
Unsubscribe from single-input source.
void set_module_name(const std::string &name)
void set_config(const ModuleConfig *cfg)
std::vector< SubscriptionState > input_subscriptions_
uint32_t get_input_type_id_at_index_impl(size_t index, std::index_sequence< Is... >) const
Helper: Get message ID for input type at runtime index.
void subscribe_to_all_sources()
Subscribe to all configured input sources.
void set_work_mailbox(RegistryMailbox< SystemRegistry > *mbx)
CommRaT - Modern C++ Real-Time Communication Framework.
constexpr uint8_t get_data_mbx_base(uint8_t num_outputs)
Get DATA mailbox base index (after all CMD mailboxes)
uint8_t input_system_id(size_t index) const
Get source system_id at index (MultiInput only)
bool has_single_input() const
uint8_t system_id() const
Get system_id - NoOutput or SimpleOutput only.
uint8_t instance_id() const
Get instance_id - NoOutput or SimpleOutput only.
const std::vector< MultiInputConfig::InputSource > & input_sources() const
Get input sources (MultiInput only)
uint8_t source_system_id() const
Get source_system_id (SingleInput only)
uint8_t source_instance_id() const
Get source_instance_id (SingleInput only)
std::chrono::milliseconds period
uint8_t input_instance_id(size_t index) const
Get source instance_id at index (MultiInput only)
Reply to subscription request.
int64_t actual_period_ms
Actual update period in ms.
bool success
True if subscription succeeded.
Request to subscribe to continuous data from a producer module.
uint32_t subscriber_base_addr
Consumer's base address ([type][sys][inst][mbx=0])
uint8_t mailbox_index
Which mailbox to send data to (DATA mailbox index)
Subscription state tracker for multi-input modules.
uint32_t actual_period_ms
Acknowledgment of unsubscribe request.
bool success
Always true unless error.
Request to unsubscribe from continuous data.
uint32_t subscriber_base_addr
Consumer's base mailbox address (no mailbox_index)