CommRaT (Communication Runtime) is a C++20 real-time messaging framework that provides type-safe, compile-time validated message passing for deterministic systems. Built on top of TiMS (TIMS Interprocess Message System), it enables you to build robust distributed applications with guaranteed real-time performance.
Table of Contents
- Introduction
- Core Concepts
- Your First Module
- Module Types and Processing Modes
- I/O Specifications
- Message Flow and Subscription
- Multi-Input Synchronization
- Timestamp Management
- Command Handling
- Configuration and Deployment
- Best Practices
- Troubleshooting
1. Introduction
What is CommRaT?
CommRaT is a messaging framework designed for real-time embedded systems where deterministic behavior, low latency, and type safety are critical. Unlike traditional publish-subscribe systems (ROS, DDS, MQTT), CommRaT provides:
- Compile-time validation: Message types, IDs, and registries validated at compile time
- Zero runtime overhead: No dynamic allocation, no runtime type checking
- Deterministic execution: Bounded execution time, no blocking I/O in hot paths
- Real-time safe: Lock-free where possible, priority inheritance mutexes
- Type safety: C++20 concepts enforce correct API usage
Key Features
1. Compile-Time Message Registry
using MyApp = CommRaT<
Message::Data<TemperatureData>,
Message::Data<FilteredData>,
Message::Command<ResetCmd>
>;
2. MailboxSet Architecture
CommRaT uses a MailboxSet per output type design:
Each output type gets 3 mailboxes:
- CMD Mailbox (base+0): Commands for this output type
- WORK Mailbox (base+16): Subscription protocol for this output type
- PUBLISH Mailbox (base+32): Publishes this output type to subscribers
Plus a shared DATA mailbox for all inputs:
- DATA Mailbox (base+48): Receives input data (shared across output types)
Single-Output Module Example:
class Sensor : public Module<Output<TempData>, PeriodicInput>
Has 1 MailboxSet = 3 mailboxes (CMD, WORK, PUBLISH) Total: 3 mailboxes
Multi-Output Module Example:
class Fusion : public Module<Outputs<Raw, Filtered, Diag>, Input<SensorData>>
Has 3 MailboxSets × 3 = 9 mailboxes + 1 DATA = 10 total mailboxes
- Each output type (Raw, Filtered, Diag) has its own CMD, WORK, PUBLISH
- All share one DATA mailbox for receiving SensorData
- Subscribers choose which output type to subscribe to
This design allows independent subscription per output type - a subscriber can request only FilteredData without receiving Raw or Diag.
3. Blocking Receives with Zero CPU Usage
void command_loop() {
while (running_) {
cmd_mailbox_.receive_any([this](auto&& msg) {
handle_command(msg);
});
}
}
4. Multi-Input Sensor Fusion
class SensorFusion : public MyApp::Module<
Output<FusedData>,
Inputs<IMUData, GPSData, LidarData>,
PrimaryInput<IMUData>
> {
protected:
void process(
const IMUData& imu,
const GPSData& gps,
const LidarData& lidar,
FusedData& output
) override {
output = fuse_sensors(imu, gps, lidar);
}
};
When to Use CommRaT
CommRaT is ideal for:
- Real-time control systems (robotics, autonomous vehicles)
- Sensor fusion and data processing pipelines
- Embedded systems with hard timing constraints
- Applications requiring deterministic message delivery
- Systems where type safety prevents catastrophic errors
CommRaT may NOT be ideal for:
- Web services or microservices (use gRPC, REST)
- Data analytics pipelines (use Kafka, RabbitMQ)
- Systems without real-time requirements (use ROS 2, ZeroMQ)
- Applications requiring dynamic discovery (CommRaT uses static configuration)
Comparison with Other Frameworks
| Feature | CommRaT | ROS 2 | DDS | ZeroMQ |
| Real-time guarantees | Yes (compile-time) | Partial | Yes | No |
| Zero dynamic allocation | Yes | No | No | No |
| Compile-time validation | Yes | No | No | No |
| Type safety | Strong (C++20 concepts) | Moderate | Weak | None |
| Learning curve | Moderate | Steep | Steep | Easy |
| Deployment complexity | Low (static config) | High (discovery) | High | Moderate |
| Performance | High (zero-copy) | Moderate | High | Moderate |
Architecture Overview
┌─────────────────────────────────────────────────────────────┐
│ Application │
│ using MyApp = CommRaT<MessageA, MessageB, CommandC>; │
└─────────────────────────────────────────────────────────────┘
│
┌─────────────────────┼─────────────────────┐
│ │ │
┌────▼─────┐ ┌────▼─────┐ ┌────▼─────┐
│ Producer │ │ Consumer │ │ Filter │
│ Module │ │ Module │ │ Module │
└──────────┘ └──────────┘ └──────────┘
│ ▲ ▲
│ Publish │ Subscribe │
│ │ │
┌────▼─────────────────────┴─────────────────────┴─────┐
│ TiMS (Message Passing Layer) │
│ - 3 mailboxes per module (CMD/WORK/DATA) │
│ - Blocking receives (0% CPU idle) │
│ - Zero-copy when possible │
└──────────────────────────────────────────────────────┘
What You'll Learn
This guide will take you from zero to productive with CommRaT:
- Sections 1-3: Understand core concepts and create your first working system
- Sections 4-6: Master different module types and message flow patterns
- Sections 7-9: Learn advanced features (multi-input, timestamps, commands)
- Sections 10-12: Deploy, optimize, and troubleshoot real systems
By the end, you'll be able to design and implement real-time messaging systems with confidence.
2. Core Concepts
Understanding CommRaT requires familiarity with five fundamental concepts: Messages, Modules, Mailboxes, Registries, and the Subscription Protocol.
2.1 Messages
Messages are the data structures you send between modules. Every message is a plain C++ struct (POD type) with no virtual functions, pointers, or dynamic allocation.
Basic Message Structure:
struct TemperatureData {
uint64_t timestamp;
uint32_t sensor_id;
float temperature_c;
float confidence;
};
Message Wrapper: CommRaT automatically wraps your data in a TimsMessage<T> that includes metadata:
template<typename T>
struct TimsMessage {
TimsHeader header;
T payload;
};
Key Properties:
- Serializable: Must be compatible with SeRTial serialization (POD types work automatically)
- Fixed-size or bounded: No
std::string, std::vector (use sertial::fixed_vector<T, N> instead)
- Real-time safe: No heap allocations when copying or serializing
2.2 Application Definition
The CommRaT<...> template defines your entire application's message types:
using MyApp = CommRaT<
Message::Data<TemperatureData>,
Message::Data<FilteredData>,
Message::Command<ResetCmd>
>;
What This Does:
- Validates uniqueness: Compile error if duplicate message types
- Assigns message IDs: Each type gets unique 32-bit ID
- Generates serialization: Automatic serialization/deserialization
- Creates type registry: Compile-time lookup of types by ID
Message ID Format:
[prefix:8][subprefix:8][local_id:16]
└─ 0x01 = UserDefined
└─ 0x00 = Data
└─ Auto-assigned unique ID
2.3 Modules
A Module is a processing unit that receives messages, processes them, and optionally produces output. Every module runs in its own thread(s) and has its own mailboxes.
Module Signature:
class MyModule : public MyApp::Module<
OutputSpec,
InputSpec,
...Commands
> {
protected:
};
Module Anatomy:
class TemperatureSensor : public MyApp::Module<
Output<TemperatureData>,
PeriodicInput
> {
protected:
void process(TemperatureData& output) override {
float temp = read_sensor();
output = TemperatureData{
.timestamp = Time::now(),
.sensor_id = sensor_id_,
.temperature_c = temp,
.confidence = 0.95
};
}
private:
uint32_t sensor_id_;
};
Module Lifecycle:
- Construction: Module initialized, mailboxes created
- start(): Spawns threads (command_loop, work_loop, data_thread)
- Running: Processes messages, publishes outputs
- stop(): Signals threads to exit
- Destruction: Threads joined, mailboxes cleaned up
2.4 Three-Mailbox Architecture
Each module has three separate mailboxes to prevent interference:
Module Address Space:
┌─────────────────────────────────────────┐
│ Base Address = [type_id:16][sys:8][inst:8] │
├─────────────────────────────────────────┤
│ CMD Mailbox (base + 0) │ ← User commands
│ - Commands from other modules │
│ - Control messages │
├─────────────────────────────────────────┤
│ WORK Mailbox (base + 16) │ ← Subscription protocol
│ - SubscribeRequest from consumers │
│ - SubscribeReply confirmations │
│ - UnsubscribeRequest on shutdown │
├─────────────────────────────────────────┤
│ DATA Mailbox (base + 32) │ ← High-frequency data
│ - ContinuousInput messages │
│ - Published data from producers │
└─────────────────────────────────────────┘
Why Three Mailboxes?
- Separation of concerns: Commands don't interfere with data streams
- Priority handling: Can prioritize WORK over DATA
- Blocking receives: Each mailbox can block independently (0% CPU when idle)
2.5 Message Registry
The MessageRegistry<...> (internal to CommRaT<...>) is a compile-time map from message types to IDs:
constexpr uint32_t msg_id = MyApp::get_message_id<TemperatureData>();
auto serialized = MyApp::serialize(temp_data);
auto result = MyApp::deserialize<TemperatureData>(buffer);
Registry Features:
- Compile-time validation: All types checked at compile time
- Unique ID assignment: Guaranteed no collisions
- Type safety: Can't deserialize wrong type
- Zero overhead: No runtime type lookups
2.6 Subscription Protocol
CommRaT uses an explicit subscription protocol (not automatic discovery like ROS):
Step 1: Consumer Sends SubscribeRequest
SubscribeRequest req{
.subscriber_base_address = my_base_address,
.message_type_id = MyApp::get_message_id<TemperatureData>()
};
work_mailbox_.send(req, producer_work_mailbox);
Step 2: Producer Adds Subscriber
void handle_subscribe_request(const SubscribeRequest& req) {
subscribers_.push_back(req.subscriber_base_address + 48);
SubscribeReply reply{ .success = true };
work_mailbox_.send(reply, req.subscriber_base_address + 16);
}
Step 3: Producer Publishes to Subscribers
void publish(const TemperatureData& data) {
for (uint32_t subscriber_data_mbx : subscribers_) {
publish_mailbox_.send(data, subscriber_data_mbx);
}
}
Step 4: Consumer Receives on DATA Mailbox (base+48)
void continuous_loop() {
while (running_) {
data_mailbox_.receive_any([this](auto&& msg) {
process(msg.message);
});
}
}
2.7 Compile-Time Guarantees
CommRaT validates many properties at compile time, catching errors before runtime:
MyApp::serialize(UnregisteredType{});
class BadModule : public MyApp::Module<Output<WrongType>, PeriodicInput> {
void process(TemperatureData& output) override { ... }
};
using BadApp = CommRaT<
Message::Data<TemperatureData>,
Message::Data<TemperatureData>
>;
2.8 Real-Time Safety
CommRaT enforces real-time safety through design constraints:
FORBIDDEN in process() functions:
void process(TemperatureData& output) override {
std::vector<float> readings;
readings.push_back(temp);
std::cout << "Temperature: " << temp << "\n";
while (sensor_ready()) { }
std::lock_guard<std::mutex> lock(mtx_);
}
ALLOWED in process() functions:
void process(TemperatureData& output) override {
std::array<float, 10> readings;
sertial::fixed_vector<float, 100> buffer;
constexpr int scale = 100;
uint64_t count = counter_.fetch_add(1, std::memory_order_relaxed);
float filtered = alpha_ * temp + (1 - alpha_) * prev_temp_;
output = TemperatureData{ };
}
Key Takeaways
- Messages are plain POD structs, wrapped in
TimsMessage<T> with metadata
- CommRaT<...> defines your application and validates all message types at compile time
- Modules are processing units with 3 mailboxes (CMD/WORK/DATA)
- Subscription is explicit: Consumer requests, producer acknowledges, then publishes
- Compile-time validation catches type errors before runtime
- Real-time safety requires no dynamic allocation, no blocking I/O in process()
3. Your First Module
Let's build a complete working system: a temperature sensor that publishes data, and a monitor that receives and displays it. This tutorial will take about 10 minutes and requires only basic C++ knowledge.
3.1 Project Setup
Step 1: Create project directory
mkdir my_first_commrat
cd my_first_commrat
Step 2: Create CMakeLists.txt
cmake_minimum_required(VERSION 3.20)
project(MyFirstCommRaT CXX)
set(CMAKE_CXX_STANDARD 20)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
# Find CommRaT (adjust path to your installation)
find_package(CommRaT REQUIRED)
add_executable(temperature_system
temperature_system.cpp
)
target_link_libraries(temperature_system
CommRaT::commrat
tims
pthread
)
3.2 Define Your Message
Create temperature_system.cpp:
#include <iostream>
#include <csignal>
#include <atomic>
std::atomic<bool> shutdown_requested{false};
void signal_handler(int signal) {
if (signal == SIGINT || signal == SIGTERM) {
shutdown_requested.store(true);
}
}
struct TemperatureReading {
uint64_t timestamp;
uint32_t sensor_id;
float temperature_c;
float humidity_percent;
};
>;
CommRaT Application Template - Main User-Facing Interface.
Main CommRaT header - include this to get everything you need.
Message definition with compile-time ID assignment.
What just happened?
- You defined a plain C++ struct with your sensor data
- You created a CommRaT application that knows about your message type
- The compiler validated your types and assigned a unique ID to TemperatureReading
- All serialization code was generated automatically
3.3 Create a Producer Module
A producer generates data periodically:
class TemperatureSensor : public TempApp::Module<
commrat::Output<TemperatureReading>,
commrat::PeriodicInput
> {
public:
: Module(config)
, sensor_id_(sensor_id)
{
std::cout << "[Sensor] Initialized sensor_id=" << sensor_id_ << "\n";
}
protected:
void process(TemperatureReading& output) override {
float temp = 20.0f + (rand() % 100) / 10.0f;
float humidity = 40.0f + (rand() % 200) / 10.0f;
output = TemperatureReading{
.sensor_id = sensor_id_,
.temperature_c = temp,
.humidity_percent = humidity
};
std::cout << "[Sensor] Generated: " << temp << "°C, "
<< humidity << "% humidity\n";
}
private:
uint32_t sensor_id_;
};
static Timestamp now() noexcept
Get current timestamp in nanoseconds.
Key points:
- Inherits from
TempApp::Module<Output<...>, PeriodicInput>
- Overrides
process(OutputData& output) which is called automatically every config_.period
- Writes to output parameter - no explicit publish call needed
- Must use
override keyword (process is virtual)
3.4 Create a Consumer Module
A consumer receives and processes data:
class TemperatureMonitor : public TempApp::Module<
commrat::Output<TemperatureReading>,
commrat::Input<TemperatureReading>
> {
public:
: Module(config)
, count_(0)
{
std::cout << "[Monitor] Initialized\n";
}
protected:
void process(const TemperatureReading& input, TemperatureReading& output) override {
count_++;
std::cout << "[Monitor] #" << count_
<< " Sensor " << reading.sensor_id
<< ": " << reading.temperature_c << "°C, "
<< reading.humidity_percent << "% humidity\n";
if (reading.temperature_c > 28.0f) {
std::cout << " WARNING: High temperature!\n";
}
return reading;
}
private:
uint32_t count_;
};
Key points:
- Inherits from
TempApp::Module<Output<TemperatureReading>, Input<TemperatureReading>>
- Overrides
process_continuous(const TemperatureReading&)
- Called automatically for each received message
- Blocks efficiently (0% CPU when no messages)
3.5 Wire It Together
Now create the main function to configure and run both modules:
int main() {
std::signal(SIGINT, signal_handler);
std::signal(SIGTERM, signal_handler);
std::cout << "=== CommRaT Temperature System ===\n\n";
.system_id = 10,
.instance_id = 1,
};
.system_id = 20,
.instance_id = 1,
.source_system_id = 10,
.source_instance_id = 1
};
TemperatureSensor sensor(sensor_config, 1);
TemperatureMonitor monitor(monitor_config);
sensor.start();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
monitor.start();
std::cout << "\nRunning... (Press Ctrl+C to stop)\n\n";
int seconds = 0;
while (!shutdown_requested.load() && seconds < 5) {
std::this_thread::sleep_for(std::chrono::seconds(1));
seconds++;
}
std::cout << "\nShutting down...\n";
monitor.stop();
sensor.stop();
std::cout << "Done!\n";
return 0;
}
std::chrono::milliseconds Milliseconds
3.6 Build and Run
Build:
mkdir build && cd build
cmake ..
make
Run:
Expected Output:
=== CommRaT Temperature System ===
[Sensor] Initialized sensor_id=1
[Monitor] Initialized
Running for 5 seconds...
[Sensor] Generated: 23.4°C, 52.1% humidity
[Monitor] #1 Sensor 1: 23.4°C, 52.1% humidity
[Sensor] Generated: 27.8°C, 48.3% humidity
[Monitor] #2 Sensor 1: 27.8°C, 48.3% humidity
[Sensor] Generated: 29.1°C, 55.7% humidity
[Monitor] #3 Sensor 1: 29.1°C, 55.7% humidity
WARNING: High temperature!
[Sensor] Generated: 21.5°C, 43.9% humidity
[Monitor] #4 Sensor 1: 21.5°C, 43.9% humidity
...
Shutting down...
Done!
3.7 What Happened Under the Hood
Let's trace the message flow:
1. Initialization (0-10ms):
main()
→ TemperatureSensor constructor
→ Creates 3 mailboxes (CMD, WORK, DATA)
→ Base address: [TemperatureReading_ID:16][10:8][1:8]
→ TemperatureMonitor constructor
→ Creates 3 mailboxes (CMD, WORK, DATA)
2. Module Start (10-20ms):
sensor.start()
→ Spawns command_loop() thread (handles commands)
→ Spawns work_loop() thread (handles subscriptions)
→ Spawns data_thread_ (calls process() every 100ms)
monitor.start()
→ Spawns command_loop() thread
→ Spawns work_loop() thread
→ Spawns data_thread_ (calls continuous_loop)
→ Sends SubscribeRequest to sensor's WORK mailbox
3. Subscription (20-30ms):
Sensor's work_loop receives SubscribeRequest
→ Adds monitor's DATA mailbox (base+48) to subscribers_ list
→ Sends SubscribeReply to monitor's WORK mailbox (base+16)
Monitor's work_loop receives SubscribeReply
→ Subscription confirmed
→ Begins blocking on DATA mailbox (base+48)
4. Data Flow (every 100ms):
Sensor's data_thread (timer fires every 100ms)
→ Calls process()
→ User code generates TemperatureReading
→ Module automatically publishes to all subscribers
→ Serializes TemperatureReading
→ Sends from PUBLISH mailbox (base+32) to monitor's DATA mailbox (base+48)
Monitor's continuous_loop (blocked on receive)
→ Receives message on DATA mailbox (base+48)
→ Deserializes TemperatureReading
→ Calls process_continuous(reading)
→ User code displays data
→ Blocks again waiting for next message
5. Shutdown (5000ms):
main() calls sensor.stop()
→ Sets running_ = false
→ Threads detect flag and exit
→ Joins all threads
main() calls monitor.stop()
→ Sends UnsubscribeRequest to sensor
→ Exits threads
3.8 Common First-Time Mistakes
Mistake 1: Forgetting override keyword
void process(TemperatureReading& output) {
output = reading;
}
Fix: Always use override for process methods (they're virtual).
Mistake 2: Wrong output type
class BadSensor : public TempApp::Module<Output<TemperatureReading>, PeriodicInput> {
void process(OtherData& output) override {
output = OtherData{};
}
};
Fix: Return type must match Output<T> specification.
Mistake 3: Mismatched system IDs
ModuleConfig monitor_config{
.system_id = 20,
.source_system_id = 99,
.source_instance_id = 1
};
Fix: source_system_id must match producer's system_id.
Mistake 4: Blocking in process()
void process(TemperatureReading& output) override {
std::this_thread::sleep_for(std::chrono::seconds(1));
output = reading;
}
Fix: Never block in process() - use CommRaT's timing primitives or configure period.
3.9 Extending Your System
Add another sensor:
ModuleConfig sensor2_config{
.name = "TempSensor2",
.system_id = 11,
.instance_id = 1,
.period = Milliseconds(100)
};
TemperatureSensor sensor2(sensor2_config, 2);
sensor2.start();
Monitor both sensors:
ModuleConfig monitor2_config{
.name = "TempMonitor2",
.system_id = 21,
.source_system_id = 11,
.source_instance_id = 1
};
TemperatureMonitor monitor2(monitor2_config);
monitor2.start();
Add a filter module:
class TemperatureFilter : public TempApp::Module<
Output<TemperatureReading>,
Input<TemperatureReading>
> {
void process(const TemperatureReading& raw, TemperatureReading& output) override {
filtered_temp_ = 0.7f * filtered_temp_ + 0.3f * raw.temperature_c;
output = TemperatureReading{
.timestamp = Time::now(),
.sensor_id = raw.sensor_id,
.temperature_c = filtered_temp_,
.humidity_percent = raw.humidity_percent
};
}
private:
float filtered_temp_ = 20.0f;
};
Key Takeaways
- Messages are plain POD structs - define your data structure naturally
- CommRaT<...> registers all types and generates serialization automatically
- Producer modules use
PeriodicInput and override process(OutputData& output)
- Consumer modules use
Input<T> and override process(const InputData& input, OutputData& output)
- Configuration uses system_id/instance_id for addressing and source IDs for subscription
- Subscription happens automatically in
start() based on configuration
- Shutdown is clean - just call
stop() on all modules
Next: Section 4 explores all module types (Periodic, Continuous, Loop) and processing modes.
4. Module Types and Processing Modes
CommRaT modules come in different flavors based on how they process data. Understanding these patterns helps you choose the right architecture for your application.
4.1 The Three Processing Modes
Every module has an InputSpec that determines its processing behavior:
| InputSpec | When process() Called | Use Case | Example |
| PeriodicInput | Timer fires (every period) | Data generation, periodic sampling | Sensor reading, heartbeat |
| Input<T> | Message received | Data transformation, filtering | Signal processing, fusion |
| LoopInput | As fast as possible | Maximum throughput | High-speed data forwarding |
4.2 PeriodicInput: Timer-Driven Processing
When to use: Generate data at fixed intervals, periodic tasks.
Signature:
class MyModule : public MyApp::Module<Output<OutputType>, PeriodicInput> {
protected:
void process(OutputType& output) override {
output = OutputType{ };
}
};
Configuration:
ModuleConfig config{
.name = "PeriodicModule",
.system_id = 10,
.instance_id = 1,
.period = Milliseconds(100)
};
Example: Heartbeat Generator
struct HeartbeatMsg {
uint64_t timestamp;
uint32_t sequence_number;
uint32_t process_id;
};
class Heartbeat : public MyApp::Module<Output<HeartbeatMsg>, PeriodicInput> {
public:
Heartbeat(const ModuleConfig& config)
: Module(config), seq_(0) {}
protected:
void process(HeartbeatMsg& output) override {
output = HeartbeatMsg{
.timestamp = Time::now(),
.sequence_number = seq_++,
.process_id = getpid()
};
}
private:
uint32_t seq_;
};
ModuleConfig config{
};
Heartbeat hb(config);
hb.start();
std::chrono::seconds Seconds
Characteristics:
- Deterministic timing: Process runs at exact intervals (handled by CommRaT's scheduler)
- No blocking: If process() takes longer than period, warning logged
- CPU efficient: Thread sleeps between periods (0% CPU when idle)
- Timestamp assignment:
TimsHeader.timestamp = Time::now() at generation moment
4.3 Input<T>: Event-Driven Processing
When to use: React to incoming messages, transform data.
Signature:
class MyModule : public MyApp::Module<OutputSpec, Input<InputType>> {
protected:
OutputType process_continuous(const InputType& input) override {
return OutputType{ };
}
};
Configuration:
ModuleConfig config{
.name = "ContinuousModule",
.system_id = 20,
.instance_id = 1,
.source_system_id = 10,
.source_instance_id = 1,
.period = Duration(0)
};
Example: Temperature Filter
struct RawTemperature {
uint64_t timestamp;
float value_c;
};
struct FilteredTemperature {
uint64_t timestamp;
float value_c;
float confidence;
};
class TemperatureFilter : public MyApp::Module<
Output<FilteredTemperature>,
Input<RawTemperature>
> {
public:
TemperatureFilter(const ModuleConfig& config)
: Module(config), alpha_(0.3f), prev_temp_(20.0f) {}
protected:
void process(const RawTemperature& raw, FilteredTemperature& output) override {
float filtered = alpha_ * raw.value_c + (1 - alpha_) * prev_temp_;
prev_temp_ = filtered;
float confidence = 1.0f - std::min(std::abs(filtered - raw.value_c) / 10.0f, 1.0f);
output = FilteredTemperature{
.timestamp = Time::now(),
.value_c = filtered,
.confidence = confidence
};
}
private:
float alpha_;
float prev_temp_;
};
Characteristics:
- Event-driven: Executes only when data arrives (0% CPU when no messages)
- Blocking receive: Thread blocks efficiently until message available
- Guaranteed delivery: Every published message triggers process
- Timestamp propagation: Output timestamp = input.header.timestamp (exact propagation)
- Order preserved: Messages processed in arrival order
4.4 LoopInput: Maximum Throughput Processing
When to use: Process data as fast as possible, no timing constraints.
Signature:
class MyModule : public MyApp::Module<Output<OutputType>, LoopInput> {
protected:
void process(OutputType& output) override {
output = OutputType{ };
}
};
Configuration:
ModuleConfig config{
.name = "LoopModule",
.system_id = 30,
.instance_id = 1,
.period = Duration(0)
};
Example: Data Generator
struct DataPacket {
uint64_t timestamp;
uint64_t packet_id;
std::array<float, 100> samples;
};
class HighSpeedGenerator : public MyApp::Module<Output<DataPacket>, LoopInput> {
public:
HighSpeedGenerator(const ModuleConfig& config)
: Module(config), packet_id_(0) {}
protected:
void process(DataPacket& output) override {
DataPacket packet{
.timestamp = Time::now(),
.packet_id = packet_id_++
};
for (size_t i = 0; i < 100; ++i) {
packet.samples[i] = std::sin(2 * M_PI * i / 100.0);
}
output = packet;
}
private:
uint64_t packet_id_;
};
Characteristics:
- Maximum throughput: No artificial delays
- High CPU usage: Runs continuously (100% CPU on one core)
- Non-deterministic timing: Rate depends on processing speed
- Use with caution: Can starve other processes
WARNING: LoopInput should be used sparingly. Most applications should use PeriodicInput or Input<T> for predictable behavior and efficient CPU usage.
4.5 Output Specifications
Modules can produce zero, one, or multiple outputs:
No Output (Monitor/Sink)
class DataLogger : public MyApp::Module<Output<void>, Input<SensorData>> {
protected:
void process(const SensorData& data) override {
log_to_file(data);
}
};
Single Output (Most Common)
class Filter : public MyApp::Module<Output<FilteredData>, Input<RawData>> {
protected:
void process(const RawData& raw, FilteredData& output) override {
return apply_filter(raw);
}
};
Multiple Outputs (Phase 5.3+)
class SignalSplitter : public MyApp::Module<
Outputs<DataA, DataB>,
Input<CombinedData>
> {
protected:
void process(const CombinedData& input, DataA& out1, DataB& out2) override {
out1 = extract_channel_a(input);
out2 = extract_channel_b(input);
}
};
4.6 InputSpec vs ProcessingMode: Understanding the Difference
Common confusion: "What's the difference between InputSpec and ProcessingMode?"
InputSpec (compile-time): Specifies what data comes in
PeriodicInput - No input data (self-generating)
Input<T> - One input type
Inputs<T, U, V> - Multiple input types
ProcessingMode (runtime): Specifies when to process
Periodic - Timer-driven (for PeriodicInput)
Continuous - Event-driven (for Input<T>)
Loop - As-fast-as-possible (for LoopInput)
The InputSpec determines the ProcessingMode automatically:
PeriodicInput → ProcessingMode::Periodic
Input<T> → ProcessingMode::Continuous
LoopInput → ProcessingMode::Loop
You don't specify ProcessingMode directly - it's inferred from InputSpec.
4.7 Combining Inputs and Outputs
Valid combinations:
class Generator : public MyApp::Module<Output<Data>, PeriodicInput>;
class Transformer : public MyApp::Module<Output<DataB>, Input<DataA>>;
class Sink : public MyApp::Module<Output<void>, Input<Data>>;
class Splitter : public MyApp::Module<Outputs<DataA, DataB>, Input<Combined>>;
class Fusion : public MyApp::Module<Output<Fused>, Inputs<DataA, DataB, DataC>>;
4.8 Choosing the Right Module Type
Decision tree:
Does your module generate data from scratch?
├─ YES → Use PeriodicInput
│ └─ Set .period to desired rate
│
└─ NO → Does it receive messages?
├─ YES → Need maximum throughput?
│ ├─ NO → Use Input<T>
│ │ └─ Subscribe with source_system_id
│ │
│ └─ YES → Use LoopInput (rarely needed)
│ └─ Monitor CPU usage carefully
│
└─ NO → Invalid (module must do something)
Examples by use case:
| Use Case | Module Type | Rationale |
| Read sensor every 10ms | PeriodicInput | Fixed sampling rate |
| Filter incoming data | Input<T> | React to each message |
| Log messages to disk | Input<T> | Process each message |
| Generate test patterns | PeriodicInput | Controlled generation rate |
| Fuse IMU + GPS | Inputs<IMU, GPS> | Multiple synchronized inputs |
| Stress test system | LoopInput | Maximum load generation |
4.9 Performance Characteristics
PeriodicInput:
- Latency: Fixed (= period)
- Throughput: 1 / period messages/sec
- CPU Usage: Low (sleeps between periods)
- Jitter: Low (timer-driven)
Input<T>:****
- **Latency: Minimal (processes immediately on arrival)
- Throughput: Depends on publisher rate
- CPU Usage: 0% when idle, scales with message rate
- Jitter: Minimal (event-driven)
LoopInput:
- Latency: Minimal (no delays)
- Throughput: Maximum (limited by CPU)
- CPU Usage: 100% (continuous loop)
- Jitter: High (depends on system load)
4.10 Real-World Example: Sensor Fusion Pipeline
Let's combine multiple module types in a realistic system:
class IMUSensor : public MyApp::Module<Output<IMUData>, PeriodicInput> {
protected:
void process(IMUData& output) override {
output = read_imu_hardware();
}
};
class IMUFilter : public MyApp::Module<Output<FilteredIMU>, Input<IMUData>> {
protected:
void process(const IMUData& raw, FilteredIMU& output) override {
output = kalman_filter_.update(raw);
}
private:
KalmanFilter kalman_filter_;
};
class PoseFusion : public MyApp::Module<
Output<PoseEstimate>,
Inputs<FilteredIMU, GPSData>,
PrimaryInput<FilteredIMU>
> {
protected:
void process(
const FilteredIMU& imu,
const GPSData& gps,
PoseEstimate& output
) override {
output = fuse_sensors(imu, gps);
}
};
class PoseLogger : public MyApp::Module<Output<void>, Input<PoseEstimate>> {
protected:
void process(const PoseEstimate& pose) override {
write_to_log(pose);
}
};
Pipeline:
IMUSensor (10ms) → IMUFilter → PoseFusion ← GPSData
↓
PoseLogger
Key Takeaways
- PeriodicInput: Timer-driven, for data generation at fixed rates
- Input<T>: Event-driven, for message processing and transformation
- LoopInput: Maximum throughput, use sparingly (high CPU usage)
- InputSpec determines ProcessingMode automatically - you don't specify both
- Combine module types to build complex pipelines
- Choose based on requirements: Timing constraints → Periodic, React to events → Continuous
Next: Section 5 dives deep into I/O specifications for advanced patterns.
5. I/O Specifications
CommRaT modules are defined by their input and output specifications. These determine what data a module produces, consumes, and how it interacts with other modules.
5.1 Output Specifications
- Output<T>: Module produces a single message type T.
- Outputs<T, U, ...>: Module produces multiple message types (multi-output).
- Output<void>: Module produces no output (sink/monitor).
Examples:
class Producer : public MyApp::Module<Output<DataA>, PeriodicInput> { ... };
class Splitter : public MyApp::Module<Outputs<DataA, DataB>, Input<CombinedData>> { ... };
class Logger : public MyApp::Module<Output<void>, Input<DataA>> { ... };
5.2 Input Specifications
- PeriodicInput: No input, generates data periodically.
- Input<T>: Receives a single message type T (continuous input).
- Inputs<T, U, ...>: Receives multiple message types (multi-input fusion).
- LoopInput: No input, runs as fast as possible.
Examples:
class Sensor : public MyApp::Module<Output<SensorData>, PeriodicInput> { ... };
class Filter : public MyApp::Module<Output<FilteredData>, Input<SensorData>> { ... };
class Fusion : public MyApp::Module<Output<FusedData>, Inputs<IMUData, GPSData>> { ... };
class StressTest : public MyApp::Module<Output<Data>, LoopInput> { ... };
5.3 Combining Inputs and Outputs
You can combine input and output specs for advanced patterns:
- Generator: Output<T>, PeriodicInput
- Transformer: Output<T>, Input
- Sink: Output<void>, Input<T>
- Multi-output: Outputs<T, U>, Input<V>
- Multi-input fusion: Output<T>, Inputs<U, V, W>
Example: Multi-output producer
class MultiProducer : public MyApp::Module<Outputs<DataA, DataB>, PeriodicInput> {
protected:
void process(DataA& outA, DataB& outB) override {
outA = generate_a();
outB = generate_b();
}
};
Example: Multi-input fusion
class Fusion : public MyApp::Module<Output<FusedData>, Inputs<IMUData, GPSData>, PrimaryInput<IMUData>> {
protected:
void process(const IMUData& imu, const GPSData& gps, FusedData& output) override {
return fuse(imu, gps);
}
};
5.4 Input Metadata and Accessors
For each input, CommRaT provides metadata accessors:
get_input_metadata<Index>() - Get metadata for input at index
get_input_metadata<Type>() - Get metadata for input of unique type
get_input_timestamp<Index>() - Get timestamp for input
has_new_data<Index>() - Check freshness
is_input_valid<Index>() - Check validity
Example:
uint64_t imu_ts = get_input_timestamp<IMUData>();
bool gps_fresh = has_new_data<1>();
5.5 Real-Time Safety in I/O Specs
- All input/output specs are validated at compile time
- No dynamic allocation in hot paths
- Type mismatches are compile errors
- Multi-output and multi-input patterns are deterministic
Key Takeaways
- Output/Input specs define module data flow
- Multi-output and multi-input enable advanced pipelines
- Metadata accessors provide timestamps, freshness, validity
- Compile-time validation ensures correctness and real-time safety
6. Message Flow and Subscription
CommRaT uses a 3-mailbox architecture with an automatic subscription protocol to enable efficient, deterministic message delivery between modules. This section explains how messages flow through the system and how modules discover and connect to each other.
6.1 The Three-Mailbox Architecture
Each module has three separate mailboxes, each serving a distinct purpose:
CMD mailbox: base_address + 0
WORK mailbox: base_address + 16
DATA mailbox: base_address + 32
Why three mailboxes?
- Separation of concerns: Commands, control, and data don't interfere
- Real-time predictability: High-priority data unaffected by control messages
- Blocking efficiency: Each mailbox can block independently (0% CPU when idle)
Mailbox roles:
| Mailbox | Purpose | Messages | Threading |
| CMD | User commands | Custom command types | command_loop() thread |
| WORK | Subscription control | SubscribeRequest, SubscribeReply, UnsubscribeRequest | work_loop() thread |
| DATA | Data streaming | User data messages (from subscriptions) | Processed in data_thread_ |
6.2 Hierarchical Addressing
CommRaT uses hierarchical addressing to uniquely identify modules and their mailboxes:
Module identity:
.system_id = 10,
.instance_id = 1
};
Address calculation:
uint16_t type_id_low = get_message_id<TemperatureData>() & 0xFFFF;
uint32_t base = (type_id_low << 16) | (system_id << 8) | instance_id;
uint32_t cmd_mailbox = base + 0;
uint32_t work_mailbox = base + 16;
uint32_t data_mailbox = base + 32;
Key insight: The base address encodes the primary output type, enabling type-specific message delivery for multi-output producers.
6.3 The Subscription Protocol
When a consumer module wants to receive data from a producer, it automatically initiates a 4-step subscription handshake:
Consumer Producer
| |
| 1. SubscribeRequest |
| (to producer's WORK mailbox) |
|--------------------------------->|
| | 2. Add subscriber
| | to list
| 3. SubscribeReply |
| (to consumer's WORK mailbox) |
|<---------------------------------|
| |
| 4. Data messages |
| (to consumer's DATA mailbox) |
|<---------------------------------|
|<---------------------------------|
|<---------------------------------|
Step 1: SubscribeRequest
Consumer sends subscription request to producer's WORK mailbox:
SubscribeRequestPayload request{
.subscriber_mailbox_id = consumer_data_mailbox,
.requested_period_ms = 0
};
Step 2: Producer Adds Subscriber
Producer's work_loop() receives request and adds consumer to subscriber list:
void handle_subscribe_request(const SubscribeRequestPayload& req) {
add_subscriber(req.subscriber_mailbox_id);
SubscribeReplyPayload reply{
.actual_period_ms = config_.period.count() / 1'000'000,
.success = true,
.error_code = 0
};
work_mailbox_.send(reply, );
}
Step 3: SubscribeReply
Consumer receives confirmation on its WORK mailbox:
void handle_subscribe_reply(const SubscribeReplyPayload& reply) {
if (reply.success) {
std::cout << "Subscription confirmed! Period: "
<< reply.actual_period_ms << " ms\n";
} else {
std::cerr << "Subscription failed: error " << reply.error_code << "\n";
}
}
Step 4: Data Delivery
Producer publishes messages to all subscribers' DATA mailboxes:
for (uint32_t subscriber : subscribers_) {
cmd_mailbox_.send(output_message, subscriber);
}
6.4 Automatic Subscription Setup
The subscription happens automatically when you call module.start() on a consumer module with Input<T> or Inputs<T, U, V>:
.system_id = 20,
.instance_id = 1,
.source_system_id = 10,
.source_instance_id = 1
};
MyConsumer consumer(consumer_config);
consumer.start();
What happens in start():
- Module spawns three threads:
command_loop(), work_loop(), data_thread_
data_thread_ calculates producer's WORK mailbox address
- Sends
SubscribeRequest to producer
- Waits for
SubscribeReply on WORK mailbox
- Begins processing incoming data on DATA mailbox
6.5 Message Delivery Mechanisms
Periodic Producer:
class Sensor : public MyApp::Module<Output<SensorData>, PeriodicInput> {
protected:
void process(SensorData& output) override {
output = read_sensor();
}
};
periodic_loop() calls process() at fixed intervals
- Result wrapped in
TimsMessage<SensorData> with timestamp = Time::now()
- Sent to all subscribers' DATA mailboxes
Continuous Consumer:
class Filter : public MyApp::Module<Output<FilteredData>, Input<SensorData>> {
protected:
void process(const SensorData& input, FilteredData& output) override {
output = apply_filter(input);
}
};
continuous_loop() blocks on DATA mailbox receive()
- Receives
TimsMessage<SensorData> from producer
- Unwraps payload, calls
process(payload, output)
- Result wrapped with
timestamp = input.header.timestamp (exact propagation)
- Sent to filter's subscribers
Result: Zero polling, zero CPU usage when idle, deterministic message delivery.
6.6 Unsubscription and Cleanup
On shutdown, consumers automatically unsubscribe:
Unsubscribe steps:
- Consumer sends
UnsubscribeRequest to producer's WORK mailbox
- Producer removes consumer from subscriber list
- Producer sends
UnsubscribeReply to consumer's WORK mailbox
- Consumer stops threads and closes mailboxes
Clean shutdown pattern:
std::signal(SIGINT, signal_handler);
producer.start();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
consumer.start();
while (!shutdown_requested.load()) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
consumer.stop();
producer.stop();
6.7 Multi-Output Type-Specific Delivery
For multi-output producers, subscribers receive only their subscribed type:
class MultiProducer : public MyApp::Module<Outputs<DataA, DataB>, PeriodicInput> {
protected:
void process(DataA& outA, DataB& outB) override {
outA = generate_a();
outB = generate_b();
}
};
Type filtering:
- Consumer's base address encodes expected message type (lower 16 bits)
- Producer extracts expected type from subscriber address
- Only sends outputs that match subscriber's expected type
Example:
.instance_id = 1,
.source_system_id = 10,
.source_instance_id = 1
};
.instance_id = 1,
.source_system_id = 10,
.source_instance_id = 1,
.source_primary_output_type_id = MyApp::get_message_id<DataA>()
};
uint8_t system_id() const
Get system_id - NoOutput or SimpleOutput only.
Why source_primary_output_type_id?
- Multi-output producer's base address uses first output type (DataA)
- Consumer B wants DataB but must find producer at DataA's address
source_primary_output_type_id tells consumer which address to use
6.8 Error Handling
Subscription failures:
SubscribeReplyPayload reply{
.actual_period_ms = 0,
.success = false,
.error_code = 1
};
Common errors:
- Max subscribers reached: Producer has limited subscriber capacity
- Producer not running: Consumer sends request before producer starts
- Type mismatch: Consumer expects wrong message type (caught at compile time)
- Address collision: Two modules with same system_id/instance_id
Best practices:
- Start producers before consumers (with delay)
- Check
SubscribeReply.success in critical systems
- Use unique system_id/instance_id combinations
- Monitor mailbox errors via error callbacks
6.9 Message Flow Summary
Producer → Consumer Data Flow:
1. Producer generates data (periodic or continuous)
2. Wraps payload in TimsMessage with header (timestamp, seq_number)
3. Serializes using SeRTial (zero-copy when possible)
4. Sends to each subscriber's DATA mailbox (TiMS send)
5. Consumer's DATA mailbox receives (blocking)
6. Deserializes message (zero-copy when possible)
7. Unwraps payload from TimsMessage
8. Calls process(payload, output)
9. Result wrapped and published to consumer's subscribers
Subscription Control Flow:
1. Consumer.start() sends SubscribeRequest to producer's WORK mailbox
2. Producer.work_loop() receives request, adds subscriber
3. Producer sends SubscribeReply to consumer's WORK mailbox
4. Consumer.work_loop() receives reply, confirms subscription
5. On shutdown, consumer sends UnsubscribeRequest
6. Producer removes subscriber, sends UnsubscribeReply
6.10 Key Takeaways
- Three mailboxes (CMD, WORK, DATA) separate concerns and enable real-time predictability
- Hierarchical addressing (system_id, instance_id) uniquely identifies modules
- Automatic subscription happens on
module.start() for Input<T> modules
- Type-specific delivery enables multi-output producers with type filtering
- Blocking receives ensure 0% CPU when idle, deterministic message delivery
- Clean shutdown requires reverse-order stop() and unsubscribe protocol
7. Multi-Input Synchronization
Many real-world applications require sensor fusion - combining data from multiple sources with different update rates to produce a unified output. CommRaT's multi-input pattern provides time-synchronized data fusion with automatic timestamp alignment.
7.1 The Challenge: Asynchronous Sensors
Consider a robotics system with three sensors:
- IMU: 100Hz (10ms period) - fast, drives robot control loop
- GPS: 5Hz (200ms period) - slow, provides absolute position
- Lidar: 20Hz (50ms period) - medium, provides obstacle detection
Problem: How do you fuse these asynchronous streams?
Naive approach (broken):
class BadFusion : public MyApp::Module<Output<FusedData>, Input<IMUData>> {
GPSData latest_gps_;
LidarData latest_lidar_;
void process(const IMUData& imu, FusedData& output) override {
output = fuse(imu, latest_gps_, latest_lidar_);
}
};
CommRaT solution: Multi-input with automatic time synchronization.
7.2 Multi-Input Specification
Use Inputs<T, U, V> to declare multiple input types:
class SensorFusion : public MyApp::Module<
Output<FusedData>,
Inputs<IMUData, GPSData, LidarData>,
PrimaryInput<IMUData>
> {
protected:
void process(
const IMUData& imu,
const GPSData& gps,
const LidarData& lidar,
FusedData& output,
) override {
output = fuse_sensors(imu, gps, lidar);
}
};
Key concepts:
- Primary input (IMUData): Drives execution rate via blocking receive
- Secondary inputs (GPSData, LidarData): Fetched via
getData(timestamp) to match primary
- Automatic synchronization: Module ensures all inputs aligned to primary timestamp
7.3 Primary vs Secondary Inputs
Primary Input:
- Blocking receive: Module blocks until primary message arrives
- Execution driver: Sets the fusion rate (e.g., 100Hz for IMU)
- Timestamp source: All inputs synchronized to primary's timestamp
- Designated by:
PrimaryInput<IMUData> template parameter
Secondary Inputs:
- Non-blocking getData: Fetches best-match message from history buffer
- Time-aligned: Retrieved based on primary input's timestamp
- May be stale: If no recent message within tolerance, returns older data
- Freshness tracked:
has_new_data<Index>() indicates if data is fresh
Why this design?
- Fast sensors drive execution (low latency)
- Slow sensors don't block processing (deterministic timing)
- All data temporally coherent (no race conditions)
7.4 Configuration for Multi-Input Modules
Multi-input modules require input_sources and sync_tolerance:
.system_id = 20,
.instance_id = 1,
.input_sources = {
{10, 1},
{11, 1},
{12, 1}
},
.sync_tolerance = 50'000'000
};
Configuration fields:
| Field | Purpose | Example |
input_sources | System/instance IDs of input producers | {{10,1}, {11,1}, {12,1}} |
sync_tolerance | Max timestamp difference for getData | 50'000'000 (50ms in ns) |
period | Primary input's expected rate | Milliseconds(10) (100Hz) |
Order matters: First input_source is PRIMARY, rest are SECONDARY (unless PrimaryInput specified).
7.5 How getData Synchronization Works
Under the hood:
- HistoricalMailbox: Each secondary input has a circular buffer (default: 100 messages)
- Automatic buffering: Every received message stored with timestamp
- getData(timestamp, tolerance): Finds closest message within tolerance
- Best-match algorithm: Returns message with smallest
|msg.timestamp - requested_timestamp|
Example timeline:
Time (ms): 0 50 100 150 200 250 300
IMU: ●----●----●----●----●----●----● (100Hz primary)
GPS: ●---------●---------●---------● (5Hz secondary)
Lidar: ●----●----●----●----●----●----● (20Hz secondary, actually every 50ms)
At t=150ms (IMU arrives):
- Primary: receive() blocks, gets IMU@150ms
- GPS: getData(150ms, 50ms) → returns GPS@200ms (closest within tolerance)
- Lidar: getData(150ms, 50ms) → returns Lidar@150ms (exact match)
Tolerance selection:
- Too small: Secondary inputs often invalid (getData fails)
- Too large: Temporal misalignment (stale data accepted)
- Rule of thumb: 2-3x slowest sensor period (e.g., GPS@5Hz → 50ms tolerance)
7.6 Detecting Fresh vs Stale Data
Use metadata accessors to check secondary input freshness:
void process(
const IMUData& imu,
const GPSData& gps,
const LidarData& lidar,
FusedData& output
) override {
if (has_new_data<1>()) {
std::cout << "GPS updated!\n";
} else {
std::cout << "GPS stale (reusing old data)\n";
}
if (!is_input_valid<2>()) {
std::cerr << "Lidar getData failed (no data within tolerance)\n";
}
auto gps_meta = get_input_metadata<GPSData>();
uint64_t age_ns = imu_meta.timestamp - gps_meta.timestamp;
std::cout << "GPS age: " << age_ns / 1'000'000 << " ms\n";
output = fuse_sensors(imu, gps, lidar);
}
Metadata for multi-input:
| Accessor | Returns | Use Case |
get_input_metadata<Index>() | Full metadata struct | Comprehensive input state |
get_input_timestamp<Index>() | uint64_t timestamp | Calculate data age |
has_new_data<Index>() | bool (true if fresh) | Detect sensor updates |
is_input_valid<Index>() | bool (true if getData succeeded) | Handle optional inputs |
7.7 Complete Multi-Input Example
Sensor fusion module:
#include <commrat/timestamp.hpp>
#include <iostream>
struct IMUData {
uint64_t timestamp;
double accel_x, accel_y, accel_z;
double gyro_x, gyro_y, gyro_z;
};
struct GPSData {
uint64_t timestamp;
double latitude, longitude, altitude;
};
struct FusedPose {
uint64_t timestamp;
double x, y, z;
double roll, pitch, yaw;
float confidence;
};
>;
class SensorFusion : public FusionApp::Module<
commrat::Output<FusedPose>,
commrat::Inputs<IMUData, GPSData>,
commrat::PrimaryInput<IMUData>
> {
public:
: FusionApp::Module<
commrat::Output<FusedPose>,
commrat::Inputs<IMUData, GPSData>,
commrat::PrimaryInput<IMUData>>(config) {
std::cout << "[Fusion] Initialized\n";
}
protected:
void process(
const IMUData& imu,
const GPSData& gps,
FusedPose& output
) override {
bool gps_fresh = has_new_data<1>();
FusedPose pose{
.timestamp = get_input_timestamp<0>(),
.x = gps.latitude * 111000.0,
.y = gps.longitude * 111000.0,
.z = gps.altitude,
.roll = std::atan2(imu.accel_y, imu.accel_z),
.pitch = std::atan2(-imu.accel_x, std::sqrt(imu.accel_y*imu.accel_y + imu.accel_z*imu.accel_z)),
.yaw = 0.0,
.confidence = gps_fresh ? 0.9f : 0.5f
};
std::cout << "[Fusion] Fused pose @ " << pose.timestamp / 1'000'000 << " ms"
<< " (GPS " << (gps_fresh ? "FRESH" : "STALE") << ")\n";
output = pose;
}
};
CommRaT - Modern C++ Real-Time Communication Framework.
Configuration and main:
int main() {
std::atomic<bool> shutdown{false};
std::signal(SIGINT, [](int) { shutdown.store(true); });
.system_id = 10,
.instance_id = 1,
};
.system_id = 11,
.instance_id = 1,
};
.system_id = 20,
.instance_id = 1,
.input_sources = {
{10, 1},
{11, 1}
},
.sync_tolerance = 50'000'000
};
IMUSensor imu(imu_config);
GPSSensor gps(gps_config);
SensorFusion fusion(fusion_config);
imu.start();
gps.start();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
fusion.start();
while (!shutdown.load()) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
fusion.stop();
gps.stop();
imu.stop();
return 0;
}
7.8 Advanced Multi-Input Patterns
Optional secondary inputs:
void process(
const IMUData& imu,
const GPSData& gps,
const MagnetometerData& mag,
FusedData& output
) override {
if (is_input_valid<2>()) {
output = fuse_with_mag(imu, gps, mag);
} else {
output = fuse_without_mag(imu, gps);
}
}
Adaptive tolerance:
void process(const IMUData& imu, const GPSData& gps, FusedData& output) override {
auto gps_meta = get_input_metadata<GPSData>();
uint64_t gps_age = get_input_timestamp<IMUData>() - gps_meta.timestamp;
if (gps_age > 100'000'000) {
std::cerr << "WARNING: GPS very stale (" << gps_age / 1'000'000 << " ms)\n";
}
return weighted_fusion(imu, gps, calculate_weights(gps_age));
}
Three or more inputs:
class MultiSensorFusion : public MyApp::Module<
Output<FusedData>,
Inputs<IMUData, GPSData, LidarData, CameraData>,
PrimaryInput<IMUData>
> {
protected:
void process(
const IMUData& imu,
const GPSData& gps,
const LidarData& lidar,
const CameraData& camera,
FusedData& output
) override {
return fuse_all(imu, gps, lidar, camera);
}
};
7.9 Performance Considerations
HistoricalMailbox overhead:
- Buffer size: Default 100 messages per input (configurable)
- Memory:
sizeof(TimsMessage<T>) * 100 * num_secondary_inputs
- getData complexity: O(log N) binary search in circular buffer
- Real-time safe: No dynamic allocation after initialization
Best practices:
- Choose primary wisely: Use fastest sensor as primary (lowest latency)
- Minimize secondaries: Each secondary input adds getData overhead
- Tune buffer size: Match buffer to expected message rates
- Monitor freshness: Log stale data warnings for debugging
- Handle failures: Check
is_input_valid() for optional inputs
7.10 Multi-Input vs Single-Input Patterns
When to use multi-input:
- Sensor fusion with different update rates
- Combining asynchronous data streams
- Time-critical applications requiring synchronization
- Kalman filtering, SLAM, state estimation
When to use single-input:
- Simple pipeline (A → B → C)
- Same update rate throughout
- Stateless processing (no fusion needed)
- Lowest latency required (no getData overhead)
Hybrid pattern:
class FastFilter : public MyApp::Module<Output<FilteredIMU>, Input<IMUData>> { ... };
class SlowFusion : public MyApp::Module<Output<FusedState>, Inputs<FilteredIMU, GPSData>> { ... };
7.11 Key Takeaways
- Inputs<T, U, V> enables multi-input with automatic time synchronization
- PrimaryInput<T> designates which input drives execution (blocking receive)
- Secondary inputs use getData(timestamp, tolerance) for synchronization
- HistoricalMailbox buffers recent messages for temporal queries
- Metadata accessors (
has_new_data<>(), is_input_valid<>()) track freshness and validity
- Tolerance tuning balances data freshness vs synchronization success rate
- Real-time safe with bounded execution time and no dynamic allocation
8. Timestamp Management
Timestamps are the single source of truth for message timing in CommRaT. Every message has exactly one timestamp stored in the TimsHeader, enabling precise time synchronization, latency measurement, and temporal reasoning. This section explains how timestamps work and how to use them effectively.
8.1 Single Source of Truth: TimsHeader.timestamp
Core principle: Every message has exactly one timestamp in its header, never in the payload.
template<typename PayloadT>
struct TimsMessage {
TimsHeader header;
PayloadT payload;
};
struct TimsHeader {
uint32_t msg_type;
uint32_t msg_size;
uint64_t timestamp;
uint32_t sequence_number;
uint32_t flags;
};
Why header-only timestamps?
- No duplication: One timestamp per message, no redundancy
- Automatic management: Module sets timestamp, not user code
- Type agnostic: Works for any payload type
- Clean payloads: User data structures remain simple POD types
WRONG - Don't do this:
struct SensorData {
uint64_t timestamp;
float temperature;
float pressure;
};
RIGHT - Clean payload:
struct SensorData {
float temperature;
float pressure;
};
8.2 Automatic Timestamp Assignment
CommRaT automatically assigns timestamps based on module type:
| Module Type | Timestamp Assignment | Meaning |
| PeriodicInput | Time::now() | Data generation moment |
| ContinuousInput | input.header.timestamp | Exact propagation from input |
| Multi-input | primary.header.timestamp | Synchronization point |
| LoopInput | Time::now() | Data generation moment |
You never set timestamps manually - the module does it automatically before sending.
Example: Periodic producer
class Sensor : public MyApp::Module<Output<SensorData>, PeriodicInput> {
protected:
void process(SensorData& output) override {
output = SensorData{
.temperature = read_sensor(),
.pressure = read_pressure()
};
}
};
Example: Continuous transformer
class Filter : public MyApp::Module<Output<FilteredData>, Input<SensorData>> {
protected:
void process(const SensorData& input, FilteredData& output) override {
output = FilteredData{
.filtered_value = apply_filter(input.temperature)
};
}
};
Example: Multi-input fusion
class Fusion : public MyApp::Module<Output<FusedData>, Inputs<IMUData, GPSData>, PrimaryInput<IMUData>> {
protected:
void process(const IMUData& imu, const GPSData& gps, FusedData& output) override {
output = FusedData{
.position = fuse_position(imu, gps),
.velocity = fuse_velocity(imu, gps)
};
}
};
8.3 Accessing Input Timestamps
Use metadata accessors to access input timestamps in your process() functions:
Index-based access (always works):
void process(const IMUData& imu, const GPSData& gps, FusedData& output) override {
auto imu_meta = get_input_metadata<0>();
auto gps_meta = get_input_metadata<1>();
uint64_t imu_ts = imu_meta.timestamp;
uint64_t gps_ts = gps_meta.timestamp;
uint64_t gps_age_ns = imu_ts - gps_ts;
std::cout << "GPS age: " << gps_age_ns / 1'000'000 << " ms\n";
output = fuse(imu, gps);
}
Type-based access (when types unique):
void process(const IMUData& imu, const GPSData& gps, FusedData& output) override {
uint64_t imu_ts = get_input_timestamp<IMUData>();
uint64_t gps_ts = get_input_timestamp<GPSData>();
output = fuse(imu, gps);
}
Metadata structure:
template<typename T>
struct InputMetadata {
uint64_t timestamp;
uint32_t sequence_number;
uint32_t message_id;
bool is_new_data;
bool is_valid;
};
8.4 Timestamp Units and Conversions
CommRaT uses nanoseconds since epoch (UNIX timestamp in nanoseconds):
uint64_t now_ns = Time::now();
uint64_t now_ms = now_ns / 1'000'000;
double now_s = now_ns / 1'000'000'000.0;
Duration ten_ms = Milliseconds(10);
Duration one_sec = Seconds(1);
Duration tolerance = Milliseconds(50);
Common operations:
uint64_t age_ns = Time::now() - message.header.timestamp;
double age_ms = age_ns / 1'000'000.0;
uint64_t generation_time = sensor_msg.header.timestamp;
uint64_t processing_time = Time::now();
uint64_t latency_ns = processing_time - generation_time;
constexpr uint64_t MAX_AGE_NS = 100'000'000;
if ((Time::now() - msg.header.timestamp) > MAX_AGE_NS) {
std::cerr << "Message too old!\n";
}
8.5 Monotonicity and Sequence Numbers
Monotonicity guarantee: Timestamps from a single producer are monotonically increasing.
message_1.header.timestamp <= message_2.header.timestamp
message_1.header.seq_number < message_2.header.seq_number
Sequence numbers:
- Start at 0 for each module
- Increment by 1 for each sent message
- Enable message loss detection
Detecting message loss:
class Monitor : public MyApp::Module<Output<void>, Input<SensorData>> {
uint32_t last_seq_{0};
protected:
void process(const SensorData& input, SensorData& output) override {
auto meta = get_input_metadata<0>();
uint32_t expected_seq = last_seq_ + 1;
if (meta.sequence_number != expected_seq) {
uint32_t dropped = meta.sequence_number - expected_seq;
std::cerr << "WARNING: Dropped " << dropped << " messages!\n";
}
last_seq_ = meta.sequence_number;
output = input;
}
};
Verifying monotonicity (debugging):
void process(const SensorData& input, FilteredData& output) override {
static uint64_t last_ts = 0;
uint64_t current_ts = get_input_timestamp<0>();
if (current_ts < last_ts) {
std::cerr << "ERROR: Non-monotonic timestamp! "
<< current_ts << " < " << last_ts << "\n";
}
last_ts = current_ts;
output = apply_filter(input);
}
8.6 Time Synchronization Across Modules
Within single host: Timestamps use monotonic clock (no drift, immune to NTP updates).
Across hosts (future): Time synchronization protocols (PTP, NTP) can be integrated via custom timestamp source.
Current implementation:
class Time {
public:
static uint64_t now() {
auto now = std::chrono::steady_clock::now();
return std::chrono::duration_cast<std::chrono::nanoseconds>(
now.time_since_epoch()
).count();
}
};
For multi-host systems:
- Use PTP hardware timestamping (IEEE 1588)
- Implement custom
Time::now() using PTP clock
- Ensure all hosts synchronized to sub-microsecond accuracy
8.7 Debugging with Timestamps
Logging message flow:
void process(const SensorData& input, FilteredData& output) override {
uint64_t recv_time = Time::now();
uint64_t msg_time = get_input_timestamp<0>();
uint64_t latency_us = (recv_time - msg_time) / 1000;
std::cout << "[Filter] Received message @ " << msg_time / 1'000'000 << " ms"
<< " (latency: " << latency_us << " µs)\n";
output = apply_filter(input);
}
Timestamp tracing (full pipeline):
SensorData sensor_output = read_sensor();
FilteredData filter_output = apply_filter(sensor_output);
void process_continuous(const FilteredData& input) {
uint64_t now = Time::now();
uint64_t origin = get_input_timestamp<0>();
uint64_t e2e_latency_us = (now - origin) / 1000;
std::cout << "End-to-end latency: " << e2e_latency_us << " µs\n";
}
Performance profiling:
class ProfilingFilter : public MyApp::Module<Output<FilteredData>, Input<SensorData>> {
std::array<uint64_t, 1000> latencies_;
size_t idx_{0};
protected:
FilteredData process_continuous(const SensorData& input) override {
uint64_t start = Time::now();
FilteredData result = apply_filter(input);
uint64_t end = Time::now();
uint64_t processing_time_ns = end - start;
latencies_[idx_++ % 1000] = processing_time_ns;
if (idx_ % 1000 == 0) {
uint64_t avg = std::accumulate(latencies_.begin(), latencies_.end(), 0ULL) / 1000;
std::cout << "Avg processing time: " << avg / 1000 << " µs\n";
}
return result;
}
};
8.8 Common Timestamp Pitfalls
Pitfall 1: Payload timestamps
struct SensorData {
uint64_t timestamp;
float temperature;
};
struct SensorData {
float temperature;
};
Pitfall 2: Manual timestamp setting
void process(SensorData& output) override {
SensorData data{.temperature = read_sensor()};
data.timestamp = Time::now();
output = data;
}
void process(SensorData& output) override {
return SensorData{.temperature = read_sensor()};
}
Pitfall 3: Stale data without checking
void process(const IMUData& imu, const GPSData& gps, FusedData& output) override {
output = fuse(imu, gps);
}
void process(const IMUData& imu, const GPSData& gps, FusedData& output) override {
uint64_t gps_age = get_input_timestamp<0>() - get_input_timestamp<1>();
if (gps_age > 100'000'000) {
std::cerr << "GPS stale: " << gps_age / 1'000'000 << " ms\n";
}
output = fuse(imu, gps);
}
Pitfall 4: Timezone confusion
uint64_t ts_ns = get_input_timestamp<0>();
time_t ts_s = ts_ns / 1'000'000'000;
struct tm* local = localtime(&ts_s);
uint64_t start_ts = message1.header.timestamp;
uint64_t end_ts = message2.header.timestamp;
uint64_t duration_ns = end_ts - start_ts;
8.9 Best Practices
- Never add timestamp fields to payloads - use
TimsHeader.timestamp only
- Access via metadata accessors -
get_input_timestamp<>() or get_input_metadata<>()
- Check freshness for multi-input - use
has_new_data<>() and timestamp age
- Monitor sequence numbers - detect message loss in critical systems
- Use nanosecond precision - CommRaT timestamps are 64-bit nanoseconds
- Profile with timestamps - measure processing latency and end-to-end delays
- Verify monotonicity - add assertions in debug builds
8.10 Key Takeaways
- Single source of truth:
TimsHeader.timestamp is the ONLY timestamp (never in payload)
- Automatic assignment: Module sets timestamp based on input type (periodic/continuous/multi)
- Metadata accessors:
get_input_timestamp<>(), get_input_metadata<>() for timestamp queries
- Nanosecond precision: All timestamps in nanoseconds since epoch (uint64_t)
- Monotonicity: Timestamps from single producer strictly increasing
- Sequence numbers: Enable message loss detection
- Freshness tracking:
has_new_data<>() and timestamp age for multi-input