Source code for mil_passive_sonar.tx_interface

#!/usr/bin/env python3
import asyncio
from typing import Callable

from axros import NodeHandle
from geometry_msgs.msg import PointStamped, Vector3Stamped
from std_srvs.srv import SetBool, SetBoolRequest, Trigger, TriggerRequest


[docs]class TxHydrophonesClient: """ TxROS abstraction for interacting with the nodes in this package. Attributes: dir_callback (Optional[Callable]): The method that is called when a ping is received. """ def __init__(self, nh: NodeHandle): """ Construct a client. Args: nh (NodeHandle): The TxROS node handler. """ self._direction_sub = nh.subscribe( "/hydrophones/direction", Vector3Stamped, callback=self.heading_cb, ) self._enable_srv = nh.get_service_client("/multilateration/enable", SetBool) self._reset_srv = nh.get_service_client("/multilateration/reset", Trigger) self._position_sub = nh.subscribe("/hydrophones/position", PointStamped) self.dir_callback = None
[docs] def get_direction(self) -> asyncio.Future: """ Get the next processed direction to the pinger. Returns: asycio.Future: A Future object which can be used to get the next message of the pinger's direction. """ return self._direction_sub.get_next_message()
[docs] def get_position(self) -> asyncio.Future: """ Get the next processed position of the pinger. Returns: asyncio.Future: A Future object which can be used to get the next message of the pinger's position. """ return self._position_sub.get_next_message()
[docs] def get_last_position(self) -> PointStamped: """ Get the last processed position of the pinger. Returns: PointStamped: A cached value of the last position of the pinger. """ return self._position_sub.get_last_message()
[docs] def enable(self) -> None: """ Enable listening to pings for position estimation. """ return self._enable_srv(SetBoolRequest(data=True))
[docs] def reset(self) -> None: """ Reset the position estimation of the pinger. """ return self._reset_srv(TriggerRequest())
[docs] def disable(self) -> None: """ Disable listening to pings for position estimation. """ return self._enable_srv(SetBoolRequest(data=False))
[docs] def heading_cb(self, heading_msg: Vector3Stamped): """ Callback for pings received. Args: heading_msg (Vector3Stamped): The passed-in message representing the heading. """ if self.dir_callback is not None: self.dir_callback(heading_msg)
[docs] def set_callback(self, cb: Callable): """ Set a callback for when a ping is received. Args: cb (Callable): The callback to set. The callback should have a single parameter that takes a Vector3Stamped message representing the heading towards the pinger. """ self.dir_callback = cb