kompass.components.component¶
Module Contents¶
Classes¶
Component is the main execution entity in Kompass, every Component is equivalent to a ROS2 Lifecycle Node. |
API¶
- class kompass.components.component.Component(component_name: str, config: Optional[kompass.config.ComponentConfig] = None, config_file: Optional[str] = None, inputs: Optional[Dict[kompass.components.defaults.TopicsKeys, Union[kompass.components.ros.Topic, List[kompass.components.ros.Topic], None]]] = None, outputs: Optional[Dict[kompass.components.defaults.TopicsKeys, Union[kompass.components.ros.Topic, List[kompass.components.ros.Topic], None]]] = None, fallbacks: Optional[ros_sugar.core.ComponentFallbacks] = None, allowed_inputs: Optional[Dict[str, ros_sugar.io.AllowedTopics]] = None, allowed_outputs: Optional[Dict[str, ros_sugar.io.AllowedTopics]] = None, allowed_run_types: Optional[List[kompass.config.ComponentRunType]] = None, callback_group=None, **kwargs)¶
Bases:
ros_sugar.core.BaseComponentComponent is the main execution entity in Kompass, every Component is equivalent to a ROS2 Lifecycle Node.
A Component requires a set of input and/or output topic(s). All the functionalities implemented in ROS2 nodes can be implemented in the Component. Inputs/Outputs are attrs classes with the attribute name representing a unique input/output key name and the value equal to the Topic.
Example:
from kompass.topic import Topic, create_topics_config from kompass.components.component import Component NewTopicsClass = create_topics_config( "ClassName", input_1=Topic(name="/plan", msg_type="Path"), input_2=Topic(name="/location", msg_type="Odometry"), ) inputs = NewTopicsClass() my_component = Component(component_name='my_node', inputs=inputs)
Components in Kompass can be defined to accept only restricted types of inputs/outputs to help lock the functionality of a specific Component implementation.
Example:
from kompass.topic import AllowedTopic, RestrictedTopicsConfig class AllowedInputs(RestrictedTopicsConfig): PLAN = AllowedTopic(key="input_1", types=["Path"]) LOCATION = AllowedTopic(key="input_2", types=["Odometry", "PoseStamped", "Pose"]) my_component = Component(component_name='my_node', allowed_inputs=AllowedInputs)
- custom_on_configure()¶
Component custom configuration method to set the core debug level
- property robot: kompass.config.RobotConfig¶
Getter of robot config
- Returns:
Robot configuration
- Return type:
- property run_type: kompass.config.ComponentRunType¶
Component run type: Timed, ActionServer or Event
- Returns:
Timed, ActionServer or Server
- Return type:
str
- property inputs_keys: List[kompass.components.defaults.TopicsKeys]¶
Getter of component inputs key names that should be used in the inputs dictionary
- property outputs_keys: List[kompass.components.defaults.TopicsKeys]¶
Getter of component outputs key names that should be used in the inputs dictionary
- inputs(**kwargs)¶
Set component input streams (topics) : kwargs[topic_keyword]=Topic()
- outputs(**kwargs)¶
Set component output streams (topics)
- config_from_file(config_file: str)¶
Configure component from file
- Parameters:
config_file (str) – Path to file (yaml, json, toml)
- property odom_tf_listener: Optional[ros_sugar.tf.TFListener]¶
Gets a transform listener for Odometry (from odom to world)
- Returns:
- Return type:
TFListener
- property scan_tf_listener: ros_sugar.tf.TFListener¶
Gets a transform listener for LaserScan (from scan to robot base)
- Returns:
- Return type:
TFListener
- property depth_tf_listener: ros_sugar.tf.TFListener¶
Gets a transform listener for LaserScan (from scan to robot base)
- Returns:
- Return type:
TFListener
- property pc_tf_listener: ros_sugar.tf.TFListener¶
Gets a transform listener for LaserScan (from scan to robot base)
- Returns:
- Return type:
TFListener
- get_transform_listener(src_frame: str, goal_frame: str) ros_sugar.tf.TFListener¶
Gets a transform listener
- Parameters:
src_frame (str) – Source coordinates frame
goal_frame (str) – Goal coordinates frame
- Returns:
TF listener object
- Return type:
TFListener
- in_topic_name(key: Union[str, kompass.components.defaults.TopicsKeys]) Union[str, List[str], None]¶
Get the topic(s) name(s) corresponding to an input key name
- Parameters:
key (str) – Input key name
- Returns:
Topic(s) name(s)
- Return type:
Union[str, List[str], None]
- out_topic_name(key: Union[str, kompass.components.defaults.TopicsKeys]) Union[str, List[str], None]¶
Get the topic(s) name(s) corresponding to an output key name
- Parameters:
key (str) – Output key name
- Returns:
Topic(s) name(s)
- Return type:
Union[str, List[str], None]
- get_in_topic(key: Union[str, kompass.components.defaults.TopicsKeys]) Union[kompass.components.ros.Topic, List[kompass.components.ros.Topic], None]¶
Get the topic(s) corresponding to an input key name
- get_out_topic(key: Union[str, kompass.components.defaults.TopicsKeys]) Union[kompass.components.ros.Topic, List[kompass.components.ros.Topic], None]¶
Get the topic(s) corresponding to an output key name
- get_callback(key: Union[str, kompass.components.defaults.TopicsKeys], idx: int = 0) Optional[kompass.callbacks.GenericCallback]¶
Get callback with given input key name
- Parameters:
key (str) – Input key name
idx (int, optional) – Index of the input of multiple inputs correspond to the same key, defaults to 0
- Raises:
KeyError – If key is not found in component inputs
- Returns:
Callback object
- Return type:
GenericCallback
- get_publisher(key: Union[str, kompass.components.defaults.TopicsKeys], idx: int = 0) ros_sugar.io.Publisher¶
Get publisher with given output key name
- Parameters:
key (str) – Output topic key name
idx (int, optional) – Index of the output of multiple inputs correspond to the same key, defaults to 0
- Raises:
KeyError – If key is not found in component outputs
- Returns:
Publisher object
- Return type:
Publisher
- callbacks_inputs_check(inputs_to_check: Optional[List[str]] = None, inputs_to_exclude: Optional[List[str]] = None) bool¶
Check that all node inputs are provided before executing callback