Using rosbag2_py to use MCAP with ROS 2

You can use the rosbag2_py library along with the MCAP Storage Plugin to interact with MCAP files in ROS2 packages.

Workspace Setup

To get started, you’ll need to install the following ROS packages for your distribution:

ros-$ROS_DISTRO-ros-base
ros-$ROS_DISTRO-ros2bag
ros-$ROS_DISTRO-rosbag2-transport
ros-$ROS_DISTRO-rosbag2-storage-mcap

Writing Messages

"""script that writes ROS2 messages to MCAP using the rosbag2_py api."""
import argparse

import rosbag2_py
from rclpy.serialization import serialize_message
from std_msgs.msg import String

TOPIC_NAME = "/chatter"


def write_to(output_path: str):
    writer = rosbag2_py.SequentialWriter()
    writer.open(
        rosbag2_py.StorageOptions(uri=output_path, storage_id="mcap"),
        rosbag2_py.ConverterOptions(
            input_serialization_format="cdr", output_serialization_format="cdr"
        ),
    )

    writer.create_topic(
        rosbag2_py.TopicMetadata(
            name=TOPIC_NAME, type="std_msgs/msg/String", serialization_format="cdr"
        )
    )

    start_time = 0
    for i in range(10):
        msg = String()
        msg.data = f"Chatter #{i}"
        timestamp = start_time + (i * 100)
        writer.write(TOPIC_NAME, serialize_message(msg), timestamp)

    del writer


def main():
    parser = argparse.ArgumentParser(description=__doc__)
    parser.add_argument("output", help="output directory to create and write to")

    args = parser.parse_args()
    write_to(args.output)


if __name__ == "__main__":
    main()

Reading Messages

"""script that reads ROS2 messages from an MCAP bag using the rosbag2_py API."""
import argparse

import rosbag2_py
from rclpy.serialization import deserialize_message
from rosidl_runtime_py.utilities import get_message


def read_messages(input_bag: str):
    reader = rosbag2_py.SequentialReader()
    reader.open(
        rosbag2_py.StorageOptions(uri=input_bag, storage_id="mcap"),
        rosbag2_py.ConverterOptions(
            input_serialization_format="cdr", output_serialization_format="cdr"
        ),
    )

    topic_types = reader.get_all_topics_and_types()

    def typename(topic_name):
        for topic_type in topic_types:
            if topic_type.name == topic_name:
                return topic_type.type
        raise ValueError(f"topic {topic_name} not in bag")

    while reader.has_next():
        topic, data, timestamp = reader.read_next()
        msg_type = get_message(typename(topic))
        msg = deserialize_message(data, msg_type)
        yield topic, msg, timestamp
    del reader


def main():
    parser = argparse.ArgumentParser(description=__doc__)
    parser.add_argument(
        "input", help="input bag path (folder or filepath) to read from"
    )

    args = parser.parse_args()
    for topic, msg, timestamp in read_messages(args.input):
        print(f"{topic} ({type(msg).__name__}) [{timestamp}]: '{msg.data}'")


if __name__ == "__main__":
    main()