Source code for mil_pneumatic_actuator.board

#!/usr/bin/env python3
import threading
from typing import Optional

import serial
from mil_ros_tools import thread_lock

from .constants import Constants
from .simulated_board import SimulatedPnuematicActuatorBoard

lock = threading.Lock()


[docs]class PnuematicActuatorDriverError(Exception): """ The base exception class for all exceptions/errors related to the Pneumatic Actuator Board. Inherits from :class:`Exception`. """ def __init__(self, message): super().__init__("Actuator board: " + message)
[docs]class PnuematicActuatorDriverChecksumError(PnuematicActuatorDriverError): """ Exception representing an invalid checksum. Inherits from :class:`PnuematicActuatorDriverError`. """ def __init__(self, checksum_is, checksum_should_be): message = f"Invalid checksum. Recievied {hex(checksum_is)}, should be {hex(checksum_should_be)}" super().__init__(message)
[docs]class PnuematicActuatorDriverResponseError(PnuematicActuatorDriverError): """ Exception representing an invalid response. Inherits from :class:`PnuematicActuatorDriverError`. """ def __init__(self, received, expected): message = ( f"Unexpected response. Expected {hex(received)}, received {hex(expected)}" ) super().__init__(message)
[docs]class PnuematicActuatorTimeoutError(PnuematicActuatorDriverError): """ Exception representing a serial timeout experienced by the board. Inherits from :class:`PnuematicActuatorDriverError`. """ def __init__(self): message = "Serial timeout" super().__init__(message)
[docs]class PnuematicActuatorDriver: """ Allows high level ROS code to interface with Daniel's pneumatics board. For the dropper and grabber systems, call service with ``True`` or ``False`` to open or close. For the shooter system, sending a ``True`` signal will pulse the valve. Further information on the board's communication protocol can be found in the design documentation. """ # TODO: Add a function to try and reconnect to the serial port if we lose connection. def __init__(self, port: str, baud: int = 9600, simulated: bool = False): """ Args: port (str): The nname of the board to establish a serial connection to. baud (int): The baud rate to establish the serial connection at. simulated (bool): Whether to use a simulated actuator board class or an interface to the physical board. """ if simulated: self.ser = SimulatedPnuematicActuatorBoard() else: self.ser = serial.Serial(port=port, baudrate=baud, timeout=2.0) self.ser.flushInput() @classmethod def _verify_checksum(cls, byte: int, checksum: int) -> None: """ Verifies that two checksums are equivalent. If they are not equivalent, then an exception is raised. Otherwise, ``None`` is implicitly returned. """ if not Constants.verify_checksum(byte, checksum): raise PnuematicActuatorDriverChecksumError( checksum, Constants.create_checksum(byte), ) def _get_response(self) -> int: """ Internal method to return only the desired data sent by the actuator board. Returns: int: The response sent by the board. """ data = self.ser.read(2) if len(data) != 2: raise PnuematicActuatorTimeoutError response = Constants.deserialize_packet(data) data = response[0] chksum = response[1] self._verify_checksum(data, chksum) return data @thread_lock(lock) def _send_request(self, byte: int, expected_response: Optional[int] = None) -> int: """ Internal method which sends some data and compares it to the expected response (if desired) before returning it to the caller. """ data = Constants.serialize_packet(byte) self.ser.write(data) response = self._get_response() if expected_response is not None and expected_response != response: raise PnuematicActuatorDriverResponseError(response, expected_response) return response
[docs] def open_port(self, port: int) -> int: """ Opens a particular port. Args: port (int): The port to open. Raises: PnuematicActuatorDriverResponseError: The expected response from the board was not received. PnuematicActuatorDriverChecksumError: The checksum expected and the checksum received were not the same. The board may be malfunctioning or the communication with the board may be disrupted. Returns: int: The response from the board, which is frequently a standard hexadecimal value indicating that the board opened the valve at the desired port. This value is equal to :attr:`mil_pneumatic_actuator.Constants.OPEN_RESPONSE`. """ byte = Constants.OPEN_REQUEST_BASE + port return self._send_request(byte, Constants.OPEN_RESPONSE)
[docs] def close_port(self, port: int) -> int: """ Closes a particular port. Args: port (int): The port to close. Raises: PnuematicActuatorDriverResponseError: The expected response from the board was not received. PnuematicActuatorDriverChecksumError: The checksum expected and the checksum received were not the same. The board may be malfunctioning or the communication with the board may be disrupted. Returns: int: The response from the board, which is frequently a standard hexadecimal value indicating that the board closed the valve at the desired port. This value is equal to :attr:`mil_pneumatic_actuator.Constants.CLOSE_RESPONSE`. """ byte = Constants.CLOSE_REQUEST_BASE + port return self._send_request(byte, Constants.CLOSE_RESPONSE)
[docs] def set_port(self, port: int, do_open: bool) -> int: """ Sets the data at a particular port to opened/closed. This does not depend on the internal state of the port when called, ie, if you request a port to be closed, and the port is already closed, then the response is sent anyways. Args: port (int): The specific port to open/close. do_open (bool): Whether to open the port. If ``False``, then the port is closed. Raises: PnuematicActuatorDriverResponseError: The expected response from the board was not received. PnuematicActuatorDriverChecksumError: The checksum expected and the checksum received were not the same. The board may be malfunctioning or the communication with the board may be disrupted. """ if do_open: return self.open_port(port) else: return self.close_port(port)
[docs] def get_port(self, port: int) -> int: """ Reads the data at a specific port. Args: port (int): The port to read from. Raises: PnuematicActuatorDriverResponseError: The expected response from the board was not received. PnuematicActuatorDriverChecksumError: The checksum expected and the checksum received were not the same. The board may be malfunctioning or the communication with the board may be disrupted. Returns: int: The response served by the board. """ byte = Constants.READ_REQUEST_BASE + port return self._send_request(byte)
[docs] def ping(self) -> int: """ Sends a ping message to the board and returns the response. Raises: PnuematicActuatorDriverResponseError: The expected response from the board was not received. PnuematicActuatorDriverChecksumError: The checksum expected and the checksum received were not the same. The board may be malfunctioning or the communication with the board may be disrupted. Returns: int: The response from the board. """ return self._send_request(Constants.PING_REQUEST, Constants.PING_RESPONSE)