Working with ROS 2 Messages

These examples demonstrate writing ROS2 messages to an MCAP recording and reading messages without a ROS2 installation. However, you will need to supply the correct datatype and concatenated message definition when writing. When importing message definitions in Python from ROS2, you can use message._type and message.__class__._full_text.

Alternatively, you can use the rosbag2_py library along with the MCAP Storage Plugin to interact with MCAP files in ROS2 packages.

Writing Messages

"""This example demonstrates creating a ROS2 MCAP file without a ROS2 environment."""

import sys

from mcap_ros2.writer import Writer as McapWriter

# In a ROS2 environment, you can access the schema name (datatype) of a message
# instance using `message._type` and the schema text (message definition) using
# `message.__class__._full_text`. We hardcode these values here to avoid
# requiring a ROS2 environment
SCHEMA_NAME = "tf2_msgs/TFMessage"
SCHEMA_TEXT = """\
geometry_msgs/TransformStamped[] transforms
================================================================================
MSG: geometry_msgs/TransformStamped
std_msgs/Header header
string child_frame_id # the frame id of the child frame
Transform transform
================================================================================
MSG: std_msgs/Header
builtin_interfaces/Time stamp
string frame_id
================================================================================
MSG: geometry_msgs/Transform
Vector3 translation
Quaternion rotation
================================================================================
MSG: geometry_msgs/Vector3
float64 x
float64 y
float64 z
================================================================================
MSG: geometry_msgs/Quaternion
float64 x
float64 y
float64 z
float64 w"""

with open(sys.argv[1], "wb") as f:
    writer = McapWriter(f)
    schema = writer.register_msgdef(SCHEMA_NAME, SCHEMA_TEXT)

    for i in range(0, 10):
        writer.write_message(
            topic="/tf",
            schema=schema,
            message={
                "transforms": [
                    {
                        "header": {
                            "stamp": {"sec": 0, "nanosec": i},
                            "frame_id": "parent_frame",
                        },
                        "child_frame_id": "child_frame",
                        "transform": {
                            "translation": {"x": 1.0, "y": 2.0, "z": 3.0},
                            "rotation": {"x": 0.0, "y": 0.0, "z": 0.0, "w": 1.0},
                        },
                    }
                ]
            },
            log_time=i,
            publish_time=i,
            sequence=i,
        )
    writer.finish()

Reading Messages

import sys

from mcap_ros2.decoder import DecoderFactory

from mcap.reader import make_reader


def main():
    with open(sys.argv[1], "rb") as f:
        reader = make_reader(f, decoder_factories=[DecoderFactory()])
        for schema, channel, message, ros_msg in reader.iter_decoded_messages():
            print(f"{channel.topic} {schema.name} [{message.log_time}]: {ros_msg}")


if __name__ == "__main__":
    main()