Using rosbag2_py
to use MCAP with ROS 2¶
You can use the rosbag2_py library along with the MCAP Storage Plugin to interact with MCAP files in ROS2 packages.
Workspace Setup¶
To get started, you’ll need to install the following ROS packages for your distribution:
ros-$ROS_DISTRO-ros-base
ros-$ROS_DISTRO-ros2bag
ros-$ROS_DISTRO-rosbag2-transport
ros-$ROS_DISTRO-rosbag2-storage-mcap
Writing Messages¶
"""script that writes ROS2 messages to MCAP using the rosbag2_py api."""
import argparse
import rosbag2_py
from rclpy.serialization import serialize_message
from std_msgs.msg import String
TOPIC_NAME = "/chatter"
def write_to(output_path: str):
writer = rosbag2_py.SequentialWriter()
writer.open(
rosbag2_py.StorageOptions(uri=output_path, storage_id="mcap"),
rosbag2_py.ConverterOptions(
input_serialization_format="cdr", output_serialization_format="cdr"
),
)
writer.create_topic(
rosbag2_py.TopicMetadata(
name=TOPIC_NAME, type="std_msgs/msg/String", serialization_format="cdr"
)
)
start_time = 0
for i in range(10):
msg = String()
msg.data = f"Chatter #{i}"
timestamp = start_time + (i * 100)
writer.write(TOPIC_NAME, serialize_message(msg), timestamp)
del writer
def main():
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("output", help="output directory to create and write to")
args = parser.parse_args()
write_to(args.output)
if __name__ == "__main__":
main()
Reading Messages¶
"""script that reads ROS2 messages from an MCAP bag using the rosbag2_py API."""
import argparse
import rosbag2_py
from rclpy.serialization import deserialize_message
from rosidl_runtime_py.utilities import get_message
def read_messages(input_bag: str):
reader = rosbag2_py.SequentialReader()
reader.open(
rosbag2_py.StorageOptions(uri=input_bag, storage_id="mcap"),
rosbag2_py.ConverterOptions(
input_serialization_format="cdr", output_serialization_format="cdr"
),
)
topic_types = reader.get_all_topics_and_types()
def typename(topic_name):
for topic_type in topic_types:
if topic_type.name == topic_name:
return topic_type.type
raise ValueError(f"topic {topic_name} not in bag")
while reader.has_next():
topic, data, timestamp = reader.read_next()
msg_type = get_message(typename(topic))
msg = deserialize_message(data, msg_type)
yield topic, msg, timestamp
del reader
def main():
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"input", help="input bag path (folder or filepath) to read from"
)
args = parser.parse_args()
for topic, msg, timestamp in read_messages(args.input):
print(f"{topic} ({type(msg).__name__}) [{timestamp}]: '{msg.data}'")
if __name__ == "__main__":
main()