Source code for axros.subscriber

from __future__ import annotations

import asyncio
import time
import traceback
import warnings
from types import TracebackType
from typing import TYPE_CHECKING, Callable, Generic, TypeVar

import genpy

from . import exceptions, rosxmlrpc, tcpros, types, util

if TYPE_CHECKING:
    from .nodehandle import NodeHandle

M = TypeVar("M", bound=types.Message)


[docs]class Subscriber(Generic[M]): """ A subscriber in the axros suite. This class should usually be made through :meth:`axros.NodeHandle.subscribe`. .. container:: operations .. describe:: async with x: On entering the block, the subscriber is :meth:`~.setup`; upon leaving the block, the subscriber is :meth:`~.shutdown`. """ _message_futs: list[asyncio.Future] _publisher_threads: dict[str, asyncio.Task] _is_running: bool def __init__( self, node_handle: NodeHandle, name: str, message_type: type[M], callback: Callable[[M], None] = lambda message: None, ): """ Args: node_handle (NodeHandle): The node handle used to communicate with the ROS master server. name (str): The name of the topic to subscribe to. message_type (Type[genpy.Message]): The message class shared by the topic. callback (Callable[[genpy.Message], genpy.Message | None]): The callback to use with the subscriber. The callback should receive an instance of the message shared by the topic and return None. .. note:: If you are subscribing to a publisher publishing messages at a very fast rate, you should always use a callback over :meth:`~.get_next_message`. """ self._node_handle = node_handle self._name = self._node_handle.resolve_name(name) self.message_type = message_type self._callback = callback self._publisher_threads = {} self._last_message = None self._last_message_time = None self._message_futs = [] self._connections = {} self._node_handle.shutdown_callbacks.add(self.shutdown) self._is_running = False self._is_reading = {} def __str__(self) -> str: return ( f"<axros.Subscriber at 0x{id(self):0x}, " f"name={self._name} " f"running={self.is_running()} " f"message_type={self.message_type} " f"node_handle={self._node_handle}>" ) __repr__ = __str__
[docs] async def setup(self) -> None: """ Sets up the subscriber by registering the subscriber with ROS and listing handlers supported by the topic. The subscriber must be setup before use. """ if self.is_running(): raise exceptions.AlreadySetup(self, self._node_handle) if ("publisherUpdate", self._name) in self._node_handle.xmlrpc_handlers: self._node_handle.xmlrpc_handlers[("publisherUpdate", self._name)].append( self._handle_publisher_list ) else: self._node_handle.xmlrpc_handlers[("publisherUpdate", self._name)] = [ self._handle_publisher_list ] publishers = await self._node_handle.master_proxy.register_subscriber( self._name, self.message_type._type, self._node_handle.xmlrpc_server_uri, ) self._handle_publisher_list(publishers) self._is_running = True
def __del__(self): if self._is_running: warnings.simplefilter("always", ResourceWarning) warnings.warn( f"The '{self._name}' subscriber (in '{self._node_handle.get_name()}') was never shutdown(). This may cause issues with this instance of ROS - please fix the errors and completely restart ROS.", ResourceWarning, ) warnings.simplefilter("default", ResourceWarning) async def __aenter__(self) -> Subscriber: await self.setup() return self async def __aexit__( self, exc_type: type[Exception], exc_value: Exception, traceback: TracebackType ): await self.shutdown()
[docs] def is_running(self) -> bool: """ Returns: bool: Whether the subscriber is running. """ return self._is_running
[docs] async def shutdown(self): """ Shuts the subscriber down. All operations scheduled by the subscriber are immediately cancelled. Raises: ResourceWarning: The subscriber is already not running. """ if not self._is_running: warnings.simplefilter("always", ResourceWarning) warnings.warn( f"The {self._name} subscriber (in '{self._node_handle.get_name()}') is not currently running. It may have been shutdown previously or never started.", ResourceWarning, ) warnings.simplefilter("default", ResourceWarning) return for _, v in self._publisher_threads.items(): v.cancel() try: await self._node_handle.master_proxy.unregisterSubscriber( self._name, self._node_handle.xmlrpc_server_uri ) except Exception: traceback.print_exc() handlers = self._node_handle.xmlrpc_handlers["publisherUpdate", self._name] handlers.remove(self._handle_publisher_list) if not handlers: del self._node_handle.xmlrpc_handlers["publisherUpdate", self._name] self._node_handle.shutdown_callbacks.discard(self.shutdown) for _, task in self._publisher_threads.items(): task.cancel() self._handle_publisher_list([]) self._is_running = False
[docs] def get_last_message(self) -> M | None: """ Gets the last received message. This may be ``None`` if no message has been received. Returns: Optional[genpy.Message]: The last sent message, or ``None``. """ return self._last_message
[docs] def get_last_message_time(self) -> genpy.Time | None: """ Gets the time associated with the last received message. May be ``None`` if no message has been received yet. Returns: Optional[genpy.Time]: The time of the last received message, or ``None``. """ return self._last_message_time
[docs] def get_next_message(self) -> asyncio.Future[M]: """ Returns a :class:`~asyncio.Future` which will contain the next message received by the subscriber. .. warning:: This method should not be used to reliably obtain messages from publishers publishing at very high rates. Because of the overhead in setting up futures, this method will likely cause some messages to get lost. Instead, you should use a callback with the subscriber - all messages are passed through the callback. Raises: NotSetup: The subscriber was never :meth:`~.setup` or was previously :meth:`~.shutdown`. Returns: asyncio.Future[genpy.Message]: A future which will eventually contain the next message published on the topic. """ if not self.is_running(): raise exceptions.NotSetup(self, self._node_handle) res = asyncio.Future() self._message_futs.append(res) return res
[docs] def get_connections(self) -> list[str]: """ Returns the ``callerid`` of each connected client. If a connection does not provide an ID, then the value may be None. Returns: List[str]: A list of the caller IDs of all nodes connected to the subscriber. """ return list(self._connections.values())
[docs] def recently_read(self, *, seconds=1) -> bool: """ Whether the subscriber has recently read messages. Using this method is helpful when the subscriber may not be fast enough to keep up with all messages published. This method can be used to determine when the subscriber has finished reading queued messages. This is helpful at the end of programs, when a publisher has stopped publishing, but the subscriber has not yet received all messages. .. code-block:: python3 >>> while sub.recently_read(): ... print("Subscriber is still reading messages...") ... await asyncio.sleep(0.5) >>> print("Subscriber has finished catching up on messages!") Args: seconds (float): The number of seconds to wait after receiving a message before claiming that the subscriber is no longer actively reading messages. Defaults to 1 second. Returns: bool: Whether the subscriber has recently read messages. """ last_message_time = self.get_last_message_time() if last_message_time is not None: return time.time() - last_message_time.to_sec() < seconds return False
async def _publisher_thread(self, url: str) -> None: while True: try: proxy = rosxmlrpc.ROSMasterProxy( rosxmlrpc.AsyncServerProxy(url, self._node_handle), self._node_handle._name, ) value = await proxy.request_topic(self._name, [["TCPROS"]]) _, host, port = value assert isinstance(host, str) assert isinstance(port, int) # Note that this line opens a connection to the host + port combo # of the publisher - this connection will NOT be the same as the # node handle's TCP/XMLRPC servers. This will be a randomly assigned # port. reader, writer = await asyncio.open_connection(host, port) try: tcpros.send_string( tcpros.serialize_dict( dict( message_definition=self.message_type._full_text, callerid=self._node_handle._name, topic=self._name, md5sum=self.message_type._md5sum, type=self.message_type._type, ) ), writer, ) header = tcpros.deserialize_dict( await tcpros.receive_string(reader) ) self._connections[writer] = header.get("callerid", None) try: while True: data = await tcpros.receive_string(reader) msg = self.message_type().deserialize(data) try: self._callback(msg) except: traceback.print_exc() self._last_message = msg self._last_message_time = self._node_handle.get_time() old, self._message_futs = self._message_futs, [] for fut in old: fut.set_result(msg) await asyncio.sleep(0) except ( ConnectionRefusedError, BrokenPipeError, ConnectionResetError, asyncio.IncompleteReadError, ): pass finally: del self._connections[writer] except ( ConnectionRefusedError, BrokenPipeError, ConnectionResetError, asyncio.IncompleteReadError, ): pass finally: writer.close() await writer.wait_closed() except ( ConnectionRefusedError, BrokenPipeError, ConnectionResetError, asyncio.IncompleteReadError, ): pass except rosxmlrpc.XMLRPCException: print(f"Error connecting on subscriber '{self._name}' to '{url}', assuming publisher has died. Closing connection...") if url in self._publisher_threads: self._publisher_threads.pop(url) except Exception: traceback.print_exc() await util.wall_sleep( 1 ) # pause so that we don't repeatedly reconnect immediately on failure def _handle_publisher_list(self, publishers: list[str]) -> tuple[int, str, bool]: new = { k: self._publisher_threads.pop(k) if k in self._publisher_threads else asyncio.create_task(self._publisher_thread(k)) for k in publishers } for k, v in self._publisher_threads.items(): v.cancel() self._publisher_threads = new return 1, "success", False