kompass.components.component

Module Contents

Classes

Component

Component is the main execution entity in Kompass, every Component is equivalent to a ROS2 Lifecycle Node.

API

class kompass.components.component.Component(component_name: str, config: Optional[kompass.config.ComponentConfig] = None, config_file: Optional[str] = None, inputs: Optional[Dict[kompass.components.defaults.TopicsKeys, Union[kompass.components.ros.Topic, List[kompass.components.ros.Topic], None]]] = None, outputs: Optional[Dict[kompass.components.defaults.TopicsKeys, Union[kompass.components.ros.Topic, List[kompass.components.ros.Topic], None]]] = None, fallbacks: Optional[ros_sugar.core.ComponentFallbacks] = None, allowed_inputs: Optional[Dict[str, ros_sugar.io.AllowedTopics]] = None, allowed_outputs: Optional[Dict[str, ros_sugar.io.AllowedTopics]] = None, allowed_run_types: Optional[List[kompass.config.ComponentRunType]] = None, callback_group=None, **kwargs)

Bases: ros_sugar.core.BaseComponent

Component is the main execution entity in Kompass, every Component is equivalent to a ROS2 Lifecycle Node.

A Component requires a set of input and/or output topic(s). All the functionalities implemented in ROS2 nodes can be implemented in the Component. Inputs/Outputs are attrs classes with the attribute name representing a unique input/output key name and the value equal to the Topic.

  • Example:

        from kompass.topic import Topic, create_topics_config
        from kompass.components.component import Component
    
        NewTopicsClass = create_topics_config(
                            "ClassName",
                            input_1=Topic(name="/plan", msg_type="Path"),
                            input_2=Topic(name="/location", msg_type="Odometry"),
                        )
        inputs = NewTopicsClass()
        my_component = Component(component_name='my_node', inputs=inputs)
    

Components in Kompass can be defined to accept only restricted types of inputs/outputs to help lock the functionality of a specific Component implementation.

  • Example:

        from kompass.topic import AllowedTopic, RestrictedTopicsConfig
    
        class AllowedInputs(RestrictedTopicsConfig):
            PLAN = AllowedTopic(key="input_1", types=["Path"])
            LOCATION = AllowedTopic(key="input_2", types=["Odometry", "PoseStamped", "Pose"])
    
        my_component = Component(component_name='my_node', allowed_inputs=AllowedInputs)
    
custom_on_configure()

Component custom configuration method to set the core debug level

property robot: kompass.config.RobotConfig

Getter of robot config

Returns:

Robot configuration

Return type:

RobotConfig

property run_type: kompass.config.ComponentRunType

Component run type: Timed, ActionServer or Event

Returns:

Timed, ActionServer or Server

Return type:

str

property inputs_keys: List[kompass.components.defaults.TopicsKeys]

Getter of component inputs key names that should be used in the inputs dictionary

property outputs_keys: List[kompass.components.defaults.TopicsKeys]

Getter of component outputs key names that should be used in the inputs dictionary

inputs(**kwargs)

Set component input streams (topics) : kwargs[topic_keyword]=Topic()

outputs(**kwargs)

Set component output streams (topics)

config_from_file(config_file: str)

Configure component from file

Parameters:

config_file (str) – Path to file (yaml, json, toml)

property odom_tf_listener: Optional[ros_sugar.tf.TFListener]

Gets a transform listener for Odometry (from odom to world)

Returns:

Return type:

TFListener

property scan_tf_listener: ros_sugar.tf.TFListener

Gets a transform listener for LaserScan (from scan to robot base)

Returns:

Return type:

TFListener

property depth_tf_listener: ros_sugar.tf.TFListener

Gets a transform listener for LaserScan (from scan to robot base)

Returns:

Return type:

TFListener

property pc_tf_listener: ros_sugar.tf.TFListener

Gets a transform listener for LaserScan (from scan to robot base)

Returns:

Return type:

TFListener

get_transform_listener(src_frame: str, goal_frame: str) ros_sugar.tf.TFListener

Gets a transform listener

Parameters:
  • src_frame (str) – Source coordinates frame

  • goal_frame (str) – Goal coordinates frame

Returns:

TF listener object

Return type:

TFListener

in_topic_name(key: Union[str, kompass.components.defaults.TopicsKeys]) Union[str, List[str], None]

Get the topic(s) name(s) corresponding to an input key name

Parameters:

key (str) – Input key name

Returns:

Topic(s) name(s)

Return type:

Union[str, List[str], None]

out_topic_name(key: Union[str, kompass.components.defaults.TopicsKeys]) Union[str, List[str], None]

Get the topic(s) name(s) corresponding to an output key name

Parameters:

key (str) – Output key name

Returns:

Topic(s) name(s)

Return type:

Union[str, List[str], None]

get_in_topic(key: Union[str, kompass.components.defaults.TopicsKeys]) Union[kompass.components.ros.Topic, List[kompass.components.ros.Topic], None]

Get the topic(s) corresponding to an input key name

Parameters:

key (str) – Input key name

Returns:

Topic(s)

Return type:

Union[Topic, List[Topic], None]

get_out_topic(key: Union[str, kompass.components.defaults.TopicsKeys]) Union[kompass.components.ros.Topic, List[kompass.components.ros.Topic], None]

Get the topic(s) corresponding to an output key name

Parameters:

key (str) – Output key name

Returns:

Topic(s)

Return type:

Union[Topic, List[Topic], None]

get_callback(key: Union[str, kompass.components.defaults.TopicsKeys], idx: int = 0) Optional[kompass.callbacks.GenericCallback]

Get callback with given input key name

Parameters:
  • key (str) – Input key name

  • idx (int, optional) – Index of the input of multiple inputs correspond to the same key, defaults to 0

Raises:

KeyError – If key is not found in component inputs

Returns:

Callback object

Return type:

GenericCallback

get_publisher(key: Union[str, kompass.components.defaults.TopicsKeys], idx: int = 0) ros_sugar.io.Publisher

Get publisher with given output key name

Parameters:
  • key (str) – Output topic key name

  • idx (int, optional) – Index of the output of multiple inputs correspond to the same key, defaults to 0

Raises:

KeyError – If key is not found in component outputs

Returns:

Publisher object

Return type:

Publisher

callbacks_inputs_check(inputs_to_check: Optional[List[str]] = None, inputs_to_exclude: Optional[List[str]] = None) bool

Check that all node inputs are provided before executing callback