from __future__ import annotations
import abc
import contextlib
import threading
from typing import Any, Generic, TypeVar, Union, cast, get_args, get_origin
import rospy
import serial
from .packet import SYNC_CHAR_1, Packet
SendPackets = TypeVar("SendPackets", bound=Packet)
RecvPackets = TypeVar("RecvPackets", bound=Packet)
[docs]class ROSSerialDevice(Generic[SendPackets, RecvPackets]):
"""
Represents a generic serial device, which is expected to be the main component
of an individual ROS node.
Attributes:
port (Optional[str]): The port used for the serial connection, if provided.
baudrate (Optional[int]): The baudrate to use with the device, if provided.
device (Optional[serial.Serial]): The serial class used to communicate with
the device.
rate (float): The reading rate of the device, in Hertz. Set to `20` by default.
"""
device: serial.Serial | None
_recv_T: Any
_send_T: Any
def is_connected(self) -> bool:
return self.device is not None
def is_open(self) -> bool:
return bool(self.device) and self.device.is_open
def __init__(self, port: str | None, baudrate: int | None) -> None:
"""
Arguments:
port (Optional[str]): The serial port to connect to. If ``None``, connection
will not be established on initialization; rather, the user can use
:meth:`~.connect` to connect later.
baudrate (Optional[int]): The baudrate to connect with. If ``None`` and
a port is specified, then 115200 is assumed.
"""
self.port = port
self.baudrate = baudrate
if port:
self.device = serial.Serial(port, baudrate or 115200, timeout=0.1)
if not self.device.is_open:
self.device.open()
self.device.reset_input_buffer()
self.device.reset_output_buffer()
else:
self.device = None
self.lock = threading.Lock()
self.rate = 20.0
self.timer = rospy.Timer(rospy.Duration(1.0 / self.rate), self._process_buffer) # type: ignore
def __init_subclass__(cls) -> None:
# this is a super hack lol :P
# cred: https://stackoverflow.com/a/71720366
cls._send_T = get_args(cls.__orig_bases__[0])[0] # type: ignore
cls._recv_T = get_args(cls.__orig_bases__[0])[1] # type: ignore
[docs] def connect(self, port: str, baudrate: int) -> None:
"""
Connects to the port with the given baudrate. If the device is already
connected, the input and output buffers will be flushed.
Arguments:
port (str): The serial port to connect to.
baudrate (int): The baudrate to connect with.
"""
self.port = port
self.baudrate = baudrate
self.device = serial.Serial(port, baudrate, timeout=0.1)
if not self.device.is_open:
self.device.open()
self.device.reset_input_buffer()
self.device.reset_output_buffer()
[docs] def close(self) -> None:
"""
Closes the serial device.
"""
rospy.loginfo("Closing serial device...")
if not self.device:
raise RuntimeError("Device is not connected.")
else:
# TODO: Find a better way to deal with these os errors
with contextlib.suppress(OSError):
if self.device.in_waiting:
rospy.logwarn(
"Shutting down device, but packets were left in buffer...",
)
self.device.close()
[docs] def write(self, data: bytes) -> None:
"""
Writes a series of raw bytes to the device. This method should rarely be
used; using :meth:`~.send_packet` is preferred because of the guarantees
it provides through the packet class.
Arguments:
data (bytes): The data to send.
"""
if not self.device:
raise RuntimeError("Device is not connected.")
self.device.write(data)
[docs] def send_packet(self, packet: SendPackets) -> None:
"""
Sends a given packet to the device.
Arguments:
packet (:class:`~.Packet`): The packet to send.
"""
with self.lock:
self.write(bytes(packet))
def _read_from_stream(self) -> bytes:
# Read until SOF is encourntered in case buffer contains the end of a previous packet
if not self.device:
raise RuntimeError("Device is not connected.")
sof = None
for _ in range(10):
sof = self.device.read(1)
if not len(sof):
continue
sof_int = int.from_bytes(sof, byteorder="big")
if sof_int == SYNC_CHAR_1:
break
if not isinstance(sof, bytes):
raise TimeoutError("No SOF received in one second.")
sof_int = int.from_bytes(sof, byteorder="big")
if sof_int != SYNC_CHAR_1:
rospy.logerr("Where da start char at?")
data = sof
# Read sync char 2, msg ID, subclass ID
data += self.device.read(3)
length = self.device.read(2) # read payload length
data += length
data += self.device.read(
int.from_bytes(length, byteorder="little") + 2,
) # read data and checksum
return data
def _correct_type(self, provided: Any) -> bool:
# either:
# 1. RecvPackets is a Packet --> check isinstance on the type var
# 2. RecvPackets is a Union of Packets --> check isinstance on all
if get_origin(self._recv_T) is Union:
return isinstance(provided, get_args(self._recv_T))
else:
return isinstance(provided, self._recv_T)
[docs] def adjust_read_rate(self, rate: float) -> None:
"""
Sets the reading rate to a specified amount.
Arguments:
rate (float): The reading speed to use, in hz.
"""
self.timer.shutdown()
self.rate = min(rate, 1_000)
self.timer = rospy.Timer(rospy.Duration(1.0 / rate), self._process_buffer) # type: ignore
[docs] def scale_read_rate(self, scale: float) -> None:
"""
Scales the reading rate of the device handle by some factor.
Arguments:
scale (float): The amount to scale the reading rate by.
"""
self.adjust_read_rate(self.rate * scale)
def _read_packet(self) -> bool:
if not self.device:
raise RuntimeError("Device is not connected.")
try:
with self.lock:
if not self.is_open() or self.device.in_waiting == 0:
return False
if self.device.in_waiting > 200:
rospy.logwarn_throttle(
0.5,
"Packets are coming in much quicker than expected, upping rate...",
)
self.scale_read_rate(1 + self.device.in_waiting / 1000)
packed_packet = self._read_from_stream()
assert isinstance(packed_packet, bytes)
packet = Packet.from_bytes(packed_packet)
except serial.SerialException as e:
rospy.logerr(f"Error reading packet: {e}")
return False
except OSError:
rospy.logerr_throttle(1, "Cannot read from serial device.")
return False
if not self._correct_type(packet):
rospy.logerr(
f"Received unexpected packet: {packet} (expected: {self._recv_T})",
)
return False
packet = cast(RecvPackets, packet)
self.on_packet_received(packet)
return True
def _process_buffer(self, _: rospy.timer.TimerEvent) -> None:
if not self.is_open():
return
try:
self._read_packet()
except Exception as e:
rospy.logerr(f"Error reading recent packet: {e}")
import traceback
traceback.print_exc()
[docs] @abc.abstractmethod
def on_packet_received(self, packet: RecvPackets) -> None:
"""
Abstract method to be implemented by subclasses for handling packets
sent by the physical electrical board.
Arguments:
packet (:class:`~.Packet`): The packet that is received.
"""
pass