Context-Aware Actions¶
In previous tutorials, our actions were Static, i.e. the action argument were pre-defined in the recipe. For example, in the External Reflexes Tutorial we defined an event when a “Person is Detected” \(\rightarrow\) execute action to trigger the vision follower action server with a pre-defined request message with the target ‘person’. But what if the action depends on what object is detected?
Real-world autonomy requires context aware actions, where the action arguments can be fetched from the system or environment at the time of execution
Instead of writing separate hard-coded actions for every possibility, Kompass allows you to use Dynamic Data Injection from Topics directly in your Actions. In the Point Navigation tutorial, we manually set goals using Rviz clicked_point. In this tutorial, we will upgrade the Turtlebot3 to understand Semantic Commands.
The Setup¶
Prerequisites: Completed Point Navigation.
We will use the exact same set of component as in the Point Navigation tutorial, plus a simulated “Command” topic.
from kompass.ros import Topic
# Define the Command Source
# Simulates a voice command or fleet management instruction
# Examples: "kitchen", "reception", "station_A"
command_topic = Topic(name="/user_command", msg_type="String")
The Context-Aware Logic¶
We need a function that translates a Place Name (String) into a Goal Pose (Coordinates).
from geomerty_msgs.msg import PoseStamped
import subprocess
# A simple "Map" of our environment
# In a real app, this can come from a database or a "semantic memory"
WAYPOINTS = {
"kitchen": {"x": 2.0, "y": 0.5},
"reception": {"x": 0.0, "y": 0.0},
"station_a": {"x": -1.5, "y": 2.0}
}
def navigate_to_location(location_name: str):
"""
Looks up the location coordinates and sends them to the planner.
"""
key = location_name.strip().lower()
if key not in WAYPOINTS:
print(f"Unknown location: {key}")
return
coords = WAYPOINTS[key]
x = coords["x"]
y = coords["y"]
# Construct the Bash Command
# We publish to /clicked_point (PointStamped) which the Planner listens to
# Corrected f-string with proper escaping
topic_cmd = (
f"ros2 topic pub --once /clicked_point geometry_msgs/msg/PointStamped "
f"'{{header: {{frame_id: \"map\"}}, point: {{x: {x}, y: {y}, z: 0.0}}}}'"
)
print(f"Executing: {topic_cmd}")
subprocess.run(topic_cmd, shell=True)
Define an Event that triggers whenever a new command arrives:
# Trigger on ANY new message on the command topic, if the local mapper is healthy
event_command_received = Event(
event_condition=(command_topic & (mapper.status_topic.msg.status == ComponentStatus.STATUS_HEALTHY)),
)
Define the action:
action_process_command = Action(
method=navigate_to_location,
# DYNAMIC INJECTION:
# We pass 'command_topic.msg.data' as the argument.
# Sugarcoat will fetch the actual string ("kitchen") when the event fires.
args=(command_topic.msg.data,)
)
Complete Recipe¶
Launch this script, then publish a string to /user_command (e.g., ros2 topic pub /user_command std_msgs/String "data: kitchen" --once) to see the robot move!
1import numpy as np
2from kompass.robot import (
3 AngularCtrlLimits,
4 LinearCtrlLimits,
5 RobotGeometry,
6 RobotType,
7)
8from kompass.config import RobotConfig, RobotFrames
9from kompass.components import (
10 Controller,
11 DriveManager,
12 Planner,
13 PlannerConfig,
14)
15from kompass.control import ControllersID
16from kompass.ros import Topic, Launcher, Action
17
18# Import the ComponentStatus message to access the status values
19from automatika_ros_sugar.msg import ComponentStatus
20
21# Import the reconfiguration action
22from kompass.actions import update_parameter
23
24from geomerty_msgs.msg import PoseStamped
25import subprocess
26
27# A simple "Map" of our environment
28# In a real app, this can come from a database or a "semantic memory"
29WAYPOINTS = {
30 "kitchen": {"x": 2.0, "y": 0.5},
31 "reception": {"x": 0.0, "y": 0.0},
32 "station_a": {"x": -1.5, "y": 2.0}
33}
34
35def navigate_to_location(location_name: str):
36 """
37 Looks up the location coordinates and sends them to the planner.
38 """
39 key = location_name.strip().lower()
40 if key not in WAYPOINTS:
41 print(f"Unknown location: {key}")
42 return
43
44 coords = WAYPOINTS[key]
45 x = coords["x"]
46 y = coords["y"]
47
48 # Construct the Bash Command
49 # We publish to /clicked_point (PointStamped) which the Planner listens to
50 topic_cmd = (
51 f"ros2 topic pub --once /clicked_point geometry_msgs/msg/PointStamped "
52 f"'{{header: {{frame_id: \"map\"}}, point: {{x: {x}, y: {y}, z: 0.0}}}}'"
53 )
54 print(f"Executing: {topic_cmd}")
55 subprocess.run(topic_cmd, shell=True)
56
57# Define the Command Source
58# Simulates a voice command or fleet management instruction
59# Examples: "kitchen", "reception", "station_A"
60command_topic = Topic(name="/user_command", msg_type="String")
61
62# Setup your robot configuration
63my_robot = RobotConfig(
64 model_type=RobotType.DIFFERENTIAL_DRIVE,
65 geometry_type=RobotGeometry.Type.CYLINDER,
66 geometry_params=np.array([0.1, 0.3]),
67 ctrl_vx_limits=LinearCtrlLimits(max_vel=0.2, max_acc=1.5, max_decel=2.5),
68 ctrl_omega_limits=AngularCtrlLimits(
69 max_vel=0.4, max_acc=2.0, max_decel=2.0, max_steer=np.pi / 3)
70)
71
72# Set the robot frames
73robot_frames = RobotFrames(
74 robot_base='base_link',
75 odom='odom',
76 world='map',
77 scan='LDS-01')
78
79# Setup components with default config, inputs and outputs
80planner_config = PlannerConfig(loop_rate=1.0) # 1 Hz
81planner = Planner(component_name="planner", config=planner_config)
82
83# Set Planner goal input to Rviz Clicked point
84goal_topic = Topic(name="/clicked_point", msg_type="PointStamped")
85planner.inputs(goal_point=goal_topic)
86
87# Get a default controller component
88controller = Controller(component_name="controller")
89
90# Configure Controller to use local map instead of direct sensor information
91controller.direct_sensor = False
92
93# Select the primary control algorithm
94controller.algorithm = ControllersID.DWA
95
96# Define the action
97switch_algorithm_action = Action(method=controller.set_algorithm, args=(ControllersID.PURE_PURSUIT,))
98
99# Set the Controller 'algorithm-level' failure fallback
100controller.on_algorithm_fail(action=[Action(controller.restart), switch_algorithm_action], max_retries=1)
101
102# Get the default DriveManager
103driver = DriveManager(component_name="drive_manager")
104
105# Publish Twist or TwistStamped from the DriveManager based on the distribution
106if "ROS_DISTRO" in os.environ and (
107 os.environ["ROS_DISTRO"] in ["rolling", "jazzy", "kilted"]
108):
109 cmd_msg_type : str = "TwistStamped"
110else:
111 cmd_msg_type = "Twist"
112
113driver.outputs(robot_command=Topic(name="/cmd_vel", msg_type=cmd_msg_type))
114
115# Set the DriveManager System-Level failure Fallback
116# Not setting max_retires -> will attempt restarting infinity
117driver.on_system_fail(Action(driver.restart))
118
119# Get a default Local Mapper component
120mapper = LocalMapper(component_name="mapper")
121
122# Define Events
123# 1. Controller Algorithm Failure
124# We set a `on_change` True to trigger the event only when the status changes to algorithm failure the first time
125# i.e. the associated action will not keep triggering while the controller keeps detecting the failure
126event_controller_fail = Event(
127 controller.status_topic.msg.status
128 == ComponentStatus.STATUS_FAILURE_ALGORITHM_LEVEL,
129 keep_event_delay=60.0
130 )
131# 2. Mapper is NOT Healthy
132# We set `handle_once` to True to trigger this event only ONCE during the lifetime oft he system
133event_mapper_fault = Event(
134 mapper_status.msg.status != ComponentStatus.STATUS_HEALTHY,
135 handle_once=True
136)
137
138# Trigger on ANY new message on the command topic, if the local mapper is healthy
139event_command_received = Event(
140 event_condition=(command_topic & (mapper.status_topic.msg.status == ComponentStatus.STATUS_HEALTHY)),
141)
142
143# Define Actions
144# 1. Drive Manager's Unblock method
145# This is a pre-defined method in DriveManager that executes a recovery maneuver
146unblock_action = Action(method=driver.move_to_unblock)
147
148# 2. Action to reconfigure Controller to use direct sensor data
149activate_direct_sensor_mode_action = update_parameter(
150 component=controller, param_name="use_direct_sensor", new_value=True
151)
152
153action_process_command = Action(
154 method=navigate_to_location,
155 # DYNAMIC INJECTION:
156 # We pass 'command_topic.msg.data' as the argument.
157 # Sugarcoat will fetch the actual string ("kitchen") when the event fires.
158 args=(command_topic.msg.data,)
159)
160
161
162# Define your events/action dictionary
163events_actions = {
164 event_controller_fail: unblock_action,
165 event_mapper_fault: activate_direct_sensor_mode_action,
166 event_command_received: action_process_command
167}
168
169# Init a launcher
170launcher = Launcher()
171
172# Pass kompass components to the launcher and register the events
173launcher.kompass(
174 components=[planner, controller, driver, mapper],
175 activate_all_components_on_start=True,
176 multi_processing=True,
177 events_actions=events_actions
178 )
179
180# Set the robot
181launcher.robot = robot_config
182
183# Set the frames
184launcher.frames = frames_config
185
186# Optional Generic Fallback Policy: If any component fails -> restart it with unlimited retries
187# launcher.on_fail(action_name="restart")
188
189# After all configuration is done bringup the stack
190launcher.bringup()