Source code for mcap_ros1.decoder
import warnings
from typing import Any, Callable, Dict, Optional, Type
try:
# If the user has genpy on their PATH from an existing ROS1 environment, use that.
# This ensures that `isinstance(msg, genpy.Message)` succeeds on objects returned
# by decode().
from genpy import dynamic # type: ignore
except ImportError:
from ._vendor.genpy import dynamic # type: ignore
from mcap.decoder import DecoderFactory as McapDecoderFactory
from mcap.exceptions import McapError
from mcap.records import Message, Schema
from mcap.well_known import MessageEncoding, SchemaEncoding
[docs]class McapROS1DecodeError(McapError):
"""Raised if a MCAP message record cannot be decoded as a ROS1 message."""
pass
[docs]class DecoderFactory(McapDecoderFactory):
"""Provides functionality to an :py:class:`~mcap.reader.McapReader` to decode ROS 1 messages.
Requires a valid `ros1msg` schema to decode messages.
"""
def __init__(self) -> None:
self._types: Dict[int, Type[Any]] = {}
[docs] def decoder_for(
self, message_encoding: str, schema: Optional[Schema]
) -> Optional[Callable[[bytes], Any]]:
if (
message_encoding != MessageEncoding.ROS1
or schema is None
or schema.encoding != SchemaEncoding.ROS1
):
return None
generated_type = self._types.get(schema.id)
if generated_type is None:
type_dict: Dict[str, Type[Any]] = dynamic.generate_dynamic( # type: ignore
schema.name, schema.data.decode()
)
generated_type = type_dict[schema.name]
self._types[schema.id] = generated_type
def decoder(data: bytes):
ros_msg = generated_type()
ros_msg.deserialize(data)
return ros_msg
return decoder
[docs]class Decoder:
"""Decodes ROS 1 messages.
.. deprecated:: 0.7.0
Use :py:class:`~mcap_ros1.decoder.DecoderFactory` with :py:class:`~mcap.reader.McapReader`
instead.
"""
def __init__(self):
warnings.warn(
"""The :py:class:`mcap_ros1.decoder.Decoder` class is deprecated.
For similar functionality, instantiate the :py:class:`mcap.reader.McapReader` with a
:py:class:`mcap_ros1.decoder.DecoderFactory` instance.""",
DeprecationWarning,
)
self._decoder_factory = DecoderFactory()
[docs] def decode(self, schema: Schema, message: Message) -> Any:
decoder = self._decoder_factory.decoder_for(MessageEncoding.ROS1, schema)
assert decoder is not None, "failed to construct a ROS1 decoder"
return decoder(message.data)