CommRaT 2.0.0
C++20 Real-Time Messaging Framework
Loading...
Searching...
No Matches
subscription.hpp
Go to the documentation of this file.
1
9#pragma once
10
14#include <iostream>
15#include <vector>
16#include <mutex>
17#include <algorithm>
18#include <optional>
19#include <thread>
20#include <chrono>
21
22namespace commrat {
23
24// Forward declarations for mailbox types
25template<typename Registry>
26class RegistryMailbox;
27
32 bool subscribed{false}; // SubscribeRequest sent
33 bool reply_received{false}; // SubscribeReply received
34 uint32_t actual_period_ms{0}; // Period from reply
35};
36
50template<
51 typename Registry,
52 bool has_continuous_input,
53 bool has_multi_input,
54 typename InputData = void,
55 typename InputTypesTuple = std::tuple<>,
56 size_t InputCount = 0,
57 typename OutputData = void,
58 typename OutputTypesTuple = std::tuple<>
59>
61protected:
66
67 std::vector<SubscriptionState> input_subscriptions_;
68 mutable std::mutex subscription_mutex_;
69
70 // Reference to module's config and mailboxes (set by derived class)
71 const ModuleConfig* config_{nullptr};
73 std::string module_name_;
74
75 // Helper removed - work_mailbox_ is now properly typed
76
77public:
78 void set_config(const ModuleConfig* cfg) { config_ = cfg; }
80 void set_module_name(const std::string& name) { module_name_ = name; }
81
89 static_assert(has_continuous_input || has_multi_input,
90 "subscribe_to_all_sources() only for continuous or multi-input modules");
91
92 if constexpr (has_multi_input) {
93 // Multi-input: subscribe to each source in input_sources
94 auto& sources = config_->input_sources();
95 if (sources.empty()) {
96 std::cerr << "[" << module_name_ << "] ERROR: Multi-input module but input_sources is empty!\n";
97 return;
98 }
99
100 std::lock_guard<std::mutex> lock(subscription_mutex_);
101 input_subscriptions_.resize(sources.size());
102
103 for (size_t i = 0; i < sources.size(); ++i) {
104 auto& source = sources[i]; // Non-const to populate input_index
105 source.input_index = i; // Save the index for unsubscribe
107 }
108 } else if constexpr (has_continuous_input) {
109 // Single-input: use config fields
110 if (config_->has_single_input()) {
111 input_subscriptions_.resize(1);
113 }
114 }
115 }
116
120 void subscribe_to_source(uint8_t source_system_id, uint8_t source_instance_id) {
121 static_assert(has_continuous_input, "subscribe_to_source() only available for continuous input modules");
122 input_subscriptions_.resize(1);
123 subscribe_to_source_impl(source_system_id, source_instance_id, 0);
124 }
125
129 void unsubscribe_from_source(uint8_t source_system_id, uint8_t source_instance_id) {
130 static_assert(has_continuous_input, "unsubscribe_from_source() only available for continuous input modules");
131
132 uint32_t our_base_addr = commrat::calculate_base_address<OutputData, OutputTypesTuple, Registry>(
134
136 .subscriber_base_addr = our_base_addr
137 };
138
139 // Calculate source WORK mailbox
140 uint32_t source_base;
141 uint32_t source_work_mbx;
142
143 if constexpr (has_continuous_input && !has_multi_input) {
144 // Single continuous input
145 constexpr uint32_t source_data_type_id = Registry::template get_message_id<InputData>();
146 constexpr uint16_t source_data_type_id_low = static_cast<uint16_t>(source_data_type_id & 0xFFFF);
147 source_base = (static_cast<uint32_t>(source_data_type_id_low) << 16) | (source_system_id << 8) | source_instance_id;
148 source_work_mbx = source_base + static_cast<uint8_t>(MailboxType::WORK);
149 }
150
151 // Send unsubscribe request from work mailbox (SystemRegistry messages)
152 work_mailbox_->send(request, source_work_mbx);
153 }
154
164 static_assert(has_multi_input, "unsubscribe_from_multi_input_source() only for multi-input modules");
165
166 uint32_t our_base_addr = commrat::calculate_base_address<OutputData, OutputTypesTuple, Registry>(
168
170 .subscriber_base_addr = our_base_addr
171 };
172
173 // Use the input_index to get the correct type ID
174 // Multi-input: use the input type at source.input_index
175 uint32_t source_data_type_id = get_input_type_id_at_index(source.input_index);
176
177 uint16_t source_data_type_id_low = static_cast<uint16_t>(source_data_type_id & 0xFFFF);
178 uint32_t source_base = (static_cast<uint32_t>(source_data_type_id_low) << 16) |
179 (source.system_id << 8) | source.instance_id;
180 uint32_t source_work_mbx = source_base + static_cast<uint8_t>(MailboxType::WORK);
181
182 // Send unsubscribe request from work mailbox (SystemRegistry messages)
183 work_mailbox_->send(request, source_work_mbx);
184 }
185
189 template<typename SubscriberMgr>
190 void handle_subscribe_request(const SubscribeRequestType& req, SubscriberMgr& sub_mgr, std::size_t output_idx = 0) {
191 try {
192 // req.subscriber_base_addr is the subscriber's base address
193 // req.mailbox_index tells us which DATA mailbox to send to
194 // Phase 7: Route to correct output-specific subscriber list
195 sub_mgr.add_subscriber_to_output(req.subscriber_base_addr, req.mailbox_index, output_idx);
196 uint32_t subscriber_data_mbx = req.subscriber_base_addr | req.mailbox_index;
197 std::cout << "[" << module_name_ << "] Added subscriber to output-specific list, "
198 << "will send to DATA mailbox=0x" << std::hex << subscriber_data_mbx << std::dec << "\n";
199
200 SubscribeReplyType reply{
201 .actual_period_ms = config_->period.count(),
202 .success = true,
203 .error_code = 0
204 };
205
206 // Send reply to subscriber's WORK mailbox (base + 1)
207 uint32_t subscriber_work_mbx = req.subscriber_base_addr + static_cast<uint8_t>(MailboxType::WORK);
208 work_mailbox_->send(reply, subscriber_work_mbx);
209 } catch (...) {
210 std::cout << "[" << module_name_ << "] Failed to add subscriber: 0x" << std::hex << req.subscriber_base_addr << std::dec << "\n";
211
212 SubscribeReplyType reply{
213 .actual_period_ms = 0,
214 .success = false,
215 .error_code = 1 // Max subscribers exceeded
216 };
217
218 uint32_t subscriber_work_mbx = req.subscriber_base_addr + static_cast<uint8_t>(MailboxType::WORK);
219 work_mailbox_->send(reply, subscriber_work_mbx);
220 }
221 }
222
227 std::lock_guard<std::mutex> lock(subscription_mutex_);
228
229 // Phase 6.6: Mark subscription as complete
230 // For now, mark the first non-replied subscription
231 // TODO: Need to track which reply corresponds to which source
232 for (auto& sub : input_subscriptions_) {
233 if (sub.subscribed && !sub.reply_received) {
234 sub.reply_received = true;
235 sub.actual_period_ms = reply.actual_period_ms;
236 std::cout << "[" << module_name_ << "] SubscribeReply received: "
237 << (reply.success ? "SUCCESS" : "FAILED")
238 << ", actual_period_ms=" << reply.actual_period_ms << "\n";
239 return;
240 }
241 }
242 }
243
247 template<typename SubscriberMgr>
248 void handle_unsubscribe_request(const UnsubscribeRequestType& req, SubscriberMgr& sub_mgr) {
249 sub_mgr.remove_subscriber(req.subscriber_base_addr);
250
251 UnsubscribeReplyType reply{.success = true};
252 uint32_t subscriber_work_mbx = req.subscriber_base_addr + static_cast<uint8_t>(MailboxType::WORK);
253 work_mailbox_->send(reply, subscriber_work_mbx);
254 }
255
256protected:
260 void subscribe_to_source_impl(uint8_t source_system_id, uint8_t source_instance_id,
261 size_t source_index) {
262 // Calculate subscriber_base_addr and source_data_type_id
263 uint32_t source_data_type_id;
264
265 // BOTH single-input and multi-input now use OUTPUT type for base address
266 // This prevents collisions (module identity = OUTPUT type)
267 uint32_t subscriber_base_addr = commrat::calculate_base_address<OutputData, OutputTypesTuple, Registry>(
269
270 // Calculate actual DATA mailbox index (after CMD mailboxes)
271 constexpr uint8_t num_outputs = std::tuple_size_v<OutputTypesTuple>;
272 uint8_t data_mailbox_index;
273
274 if constexpr (has_multi_input) {
275 // Multi-input: Specify which input index for DATA mailbox offset
276 source_data_type_id = get_input_type_id_at_index(source_index);
277 // DATA mailboxes start after CMD mailboxes: base + num_outputs + input_index
278 data_mailbox_index = get_data_mbx_base(num_outputs) + static_cast<uint8_t>(source_index);
279 }
280 else if constexpr (has_continuous_input) {
281 // Single continuous input: input_index = 0 (default DATA mailbox)
282 source_data_type_id = Registry::template get_message_id<InputData>();
283 data_mailbox_index = get_data_mbx_base(num_outputs) + 0;
284 }
285 else {
286 static_assert(has_continuous_input || has_multi_input, "Invalid input configuration");
287 return;
288 }
289
290 SubscribeRequestType request{
291 .subscriber_base_addr = subscriber_base_addr,
292 .mailbox_index = data_mailbox_index, // Actual DATA mailbox index
293 .requested_period_ms = config_->period.count()
294 };
295
296 uint16_t source_data_type_id_low = static_cast<uint16_t>(source_data_type_id & 0xFFFF);
297 uint32_t source_base = (static_cast<uint32_t>(source_data_type_id_low) << 16) |
298 (source_system_id << 8) | source_instance_id;
299 uint32_t source_work_mbx = source_base + static_cast<uint8_t>(MailboxType::WORK);
300
301 std::cout << "[" << module_name_ << "] Sending SubscribeRequest[" << source_index
302 << "] to source WORK mailbox " << source_work_mbx << "\n";
303
304 // Retry a few times in case the producer's mailbox isn't ready yet
305 int max_retries = 5;
306 for (int i = 0; i < max_retries; ++i) {
307 // Send subscribe request from work mailbox (SystemRegistry messages)
308 auto result = work_mailbox_->send(request, source_work_mbx);
309 if (result) {
310 std::cout << "[" << module_name_ << "] SubscribeRequest[" << source_index
311 << "] sent successfully\n";
312 // Mark subscription as sent (reply not yet received)
313 if (source_index < input_subscriptions_.size()) {
314 input_subscriptions_[source_index].subscribed = true;
315 input_subscriptions_[source_index].reply_received = false;
316 }
317 return;
318 }
319
320 if (i < max_retries - 1) {
321 std::cout << "[" << module_name_ << "] Failed to send SubscribeRequest (attempt "
322 << (i + 1) << "/" << max_retries << "), retrying...\n";
323 std::this_thread::sleep_for(std::chrono::milliseconds(100));
324 }
325 }
326
327 std::cout << "[" << module_name_ << "] Failed to send SubscribeRequest[" << source_index
328 << "] after " << max_retries << " attempts!\n";
329 }
330
334 template<size_t... Is>
335 uint32_t get_input_type_id_at_index_impl(size_t index, std::index_sequence<Is...>) const {
336 uint32_t result = 0;
337 ((index == Is ? (result = Registry::template get_message_id<std::tuple_element_t<Is, InputTypesTuple>>(), true) : false) || ...);
338 return result;
339 }
340
341 uint32_t get_input_type_id_at_index(size_t index) const {
342 if constexpr (has_multi_input) {
343 return get_input_type_id_at_index_impl(index, std::make_index_sequence<InputCount>{});
344 } else {
345 return Registry::template get_message_id<InputData>();
346 }
347 }
348};
349
350} // namespace commrat
Mailbox that takes a MessageRegistry and exposes payload-only interface.
Subscription protocol handler.
void handle_subscribe_reply(const SubscribeReplyType &reply)
Handle incoming SubscribeReply (consumer side)
const ModuleConfig * config_
void subscribe_to_source(uint8_t source_system_id, uint8_t source_instance_id)
Legacy single-input subscription (backward compatible)
uint32_t get_input_type_id_at_index(size_t index) const
void subscribe_to_source_impl(uint8_t source_system_id, uint8_t source_instance_id, size_t source_index)
Internal implementation: send SubscribeRequest to one source.
void unsubscribe_from_multi_input_source(const MultiInputConfig::InputSource &source)
Unsubscribe from multi-input source.
RegistryMailbox< SystemRegistry > * work_mailbox_
void handle_subscribe_request(const SubscribeRequestType &req, SubscriberMgr &sub_mgr, std::size_t output_idx=0)
Handle incoming SubscribeRequest (producer side)
void handle_unsubscribe_request(const UnsubscribeRequestType &req, SubscriberMgr &sub_mgr)
Handle incoming UnsubscribeRequest (producer side)
void unsubscribe_from_source(uint8_t source_system_id, uint8_t source_instance_id)
Unsubscribe from single-input source.
void set_module_name(const std::string &name)
void set_config(const ModuleConfig *cfg)
std::vector< SubscriptionState > input_subscriptions_
uint32_t get_input_type_id_at_index_impl(size_t index, std::index_sequence< Is... >) const
Helper: Get message ID for input type at runtime index.
void subscribe_to_all_sources()
Subscribe to all configured input sources.
void set_work_mailbox(RegistryMailbox< SystemRegistry > *mbx)
CommRaT - Modern C++ Real-Time Communication Framework.
constexpr uint8_t get_data_mbx_base(uint8_t num_outputs)
Get DATA mailbox base index (after all CMD mailboxes)
uint8_t input_system_id(size_t index) const
Get source system_id at index (MultiInput only)
bool has_single_input() const
uint8_t system_id() const
Get system_id - NoOutput or SimpleOutput only.
uint8_t instance_id() const
Get instance_id - NoOutput or SimpleOutput only.
const std::vector< MultiInputConfig::InputSource > & input_sources() const
Get input sources (MultiInput only)
uint8_t source_system_id() const
Get source_system_id (SingleInput only)
uint8_t source_instance_id() const
Get source_instance_id (SingleInput only)
std::chrono::milliseconds period
uint8_t input_instance_id(size_t index) const
Get source instance_id at index (MultiInput only)
Reply to subscription request.
int64_t actual_period_ms
Actual update period in ms.
bool success
True if subscription succeeded.
Request to subscribe to continuous data from a producer module.
uint32_t subscriber_base_addr
Consumer's base address ([type][sys][inst][mbx=0])
uint8_t mailbox_index
Which mailbox to send data to (DATA mailbox index)
Subscription state tracker for multi-input modules.
Acknowledgment of unsubscribe request.
bool success
Always true unless error.
Request to unsubscribe from continuous data.
uint32_t subscriber_base_addr
Consumer's base mailbox address (no mailbox_index)