Data Types, Callbacks & Publishers¶
Kompass uses a type-driven I/O system built on ros-sugar. Each ROS message type is represented by a SupportedType class that bundles three things together:
The ROS message type (
_ros_type) – the underlying ROS 2 message class.A callback (
callback) – how to process incoming messages of this type.A converter (
convert()) – how to create outgoing messages for publishing.
When a Topic is configured with msg_type="Odometry", the system automatically uses the Odometry SupportedType to set up the right callback for subscriptions and the right converter for publishing.
How It Works¶
Subscribing (input):
ROS message ──▶ GenericCallback.callback(msg) ──▶ stores raw msg
│
component calls get_output() ──▶ _get_output() ──▶ post_processors ──▶ processed data
Publishing (output):
component calls publisher.publish(data) ──▶ pre_processors ──▶ SupportedType.convert() ──▶ ROS message
Concrete Example: Controller Data Flow¶
Input:
sensor_msgs/LaserScanarrives on/scantopic.Callback:
LaserScanCallbackconverts it tokompass_core.datatypes.LaserScanData.Algorithm: DWA controller reads
LaserScanDataalong withRobotState(from odometry) and a referencePath.Output: DWA computes velocity commands;
Twist.convert(vx, vy, omega)createsgeometry_msgs/Twist.Publish: Result is published on
/controltopic.
Built-in Types¶
Kompass extends the standard ros-sugar types with navigation-specific processing:
Type class |
ROS message |
Callback |
|
|
|---|---|---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
– |
|
|
|
|
– |
|
|
|
raw msg |
|
|
|
|
|
|
|
|
|
raw msg |
|
|
|
|
raw msg |
|
|
|
|
|
|
|
|
|
|
– |
|
|
|
|
– |
Note
Trackings and Detections require the optional automatika_embodied_agents package.
kompass-core Data Types¶
The callback outputs above use data structures from kompass_core – these are pure Python/C++ types with no ROS dependency, keeping kompass-core framework-agnostic:
RobotState(kompass_core.models) – position (x, y, yaw), velocity (vx, vy, omega), and speed.LaserScanData(kompass_core.datatypes) – structured laser scan with angle/range arrays.PointCloudData(kompass_core.datatypes) – 3D point cloud representation with field offsets.Bbox2D(kompass_core.datatypes) – 2D bounding box for vision-based tracking.
Creating a Custom Callback¶
A callback controls how incoming ROS messages are processed before your component sees them. To create one, subclass GenericCallback and implement _get_output().
Step 1: Implement the Callback¶
# In kompass/kompass/callbacks.py (or your own module)
from typing import Optional
import numpy as np
from ros_sugar.io import GenericCallback
class MyImuCallback(GenericCallback):
"""Processes sensor_msgs/Imu into a simplified dict."""
def __init__(self, input_topic, node_name: Optional[str] = None):
super().__init__(input_topic, node_name)
def _get_output(self, **_) -> Optional[dict]:
if not self.msg:
return None
return {
"orientation_yaw": 2 * np.arctan2(
self.msg.orientation.z, self.msg.orientation.w
),
"angular_velocity": self.msg.angular_velocity.z,
"linear_acceleration": np.array([
self.msg.linear_acceleration.x,
self.msg.linear_acceleration.y,
]),
}
Key points:
self.msgholds the raw ROS message (set automatically by the base class).Return
Nonewhen no message is available yet._get_output()can accept keyword arguments (e.g.transformation) that are passed fromget_output().
Step 2: Support TF Transforms (Optional)¶
If your callback needs to apply TF transforms, add a transformation property:
class MyTransformableCallback(GenericCallback):
def __init__(self, input_topic, node_name=None, transformation=None):
super().__init__(input_topic, node_name)
self.__tf = transformation
@property
def transformation(self):
return self.__tf
@transformation.setter
def transformation(self, transform):
self.__tf = transform
def _get_output(self, transformation=None, **_):
if not self.msg:
return None
if transformation or self.transformation:
return self._transform(self.msg, transformation or self.transformation)
return self._process(self.msg)
def _process(self, msg):
# Convert raw message to your data type
...
def _transform(self, msg, transform):
# Apply TF transform, then convert
...
The component’s TF listeners (e.g. odom_tf_listener) set the transformation property at runtime, and get_output() passes it through.
Creating a Custom SupportedType¶
A SupportedType ties together the ROS message class, callback, and publisher converter.
Step 1: Define the Type¶
# In kompass/kompass/data_types.py (or your own module)
from ros_sugar.supported_types import SupportedType
from sensor_msgs.msg import Imu as ROSImu
from .callbacks import MyImuCallback
class Imu(SupportedType):
"""Support for sensor_msgs/Imu messages."""
_ros_type = ROSImu
callback = MyImuCallback
@classmethod
def convert(cls, orientation_yaw: float, angular_velocity: float, **_) -> ROSImu:
"""Convert component data back to a ROS Imu message for publishing."""
msg = ROSImu()
msg.orientation.z = np.sin(orientation_yaw / 2)
msg.orientation.w = np.cos(orientation_yaw / 2)
msg.angular_velocity.z = angular_velocity
return msg
Three things to define:
Attribute |
Purpose |
|---|---|
|
The ROS 2 message class (used for subscription/publisher creation) |
|
Your callback class (used when subscribing to this message type) |
|
Classmethod that creates a ROS message from component data (used when publishing) |
If your type is input-only (no publishing), you can skip convert() – the base class returns the raw output as-is.
If your type is output-only (no custom processing), you can use GenericCallback as the callback.
Step 2: Register the Type¶
Types defined in kompass/kompass/data_types.py are registered automatically at import time. The registration happens in kompass.components.defaults:
from ros_sugar.supported_types import add_additional_datatypes
from ros_sugar.io import get_all_msg_types
from .. import data_types
add_additional_datatypes(get_all_msg_types(data_types))
get_all_msg_types(data_types) introspects the module and collects all SupportedType subclasses. So for types added directly to data_types.py, you just need to add the class name to __all__ – no manual registration call needed.
For types defined outside kompass.data_types (e.g. in your own package), register explicitly:
from ros_sugar.supported_types import add_additional_datatypes
add_additional_datatypes([Imu])
After registration, the type name works as a string in Topic:
Topic(name="/imu", msg_type="Imu")
Step 3: Use in a Component¶
# In allowed inputs
my_allowed_inputs = {
TopicsKeys.SPATIAL_SENSOR: AllowedTopics(types=["Imu"]),
}
# In default topics
my_default_inputs = {
TopicsKeys.SPATIAL_SENSOR: Topic(name="/imu", msg_type="Imu"),
}
# In _execution_step
def _execution_step(self):
imu_cb = self.get_callback(TopicsKeys.SPATIAL_SENSOR)
if imu_cb and imu_cb.got_msg:
imu_data = imu_cb.get_output() # Returns your dict from _get_output()
Processing Pipelines¶
Both callbacks and publishers support processing pipelines for chaining transformations.
Post-Processors (Callbacks)¶
Post-processors run after _get_output(), transforming the result before it reaches your component:
def clip_scan_ranges(scan_data: LaserScanData) -> LaserScanData:
"""Clip laser scan to 5m max range."""
scan_data.ranges = scan_data.ranges.clip(max=5.0)
return scan_data
# Attach to a callback
callback = self.get_callback(TopicsKeys.SPATIAL_SENSOR)
callback.add_post_processors([clip_scan_ranges])
# Now get_output() returns clipped data
data = callback.get_output()
Rules:
Each processor receives the output of the previous one.
The return type must match the input type.
If any processor returns
None,get_output()returnsNone.
Pre-Processors (Publishers)¶
Pre-processors run before convert(), transforming component data before it becomes a ROS message:
def clamp_velocity(vx, vy, omega):
"""Clamp velocity commands to safe limits."""
return (
max(-1.0, min(1.0, vx)),
max(-0.5, min(0.5, vy)),
max(-2.0, min(2.0, omega)),
)
# Attach to a publisher
publisher = self.get_publisher(TopicsKeys.INTERMEDIATE_CMD)
publisher.add_pre_processors([clamp_velocity])
# Now publish() clamps before converting
publisher.publish(vx, vy, omega)
Rules:
Each processor receives the output tuple of the previous one.
The return types must match the input types.
If any processor returns
None, the message is not published.
Event-Triggered Callbacks¶
You can attach a function that fires every time a message arrives, before get_output() is called:
def on_new_path(msg, topic, output):
"""Called whenever a new path message arrives."""
self._controller.set_path(output)
callback = self.get_callback(TopicsKeys.GLOBAL_PLAN)
callback.on_callback_execute(on_new_path, get_processed=True)
Parameters passed to your function:
msg– the raw ROS messagetopic– theTopicobjectoutput– the processed output from_get_output()(only ifget_processed=True)
Config Bridging Patterns¶
Algorithm configuration classes in kompass-core (e.g., DWAConfig, PurePursuitConfig) are attrs @define classes. Kompass re-exports them directly:
# kompass/kompass/control.py
from kompass_core.control import (
ControllersID,
DWAConfig,
PurePursuitConfig,
StanleyConfig,
DVZConfig,
VisionRGBFollowerConfig,
VisionRGBDFollowerConfig,
)
At runtime, the Controller component looks up the corresponding config class from ControlConfigClasses[config.algorithm] and instantiates it, either from defaults or from a YAML/JSON/TOML configuration file. This keeps algorithm configuration decoupled from the component framework while allowing full configuration via Kompass’s standard file-based config system.
GPU Acceleration via SYCL¶
kompass-core algorithms that operate on spatial data (local mapping, DWA obstacle checking, etc.) can leverage GPU acceleration through AdaptiveCpp (SYCL). The acceleration is transparent to the data type layer:
The same
LaserScanData/PointCloudDatatypes are used regardless of CPU or GPU execution.GPU memory transfers are handled internally by
kompass-core’s C++ layer.No code changes are needed in the Kompass Python layer to enable GPU acceleration.
Complete Example: Adding a Custom Sensor Type¶
# --- callbacks.py ---
from typing import Optional
import numpy as np
from ros_sugar.io import GenericCallback
class UltrasonicCallback(GenericCallback):
"""Processes sensor_msgs/Range into a distance float."""
def __init__(self, input_topic, node_name=None):
super().__init__(input_topic, node_name)
def _get_output(self, **_) -> Optional[float]:
if not self.msg:
return None
# Clamp to valid range
return float(np.clip(
self.msg.range,
self.msg.min_range,
self.msg.max_range,
))
# --- data_types.py ---
from ros_sugar.supported_types import SupportedType, add_additional_datatypes
from sensor_msgs.msg import Range as ROSRange
from .callbacks import UltrasonicCallback
class Ultrasonic(SupportedType):
_ros_type = ROSRange
callback = UltrasonicCallback
@classmethod
def convert(cls, distance: float, **_) -> ROSRange:
msg = ROSRange()
msg.range = distance
return msg
# Register at import time
add_additional_datatypes([Ultrasonic])
# --- component.py ---
from kompass.components.component import Component
from kompass.components.defaults import TopicsKeys
from ros_sugar.io import AllowedTopics, Topic
class ProximityMonitor(Component):
def __init__(self, **kwargs):
super().__init__(
component_name="proximity_monitor",
inputs={
TopicsKeys.SPATIAL_SENSOR: Topic(
name="/ultrasonic", msg_type="Ultrasonic"
),
},
allowed_inputs={
TopicsKeys.SPATIAL_SENSOR: AllowedTopics(types=["Ultrasonic"]),
},
**kwargs,
)
def _execution_step(self):
cb = self.get_callback(TopicsKeys.SPATIAL_SENSOR)
if cb and cb.got_msg:
distance = cb.get_output() # Returns float from UltrasonicCallback
if distance < 0.3:
self.health_status.set_fail_algorithm(
algorithm_names=["ProximityCheck"]
)