Working with Protobuf Messages¶
Reader Example¶
import sys
from mcap_protobuf.decoder import DecoderFactory
from mcap.reader import make_reader
def main():
with open(sys.argv[1], "rb") as f:
reader = make_reader(f, decoder_factories=[DecoderFactory()])
for schema, channel, message, proto_msg in reader.iter_decoded_messages():
print(f"{channel.topic} {schema.name} [{message.log_time}]: {proto_msg}")
if __name__ == "__main__":
main()
Writer Example¶
import sys
from complex_message_pb2 import ComplexMessage
from mcap_protobuf.writer import Writer
from simple_message_pb2 import SimpleMessage
def main():
with open(sys.argv[1], "wb") as f, Writer(f) as mcap_writer:
for i in range(1, 11):
mcap_writer.write_message(
topic="/simple_messages",
message=SimpleMessage(data=f"Hello MCAP protobuf world #{i}!"),
log_time=i * 1000,
publish_time=i * 1000,
)
complex_message = ComplexMessage(
fieldA=f"Field A {i}", fieldB=f"Field B {i}"
)
mcap_writer.write_message(
topic="/complex_messages",
message=complex_message,
log_time=i * 1000,
publish_time=i * 1000,
)
if __name__ == "__main__":
main()