CommRaT 2.0.0
C++20 Real-Time Messaging Framework
Loading...
Searching...
No Matches
historical_mailbox.hpp
Go to the documentation of this file.
1
17#pragma once
18
19#include "registry_mailbox.hpp"
21#include "../platform/threading.hpp"
22#include <optional>
23#include <memory>
24
25namespace commrat {
26
62template<typename UserRegistry, std::size_t HistorySize = 100>
64public:
65 using Registry = UserRegistry;
67
68 // Define history buffer tuple type here (where TimestampedRingBuffer is known)
69 template<std::size_t... Is>
70 static auto make_history_tuple_type(std::index_sequence<Is...>) {
71 return std::tuple<
72 std::unique_ptr<TimestampedRingBuffer<TimsMessage<typename Registry::template type_at<Is>>, HistorySize>>...
73 >{};
74 }
75
77 std::make_index_sequence<Registry::num_types>{}
78 ));
79
80 // ========================================================================
81 // Construction and Configuration
82 // ========================================================================
83
90 const MailboxConfig& config,
91 Milliseconds default_tolerance = Milliseconds(50)
92 ) : mailbox_(config), default_tolerance_(default_tolerance) {
93 // Initialize history buffers for all registered types
94 init_history_buffers();
95 }
96
102 auto result = mailbox_.start();
103 if (!result) {
104 std::cerr << "[HistoricalMailbox] Start failed for mailbox "
105 << mailbox_.mailbox_id() << " - error "
106 << static_cast<int>(result.get_error()) << "\n";
107 } else {
108 std::cout << "[HistoricalMailbox] Started mailbox "
109 << mailbox_.mailbox_id() << " successfully\n";
110 }
111 return result;
112 }
113
117 void stop() {
118 mailbox_.stop();
119 }
120
121 // ========================================================================
122 // Primary Input API (Blocking Receive)
123 // ========================================================================
124
137 template<typename T>
139
140 // Receive from underlying mailbox
141 auto result = mailbox_.template receive<T>();
142
143 if (result) {
144 // Store in history for future getData queries
145 store_in_history(result.value());
146 }
147
148 return result;
149 }
150
151 // ========================================================================
152 // Secondary Input API (Non-Blocking getData)
153 // ========================================================================
154
181 template<typename T>
182 std::optional<TimsMessage<T>> getData(
183 uint64_t timestamp,
184 Milliseconds tolerance = Milliseconds(-1),
186 ) const {
187 auto& buffer = get_history_buffer<T>();
188 return buffer.getData(timestamp, tolerance, mode);
189 }
190
195 template<typename T>
196 std::pair<uint64_t, uint64_t> getTimestampRange() const {
197 const auto& buffer = get_history_buffer<T>();
198 return buffer.getTimestampRange();
199 }
200
204 template<typename T>
206 auto& buffer = get_history_buffer<T>();
207 buffer.clear();
208 }
209
214 clear_all_buffers();
215 }
216
217 // ========================================================================
218 // Pass-Through API to Underlying Mailbox
219 // ========================================================================
220
224 template<typename T>
225 auto send(T& message, uint32_t dest_mailbox) -> MailboxResult<void> {
226 return mailbox_.template send(message, dest_mailbox);
227 }
228
232 uint32_t get_mailbox_id() const {
233 return mailbox_.mailbox_id();
234 }
235
239 bool is_initialized() const {
240 return mailbox_.is_initialized();
241 }
242
243private:
244 // ========================================================================
245 // Internal Implementation
246 // ========================================================================
247
251 template<typename T>
252 void store_in_history(const TimsMessage<T>& tims_msg) {
253 auto& buffer = get_history_buffer<T>();
254
255 // Store TimsMessage directly - no conversion needed!
256 // Phase 6.10: Timestamp is in header (tims_msg.header.timestamp)
257 buffer.push(tims_msg);
258 }
259
263 template<typename T>
264 const TimestampedRingBuffer<TimsMessage<T>, HistorySize>& get_history_buffer() const {
265 constexpr auto type_index = Registry::template get_type_index<T>();
266 return *std::get<type_index>(history_buffers_);
267 }
268
272 template<typename T>
273 TimestampedRingBuffer<TimsMessage<T>, HistorySize>& get_history_buffer() {
274 constexpr auto type_index = Registry::template get_type_index<T>();
275 return *std::get<type_index>(history_buffers_);
276 }
277
281 void init_history_buffers() {
282 init_buffers_impl(std::make_index_sequence<Registry::num_types>{});
283 }
284
288 template<std::size_t... Is>
289 void init_buffers_impl(std::index_sequence<Is...>) {
290 ((init_buffer<Is>()), ...);
291 }
292
296 template<std::size_t I>
297 void init_buffer() {
298 using PayloadType = typename Registry::template type_at<I>;
299 std::get<I>(history_buffers_) =
300 std::make_unique<TimestampedRingBuffer<TimsMessage<PayloadType>, HistorySize>>(
301 default_tolerance_
302 );
303 }
304
308 void clear_all_buffers() {
309 clear_all_impl(std::make_index_sequence<Registry::num_types>{});
310 }
311
312 template<std::size_t... Is>
313 void clear_all_impl(std::index_sequence<Is...>) {
314 ((std::get<Is>(history_buffers_)->clear()), ...);
315 }
316
317 // ========================================================================
318 // Member Variables
319 // ========================================================================
320
321 MailboxType mailbox_;
322
323 Milliseconds default_tolerance_;
324
331 HistoryBufferTuple history_buffers_;
332};
333
334} // namespace commrat
Mailbox with timestamped history for getData synchronization.
auto send(T &message, uint32_t dest_mailbox) -> MailboxResult< void >
Send a message (pass-through to underlying mailbox)
void clearHistory()
Clear history for type T.
void stop()
Stop the mailbox (pass-through to underlying mailbox)
auto receive() -> MailboxResult< TimsMessage< T > >
Receive a message and automatically store in history.
HistoricalMailbox(const MailboxConfig &config, Milliseconds default_tolerance=Milliseconds(50))
Constructor with mailbox config and default tolerance.
std::optional< TimsMessage< T > > getData(uint64_t timestamp, Milliseconds tolerance=Milliseconds(-1), InterpolationMode mode=InterpolationMode::NEAREST) const
decltype(make_history_tuple_type(std::make_index_sequence< Registry::num_types >{})) HistoryBufferTuple
void clearAllHistory()
Clear all history buffers.
auto start() -> MailboxResult< void >
Start the mailbox (pass-through to underlying mailbox)
bool is_initialized() const
Check if mailbox is initialized.
static auto make_history_tuple_type(std::index_sequence< Is... >)
std::pair< uint64_t, uint64_t > getTimestampRange() const
Get timestamp range currently buffered for type T.
uint32_t get_mailbox_id() const
Get mailbox ID.
RegistryMailbox< UserRegistry > MailboxType
auto start() -> MailboxResult< void >
CommRaT - Modern C++ Real-Time Communication Framework.
InterpolationMode
Interpolation mode for timestamp-based lookup.
@ NEAREST
Return closest message by timestamp.
std::chrono::milliseconds Milliseconds
Definition timestamp.hpp:39
Thread-safe timestamped ring buffer for multi-input synchronization (Phase 6)