from __future__ import annotations
import asyncio
from io import BytesIO
from typing import TYPE_CHECKING, Callable, Generic, Protocol, Type, TypeVar
import genpy
from . import rosxmlrpc, tcpros, types, util
if TYPE_CHECKING:
from .nodehandle import NodeHandle
S = TypeVar("S", bound=types.ServiceMessage)
class ServiceType(Protocol):
_type: str
_md5sum: str
_request_class: genpy.Message
_response_class: genpy.Message
[docs]class ServiceError(Exception):
"""
Represents an error with a service client in axros.
Inherits from :class:`Exception`.
.. container:: operations
.. describe:: str(x)
Pretty-prints ``ServiceError`` name with the given error message.
.. describe:: repr(x)
Pretty-prints ``ServiceError`` name with the given error message.
Equivalent to ``str(x)``.
"""
def __init__(self, message: str):
self._message = message
def __str__(self):
return f"ServiceError({self._message!r})"
__repr__ = __str__
[docs]class ServiceClient(Generic[S]):
"""
A client connected to a service in axros. This is the client class of the
client-server relationship in ROS; the server is the :class:`axros.Service`
class.
.. container:: operations
.. describe:: await x(request_class)
Makes a request to the service using an instance of the ``request_class``
request type. This operation returns the result sent by the topic
through the master server. Any errors will raise an
instance of :class:`axros.ServiceError`.
"""
def __init__(self, node_handle: NodeHandle, name: str, service_type: S):
"""
Args:
node_handle (NodeHandle): The node handle serving as the client to the service.
name (str): The name of the service to connect to.
"""
self._node_handle = node_handle
self._name = self._node_handle.resolve_name(name)
self._type = service_type
def __str__(self) -> str:
return (
f"<axros.ServiceClient at 0x{id(self):0x}, "
f"name={self._name} "
f"service_type={self._type} "
f"node_handle={self._node_handle}>"
)
__repr__ = __str__
async def __call__(self, req: genpy.Message):
if req.__class__ != self._type._request_class:
raise TypeError(
f"Expected {self._type._request_class} message type when calling {self._name} service, but got {req.__class__} instead."
)
service_url = await self._node_handle.master_proxy.lookup_service(self._name)
protocol, rest = service_url.split("://", 1)
host, port_str = rest.rsplit(":", 1)
port = int(port_str)
assert protocol == "rosrpc"
loop = asyncio.get_event_loop()
reader, writer = await asyncio.open_connection(host, port)
try:
tcpros.send_string(
tcpros.serialize_dict(
dict(
callerid=self._node_handle._name,
service=self._name,
md5sum=self._type._md5sum,
type=self._type._type,
)
),
writer,
)
tcpros.deserialize_dict(await tcpros.receive_string(reader))
# request could be sent before header is received to reduce latency...
x = BytesIO()
self._type._request_class.serialize(req, x)
data = x.getvalue()
tcpros.send_string(data, writer)
result = await tcpros.receive_byte(reader)
data = await tcpros.receive_string(reader)
if result: # success
return self._type._response_class().deserialize(data)
else:
raise ServiceError(data.decode())
finally:
writer.close()
await writer.wait_closed()
[docs] async def wait_for_service(self):
"""
Waits for the service to appear. Checks to see if the service has appeared
10 times per second.
"""
while True:
try:
await self._node_handle.master_proxy.lookup_service(self._name)
except (rosxmlrpc.XMLRPCException, rosxmlrpc.ROSMasterError):
await util.wall_sleep(0.1) # XXX bad
continue
else:
return