CommRaT 2.0.0
C++20 Real-Time Messaging Framework
Loading...
Searching...
No Matches
lifecycle_manager.hpp
Go to the documentation of this file.
1#pragma once
2
3#include <iostream>
4#include <thread>
5#include <chrono>
6
7namespace commrat {
8
20template<typename ModuleType>
22public:
36 void start() {
37 auto& module = static_cast<ModuleType&>(*this);
38
39 if (module.running_) {
40 return;
41 }
42
43 module.on_init();
44
45 // Start mailboxes (always use MailboxSets, even for single output)
46 module.template start_all_mailbox_sets(std::make_index_sequence<module.num_output_types>{});
47
48 // Only start data mailbox for single ContinuousInput modules
49 if (module.data_mailbox_) {
50 auto data_result = module.data_mailbox_->start();
51 if (!data_result) {
52 throw std::runtime_error("Failed to start data mailbox");
53 }
54 }
55
56 // Phase 6.6: Start multi-input mailboxes
57 if constexpr (module.has_multi_input) {
58 module.start_input_mailboxes();
59 }
60
61 module.running_ = true;
62 module.on_start();
63
64 // Start work thread(s) FIRST to handle subscriptions
65 // Always use per-output work threads (even for single output)
66 std::cout << "[" << module.config_.name << "] Spawning " << module.num_output_types << " output work threads...\n";
67 module.template spawn_all_output_work_threads(std::make_index_sequence<module.num_output_types>{});
68
69 // Start command thread for user commands (only if module has commands)
70 if constexpr (module.num_command_types > 0) {
71 module.command_thread_ = std::thread(&ModuleType::command_loop, &module);
72 }
73
74 // Give threads time to start
75 std::this_thread::sleep_for(std::chrono::milliseconds(10));
76
77 // Subscribe to source(s)
78 if constexpr (module.has_multi_input) {
79 // Phase 6.6: Multi-input subscription
80 module.subscribe_to_all_sources();
81 } else if constexpr (module.has_continuous_input) {
82 // Single-input (backward compatible)
83 if (module.config_.has_single_input()) {
84 module.subscribe_to_source(module.config_.source_system_id(), module.config_.source_instance_id());
85 }
86 }
87
88 // Start data thread based on input mode
89 if constexpr (module.has_periodic_input) {
90 std::cout << "[" << module.config_.name << "] Starting periodic_loop thread...\n";
91 module.data_thread_ = std::thread(&ModuleType::periodic_loop, &module);
92 } else if constexpr (module.has_loop_input) {
93 std::cout << "[" << module.config_.name << "] Starting free_loop thread...\n";
94 module.data_thread_ = std::thread(&ModuleType::free_loop, &module);
95 } else if constexpr (module.has_multi_input) {
96 // Phase 6.6: Multi-input processing
97 std::cout << "[" << module.config_.name << "] Starting multi_input_loop thread...\n";
98 module.data_thread_ = std::thread(&ModuleType::multi_input_loop, &module);
99
100 // Phase 6.9: Start secondary input receive threads
101 // Primary input (index 0) is handled by multi_input_loop's blocking receive
102 // Secondary inputs (indices 1, 2, ...) need background receive loops
103 constexpr size_t primary_idx = ModuleType::get_primary_input_index();
104 module.template start_secondary_input_threads<primary_idx>();
105 } else if constexpr (module.has_continuous_input) {
106 // Single continuous input (backward compatible)
107 std::cout << "[" << module.config_.name << "] Starting continuous_loop thread...\n";
108 module.data_thread_ = std::thread(&ModuleType::continuous_loop, &module);
109 }
110 }
111
126 void stop() {
127 auto& module = static_cast<ModuleType&>(*this);
128
129 if (!module.running_) {
130 return;
131 }
132
133 module.on_stop();
134
135 // Unsubscribe from source(s)
136 if constexpr (module.has_multi_input) {
137 // Multi-input: Unsubscribe from all configured sources
138 for (const auto& source : module.config_.input_sources()) {
139 module.unsubscribe_from_multi_input_source(source);
140 }
141 } else if constexpr (module.has_continuous_input) {
142 // Single continuous input (legacy)
143 if (module.config_.has_single_input()) {
144 module.unsubscribe_from_source(module.config_.source_system_id(), module.config_.source_instance_id());
145 }
146 }
147
148 module.running_ = false;
149
150 // Wait for threads to finish
151 if (module.data_thread_ && module.data_thread_->joinable()) {
152 module.data_thread_->join();
153 }
154
155 // Phase 4: Join secondary input threads (via MultiInputInfrastructure)
156 if constexpr (module.has_multi_input) {
157 module.join_secondary_input_threads();
158 }
159
160 // Join work threads
161 module.join_output_work_threads();
162
163 if (module.command_thread_ && module.command_thread_->joinable()) {
164 module.command_thread_->join();
165 }
166
167 // Stop all mailboxes
168 module.template stop_all_mailbox_sets(std::make_index_sequence<module.num_output_types>{});
169 if (module.data_mailbox_) {
170 module.data_mailbox_->stop();
171 }
172
173 module.on_cleanup();
174 }
175
179 bool is_running() const {
180 const auto& module = static_cast<const ModuleType&>(*this);
181 return module.running_;
182 }
183};
184
185} // namespace commrat
Phase 6: Lifecycle Management CRTP Mixin.
void start()
Start the module.
bool is_running() const
Check if module is currently running.
void stop()
Stop the module.
CommRaT - Modern C++ Real-Time Communication Framework.