#!/usr/bin/env python3
import asyncio
from typing import Callable
from axros import NodeHandle
from geometry_msgs.msg import PointStamped, Vector3Stamped
from std_srvs.srv import SetBool, SetBoolRequest, Trigger, TriggerRequest
[docs]class TxHydrophonesClient:
TxROS abstraction for interacting with the nodes in this package.
dir_callback (Optional[Callable]): The method that is called when a ping
is received.
def __init__(self, nh: NodeHandle):
Construct a client.
nh (NodeHandle): The TxROS node handler.
self._direction_sub = nh.subscribe(
self._enable_srv = nh.get_service_client("/multilateration/enable", SetBool)
self._reset_srv = nh.get_service_client("/multilateration/reset", Trigger)
self._position_sub = nh.subscribe("/hydrophones/position", PointStamped)
self.dir_callback = None
[docs] def get_direction(self) -> asyncio.Future:
Get the next processed direction to the pinger.
asycio.Future: A Future object which can be used to get the next message
of the pinger's direction.
return self._direction_sub.get_next_message()
[docs] def get_position(self) -> asyncio.Future:
Get the next processed position of the pinger.
asyncio.Future: A Future object which can be used to get the next
message of the pinger's position.
return self._position_sub.get_next_message()
[docs] def get_last_position(self) -> PointStamped:
Get the last processed position of the pinger.
PointStamped: A cached value of the last position of the pinger.
return self._position_sub.get_last_message()
[docs] def enable(self) -> None:
Enable listening to pings for position estimation.
return self._enable_srv(SetBoolRequest(data=True))
[docs] def reset(self) -> None:
Reset the position estimation of the pinger.
return self._reset_srv(TriggerRequest())
[docs] def disable(self) -> None:
Disable listening to pings for position estimation.
return self._enable_srv(SetBoolRequest(data=False))
[docs] def heading_cb(self, heading_msg: Vector3Stamped):
Callback for pings received.
heading_msg (Vector3Stamped): The passed-in message representing the
if self.dir_callback is not None:
[docs] def set_callback(self, cb: Callable):
Set a callback for when a ping is received.
cb (Callable): The callback to set. The callback should have a single
parameter that takes a Vector3Stamped message representing the heading
towards the pinger.
self.dir_callback = cb