Source code for mil_usb_to_can.sub9.packet

from __future__ import annotations

import struct
from dataclasses import dataclass
from enum import Enum
from functools import lru_cache
from typing import ClassVar, TypeVar, get_type_hints

SYNC_CHAR_1 = 0x37
SYNC_CHAR_2 = 0x01

_packet_registry: dict[int, dict[int, type[Packet]]] = {}


PacketSelf = TypeVar("PacketSelf", bound="Packet")


def hexify(data: bytes) -> str:
    return ":".join(f"{c:02x}" for c in data)


@lru_cache(maxsize=None)
def get_cache_hints(cls):
    return get_type_hints(cls)


[docs]class ChecksumException(OSError): """ An invalid checksum appeared. """ def __init__( self, packet: type[Packet], received: tuple[int, int], expected: tuple[int, int], ): """ Attributes: packet (Type[:class:`~.Packet`]): The packet with the invalid checksum. received (Tuple[int, int]): The received Fletcher's checksum. expected (Tuple[int, int]): The expected Fletcher's checksum. """ super().__init__( f"Invalid checksum in packet of type {packet.__qualname__}: received {received}, expected {expected}", )
[docs]@dataclass class Packet: """ Represents one packet sent or received by a device handle communicating to/from the USB to CAN board. This class is able to handle packaging unique data values into a :class:`bytes` object for sending over a data stream. This class should be overridden to implement unique packet payloads. Note that this class supports three subclass arguments to assign unique message IDs, subclass IDs, and payload formats. Note that all subclasses must be decorated with :meth:`dataclasses.dataclass`. If any class members are annotated with a subclass of :class:`enum.Enum`, the class will always make an attempt to convert the raw data value to an instance of the enum before constructing the rest of the values in the class. .. code-block:: python from dataclasses import dataclass @dataclass class ExamplePacket(Packet, msg_id = 0x02, subclass_id = 0x01, payload_format = "BHHf"): example_char: int example_short: int example_short_two: int example_float: float .. container:: operations .. describe:: bytes(x) Returns a :class:`bytes` object representing the data of the packet in the specified packet format. Arguments: msg_id (int): The message ID. Can be between 0 and 255. subclass_id (int): The message subclass ID. Can be between 0 and 255. payload_format (str): The format for the payload. This determines how the individual payload is assembled. Each character in the format string represents the position of one class variable. The class variables are assembled in the order they are defined in. """ msg_id: ClassVar[int] subclass_id: ClassVar[int] payload_format: ClassVar[str] def __init_subclass__(cls, msg_id: int, subclass_id: int, payload_format: str = ""): cls.msg_id = msg_id cls.subclass_id = subclass_id cls.payload_format = payload_format packets = [p for mid in _packet_registry.values() for p in mid.values()] for packet in packets: if packet.msg_id == msg_id and packet.subclass_id == subclass_id: raise ValueError( f"Cannot reuse msg_id 0x{msg_id:0x} and subclass_id 0x{subclass_id}, already used by {packet.__qualname__}", ) _packet_registry.setdefault(msg_id, {})[subclass_id] = cls def __post_init__(self): for name, field_type in get_cache_hints(self.__class__).items(): if ( name not in [ "msg_id", "subclass_id", "payload_format", ] and not isinstance(self.__dict__[name], field_type) and issubclass(field_type, Enum) ): setattr(self, name, field_type(self.__dict__[name])) @classmethod def _calculate_checksum(cls, data: bytes) -> tuple[int, int]: sum1, sum2 = 0, 0 for byte in data: sum1 = (sum1 + byte) % 255 sum2 = (sum2 + sum1) % 255 return sum1, sum2 def __bytes__(self): payload = struct.pack(self.payload_format, *self.__dict__.values()) data = struct.pack( f"<BBBBH{len(payload)}s", SYNC_CHAR_1, SYNC_CHAR_2, self.msg_id, self.subclass_id, len(payload), payload, ) checksum = self._calculate_checksum(data[2:]) return data + struct.pack("<BB", *checksum)
[docs] @classmethod def from_bytes(cls: type[PacketSelf], packed: bytes) -> PacketSelf: """ Constructs a packet from a packed packet in a :class:`bytes` object. If a packet is found with the corresponding message and subclass ID, then an instance (or subclass) of that packet class will be returned. Raises: ChecksumException: The checksum is invalid. LookupError: No packet with the specified class and subclass IDs exist. Returns: An instance of the appropriate packet subclass. """ msg_id = packed[2] subclass_id = packed[3] if msg_id in _packet_registry and subclass_id in _packet_registry[msg_id]: subclass = _packet_registry[msg_id][subclass_id] payload = packed[6:-2] if struct.unpack("<BB", packed[-2:]) != cls._calculate_checksum( packed[2:-2], ): raise ChecksumException( subclass, struct.unpack("<BB", packed[-2:]), cls._calculate_checksum(packed[2:-2]), ) unpacked = struct.unpack(subclass.payload_format, payload) packet = subclass(*unpacked) if not isinstance(packet, cls): raise RuntimeError( f"Attempted to resolve packet of type {cls.__qualname__}, but found {packet.__class__.__qualname__} for bytes: {hexify(payload)}", ) return packet raise LookupError( f"Attempted to reconstruct packet with msg_id 0x{msg_id:02x} and subclass_id 0x{subclass_id:02x}, but no packet with IDs was found.", )
[docs]@dataclass class AckPacket(Packet, msg_id=0x00, subclass_id=0x01, payload_format=""): """ Common acknowledgment packet. Should only be found in response operations. """
[docs]@dataclass class NackPacket(Packet, msg_id=0x00, subclass_id=0x00, payload_format=""): """ Common not-acknowledged packet. Should only be found in response operations. """