Source code for mcap_protobuf.writer
import time
from io import BufferedWriter
from typing import IO, Any, Dict, Optional, Tuple, Union
import mcap
from mcap.well_known import MessageEncoding
from mcap.writer import CompressionType
from mcap.writer import Writer as McapWriter
from . import __version__
from .schema import register_schema
def _library_identifier():
"""the default value written into MCAP headers by this library."""
mcap_version = getattr(mcap, "__version__", "<=0.0.10")
return f"python mcap-protobuf-support {__version__}; mcap {mcap_version}"
[docs]class Writer:
"""Writer provides a higher-level abstraction for writing Protobuf messages to an
MCAP file.
"""
def __init__(
self,
output: Union[str, IO[Any], BufferedWriter],
chunk_size: int = 1024 * 1024,
compression: CompressionType = CompressionType.ZSTD,
enable_crcs: bool = True,
):
self._writer = McapWriter(
output,
chunk_size=chunk_size,
compression=compression,
enable_crcs=enable_crcs,
)
self._schemas: Dict[str, Tuple[int, str]] = {}
self._channels: Dict[str, int] = {}
self._finished = False
self._writer.start(library=_library_identifier())
[docs] def write_message(
self,
topic: str,
message: Any,
log_time: Optional[int] = None,
publish_time: Optional[int] = None,
sequence: int = 0,
):
"""Writes a message to an MCAP file.
:param topic: the topic that this message was originally published on.
:param message: a Protobuf object to write into the MCAP.
:param log_time: unix nanosecond timestamp of when this message was written to the MCAP.
:param publish_time: unix nanosecond timestamp of when this message was originally
published.
:param sequence: an optional sequence count for messages on this topic.
"""
msg_typename: str = type(message).DESCRIPTOR.full_name
if topic in self._channels:
channel_id = self._channels[topic]
schema_id, schema_name = self._schemas[topic]
if msg_typename != schema_name:
raise ValueError(
f"topic '{topic}' has type {schema_name}, cannot write a {msg_typename}"
)
else:
schema_id = register_schema(self._writer, type(message))
self._schemas[topic] = (schema_id, msg_typename)
channel_id = self._writer.register_channel(
topic=topic,
message_encoding=MessageEncoding.Protobuf,
schema_id=schema_id,
)
self._channels[topic] = channel_id
if log_time is None:
log_time = time.time_ns()
if publish_time is None:
publish_time = time.time_ns()
self._writer.add_message(
channel_id=channel_id,
log_time=log_time,
data=message.SerializeToString(), # type: ignore
publish_time=publish_time,
sequence=sequence,
)
[docs] def finish(self):
"""Writes the index and footer to the MCAP file."""
if not self._finished:
self._writer.finish()
self._finished = True
def __enter__(self):
return self
def __exit__(self, exc_: Any, exc_type_: Any, tb_: Any):
self.finish()