CommRaT 2.0.0
C++20 Real-Time Messaging Framework
Loading...
Searching...
No Matches
timestamped_ring_buffer.hpp
Go to the documentation of this file.
1
12#pragma once
13
16#include <commrat/messages.hpp> // For TimsMessage
17#include <sertial/containers/ring_buffer.hpp>
18#include <optional>
19#include <cmath>
20#include <algorithm>
21
22namespace commrat {
23
24// ============================================================================
25// Timestamp Accessor Trait (handles both direct .timestamp and .header.timestamp)
26// ============================================================================
27
34template<typename T>
36 static uint64_t get(const T& msg) {
37 return msg.timestamp;
38 }
39};
40
41// Specialization for TimsMessage<T>
42template<typename PayloadT>
43struct TimestampAccessor<TimsMessage<PayloadT>> {
44 static uint64_t get(const TimsMessage<PayloadT>& msg) {
45 return msg.header.timestamp;
46 }
47};
48
53 NEAREST,
54 BEFORE,
55 AFTER,
57};
58
101template<typename T, std::size_t MaxSize = 100>
103 // Compile-time validation
104 static_assert(MaxSize > 0, "MaxSize must be greater than 0");
105
106public:
107 using value_type = T;
108 using size_type = std::size_t;
109
110 // ========================================================================
111 // Construction
112 // ========================================================================
113
119 std::chrono::milliseconds default_tolerance = std::chrono::milliseconds(50)
120 ) : default_tolerance_(default_tolerance) {}
121
122 // ========================================================================
123 // Capacity
124 // ========================================================================
125
131 size_type size() const {
132 SharedLock lock(mutex_);
133 return buffer_.size();
134 }
135
140 static constexpr size_type capacity() {
141 return MaxSize;
142 }
143
149 bool empty() const {
150 SharedLock lock(mutex_);
151 return buffer_.empty();
152 }
153
159 bool full() const {
160 SharedLock lock(mutex_);
161 return buffer_.full();
162 }
163
168 void clear() {
169 UniqueLockShared lock(mutex_);
170 buffer_.clear();
171 oldest_timestamp_ = 0;
172 newest_timestamp_ = 0;
173 }
174
175 // ========================================================================
176 // Modifiers
177 // ========================================================================
178
191 void push(const T& message) {
192 UniqueLockShared lock(mutex_);
193
194 // Validate timestamp ordering (debug only)
195 #ifndef NDEBUG
196 if (!buffer_.empty() && TimestampAccessor<T>::get(message) < newest_timestamp_) {
197 // WARNING: Timestamps must be monotonically increasing
198 // This is a logic error in the producer
199 }
200 #endif
201
202 // Update timestamp bounds
203 if (buffer_.empty()) {
204 oldest_timestamp_ = TimestampAccessor<T>::get(message);
205 } else if (buffer_.full()) {
206 // Overwriting oldest - update lower bound
207 oldest_timestamp_ = TimestampAccessor<T>::get(buffer_[1]); // Second-oldest becomes oldest
208 }
209 newest_timestamp_ = TimestampAccessor<T>::get(message);
210
211 buffer_.push_back(message);
212 }
213
219 void push(T&& message) {
220 UniqueLockShared lock(mutex_);
221
222 #ifndef NDEBUG
223 if (!buffer_.empty() && TimestampAccessor<T>::get(message) < newest_timestamp_) {
224 // WARNING: Timestamp order violation
225 }
226 #endif
227
228 if (buffer_.empty()) {
229 oldest_timestamp_ = TimestampAccessor<T>::get(message);
230 } else if (buffer_.full()) {
231 oldest_timestamp_ = TimestampAccessor<T>::get(buffer_[1]);
232 }
233 newest_timestamp_ = TimestampAccessor<T>::get(message);
234
235 buffer_.push_back(std::move(message));
236 }
237
238 // ========================================================================
239 // Timestamp-Based Lookup (Primary Feature for Phase 6)
240 // ========================================================================
241
276 std::optional<T> getData(
277 uint64_t timestamp,
278 std::chrono::milliseconds tolerance = std::chrono::milliseconds(-1),
280 ) const {
281 SharedLock lock(mutex_);
282
283 if (buffer_.empty()) {
284 return std::nullopt;
285 }
286
287 // Use default tolerance if not specified
288 if (tolerance.count() < 0) {
289 tolerance = default_tolerance_;
290 }
291
292 // Convert tolerance from milliseconds to nanoseconds
293 // Timestamps are in nanoseconds (from Time::now()), so tolerance must match
294 uint64_t tolerance_ns = static_cast<uint64_t>(tolerance.count()) * 1'000'000ULL;
295
296 // Quick bounds check (handle underflow for low timestamps)
297 uint64_t lower_bound = (timestamp >= tolerance_ns) ? (timestamp - tolerance_ns) : 0;
298 uint64_t upper_bound = timestamp + tolerance_ns;
299 if (upper_bound < timestamp) { // Overflow check
300 upper_bound = UINT64_MAX;
301 }
302
303 if (upper_bound < oldest_timestamp_ || lower_bound > newest_timestamp_) {
304 return std::nullopt; // Requested timestamp outside buffer range
305 }
306
307 // Dispatch to mode-specific implementation
308 switch (mode) {
310 return getData_nearest(timestamp, tolerance_ns);
312 return getData_before(timestamp, tolerance_ns);
314 return getData_after(timestamp, tolerance_ns);
316 // Future: Linear interpolation between messages
317 // For now, fall back to NEAREST
318 return getData_nearest(timestamp, tolerance_ns);
319 }
320
321 return std::nullopt;
322 }
323
329 std::pair<uint64_t, uint64_t> getTimestampRange() const {
330 SharedLock lock(mutex_);
331 if (buffer_.empty()) {
332 return {0, 0};
333 }
334 return {oldest_timestamp_, newest_timestamp_};
335 }
336
337private:
338 // ========================================================================
339 // Internal Lookup Implementations
340 // ========================================================================
341
346 std::optional<T> getData_nearest(uint64_t timestamp, uint64_t tolerance_ns) const {
347 if (buffer_.empty()) {
348 return std::nullopt;
349 }
350
351 size_type best_idx = 0;
352 uint64_t best_diff = std::abs(static_cast<int64_t>(TimestampAccessor<T>::get(buffer_[0]) - timestamp));
353
354 // Linear search for minimum time difference
355 for (size_type i = 1; i < buffer_.size(); ++i) {
356 uint64_t diff = std::abs(static_cast<int64_t>(TimestampAccessor<T>::get(buffer_[i]) - timestamp));
357 if (diff < best_diff) {
358 best_diff = diff;
359 best_idx = i;
360 }
361 }
362
363 if (best_diff <= tolerance_ns) {
364 return buffer_[best_idx];
365 }
366
367 return std::nullopt;
368 }
369
374 std::optional<T> getData_before(uint64_t timestamp, uint64_t tolerance_units) const {
375 if (buffer_.empty()) {
376 return std::nullopt;
377 }
378
379 // Search backwards from newest to oldest
380 for (size_type i = buffer_.size(); i > 0; --i) {
381 size_type idx = i - 1;
382 if (TimestampAccessor<T>::get(buffer_[idx]) <= timestamp) {
383 uint64_t diff = timestamp - TimestampAccessor<T>::get(buffer_[idx]);
384 if (diff <= tolerance_units) {
385 return buffer_[idx];
386 }
387 break; // Found newest candidate, but out of tolerance
388 }
389 }
390
391 return std::nullopt;
392 }
393
398 std::optional<T> getData_after(uint64_t timestamp, uint64_t tolerance_units) const {
399 if (buffer_.empty()) {
400 return std::nullopt;
401 }
402
403 // Search forwards from oldest to newest
404 for (size_type i = 0; i < buffer_.size(); ++i) {
405 if (TimestampAccessor<T>::get(buffer_[i]) >= timestamp) {
406 uint64_t diff = TimestampAccessor<T>::get(buffer_[i]) - timestamp;
407 if (diff <= tolerance_units) {
408 return buffer_[i];
409 }
410 break; // Found oldest candidate, but out of tolerance
411 }
412 }
413
414 return std::nullopt;
415 }
416
417 // ========================================================================
418 // Member Variables
419 // ========================================================================
420
421 mutable SharedMutex mutex_;
422 sertial::RingBuffer<T, MaxSize> buffer_;
423
424 // Cached timestamp bounds (avoid scanning on every getData)
425 uint64_t oldest_timestamp_{0};
426 uint64_t newest_timestamp_{0};
427
428 std::chrono::milliseconds default_tolerance_;
429};
430
431} // namespace commrat
Thread-safe timestamped ring buffer with getData lookup.
bool full() const
Check if buffer is full.
void push(const T &message)
Push new message with timestamp.
void clear()
Clear all messages from buffer.
bool empty() const
Check if buffer is empty.
std::pair< uint64_t, uint64_t > getTimestampRange() const
Get timestamp range currently in buffer.
static constexpr size_type capacity()
Get maximum capacity.
void push(T &&message)
Push new message via move.
TimestampedRingBuffer(std::chrono::milliseconds default_tolerance=std::chrono::milliseconds(50))
Constructor with optional sync tolerance.
std::optional< T > getData(uint64_t timestamp, std::chrono::milliseconds tolerance=std::chrono::milliseconds(-1), InterpolationMode mode=InterpolationMode::NEAREST) const
size_type size() const
Get current number of stored messages.
CommRaT - Modern C++ Real-Time Communication Framework.
std::shared_lock< SharedMutex > SharedLock
Scoped shared lock (RAII) - for readers.
InterpolationMode
Interpolation mode for timestamp-based lookup.
@ BEFORE
Return message at or before requested timestamp.
@ NEAREST
Return closest message by timestamp.
@ AFTER
Return message at or after requested timestamp.
@ INTERPOLATE
Linear interpolation (if T supports it - future)
std::unique_lock< SharedMutex > UniqueLockShared
Scoped unique lock (RAII) - for writers.
static uint64_t get(const TimsMessage< PayloadT > &msg)
Trait to access timestamp from different message types.
static uint64_t get(const T &msg)
Unified threading and synchronization abstractions for CommRaT.
Unified timestamp and time utility abstractions for CommRaT.