Events For Cross-Component Healing¶
In the Fallbacks tutorial, we learned how a component can heal itself (e.g., restarting itself). But sophisticated autonomy requires more than just self-repair. It requires System-Level Awareness and Behavioral Reflexes.
In this tutorial, we will use simple Events to implement Cross-Component Healing: Components monitoring each other to fix system-level issues.
Scenario A: The “Unstuck” Reflex¶
The Controller gets stuck in a local minimum (e.g., the robot is facing a corner). It will report an ALGORITHM_FAILURE because it cannot find a valid velocity command. We detect this status and ask the DriveManager to execute a blind “Unblock” maneuver (e.g., rotate in place or back up).
Tip
All components health status topics are accessible in the API using component.status_topic
# Import the required primitives
from kompass.ros import Event, Action, Topic
# Import the ComponentStatus message to access the status values
from automatika_ros_sugar.msg import ComponentStatus
# Define Event: Controller reports Algorithm Failure
# We set a `on_change` True to trigger the event only when the status changes to algorithm failure the first time
# i.e. the associated action will not keep triggering while the controller keeps detecting the failure
event_controller_fail = Event(
controller.status_topic.msg.status
== ComponentStatus.STATUS_FAILURE_ALGORITHM_LEVEL,
keep_event_delay=60.0
)
# Define Action: Drive Manager's Unblock method
# This is a pre-defined method in DriveManager that executes a recovery maneuver
unblock_action = Action(method=driver.move_to_unblock)
Scenario B: The “Blind” Reflex¶
The LocalMapper crashes or faces an error failing to provide a high fidelity local map. The Controller, which relies on the map, can detect that the `LocalMapper is “not healthy” and reconfigure itself to use raw sensor data directly (Reactive Mode).
# Import the reconfiguration action
from kompass.actions import update_parameter
# Import the ComponentStatus message to access the status values
from automatika_ros_sugar.msg import ComponentStatus
# Define Event: Mapper is NOT Healthy
# We set `handle_once` to True to trigger this event only ONCE during the lifetime oft he system
event_mapper_fault = Event(
mapper.status_topic.msg.status != ComponentStatus.STATUS_HEALTHY,
handle_once=True
)
# Define Action: Reconfigure Controller to use direct sensor data
activate_direct_sensor_mode_action = update_parameter(
component=controller, param_name="use_direct_sensor", new_value=True
)
Register Your Events/Actions¶
After defining your event/action pairs you only need to register the dictionary with the system launcher when adding your components:
# Define your events/action dictionary
events_actions = {
event_controller_fail: unblock_action,
event_mapper_fault: activate_direct_sensor_mode_action,
}
# Pass kompass components to the launcher and register the events
launcher.kompass(
components=[planner, controller, driver, mapper],
activate_all_components_on_start=True,
multi_processing=True,
events_actions=events_actions
)
Complete Recipe¶
Here is the complete turtlebot3_cross_healing.py script combining the point navigation system, the fallbacks and the cross-component healing events
1import numpy as np
2from kompass.robot import (
3 AngularCtrlLimits,
4 LinearCtrlLimits,
5 RobotGeometry,
6 RobotType,
7)
8from kompass.config import RobotConfig, RobotFrames
9from kompass.components import (
10 Controller,
11 DriveManager,
12 Planner,
13 PlannerConfig,
14)
15from kompass.control import ControllersID
16from kompass.ros import Topic, Launcher, Action
17
18# Import the ComponentStatus message to access the status values
19from automatika_ros_sugar.msg import ComponentStatus
20
21# Import the reconfiguration action
22from kompass.actions import update_parameter
23
24
25# Setup your robot configuration
26my_robot = RobotConfig(
27 model_type=RobotType.DIFFERENTIAL_DRIVE,
28 geometry_type=RobotGeometry.Type.CYLINDER,
29 geometry_params=np.array([0.1, 0.3]),
30 ctrl_vx_limits=LinearCtrlLimits(max_vel=0.2, max_acc=1.5, max_decel=2.5),
31 ctrl_omega_limits=AngularCtrlLimits(
32 max_vel=0.4, max_acc=2.0, max_decel=2.0, max_steer=np.pi / 3)
33)
34
35# Set the robot frames
36robot_frames = RobotFrames(
37 robot_base='base_link',
38 odom='odom',
39 world='map',
40 scan='LDS-01')
41
42# Setup components with default config, inputs and outputs
43planner_config = PlannerConfig(loop_rate=1.0) # 1 Hz
44planner = Planner(component_name="planner", config=planner_config)
45
46# Set Planner goal input to Rviz Clicked point
47goal_topic = Topic(name="/clicked_point", msg_type="PointStamped")
48planner.inputs(goal_point=goal_topic)
49
50# Get a default controller component
51controller = Controller(component_name="controller")
52
53# Configure Controller to use local map instead of direct sensor information
54controller.direct_sensor = False
55
56# Select the primary control algorithm
57controller.algorithm = ControllersID.DWA
58
59# Define the action
60switch_algorithm_action = Action(method=controller.set_algorithm, args=(ControllersID.PURE_PURSUIT,))
61
62# Set the Controller 'algorithm-level' failure fallback
63controller.on_algorithm_fail(action=[Action(controller.restart), switch_algorithm_action], max_retries=1)
64
65# Get the default DriveManager
66driver = DriveManager(component_name="drive_manager")
67
68# Publish Twist or TwistStamped from the DriveManager based on the distribution
69if "ROS_DISTRO" in os.environ and (
70 os.environ["ROS_DISTRO"] in ["rolling", "jazzy", "kilted"]
71):
72 cmd_msg_type : str = "TwistStamped"
73else:
74 cmd_msg_type = "Twist"
75
76driver.outputs(robot_command=Topic(name="/cmd_vel", msg_type=cmd_msg_type))
77
78# Set the DriveManager System-Level failure Fallback
79# Not setting max_retires -> will attempt restarting infinity
80driver.on_system_fail(Action(driver.restart))
81
82# Get a default Local Mapper component
83mapper = LocalMapper(component_name="mapper")
84
85# Define Events
86# 1. Controller Algorithm Failure
87# We set a `on_change` True to trigger the event only when the status changes to algorithm failure the first time
88# i.e. the associated action will not keep triggering while the controller keeps detecting the failure
89event_controller_fail = Event(
90 controller.status_topic.msg.status
91 == ComponentStatus.STATUS_FAILURE_ALGORITHM_LEVEL,
92 keep_event_delay=60.0
93 )
94# 2. Mapper is NOT Healthy
95# We set `handle_once` to True to trigger this event only ONCE during the lifetime oft he system
96event_mapper_fault = Event(
97 mapper_status.msg.status != ComponentStatus.STATUS_HEALTHY,
98 handle_once=True
99)
100
101# Define Actions
102# 1. Drive Manager's Unblock method
103# This is a pre-defined method in DriveManager that executes a recovery maneuver
104unblock_action = Action(method=driver.move_to_unblock)
105
106# 2. Action to reconfigure Controller to use direct sensor data
107activate_direct_sensor_mode_action = update_parameter(
108 component=controller, param_name="use_direct_sensor", new_value=True
109)
110
111# Define your events/action dictionary
112events_actions = {
113 event_controller_fail: unblock_action,
114 event_mapper_fault: activate_direct_sensor_mode_action,
115}
116
117# Init a launcher
118launcher = Launcher()
119
120# Pass kompass components to the launcher and register the events
121launcher.kompass(
122 components=[planner, controller, driver, mapper],
123 activate_all_components_on_start=True,
124 multi_processing=True,
125 events_actions=events_actions
126 )
127
128# Set the robot
129launcher.robot = robot_config
130
131# Set the frames
132launcher.frames = frames_config
133
134# Optional Generic Fallback Policy: If any component fails -> restart it with unlimited retries
135# launcher.on_fail(action_name="restart")
136
137# After all configuration is done bringup the stack
138launcher.bringup()
Next Steps¶
We have handled single-topic events for cross-component healing. Now let’s see how we can use events for external reflexes.