Source code for mil_usb_to_can.sub8.sub8_driver

#!/usr/bin/python3
import importlib
from typing import TYPE_CHECKING, Any, Dict, Generator, Optional, Tuple

import rospy
from serial import SerialException

from mil_usb_to_can.sub8.board import (
    USBtoCANBoard,
)

# relative import causes import error with rosrun - GH-731
from mil_usb_to_can.sub8.utils import USB2CANException

if TYPE_CHECKING:
    from .device import CANDeviceHandle


[docs]class USBtoCANDriver: """ ROS Driver which implements the USB to CAN board. Allow users to specify a dictionary of device handle classes to be loaded at runtime to handle communication with specific devices. Attributes: board (USBtoCANBoard): The board the driver is implementing. handles (dict[int, CANDeviceHandle]): The handles served by the driver. Each key represents a unique device ID, and each corresponding value represents an instance of a child class inheriting from :class:`CANDeviceHandle`. Upon initialization, each class is constructed after being parsed from dynamic reconfigure. timer (rospy.Timer): The timer controlling when buffers are processed. """ def __init__(self): port = rospy.get_param("~port", "/dev/tty0") baud = rospy.get_param("~baudrate", 115200) can_id = rospy.get_param("~can_id", 0) simulation = rospy.get_param("/is_simulation", False) # If simulation mode, load simulated devices if simulation: rospy.logwarn( "CAN2USB driver in simulation! Will not talk to real hardware.", ) devices = dict( list( self.parse_module_dictionary(rospy.get_param("~simulated_devices")), ), ) self.board = USBtoCANBoard( port=port, baud=baud, simulated=simulation, devices=devices, can_id=can_id, ) else: self.board = USBtoCANBoard(port=port, baud=baud, simulated=simulation) # Add device handles from the modules specified in ROS params self.handles: Dict[int, CANDeviceHandle] = { device_id: cls(self, device_id) for device_id, cls in self.parse_module_dictionary( rospy.get_param("~device_handles"), ) } self.timer = rospy.Timer(rospy.Duration(1.0 / 20.0), self.process_in_buffer)
[docs] def read_packet(self) -> bool: """ Attempt to read a packet from the board. If the packet has an appropriate device handler, then the packet is passed to the ``on_data`` method of that handler. Returns: bool: The success in reading a packet. """ try: packet = self.board.read_packet() except (SerialException, USB2CANException) as e: rospy.logerr(f"Error reading packet: {e}") return False if packet is None: return False if packet.device in self.handles: self.handles[packet.device].on_data(packet.data) else: rospy.logwarn( f"Message received for device {packet.device}, but no handle registered", ) return True
[docs] def process_in_buffer(self, *args) -> None: """ Read all available packets in the board's in-buffer. """ while self.read_packet(): pass
[docs] def send_data(self, *args, **kwargs) -> Optional[Exception]: """ Sends data using the :meth:`USBtoCANBoard.send_data` method. Returns: Optional[Exception]: If data was sent successfully, nothing is returned. Otherwise, the exception that was raised in sending is returned. """ try: self.board.send_data(*args, **kwargs) return None except (SerialException, USB2CANException) as e: rospy.logerr(f"Error writing packet: {e}") return e
[docs] @staticmethod def parse_module_dictionary( d: Dict[str, Any], ) -> Generator[Tuple[int, Any], None, None]: """ Generator to load classes from module strings specified in a dictionary. Imports all found classes. Yields: Generator[Tuple[int, Any], None, None]: Yields tuples containing the device ID and the associated class. """ for device_id, module_name in d.items(): device_id = int(device_id) # Split module from class name module_name, cls = module_name.rsplit(".", 1) # import module module = importlib.import_module(module_name) # Yield a tuple (device_id, imported_class) yield device_id, getattr(module, cls)
if __name__ == "__main__": rospy.init_node("usb_to_can_driver") driver = USBtoCANDriver() rospy.spin()