Source code for mcap_ros2.decoder

"""Decoder class for decoding ROS2 messages from MCAP files."""

import warnings
from typing import Any, Callable, Dict, Optional

from mcap.decoder import DecoderFactory as McapDecoderFactory
from mcap.exceptions import McapError
from mcap.records import Message, Schema
from mcap.well_known import MessageEncoding, SchemaEncoding

from ._dynamic import DecodedMessage, DecoderFunction, generate_dynamic


[docs]class McapROS2DecodeError(McapError): """Raised if a MCAP message record cannot be decoded as a ROS2 message.""" pass
[docs]class DecoderFactory(McapDecoderFactory): """Provides functionality to an :py:class:`~mcap.reader.McapReader` to decode CDR-encoded messages. Requires valid `ros2msg` schema to decode messages. Schemas written in IDL are not currently supported. """ def __init__(self) -> None: self._decoders: Dict[int, DecoderFunction] = {}
[docs] def decoder_for( self, message_encoding: str, schema: Optional[Schema] ) -> Optional[Callable[[bytes], DecodedMessage]]: if ( message_encoding != MessageEncoding.CDR or schema is None or schema.encoding != SchemaEncoding.ROS2 ): return None decoder = self._decoders.get(schema.id) if decoder is None: type_dict = generate_dynamic( # type: ignore schema.name, schema.data.decode() ) if schema.name not in type_dict: raise McapROS2DecodeError(f'schema parsing failed for "{schema.name}"') decoder = type_dict[schema.name] self._decoders[schema.id] = decoder return decoder
[docs]class Decoder: """Decodes ROS 2 messages. .. deprecated:: 0.5.0 Use :py:class:`~mcap_ros2.decoder.DecoderFactory` with :py:class:`~mcap.reader.McapReader` instead. """ def __init__(self): warnings.warn( """The `mcap_ros2.decoder.Decoder` class is deprecated. For similar functionality, instantiate the `mcap.reader.McapReader` with a `mcap_ros2.decoder.DecoderFactory` instance.""", DeprecationWarning, ) self._decoder_factory = DecoderFactory()
[docs] def decode(self, schema: Schema, message: Message) -> Any: decoder = self._decoder_factory.decoder_for(MessageEncoding.CDR, schema) assert decoder is not None return decoder(message.data)