CommRaT 2.0.0
C++20 Real-Time Messaging Framework
Loading...
Searching...
No Matches
loop_executor.hpp
Go to the documentation of this file.
1
14#pragma once
15
17#include <iostream>
18#include <thread>
19#include <atomic>
20
21namespace commrat {
22
35template<
36 typename ModuleType // CRTP: Derived Module type
37>
39protected:
40 ModuleType& module() { return static_cast<ModuleType&>(*this); }
41 const ModuleType& module() const { return static_cast<const ModuleType&>(*this); }
42
43public:
53 auto& mod = module();
54 std::cout << "[" << mod.config_.name << "] periodic_loop started, period="
55 << mod.config_.period.count() << "ms\n";
56
57 uint32_t iteration = 0;
58 while (mod.running_) {
59 if (iteration < 3) {
60 std::cout << "[" << mod.config_.name << "] periodic_loop iteration " << iteration << "\n";
61 }
62
63 // Phase 6.10: Capture timestamp at data generation moment
64 uint64_t generation_timestamp = Time::now();
65
66 if constexpr (ModuleType::has_multi_output) {
67 // Multi-output: create tuple and call process with references
68 typename ModuleType::OutputTypesTuple outputs{};
69 // Unpack tuple and call multi-output process(Ts&...) via virtual dispatch
70 // Must use MultiOutputProcessorBase explicitly to avoid ambiguity with SingleOutputProcessorBase
71 using MultiOutBase = MultiOutputProcessorBase<
72 typename ModuleType::OutputTypesTuple,
73 typename ModuleType::InputData
74 >;
75 std::apply([&mod](auto&... args) {
76 static_cast<MultiOutBase&>(mod).process(args...);
77 }, outputs);
78 // Phase 6.10: Publish with automatic header.timestamp
79 mod.publish_multi_outputs_with_timestamp(outputs, generation_timestamp);
80 } else {
81 // Single output: call process() with output reference
82 typename ModuleType::OutputData output{};
83 mod.process(output); // Virtual call to derived class
84 // Phase 6.10: Wrap in TimsMessage with header.timestamp = generation time
85 auto tims_msg = mod.create_tims_message(std::move(output), generation_timestamp);
86 mod.publish_tims_message(tims_msg);
87 }
88
89 std::this_thread::sleep_for(mod.config_.period);
90 iteration++;
91 }
92
93 std::cout << "[" << mod.config_.name << "] periodic_loop ended after " << iteration << " iterations\n";
94 }
95
104 void free_loop() {
105 auto& mod = module();
106
107 while (mod.running_) {
108 // Phase 6.10: Capture timestamp at data generation moment
109 uint64_t generation_timestamp = Time::now();
110
111 if constexpr (ModuleType::has_multi_output) {
112 // Multi-output: create tuple and call process with references
113 typename ModuleType::OutputTypesTuple outputs{};
114 // Must use MultiOutputProcessorBase explicitly to avoid ambiguity
115 using MultiOutBase = MultiOutputProcessorBase<
116 typename ModuleType::OutputTypesTuple,
117 typename ModuleType::InputData
118 >;
119 std::apply([&mod](auto&... args) {
120 static_cast<MultiOutBase&>(mod).process(args...);
121 }, outputs);
122 mod.publish_multi_outputs_with_timestamp(outputs, generation_timestamp);
123 } else {
124 // Single output: call process() with virtual dispatch
125 // TODO: think about where to buffer output memory
126 typename ModuleType::OutputData output{};
127 mod.process(output);
128 auto tims_msg = mod.create_tims_message(std::move(output), generation_timestamp);
129 mod.publish_tims_message(tims_msg);
130 }
131 }
132 }
133
143 auto& mod = module();
144 std::cout << "[" << mod.config_.name << "] continuous_loop started, waiting for data...\n";
145
146 while (mod.running_) {
147 // BLOCKING receive on data mailbox - no timeout, waits for data
148 auto result = mod.data_mailbox_->template receive<typename ModuleType::InputData>();
149
150 if (result) {
151 // Phase 6.10: Populate metadata BEFORE process call
152 // Single continuous input always uses index 0
153 mod.update_input_metadata(0, result.value(), true); // Always new data for continuous
154
155 typename ModuleType::OutputData output{};
156 mod.process_dispatch(result->payload, output);
157 // Phase 6.10: Use input timestamp from header (data validity time)
158 auto tims_msg = mod.create_tims_message(std::move(output), result->header.timestamp);
159 mod.publish_tims_message(tims_msg);
160 }
161 }
162
163 std::cout << "[" << mod.config_.name << "] continuous_loop ended\n";
164 }
165
176 auto& mod = module();
177 static_assert(ModuleType::has_multi_input, "multi_input_loop only for multi-input modules");
178
179 std::cout << "[" << mod.config_.name << "] multi_input_loop started ("
180 << ModuleType::InputCount << " inputs)\n";
181
182 // Identify primary input index
183 constexpr size_t primary_idx = ModuleType::get_primary_input_index();
184 std::cout << "[" << mod.config_.name << "] Primary input index: " << primary_idx << "\n";
185
186 uint32_t loop_iteration = 0;
187 while (mod.running_) {
188 // Step 1: BLOCK on primary input (drives execution)
189 if (loop_iteration < 3) {
190 std::cout << "[" << mod.config_.name << "] Waiting for primary input... (iteration "
191 << loop_iteration << ")\n";
192 }
193
194 auto primary_result = mod.template receive_primary_input<primary_idx>();
195
196 if (!primary_result.has_value()) {
197 if (loop_iteration < 3) {
198 std::cout << "[" << mod.config_.name << "] No primary data received\n";
199 }
200 loop_iteration++;
201 continue;
202 }
203
204 if (loop_iteration < 3) {
205 std::cout << "[" << mod.config_.name << "] Primary input received!\n";
206 }
207
208 // Phase 6.10: Populate primary metadata
209 mod.update_input_metadata(0, primary_result.value(), true);
210
211 // Step 2: Sync all secondary inputs
212 auto all_inputs = mod.template gather_all_inputs<primary_idx>(primary_result.value());
213
214 if (!all_inputs) {
215 if (loop_iteration < 3) {
216 std::cout << "[" << mod.config_.name << "] Failed to sync inputs\n";
217 }
218 loop_iteration++;
219 continue;
220 }
221
222 if (loop_iteration < 3) {
223 std::cout << "[" << mod.config_.name << "] All inputs synced, calling process()\n";
224 }
225
226 // Phase 6.10: Extract primary timestamp (synchronization point)
227 uint64_t primary_timestamp = primary_result->header.timestamp;
228
229 // Step 3: Call process with all inputs
230 if constexpr (ModuleType::has_multi_output) {
231 typename ModuleType::OutputTypesTuple outputs{};
232 mod.call_multi_input_multi_output_process(*all_inputs, outputs);
233 mod.publish_multi_outputs_with_timestamp(outputs, primary_timestamp);
234 } else {
235 typename ModuleType::OutputData output{};
236 mod.call_multi_input_process(*all_inputs, output);
237 auto tims_msg = mod.create_tims_message(std::move(output), primary_timestamp);
238 mod.publish_tims_message(tims_msg);
239 }
240
241 loop_iteration++;
242 }
243
244 std::cout << "[" << mod.config_.name << "] multi_input_loop ended\n";
245 }
246};
247
248} // namespace commrat
Loop execution logic for Module data threads.
void periodic_loop()
Periodic loop - time-driven data generation.
void continuous_loop()
Continuous loop - event-driven single input processing.
void free_loop()
Free loop - maximum throughput data generation.
const ModuleType & module() const
void multi_input_loop()
Multi-input loop - synchronized multi-input processing.
static Timestamp now() noexcept
Get current timestamp in nanoseconds.
Definition timestamp.hpp:72
CommRaT - Modern C++ Real-Time Communication Framework.
Unified timestamp and time utility abstractions for CommRaT.