Source code for mil_usb_to_can.sub8.board

#!/usr/bin/env python3
from threading import Lock

# from mil_tools import hexify
from typing import Optional

import serial

from .simulation import SimulatedUSBtoCAN
from .utils import (  # causes error if relative import used - GH-731
    CommandPacket,
    ReceivePacket,
)


[docs]class USBtoCANBoard: """ ROS-independent wrapper which provides an interface to connect to the USB to CAN board via a serial (or simulated serial) device. Provides thread-safe functionality. Attributes: lock (threading.Lock): The thread lock. ser (Union[:class:`SimulatedUSBtoCAN`, :class:`serial.Serial`]): The serial connection. """ def __init__(self, port: str, baud: int = 9600, simulated: bool = False, **kwargs): """ Args: port (str): Path to serial device, such as ``/dev/ttyUSB0``. baud (int): Baud rate of serial device to connect to. Defaults to 9600. simulated (bool): If True, use a simulated serial device rather than a real device. Defaults to ``False``. """ self.lock = Lock() if simulated: self.ser = SimulatedUSBtoCAN(**kwargs) else: self.ser = serial.Serial(port=port, baudrate=baud, timeout=0.1, **kwargs) self.ser.flushOutput() self.ser.flushInput()
[docs] def read_packet(self) -> Optional[ReceivePacket]: """ Read a packet from the board, if available. Returns a :class:`ReceivePacket` instance if one was successfully read, or ``None`` if the in buffer is empty. Returns: Optional[:class:`ReceivePacket`]: The packet, if found, otherwise ``None``. """ # TODO Does this actually return ReceivePacket? Appears it might only be # able to return Packet. with self.lock: if self.ser.in_waiting == 0: return None return ReceivePacket.read_packet(self.ser)
[docs] def send_data(self, data: bytes, can_id: int = 0) -> None: """ Sends data to a CAN device using the thread lock. Writes using the :meth:`write` method of the :attr:`.ser` attribute. Args: device_id (int): CAN device ID to send data to. data (bytes): Data (represented as bytes) to send to the device. Raises: PayloadTooLargeException: The payload is larger than 8 bytes. """ p = CommandPacket.create_send_packet(data, can_id=can_id) with self.lock: p_bytes = bytes(p) # print 'SENDING ', hexify(p_bytes) self.ser.write(p_bytes)