Source code for sub_actuator_board.handle

#!/usr/bin/env python3
from __future__ import annotations

import rospy
from electrical_protocol import AckPacket
from mil_usb_to_can.sub9 import CANDeviceHandle

from sub_actuator_board.srv import (
    GetValve,
    GetValveRequest,
    GetValveResponse,
    SetValve,
    SetValveRequest,
)

from .packets import (
    ActuatorPollRequestPacket,
    ActuatorPollResponsePacket,
    ActuatorSetPacket,
)


[docs]class ActuatorBoard(CANDeviceHandle): """ Device handle for the actuator board. Because this class implements a CAN device, it inherits from the :class:`CANDeviceHandle` class. """ _recent_response: ActuatorPollResponsePacket | None def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._set_service = rospy.Service("/set_valve", SetValve, self.set_valve) self._get_service = rospy.Service("/get_valve", GetValve, self.get_valve) self._recent_response = None
[docs] def set_valve(self, req: SetValveRequest) -> dict: """ Called when the ``/set_valve`` service is requested. Creates a message to control the valve and sends it through the inherited device handle. Args: req (SetValveRequest): Request to set the valve. Returns: dict: List of information which is casted into a SetValveResponse. """ # Send board command to open or close specified valve message = ActuatorSetPacket(address=req.actuator, open=req.opened) self.send_data(message) rospy.loginfo( "Set valve {} {}".format( req.actuator, "opened" if req.opened else "closed", ), ) # Wait some time for board to process command rospy.sleep(0.01) # Request the status of the valve just commanded to ensure it worked self.send_data(ActuatorPollRequestPacket()) start = rospy.Time.now() while rospy.Time.now() - start < rospy.Duration(2): if self._recent_response is not None: break success = False if self._recent_response is not None: if not req.opened: success = not (self._recent_response.values & (1 << req.actuator)) else: success = self._recent_response.values & (1 << req.actuator) response = {"success": success} self._recent_response = None return response
def get_valve(self, req: GetValveRequest) -> GetValveResponse: message = ActuatorPollRequestPacket() self.send_data(message) start = rospy.Time.now() while rospy.Time.now() - start < rospy.Duration(10): if self._recent_response is not None: break if not self._recent_response: raise RuntimeError("No response from the board within 10 seconds.") response = GetValveResponse( opened=self._recent_response.values & (1 << req.actuator), ) self._recent_response = None return response
[docs] def on_data(self, packet: ActuatorPollResponsePacket | AckPacket) -> None: """ Process data received from board. """ if isinstance(packet, ActuatorPollResponsePacket): self._recent_response = packet