Advanced: Health Status, Fallbacks & Events¶
This guide covers the health monitoring, fallback recovery, and event systems that make Kompass components resilient. Read Creating a Custom Component first for the basics.
Health Status¶
Every component has a health_status object (an instance of Status from ros_sugar.core.status) that tracks the component’s operational state. The status is published on the {node_name}/status topic and drives the fallback system.
Status Levels¶
Code |
Constant |
Meaning |
|---|---|---|
0 |
|
Running normally |
1 |
|
An algorithm used by the component failed |
2 |
|
The component itself (or another component) failed |
3 |
|
An external dependency failed (e.g. missing topic) |
4 |
|
Unspecified failure |
Setting Status¶
Use these methods inside your component to report health:
# Everything is fine
self.health_status.set_healthy()
# Algorithm failure -- include the algorithm name(s) for diagnostics
self.health_status.set_fail_algorithm(
algorithm_names=["MyAlgorithm"]
)
# Component failure -- defaults to self, or specify other component(s)
self.health_status.set_fail_component()
self.health_status.set_fail_component(
component_names=["other_component"]
)
# System failure -- an external dependency is unavailable
self.health_status.set_fail_system(
topic_names=["/map", "/odom"]
)
# Generic failure (no specific category)
self.health_status.set_failure()
Checking Status¶
self.health_status.is_healthy # bool
self.health_status.is_algorithm_fail # bool
self.health_status.is_component_fail # bool
self.health_status.is_system_fail # bool
self.health_status.is_general_fail # bool
self.health_status.value # int (0-4)
Where to Update Status¶
The key principle: set unhealthy when something goes wrong, set healthy when it recovers. Here is the pattern used by existing components:
In _execution_step() – the most common location¶
def _execution_step(self):
# Check inputs are available
if not self.got_all_inputs():
missing = self.get_missing_inputs()
self.health_status.set_fail_system(topic_names=missing)
return
# Run the algorithm
try:
success = self._my_algorithm.compute(self.robot_state)
except Exception as e:
self.get_logger().error(f"Algorithm failed: {e}")
self.health_status.set_fail_algorithm(
algorithm_names=["MyAlgorithm"]
)
return
if not success:
self.health_status.set_fail_algorithm(
algorithm_names=["MyAlgorithm"]
)
return
# Success -- clear any previous failure
self.health_status.set_healthy()
# Publish results
self.get_publisher(TopicsKeys.GOAL_POINT).publish(result)
In lifecycle transitions – handled automatically¶
The base class resets status to healthy on configure, activate, and deactivate. On error, it sets set_fail_component(). You generally don’t need to touch status in lifecycle methods.
Status Publishing¶
For TIMED components, the status is published automatically at the fallback check rate. For non-timed components (e.g. ACTION_SERVER), publish manually after updating:
self.health_status.set_fail_algorithm(algorithm_names=["MyAlgorithm"])
self.health_status_publisher.publish(self.health_status())
Fallbacks¶
Fallbacks are actions that execute automatically when the health status becomes unhealthy. They provide self-recovery without external intervention.
Defining Fallbacks¶
Use the component’s setter methods before launching:
from ros_sugar.core import Action
my_component = MyComponent(component_name="my_node")
# On algorithm failure: restart the component (retry up to 3 times)
my_component.on_algorithm_fail(
action=Action(my_component.restart),
max_retries=3,
)
# On system failure: just broadcast status (let the Monitor handle it)
my_component.on_system_fail(
action=Action(my_component.broadcast_status),
max_retries=None, # None = unlimited retries
)
# On component failure: try reconfiguring with a new config, then restart
my_component.on_component_fail(
action=[
Action(my_component.reconfigure, args=(fallback_config,)),
Action(my_component.restart),
],
max_retries=2,
)
# Catch-all for any failure type not covered above
my_component.on_fail(
action=Action(my_component.broadcast_status),
max_retries=None,
)
# When all fallbacks are exhausted
my_component.on_giveup(
action=Action(my_component.stop),
max_retries=1,
)
Fallback Hierarchy¶
Method |
Triggers on |
Priority |
|---|---|---|
|
|
Checked first |
|
|
Checked second |
|
|
Checked third |
|
Any failure without a specific handler |
Catch-all |
|
All fallbacks exhausted |
Last resort |
The fallback check runs on a timer (default 100 Hz) while the component is active. When health_status is not healthy, the system:
Looks for a handler matching the specific failure type.
Falls back to
on_fail()if no specific handler exists.Retries the current action up to
max_retries.For action lists, moves to the next action when retries are exhausted.
Calls
on_giveup()when everything is exhausted.If the action returns
True, status resets to healthy automatically.
Action Lists (Sequential Fallbacks)¶
When you pass a list of actions, they execute in order. Each action is retried max_retries times before moving to the next:
# Try broadcast first (2 retries), then restart (2 retries), then reconfigure (2 retries)
my_component.on_algorithm_fail(
action=[
Action(my_component.broadcast_status),
Action(my_component.restart),
Action(my_component.reconfigure, args=(safe_config,)),
],
max_retries=2,
)
Note: For action lists,
max_retries=Noneis automatically converted tomax_retries=1to prevent getting stuck on the first action forever.
Built-in Component Actions¶
These methods are available on every component and can be used as fallback actions:
Action |
Description |
|---|---|
|
Activate the component (lifecycle transition) |
|
Deactivate the component |
|
Stop then start (optional delay between) |
|
Apply a new config (optionally while running) |
|
Change a single parameter |
|
Change multiple parameters |
|
Publish current status (default fallback) |
Default Behavior¶
If you don’t configure any fallbacks, the component uses broadcast_status() as the default on_fail action with unlimited retries. This publishes the failure status so external systems (like a Monitor node) can observe and react.
Events and Actions¶
Events allow components to react to data-driven conditions. An Event pairs a trigger condition with one or more Action callbacks.
Defining Events¶
from ros_sugar.core import Event, Action
from ros_sugar.io import Topic
# Topic-based: triggers whenever a message arrives
sensor_topic = Topic(name="/emergency", msg_type="Bool")
event = Event(event_condition=sensor_topic)
# Action-based: polls a method at a given rate
event = Event(
event_condition=Action(my_component.check_battery),
check_rate=1.0, # Poll at 1 Hz
)
Event Options¶
Parameter |
Default |
Description |
|---|---|---|
|
|
Only trigger when the value changes (not on every message) |
|
|
Only trigger once during the component’s lifetime |
|
|
Minimum delay (seconds) between consecutive triggers |
|
|
Poll rate (Hz) for action-based events |
Wiring Events to Actions¶
Events and actions are connected at the Launcher level, not inside individual components:
from ros_sugar.launch import Launcher
launcher = Launcher(namespace="robot")
# Define event + response actions
emergency_event = Event(event_condition=emergency_topic)
stop_action = Action(controller.stop)
launcher.add_pkg(
components=[planner, controller],
package_name="kompass",
events_actions={emergency_event: [stop_action]},
)
This keeps components decoupled – they don’t need to know about each other’s events.
Putting It All Together¶
Here is a complete example showing a component with health status management, fallbacks, and events:
from typing import Dict, Optional
from attrs import define, field
from ros_sugar.core import Action, Event
from ros_sugar.io import Topic
from ros_sugar.launch import Launcher
from kompass.components.component import Component
from kompass.components.defaults import TopicsKeys
from kompass.config import ComponentConfig, ComponentRunType
@define
class MyControllerConfig(ComponentConfig):
max_consecutive_failures: int = field(default=5)
class MyController(Component):
def __init__(self, component_name="my_controller", config=None, **kwargs):
super().__init__(
component_name=component_name,
config=config or MyControllerConfig(),
allowed_run_types=[ComponentRunType.TIMED],
**kwargs,
)
def init_variables(self):
self._failure_count = 0
def _execution_step(self):
location_cb = self.get_callback(TopicsKeys.ROBOT_LOCATION)
if location_cb is None or not location_cb.got_msg:
self.health_status.set_fail_system(
topic_names=[self.in_topic_name(TopicsKeys.ROBOT_LOCATION)]
)
return
data = location_cb.get_output()
success = self._run_control(data)
if success:
self._failure_count = 0
self.health_status.set_healthy()
else:
self._failure_count += 1
if self._failure_count > self.config.max_consecutive_failures:
self.health_status.set_fail_algorithm(
algorithm_names=["MyControlAlgorithm"]
)
# Otherwise stay in current status -- transient failures
# don't trigger fallbacks immediately
def _run_control(self, data) -> bool:
# Your control logic here
...
# --- Setup fallbacks and launch ---
controller = MyController()
# Recovery strategy: restart on algorithm failure, broadcast on system issues
controller.on_algorithm_fail(
action=[
Action(controller.restart),
Action(controller.reconfigure, args=(MyControllerConfig(),)),
],
max_retries=3,
)
controller.on_system_fail(
action=Action(controller.broadcast_status),
max_retries=None,
)
controller.on_giveup(
action=Action(controller.stop),
max_retries=1,
)
# Launch
launcher = Launcher(namespace="robot")
launcher.add_pkg(
components=[controller],
package_name="kompass",
)
launcher.bringup()
Health Status Flow¶
_execution_step()
│
├── Missing input? ──▶ set_fail_system(topic_names=[...])
│ │
│ ▼
│ fallback timer checks health_status
│ │
│ ├── on_system_fail defined? ──▶ execute action
│ └── no? ──▶ on_fail (broadcast_status)
│
├── Algorithm failed? ──▶ set_fail_algorithm(algorithm_names=[...])
│ │
│ ▼
│ fallback timer checks health_status
│ │
│ ├── on_algorithm_fail defined? ──▶ execute action
│ │ │
│ │ ├── action returns True ──▶ set_healthy()
│ │ └── retries exhausted ──▶ next action or giveup
│ └── no? ──▶ on_fail
│
└── Success ──▶ set_healthy()