Source code for mcap_protobuf.decoder
import warnings
from collections import Counter
from typing import Any, Callable, Dict, Optional, Type
from google.protobuf.descriptor_pb2 import FileDescriptorProto, FileDescriptorSet
from google.protobuf.descriptor_pool import DescriptorPool
from google.protobuf.message_factory import GetMessageClassesForFiles
from mcap.decoder import DecoderFactory as McapDecoderFactory
from mcap.exceptions import McapError
from mcap.records import Message, Schema
from mcap.well_known import MessageEncoding, SchemaEncoding
[docs]class McapProtobufDecodeError(McapError):
"""Raised when a Message record cannot be decoded as a Protobuf message."""
pass
[docs]class DecoderFactory(McapDecoderFactory):
"""Provides functionality to an :py:class:`~mcap.reader.McapReader` to decode protobuf
messages. Requires valid `protobuf` schemas to decode messages.
"""
def __init__(self) -> None:
self._types: Dict[int, Type[Any]] = {}
[docs] def decoder_for(
self, message_encoding: str, schema: Optional[Schema]
) -> Optional[Callable[[bytes], Any]]:
if (
message_encoding != MessageEncoding.Protobuf
or schema is None
or schema.encoding != SchemaEncoding.Protobuf
):
return None
generated = self._types.get(schema.id)
if generated is None:
fds = FileDescriptorSet.FromString(schema.data)
for name, count in Counter(fd.name for fd in fds.file).most_common(1):
if count > 1:
raise McapError(
f"FileDescriptorSet contains {count} file descriptors for {name}"
)
pool = DescriptorPool()
descriptor_by_name = {fd.name: fd for fd in fds.file}
def _add(fd: FileDescriptorProto):
for dependency in fd.dependency:
if dependency in descriptor_by_name:
_add(descriptor_by_name.pop(dependency))
pool.Add(fd)
while descriptor_by_name:
_add(descriptor_by_name.popitem()[1])
messages = GetMessageClassesForFiles([fd.name for fd in fds.file], pool)
for name, klass in messages.items():
if name == schema.name:
self._types[schema.id] = klass
generated = klass
if generated is None:
raise McapError(
f"FileDescriptorSet for type {schema.name} is missing that schema"
)
def decoder(data: bytes) -> Any:
proto_msg = generated()
proto_msg.ParseFromString(data)
return proto_msg
return decoder
[docs]class Decoder:
"""Decodes Protobuf messages.
.. deprecated:: 0.3.0
Use :py:class:`~mcap_protobuf.decoder.DecoderFactory` with :py:class:`~mcap.reader.McapReader`
instead.
"""
def __init__(self):
warnings.warn(
"""The `mcap_protobuf.decoder.Decoder` class is deprecated.
For similar functionality, instantiate the `mcap.reader.McapReader` with a
`mcap_protobuf.decoder.DecoderFactory` instance.""",
DeprecationWarning,
)
self._decoder_factory = DecoderFactory()
[docs] def decode(self, schema: Schema, message: Message) -> Any:
decoder = self._decoder_factory.decoder_for(MessageEncoding.Protobuf, schema)
assert decoder is not None, "failed to construct a Protobuf decoder"
return decoder(message.data)