Source code for sub_actuator_board.simulation

#!/usr/bin/python3
from __future__ import annotations

import random

import rospy
from electrical_protocol import AckPacket
from mil_usb_to_can.sub9 import SimulatedCANDeviceHandle

from .packets import (
    ActuatorPollRequestPacket,
    ActuatorPollResponsePacket,
    ActuatorSetPacket,
)


[docs]class ActuatorBoardSimulation(SimulatedCANDeviceHandle): """ Simulator for the communication of the actuator board. Attributes: status (Dict[int, bool]): The status of the valves. The keys are each of the valve IDs, and the values are the statues of whether the valves are open. """ def __init__(self, *args, **kwargs): # Tracks the status of the 12 valves self.status = {i: False for i in range(4)} super().__init__(*args, **kwargs)
[docs] def on_data(self, packet: ActuatorSetPacket | ActuatorPollRequestPacket) -> None: """ Processes data received from motherboard / other devices. For each message received, the class' status attribute is updated if the message is asking to write, otherwise a feedback message is constructed and sent back. """ # If message is writing a valve, store this change in the internal dictionary if isinstance(packet, ActuatorSetPacket): rospy.sleep(random.randrange(0, 1)) # Time to simluate opening of valves self.status[packet.address] = packet.open self.send_data(bytes(AckPacket())) # If message is a status request, send motherboard the status of the requested valve elif isinstance(packet, ActuatorPollRequestPacket): self.send_data( bytes( ActuatorPollResponsePacket( int( "".join( str(int(x)) for x in reversed(self.status.values()) ), base=2, ), ), ), )