CommRaT 2.0.0
C++20 Real-Time Messaging Framework
Loading...
Searching...
No Matches
multi_input_infrastructure.hpp
Go to the documentation of this file.
1
10#pragma once
11
15#include <iostream>
16#include <tuple>
17#include <optional>
18#include <thread>
19#include <vector>
20
21namespace commrat {
22
34template<typename ModuleType, typename UserRegistry, typename InputTypesTuple, std::size_t InputCount>
36protected:
37 // Helper: Create HistoricalMailbox type for each input type
38 template<typename T>
39 using HistoricalMailboxFor = HistoricalMailbox<UserRegistry, 100>; // TODO: Make history size configurable
40
41 // Generate tuple of HistoricalMailbox types from InputTypesTuple
42 template<typename Tuple>
44
45 template<typename... Ts>
46 struct MakeHistoricalMailboxTuple<std::tuple<Ts...>> {
47 using type = std::tuple<HistoricalMailboxFor<Ts>...>;
48 };
49
51 std::optional<HistoricalMailboxTuple> input_mailboxes_;
52
53 std::vector<std::thread> secondary_input_threads_;
54
61 create_input_mailboxes_impl(std::make_index_sequence<InputCount>{});
62 }
63
70 start_input_mailboxes_impl(std::make_index_sequence<InputCount>{});
71 }
72
81 template<std::size_t PrimaryIdx>
83 start_secondary_threads_impl<PrimaryIdx>(std::make_index_sequence<InputCount>{});
84 }
85
90 for (auto& thread : secondary_input_threads_) {
91 if (thread.joinable()) {
92 thread.join();
93 }
94 }
95 }
96
97private:
101 template<std::size_t Index>
102 auto create_historical_mailbox_for_input() {
103 auto& module = static_cast<ModuleType&>(*this);
104 using InputType = std::tuple_element_t<Index, InputTypesTuple>;
105
106 // RACK-style addressing: [type][sys][inst][mbx_idx]
107 // Base address contains module identity (type/system/instance)
108 using OutputData = typename ModuleType::OutputData;
109 using OutputTypesTuple = typename ModuleType::OutputTypesTuple;
110 uint32_t base_addr = commrat::calculate_base_address<OutputData, OutputTypesTuple, UserRegistry>(
111 module.config_.system_id(), module.config_.instance_id());
112
113 // DATA mailboxes start after all CMD mailboxes
114 // Get num_outputs from ModuleType
115 constexpr uint8_t num_outputs = std::tuple_size_v<OutputTypesTuple>;
116 uint8_t data_mbx_index = get_data_mbx_base(num_outputs) + static_cast<uint8_t>(Index);
117
118 // Full mailbox address = base | mailbox_index
119 uint32_t data_mailbox_id = base_addr | data_mbx_index;
120
121 // Compile-time size for THIS specific input type (memory efficient!)
122 constexpr size_t input_message_size = get_data_mailbox_size<InputType>();
123
124 std::cout << "[" << module.config_.name << "] Creating DATA mailbox[" << Index
125 << "] at 0x" << std::hex << data_mailbox_id << std::dec
126 << " (base=0x" << std::hex << base_addr << std::dec
127 << ", index=" << static_cast<int>(data_mbx_index)
128 << ", size=" << input_message_size << " bytes)\n";
129
130 MailboxConfig mbx_config{
131 .mailbox_id = data_mailbox_id,
132 .message_slots = module.config_.data_message_slots.value(), // Extract from DefaultVal
133 .max_message_size = input_message_size, // Per-input-type size!
134 .send_priority = static_cast<uint8_t>(module.config_.priority),
135 .realtime = module.config_.realtime,
136 .mailbox_name = module.config_.name + "_data_" + std::to_string(Index)
137 };
138
139 return HistoricalMailboxFor<InputType>(
140 mbx_config,
141 module.config_.sync_tolerance()
142 );
143 }
144
148 template<std::size_t... Is>
149 void create_input_mailboxes_impl(std::index_sequence<Is...>) {
150 input_mailboxes_ = std::make_tuple(
151 create_historical_mailbox_for_input<Is>()...
152 );
153 }
154
158 template<std::size_t... Is>
159 void start_input_mailboxes_impl(std::index_sequence<Is...>) {
160 auto& module = static_cast<ModuleType&>(*this);
161
162 if (input_mailboxes_) {
163 // Check each mailbox start result
164 ([&]() {
165 auto result = std::get<Is>(*input_mailboxes_).start();
166 if (!result) {
167 std::cerr << "[" << module.config_.name << "] ERROR: Failed to start input mailbox "
168 << Is << " - error " << static_cast<int>(result.get_error()) << "\n";
169 }
170 }(), ...);
171 }
172 }
173
177 template<std::size_t PrimaryIdx, std::size_t... Is>
178 void start_secondary_threads_impl(std::index_sequence<Is...>) {
179 auto& module = static_cast<ModuleType&>(*this);
180
181 // Start thread for each input except primary
182 ((Is != PrimaryIdx ?
183 (secondary_input_threads_.emplace_back(&ModuleType::template secondary_input_receive_loop<Is>, &module), true) :
184 true), ...);
185 }
186
187protected:
194 template<std::size_t InputIdx>
196 auto& module = static_cast<ModuleType&>(*this);
197 using InputType = std::tuple_element_t<InputIdx, InputTypesTuple>;
198 auto& mailbox = std::get<InputIdx>(*input_mailboxes_);
199
200 std::cout << "[" << module.config_.name << "] secondary_input_receive_loop[" << InputIdx << "] started\n";
201
202 int receive_count = 0;
203 while (module.running_) {
204 // Blocking receive - stores in historical buffer automatically
205 auto result = mailbox.template receive<InputType>();
206 if (!result.has_value()) {
207 std::cout << "[" << module.config_.name << "] secondary_input_receive_loop[" << InputIdx
208 << "] receive failed after " << receive_count << " messages\n";
209 break;
210 }
211 receive_count++;
212 if (receive_count <= 3) {
213 std::cout << "[" << module.config_.name << "] secondary_input_receive_loop[" << InputIdx
214 << "] received message #" << receive_count
215 << ", timestamp=" << result.value().header.timestamp << "\n";
216 }
217 }
218
219 std::cout << "[" << module.config_.name << "] secondary_input_receive_loop[" << InputIdx
220 << "] ended (total: " << receive_count << " messages)\n";
221 }
222};
223
224} // namespace commrat
Mailbox with timestamped history for getData synchronization.
Multi-input infrastructure mixin.
void start_secondary_input_threads()
Start receive threads for secondary inputs.
void join_secondary_input_threads()
Join all secondary input threads.
std::optional< HistoricalMailboxTuple > input_mailboxes_
typename MakeHistoricalMailboxTuple< InputTypesTuple >::type HistoricalMailboxTuple
void secondary_input_receive_loop()
Secondary input receive loop.
std::vector< std::thread > secondary_input_threads_
void initialize_multi_input_mailboxes()
Initialize multi-input mailboxes.
void start_input_mailboxes()
Start all input mailboxes.
RegistryMailbox wrapper with timestamped history for multi-input synchronization (Phase 6....
CommRaT - Modern C++ Real-Time Communication Framework.
constexpr uint8_t get_data_mbx_base(uint8_t num_outputs)
Get DATA mailbox base index (after all CMD mailboxes)