Source code for electrical_protocol.packet

from __future__ import annotations

import struct
from dataclasses import dataclass, fields
from enum import Enum
from functools import lru_cache
from typing import ClassVar, TypeVar, get_type_hints

SYNC_CHAR_1 = 0x37
SYNC_CHAR_2 = 0x01

_packet_registry: dict[int, dict[int, type[Packet]]] = {}


PacketSelf = TypeVar("PacketSelf", bound="Packet")


def hexify(data: bytes) -> str:
    return ":".join(f"{c:02x}" for c in data)


@lru_cache(maxsize=None)
def get_cache_hints(cls):
    return get_type_hints(cls)


[docs]class ChecksumException(OSError): """ An invalid checksum appeared. """ def __init__( self, packet: type[Packet], received: tuple[int, int], expected: tuple[int, int], ): """ Attributes: packet (Type[:class:`~.Packet`]): The packet with the invalid checksum. received (Tuple[int, int]): The received Fletcher's checksum. expected (Tuple[int, int]): The expected Fletcher's checksum. """ super().__init__( f"Invalid checksum in packet of type {packet.__qualname__}: received {received}, expected {expected}", )
[docs]@dataclass class Packet: """ Represents one packet that can be sent or received by a serial device. This class is able to handle packaging unique data values into a :class:`bytes` object for sending over a data stream. This class should be overridden to implement unique packet payloads. Note that this class supports three subclass arguments to assign unique message IDs, subclass IDs, and payload formats. Note that all subclasses must be decorated with :meth:`dataclasses.dataclass`. If any class members are annotated with a subclass of :class:`enum.Enum`, the class will always make an attempt to convert the raw data value to an instance of the enum before constructing the rest of the values in the class. .. code-block:: python from dataclasses import dataclass @dataclass class ExamplePacket(Packet, class_id = 0x02, subclass_id = 0x01, payload_format = "BHHf"): example_char: int example_short: int example_short_two: int example_float: float .. container:: operations .. describe:: bytes(x) Returns a :class:`bytes` object representing the data of the packet in the specified packet format. Arguments: class_id (int): The message ID. Can be between 0 and 255. subclass_id (int): The message subclass ID. Can be between 0 and 255. payload_format (str): The format for the payload. This determines how the individual payload is assembled. Each character in the format string represents the position of one class variable. The class variables are assembled in the order they are defined in. """ class_id: ClassVar[int] subclass_id: ClassVar[int] payload_format: ClassVar[str] = "" def __init_subclass__( cls, class_id: int, subclass_id: int, payload_format: str = "", ): cls.class_id = class_id cls.subclass_id = subclass_id cls.payload_format = payload_format packets = [p for mid in _packet_registry.values() for p in mid.values()] for packet in packets: if packet.class_id == class_id and packet.subclass_id == subclass_id: raise ValueError( f"Cannot reuse class_id 0x{class_id:0x} and subclass_id 0x{subclass_id}, already used by {packet.__qualname__}", ) _packet_registry.setdefault(class_id, {})[subclass_id] = cls def __post_init__(self): for name, field_type in get_cache_hints(self.__class__).items(): if ( name not in [ "class_id", "subclass_id", "payload_format", ] and not isinstance(self.__dict__[name], field_type) and issubclass(field_type, Enum) ): setattr(self, name, field_type(self.__dict__[name])) if self.payload_format and not self.payload_format.startswith( ("<", ">", "=", "!"), ): raise ValueError( "The payload format does not start with a standard size character: ('<', '>', '!', '=').", ) available_chars: dict[type, list[str]] = { bool: ["c", "b", "B", "?"], int: ["b", "B", "h", "H", "i", "I", "l", "L", "q", "Q"], float: ["f", "d"], } stripped_format = self.payload_format.lstrip("<>=!@") for i, field in enumerate(fields(self)): if field.type not in available_chars: continue chars = available_chars[field.type] if i >= len(stripped_format): raise ValueError( f"The payload format for the packet is too short to support all dataclass fields; expected: {len(fields(self))}, found: {len(self.payload_format)}.", ) represented_char = stripped_format[i] if represented_char not in chars: raise ValueError( f"The type of {field.name} in the payload format is '{represented_char}', which does not correspond to its dataclass type of {field.type}.", ) @classmethod def _calculate_checksum(cls, data: bytes) -> tuple[int, int]: """ Used to calculate the Fletcher's checksum for a series of bytes. When calculating the checksum for a new packet, the start bytes/sync characters should not be included. """ sum1, sum2 = 0, 0 for byte in data: sum1 = (sum1 + byte) % 255 sum2 = (sum2 + sum1) % 255 return sum1, sum2 def __bytes__(self): payload = struct.pack(self.payload_format, *self.__dict__.values()) data = struct.pack( f"<BBBBH{len(payload)}s", SYNC_CHAR_1, SYNC_CHAR_2, self.class_id, self.subclass_id, len(payload), payload, ) checksum = self._calculate_checksum(data[2:]) return data + struct.pack("<BB", *checksum) def __len__(self) -> int: return self.__class__._expected_len() @classmethod def _expected_len(cls) -> int: # We cannot use one calcsize since payload_format should start with a standard size character return struct.calcsize("<BBBBHBB") + struct.calcsize(cls.payload_format)
[docs] @classmethod def from_bytes( cls: type[PacketSelf], packed: bytes, trim: bool = True, ) -> PacketSelf: """ Constructs a packet from a packed packet in a :class:`bytes` object. If a packet is found with the corresponding message and subclass ID, then an instance (or subclass) of that packet class will be returned. Arguments: packed (bytes): The packed packet to unpack. trim (bool): If True, only the required number of bytes will be used to construct the packet. Otherwise, the entire packet will be used. Default: `True`. Raises: ChecksumException: The checksum is invalid. LookupError: No packet with the specified class and subclass IDs exist. Returns: An instance of the appropriate packet subclass. """ class_id = packed[2] subclass_id = packed[3] if class_id in _packet_registry and subclass_id in _packet_registry[class_id]: subclass = _packet_registry[class_id][subclass_id] if trim: packed = packed[: subclass._expected_len()] if len(packed) < subclass._expected_len(): raise ValueError( f"Packet is too short to be a valid packet. (provided len: {len(packed)}, expected len: {cls._expected_len()})", ) payload = packed[6:-2] if struct.unpack("<BB", packed[-2:]) != cls._calculate_checksum( packed[2:-2], ): raise ChecksumException( subclass, struct.unpack("<BB", packed[-2:]), cls._calculate_checksum(packed[2:-2]), ) unpacked = struct.unpack(subclass.payload_format, payload) packet = subclass(*unpacked) if not isinstance(packet, cls): raise RuntimeError( f"Attempted to resolve packet of type {cls.__qualname__}, but found {packet.__class__.__qualname__} for bytes: {hexify(packed)}", ) return packet raise LookupError( f"Attempted to reconstruct packet with class_id 0x{class_id:02x} and subclass_id 0x{subclass_id:02x}, but no packet with IDs was found.", )
[docs]@dataclass class AckPacket(Packet, class_id=0x00, subclass_id=0x01, payload_format=""): """ Common acknowledgment packet. """
[docs]@dataclass class NackPacket(Packet, class_id=0x00, subclass_id=0x00, payload_format=""): """ Common not-acknowledged packet. """