Source code for mil_poi.async_interface

#!/usr/bin/env python3
from __future__ import annotations

import asyncio

import numpy as np
from axros import NodeHandle
from mil_ros_tools.msg_helpers import rosmsg_to_numpy

from mil_poi.msg import POIArray


[docs]class AsyncPOIClient: """ Client for getting the positions of POIs in a POI server. """ # TODO: add service interfaces for adding / moving / deleting POI def __init__(self, nh: NodeHandle): """ Args: nh (axros.NodeHandle): The node handle to use. """ self.last_msg = None self.futures = {} self._poi_sub = nh.subscribe("/points_of_interest", POIArray, callback=self._cb)
[docs] async def setup(self) -> None: """ Sets up the client. Must be called before using the client. """ await self._poi_sub.setup()
[docs] async def shutdown(self) -> None: """ Shuts the client down. Should be called before deconstructing the class. """ await self._poi_sub.shutdown()
def _cb(self, msg): """ Internal callback on new points_of_interest updates """ self.last_msg = msg for poi in self.last_msg.pois: if poi.name not in self.futures: continue position, orientation = rosmsg_to_numpy(poi.pose.position), rosmsg_to_numpy( poi.pose.orientation, ) futures = self.futures.pop(poi.name) while len(futures): futures.pop().set_result((position, orientation))
[docs] async def get( self, name: str, only_fresh: bool = False, ) -> tuple[np.ndarray, np.ndarray]: """ Get the position of POI in the global frame as a 3x1 numpy array. Args: name (str): The name of the POI. only_fresh (bool): If the POI is already known, wait for a fresh message before returning. """ if self.last_msg is not None and not only_fresh: for poi in self.last_msg.pois: if poi.name == name: position, orientation = rosmsg_to_numpy( poi.pose.position, ), rosmsg_to_numpy(poi.pose.orientation) return (position, orientation) res = asyncio.Future() if name in self.futures: self.futures[name].append(res) else: self.futures[name] = [res] poi = await res return poi