Source code for mcap.data_stream

import struct
import zlib
from io import BytesIO
from typing import IO, Optional

from .exceptions import EndOfFile
from .opcode import Opcode


[docs]class ReadDataStream: def __init__(self, stream: IO[bytes], calculate_crc: bool = False): self._count = 0 self._stream = stream self._crc: Optional[int] = None if calculate_crc: self._crc = 0 @property def count(self) -> int: return self._count
[docs] def read(self, length: int) -> bytes: if length == 0: return b"" data = self._stream.read(length) self._count += len(data) if self._crc is not None: self._crc = zlib.crc32(data, self._crc) if data == b"": raise EndOfFile() return data
[docs] def checksum(self) -> int: if self._crc is not None: return self._crc else: raise RuntimeError("requested checksum where calculate_crc == false")
[docs] def read1(self) -> int: [value] = struct.unpack("<B", self.read(1)) return value
[docs] def read2(self) -> int: [value] = struct.unpack("<H", self.read(2)) return value
[docs] def read4(self) -> int: [value] = struct.unpack("<I", self.read(4)) return value
[docs] def read8(self) -> int: [value] = struct.unpack("<Q", self.read(8)) return value
[docs] def read_prefixed_string(self) -> str: length = self.read4() return str(self.read(length), "utf-8")
[docs]class RecordBuilder: def __init__(self) -> None: self._buffer = BytesIO() @property def count(self) -> int: return self._buffer.tell()
[docs] def start_record(self, opcode: Opcode): self._record_start_offset = self._buffer.tell() self._buffer.write(struct.pack("<BQ", opcode, 0)) # placeholder size
[docs] def finish_record(self): pos = self._buffer.tell() length = pos - self._record_start_offset - 9 self._buffer.seek(self._record_start_offset + 1) self._buffer.write(struct.pack("<Q", length)) self._buffer.seek(pos)
[docs] def end(self): buf = self._buffer.getvalue() self._buffer.close() self._buffer = BytesIO() return buf
[docs] def write(self, data: bytes): self._buffer.write(data)
[docs] def write_prefixed_string(self, value: str): bytes = value.encode() self.write4(len(bytes)) self.write(bytes)
[docs] def write1(self, value: int): self.write(struct.pack("<B", value))
[docs] def write2(self, value: int): self.write(struct.pack("<H", value))
[docs] def write4(self, value: int): self.write(struct.pack("<I", value))
[docs] def write8(self, value: int): self.write(struct.pack("<Q", value))