Working with ROS 2 Messages¶
These examples demonstrate writing ROS2 messages to an MCAP recording and reading messages without a ROS2 installation. However, you will need to supply the correct datatype and concatenated message definition when writing. When importing message definitions in Python from ROS2, you can use message._type and message.__class__._full_text.
Alternatively, you can use the rosbag2_py library along with the MCAP Storage Plugin to interact with MCAP files in ROS2 packages.
Writing Messages¶
"""This example demonstrates creating a ROS2 MCAP file without a ROS2 environment."""
import sys
from mcap_ros2.writer import Writer as McapWriter
# In a ROS2 environment, you can access the schema name (datatype) of a message
# instance using `message._type` and the schema text (message definition) using
# `message.__class__._full_text`. We hardcode these values here to avoid
# requiring a ROS2 environment
SCHEMA_NAME = "tf2_msgs/TFMessage"
SCHEMA_TEXT = """\
geometry_msgs/TransformStamped[] transforms
================================================================================
MSG: geometry_msgs/TransformStamped
std_msgs/Header header
string child_frame_id # the frame id of the child frame
Transform transform
================================================================================
MSG: std_msgs/Header
builtin_interfaces/Time stamp
string frame_id
================================================================================
MSG: geometry_msgs/Transform
Vector3 translation
Quaternion rotation
================================================================================
MSG: geometry_msgs/Vector3
float64 x
float64 y
float64 z
================================================================================
MSG: geometry_msgs/Quaternion
float64 x
float64 y
float64 z
float64 w"""
with open(sys.argv[1], "wb") as f:
writer = McapWriter(f)
schema = writer.register_msgdef(SCHEMA_NAME, SCHEMA_TEXT)
for i in range(0, 10):
writer.write_message(
topic="/tf",
schema=schema,
message={
"transforms": [
{
"header": {
"stamp": {"sec": 0, "nanosec": i},
"frame_id": "parent_frame",
},
"child_frame_id": "child_frame",
"transform": {
"translation": {"x": 1.0, "y": 2.0, "z": 3.0},
"rotation": {"x": 0.0, "y": 0.0, "z": 0.0, "w": 1.0},
},
}
]
},
log_time=i,
publish_time=i,
sequence=i,
)
writer.finish()
Reading Messages¶
import sys
from mcap_ros2.decoder import DecoderFactory
from mcap.reader import make_reader
def main():
with open(sys.argv[1], "rb") as f:
reader = make_reader(f, decoder_factories=[DecoderFactory()])
for schema, channel, message, ros_msg in reader.iter_decoded_messages():
print(f"{channel.topic} {schema.name} [{message.log_time}]: {ros_msg}")
if __name__ == "__main__":
main()