Source code for mcap_ros2.writer

import time
from io import BufferedWriter
from typing import IO, Any, Dict, Optional, Union

import mcap
from mcap.exceptions import McapError
from mcap.records import Schema
from mcap.well_known import SchemaEncoding
from mcap.writer import CompressionType
from mcap.writer import Writer as McapWriter

from . import __version__
from ._dynamic import EncoderFunction, serialize_dynamic


[docs]class McapROS2WriteError(McapError): """Raised if a ROS2 message cannot be encoded to CDR with a given schema.""" pass
def _library_identifier(): mcap_version = getattr(mcap, "__version__", "<=0.0.10") return f"mcap-ros2-support {__version__}; mcap {mcap_version}"
[docs]class Writer: def __init__( self, output: Union[str, IO[Any], BufferedWriter], chunk_size: int = 1024 * 1024, compression: CompressionType = CompressionType.ZSTD, enable_crcs: bool = True, ): self._writer = McapWriter( output=output, chunk_size=chunk_size, compression=compression, enable_crcs=enable_crcs, ) self._encoders: Dict[int, EncoderFunction] = {} self._channel_ids: Dict[str, int] = {} self._writer.start(profile="ros2", library=_library_identifier()) self._finished = False
[docs] def finish(self): """Finishes writing to the MCAP stream. This must be called before the stream is closed.""" if not self._finished: self._writer.finish() self._finished = True
[docs] def register_msgdef(self, datatype: str, msgdef_text: str) -> Schema: """Write a Schema record for a ROS2 message definition.""" msgdef_data = msgdef_text.encode() schema_id = self._writer.register_schema( datatype, SchemaEncoding.ROS2, msgdef_data ) return Schema( id=schema_id, name=datatype, encoding=SchemaEncoding.ROS2, data=msgdef_data )
[docs] def write_message( self, topic: str, schema: Schema, message: Any, log_time: Optional[int] = None, publish_time: Optional[int] = None, sequence: int = 0, ): """ Write a ROS2 Message record, automatically registering a channel as needed. :param topic: The topic of the message. :param message: The message to write. :param log_time: The time at which the message was logged as a nanosecond UNIX timestamp. Will default to the current time if not specified. :param publish_time: The time at which the message was published as a nanosecond UNIX timestamp. Will default to ``log_time`` if not specified. :param sequence: An optional sequence number. """ encoder = self._encoders.get(schema.id) if encoder is None: if schema.encoding != SchemaEncoding.ROS2: raise McapROS2WriteError( f'can\'t parse schema with encoding "{schema.encoding}"' ) type_dict = serialize_dynamic( # type: ignore schema.name, schema.data.decode() ) # Check if schema.name is in type_dict if schema.name not in type_dict: raise McapROS2WriteError(f'schema parsing failed for "{schema.name}"') encoder = type_dict[schema.name] self._encoders[schema.id] = encoder if topic not in self._channel_ids: channel_id = self._writer.register_channel( topic=topic, message_encoding="cdr", schema_id=schema.id, ) self._channel_ids[topic] = channel_id channel_id = self._channel_ids[topic] data = encoder(message) if log_time is None: log_time = time.time_ns() if publish_time is None: publish_time = log_time self._writer.add_message( channel_id=channel_id, log_time=log_time, publish_time=publish_time, sequence=sequence, data=data, )
def __enter__(self): """Context manager support.""" return self def __exit__(self, exc_: Any, exc_type_: Any, tb_: Any): """Call finish() on exit.""" self.finish()