37 auto&
module = static_cast<ModuleType&>(*this);
39 if (module.running_) {
46 module.template start_all_mailbox_sets(std::make_index_sequence<module.num_output_types>{});
49 if (module.data_mailbox_) {
50 auto data_result =
module.data_mailbox_->start();
52 throw std::runtime_error(
"Failed to start data mailbox");
57 if constexpr (
module.has_multi_input) {
58 module.start_input_mailboxes();
61 module.running_ = true;
66 std::cout <<
"[" <<
module.config_.name << "] Spawning " << module.num_output_types << " output work threads...\n";
67 module.template spawn_all_output_work_threads(std::make_index_sequence<module.num_output_types>{});
70 if constexpr (
module.num_command_types > 0) {
71 module.command_thread_ = std::thread(&ModuleType::command_loop, &module);
75 std::this_thread::sleep_for(std::chrono::milliseconds(10));
78 if constexpr (
module.has_multi_input) {
80 module.subscribe_to_all_sources();
81 }
else if constexpr (
module.has_continuous_input) {
83 if (module.config_.has_single_input()) {
84 module.subscribe_to_source(module.config_.source_system_id(), module.config_.source_instance_id());
89 if constexpr (
module.has_periodic_input) {
90 std::cout << "[" << module.config_.name << "] Starting periodic_loop thread...\n";
91 module.data_thread_ = std::thread(&ModuleType::periodic_loop, &module);
92 }
else if constexpr (
module.has_loop_input) {
93 std::cout << "[" << module.config_.name << "] Starting free_loop thread...\n";
94 module.data_thread_ = std::thread(&ModuleType::free_loop, &module);
95 }
else if constexpr (
module.has_multi_input) {
97 std::cout << "[" << module.config_.name << "] Starting multi_input_loop thread...\n";
98 module.data_thread_ = std::thread(&ModuleType::multi_input_loop, &module);
103 constexpr size_t primary_idx = ModuleType::get_primary_input_index();
104 module.template start_secondary_input_threads<primary_idx>();
105 }
else if constexpr (
module.has_continuous_input) {
107 std::cout << "[" << module.config_.name << "] Starting continuous_loop thread...\n";
108 module.data_thread_ = std::thread(&ModuleType::continuous_loop, &module);
127 auto&
module = static_cast<ModuleType&>(*this);
129 if (!module.running_) {
136 if constexpr (
module.has_multi_input) {
138 for (const auto& source : module.config_.input_sources()) {
139 module.unsubscribe_from_multi_input_source(source);
141 }
else if constexpr (
module.has_continuous_input) {
143 if (module.config_.has_single_input()) {
144 module.unsubscribe_from_source(module.config_.source_system_id(), module.config_.source_instance_id());
148 module.running_ = false;
151 if (module.data_thread_ && module.data_thread_->joinable()) {
152 module.data_thread_->join();
156 if constexpr (
module.has_multi_input) {
157 module.join_secondary_input_threads();
161 module.join_output_work_threads();
163 if (module.command_thread_ && module.command_thread_->joinable()) {
164 module.command_thread_->join();
168 module.template stop_all_mailbox_sets(std::make_index_sequence<module.num_output_types>{});
169 if (module.data_mailbox_) {
170 module.data_mailbox_->stop();