CommRaT 2.0.0
C++20 Real-Time Messaging Framework
Loading...
Searching...
No Matches
publishing.hpp
Go to the documentation of this file.
1
12#pragma once
13
16#include <commrat/messages.hpp> // TimsMessage definition
17#include <commrat/module/helpers/address_helpers.hpp> // encode_address, extract_*
18#include <commrat/module/io/multi_output_manager.hpp> // SubscriberInfo
19#include <iostream>
20#include <tuple>
21#include <mutex>
22#include <vector>
23
24namespace commrat {
25
42template<
43 typename UserRegistry,
44 typename OutputData,
45 typename PublishMailboxT = TypedMailbox<UserRegistry>,
46 typename ModuleType = void // Module type for multi-output mailbox access
47>
48class Publisher {
49protected:
50 // References to module resources (set by derived class)
51 // REMOVED: subscriber_manager_ - ALL modules now use MultiOutputManager (post-unification)
52 // REMOVED: publish_mailbox_ - ALL modules now use MailboxSets (post-unification)
53 ModuleType* module_ptr_{nullptr}; // Typed pointer to Module for mailbox/subscriber access
54 std::string module_name_;
55
56public:
57 // REMOVED: set_subscriber_manager() - no longer used after unification
58 // REMOVED: set_publish_mailbox() - no longer used after unification
59 void set_module_ptr(ModuleType* ptr) { module_ptr_ = ptr; }
60 void set_module_name(const std::string& name) { module_name_ = name; }
61
66 template<typename T>
67 static TimsMessage<T> create_tims_message(T&& payload, uint64_t timestamp_ns) {
69 .header = {
70 .msg_type = 0, // serialize() will set this
71 .msg_size = 0, // serialize() will set this
72 .timestamp = timestamp_ns, // ONE SOURCE OF TRUTH
73 .seq_number = 0, // TiMS will set this
74 .flags = 0
75 },
76 .payload = std::forward<T>(payload)
77 };
78 return msg;
79 }
80
87 template<typename T = OutputData>
88 requires (!std::is_void_v<T>)
89 void publish_to_subscribers(T& data) {
90 if constexpr (!std::is_void_v<ModuleType>) {
91 // Use CMD mailbox for publishing (Phase 7)
92 auto& cmd_mbx = module_ptr_->template get_cmd_mailbox_public<0>();
93 // Get output-specific subscriber list (index 0 for single-output)
94 auto subscribers = module_ptr_->get_output_subscribers(0);
95
96 for (const auto& sub : subscribers) {
97 // Calculate destination: base_addr | mailbox_index
98 uint32_t dest_mailbox = sub.base_addr | sub.input_index;
99 auto result = cmd_mbx.send(data, dest_mailbox);
100 if (!result) {
101 std::cerr << "[" << module_name_ << "] Send failed to subscriber base=0x" << std::hex << sub.base_addr
102 << " mbx_idx=" << std::dec << static_cast<int>(sub.input_index)
103 << " dest=0x" << std::hex << dest_mailbox << std::dec
104 << " error=" << static_cast<int>(result.get_error()) << "\n";
105 }
106 }
107 }
108 }
109
115 template<typename T>
117 if constexpr (!std::is_void_v<ModuleType>) {
118 // Use CMD mailbox for publishing (Phase 7)
119 auto& cmd_mbx = module_ptr_->template get_cmd_mailbox_public<0>();
120 // Get output-specific subscriber list (index 0 for single-output)
121 auto subscribers = module_ptr_->get_output_subscribers(0);
122
123 for (const auto& sub : subscribers) {
124 uint32_t dest_mailbox = sub.base_addr | sub.input_index;
125 // Phase 6.10: Send with explicit timestamp from header
126 auto result = cmd_mbx.send(tims_msg.payload, dest_mailbox, tims_msg.header.timestamp);
127 if (!result) {
128 std::cerr << "[" << module_name_ << "] Send failed to subscriber base=0x" << std::hex << sub.base_addr
129 << " mbx_idx=" << std::dec << static_cast<int>(sub.input_index)
130 << " dest=0x" << std::hex << dest_mailbox << std::dec
131 << " error=" << static_cast<int>(result.get_error()) << "\n";
132 }
133 }
134 }
135 }
136
144 template<std::size_t Index, typename OutputType>
145 void send_output_at_index(uint16_t subscriber_type_id_low, OutputType& output, uint32_t dest_mailbox) {
146 // Get the full message ID for this output type
147 constexpr uint32_t output_msg_id = UserRegistry::template get_message_id<OutputType>();
148 constexpr uint16_t output_type_id_low = static_cast<uint16_t>(output_msg_id & 0xFFFF);
149
150 // Only send if types match
151 if (output_type_id_low == subscriber_type_id_low) {
152 // Get the correct publish mailbox for this output index (unified path)
153 if constexpr (!std::is_void_v<ModuleType>) {
154 auto& publish_mbx = module_ptr_->template get_publish_mailbox_public<Index>();
155 auto result = publish_mbx.send(output, dest_mailbox);
156 if (!result) {
157 std::cout << "[" << module_name_ << "] Send failed for output[" << Index << "]\n";
158 }
159 }
160 }
161 }
162
167 template<typename... Ts, std::size_t... Is>
168 void publish_multi_outputs_impl(std::tuple<Ts...>& outputs, std::index_sequence<Is...>) {
169 // Each output index publishes to its own subscriber list
170 (publish_output_at_index<Is>(std::get<Is>(outputs)), ...);
171 }
172
177 template<std::size_t Index, typename OutputType>
178 void publish_output_at_index(OutputType& output) {
179 if constexpr (!std::is_void_v<ModuleType>) {
180 // Get subscribers for this specific output
181 auto subscribers = module_ptr_->get_output_subscribers(Index);
182
183 // Send to each subscriber using CMD mailbox
184 auto& cmd_mbx = module_ptr_->template get_cmd_mailbox_public<Index>();
185 for (const auto& sub : subscribers) {
186 uint32_t dest_mailbox = sub.base_addr | sub.input_index;
187 auto result = cmd_mbx.send(output, dest_mailbox);
188 if (!result) {
189 std::cout << "[" << module_name_ << "] Send failed for output[" << Index
190 << "] to subscriber base=0x" << std::hex << sub.base_addr
191 << " mbx_idx=" << std::dec << static_cast<int>(sub.input_index)
192 << " dest=0x" << std::hex << dest_mailbox << std::dec << "\n";
193 }
194 }
195 }
196 }
197
202 template<typename... Ts>
203 void publish_multi_outputs(std::tuple<Ts...>& outputs) {
204 publish_multi_outputs_impl(outputs, std::index_sequence_for<Ts...>{});
205 }
206
214 template<typename... Ts>
215 void publish_multi_outputs_with_timestamp(std::tuple<Ts...>& outputs, uint64_t timestamp_ns) {
216 // For multi-output, we still use the existing publish_multi_outputs
217 // because each output goes through send_if_type_matches which wraps in TimsMessage
218 // The timestamp is not used here yet - future enhancement could wrap each output
219 publish_multi_outputs(outputs);
220 }
221};
222
223} // namespace commrat
Publishing logic for producer modules.
void set_module_name(const std::string &name)
ModuleType * module_ptr_
void publish_multi_outputs(std::tuple< Ts... > &outputs)
Publish multiple outputs (tuple) to subscribers Each subscriber receives only outputs matching their ...
void send_output_at_index(uint16_t subscriber_type_id_low, OutputType &output, uint32_t dest_mailbox)
Send output at given index if type matches subscriber's expected type Phase 7.4: Uses Module's get_pu...
void publish_to_subscribers(T &data)
Single-output publishing (only enabled when OutputData is not void) Publishes data to all subscribers...
static TimsMessage< T > create_tims_message(T &&payload, uint64_t timestamp_ns)
Create TimsMessage with explicit timestamp Phase 6.10: Single source of truth for timestamps (header....
void publish_multi_outputs_impl(std::tuple< Ts... > &outputs, std::index_sequence< Is... >)
Multi-output publishing implementation Phase 7: Each output publishes to its own subscriber list.
void set_module_ptr(ModuleType *ptr)
void publish_tims_message(TimsMessage< T > &tims_msg)
Publish TimsMessage<T> for single output Phase 6.10: Uses explicit timestamp from header Phase 7: Use...
void publish_output_at_index(OutputType &output)
Publish specific output to its subscribers (Phase 7) Uses CMD mailbox (one per output) and new addres...
std::string module_name_
void publish_multi_outputs_with_timestamp(std::tuple< Ts... > &outputs, uint64_t timestamp_ns)
Publish multi-outputs with explicit timestamp Phase 6.10: Wraps each output in TimsMessage with times...
CommRaT - Modern C++ Real-Time Communication Framework.