CommRaT 2.0.0
C++20 Real-Time Messaging Framework
Loading...
Searching...
No Matches
multi_input_processor.hpp
Go to the documentation of this file.
1
10#pragma once
11
14#include <optional>
15#include <tuple>
16
17namespace commrat {
18
33template<typename ModuleType, typename InputTypesTuple, typename OutputData, typename OutputTypesTuple, std::size_t InputCount>
35protected:
44 template<std::size_t PrimaryIdx>
46 auto& module = static_cast<ModuleType&>(*this);
47
48 if (!module.input_mailboxes_) {
50 }
51
52 using PrimaryType = std::tuple_element_t<PrimaryIdx, InputTypesTuple>;
53 auto& primary_mailbox = std::get<PrimaryIdx>(*module.input_mailboxes_);
54
55 // BLOCKING receive - drives execution rate
56 return primary_mailbox.template receive<PrimaryType>();
57 }
58
70 template<std::size_t PrimaryIdx, typename PrimaryMsgType>
71 std::optional<InputTypesTuple> gather_all_inputs(const PrimaryMsgType& primary_msg) {
72 auto& module = static_cast<ModuleType&>(*this);
73
74 if (!module.input_mailboxes_) {
75 return std::nullopt;
76 }
77
78 // Create tuple to hold all inputs
79 InputTypesTuple all_inputs{};
80
81 // Place primary input at its index
82 std::get<PrimaryIdx>(all_inputs) = primary_msg.payload;
83
84 // Phase 6.10: Sync secondary inputs using getData with primary timestamp from header
85 // TimsMessage.header.timestamp is the authoritative timestamp
86 bool all_synced = sync_secondary_inputs<PrimaryIdx>(primary_msg.header.timestamp, all_inputs);
87
88 if (!all_synced) {
89 return std::nullopt;
90 }
91
92 return all_inputs;
93 }
94
104 template<typename O = OutputData,
105 typename = std::enable_if_t<!std::is_void_v<O>>>
106 void call_multi_input_process(const InputTypesTuple& inputs, O& output) {
107 call_multi_input_process_impl(inputs, output, std::make_index_sequence<InputCount>{});
108 }
109
118 void call_multi_input_multi_output_process(const InputTypesTuple& inputs, OutputTypesTuple& outputs) {
119 call_multi_input_multi_output_process_impl(inputs, outputs,
120 std::make_index_sequence<InputCount>{},
121 std::make_index_sequence<std::tuple_size_v<OutputTypesTuple>>{});
122 }
123
124private:
128 template<std::size_t PrimaryIdx>
129 bool sync_secondary_inputs(uint64_t primary_timestamp, InputTypesTuple& all_inputs) {
130 return sync_secondary_inputs_impl<PrimaryIdx>(primary_timestamp, all_inputs,
131 std::make_index_sequence<InputCount>{});
132 }
133
137 template<std::size_t PrimaryIdx, std::size_t... Is>
138 bool sync_secondary_inputs_impl(uint64_t primary_timestamp, InputTypesTuple& all_inputs,
139 std::index_sequence<Is...>) {
140 // For each input index (except primary), call getData
141 bool all_success = true;
142
143 // Fold expression: process each secondary input
144 ((Is != PrimaryIdx ?
145 (all_success = sync_input_at_index<Is>(primary_timestamp, all_inputs) && all_success) :
146 true), ...);
147
148 return all_success;
149 }
150
157 template<std::size_t Index>
158 bool sync_input_at_index(uint64_t primary_timestamp, InputTypesTuple& all_inputs) {
159 auto& module = static_cast<ModuleType&>(*this);
160 using InputType = std::tuple_element_t<Index, InputTypesTuple>;
161 auto& mailbox = std::get<Index>(*module.input_mailboxes_);
162
163 // Non-blocking getData with tolerance
164 auto result = mailbox.template getData<InputType>(
165 primary_timestamp,
166 module.config_.sync_tolerance(),
168 );
169
170 if (!result.has_value()) {
171 // Phase 6.10: Mark input as invalid
172 module.mark_input_invalid(Index);
173 return false; // getData failed
174 }
175
176 // Phase 6.10: Populate metadata for this input
177 // Index matches position in Inputs<T1, T2, T3, ...>
178 // getData succeeded - data is "new" (successfully retrieved from buffer)
179 // Note: is_new_data = true means getData returned a value (not nullopt)
180 // is_new_data = false would indicate using fallback/default data
181 module.update_input_metadata(Index, result.value(), true);
182
183 // Store payload in tuple
184 std::get<Index>(all_inputs) = result->payload;
185 return true;
186 }
187
192 template<std::size_t... Is, typename O = OutputData,
193 typename = std::enable_if_t<!std::is_void_v<O>>>
194 void call_multi_input_process_impl(const InputTypesTuple& inputs, O& output, std::index_sequence<Is...>) {
195 auto& module = static_cast<ModuleType&>(*this);
196
197 // Unpack tuple and call process(const T1&, const T2&, ..., Output&)
198 using Base = MultiInputProcessorBase<InputTypesTuple, OutputData, InputCount>;
199 static_cast<Base*>(&module)->process(std::get<Is>(inputs)..., output);
200 }
201
205 template<std::size_t... InputIs, std::size_t... OutputIs>
206 void call_multi_input_multi_output_process_impl(const InputTypesTuple& inputs, OutputTypesTuple& outputs,
207 std::index_sequence<InputIs...>,
208 std::index_sequence<OutputIs...>) {
209 auto& module = static_cast<ModuleType&>(*this);
210
211 // Unpack both tuples and call process(const T1&, ..., O1&, O2&, ...)
212 using Base = MultiInputProcessorBase<InputTypesTuple, OutputTypesTuple, InputCount>;
213 static_cast<Base*>(&module)->process(std::get<InputIs>(inputs)..., std::get<OutputIs>(outputs)...);
214 }
215};
216
217} // namespace commrat
Multi-input processing mixin.
std::optional< InputTypesTuple > gather_all_inputs(const PrimaryMsgType &primary_msg)
Gather all inputs synchronized to primary timestamp.
auto receive_primary_input() -> MailboxResult< TimsMessage< std::tuple_element_t< PrimaryIdx, InputTypesTuple > > >
Receive from primary input mailbox.
void call_multi_input_multi_output_process(const InputTypesTuple &inputs, OutputTypesTuple &outputs)
Call multi-input process with multi-output.
void call_multi_input_process(const InputTypesTuple &inputs, O &output)
Call multi-input process with single output.
CommRaT - Modern C++ Real-Time Communication Framework.
@ NEAREST
Return closest message by timestamp.