from __future__ import annotations
import asyncio
import functools
import os
import random
import socket
import sys
import time
import traceback
import warnings
from types import TracebackType
from typing import Any, Awaitable, Callable, Coroutine, TypeVar, overload
import aiohttp
import genpy
import yaml
from aiohttp import web
from aiohttp_xmlrpc import handler as xmlrpc_handler
from roscpp.srv import (
GetLoggers,
GetLoggersRequest,
GetLoggersResponse,
SetLoggerLevel,
SetLoggerLevelRequest,
SetLoggerLevelResponse,
)
from rosgraph_msgs.msg import Clock
from . import (
exceptions,
publisher,
rosxmlrpc,
service,
serviceclient,
subscriber,
tcpros,
types,
util,
)
M = TypeVar("M", bound=types.Message)
Request = TypeVar("Request", bound=types.Message)
Reply = TypeVar("Reply", bound=types.Message)
S = TypeVar("S", bound=types.ServiceMessage)
X = TypeVar("X", bound=rosxmlrpc.XMLRPCLegalType)
class _XMLRPCSlave(xmlrpc_handler.XMLRPCView):
"""
This class implements the ROS slave API for a node handle created with axros.
Each method beginning with "rpc_" can be called by another process, such as
another ROS node or ROS itself, through XMLRPC. The name following "rpc_" is
the XMLRPC method name needed to execute the function.
For more information on the ROS Slave API, please see http://wiki.ros.org/ROS/Slave_API.
"""
# pylint: disable=invalid-name,no-self-use,missing-function-docstring
# invalid-name: method name must begin with rpc and use camel case
# no-self-use: this is okay; these methods must be a part of this class
# to respond to incoming XMLRPC requests
# missing-function-docstring: below methods are documented in the Slave API
# linked above
node_handle: NodeHandle
background_tasks: set[asyncio.Future]
def __init__(self, request: web.Request, *, node_handle: NodeHandle):
self.node_handle = node_handle
self.background_tasks = set()
super().__init__(request)
def __getattr__(self, attr: str):
if attr.startswith("xmlrpc_") or attr.startswith("rpc_"):
print(attr, "not implemented")
raise AttributeError(attr)
def rpc_getBusStats(self, _) -> tuple[int, str, list[list[Any]]]:
return 1, "success", [[], [], []] # XXX
def rpc_getBusInfo(self, _) -> tuple[int, str, list[Any]]:
# list of (connectionId, destinationId, direction, transport, topic, connected)
return 1, "success", [] # XXX
def rpc_getMasterUri(self, _) -> tuple[int, str, str]:
return 1, "success", self.node_handle.master_uri
async def rpc_shutdown(self, _, msg: str = ""):
print(
f"Shutdown of {self.node_handle.get_name()} node requested. Reason: {msg}"
)
try:
await util.wrap_timeout(
self.node_handle.shutdown(shutdown_servers=False), 3
)
except asyncio.TimeoutError:
pass
async def _kill_soon():
await self.node_handle.shutdown_servers()
print(f"Shut down of {self.node_handle.get_name()} node complete.")
sys.exit(0)
task = asyncio.create_task(_kill_soon())
# Get exception so the loop doesn't print it out later when the program
# has completed.
task.add_done_callback(lambda t: t.exception())
task.add_done_callback(self.background_tasks.remove)
self.background_tasks.add(task)
return (1, "success", False)
def rpc_getPid(self, _) -> tuple[int, str, int]:
return 1, "success", os.getpid()
def rpc_getSubscriptions(self, _) -> tuple[int, str, list[Any]]:
# This does not appear to be of importance - ROS appears to track this itself
return 1, "success", []
def rpc_getPublications(self, _) -> tuple[int, str, list[Any]]:
# This does not appear to be of importance - ROS appears to track this itself
return 1, "success", []
def rpc_paramUpdate(self, parameter_key, parameter_value) -> tuple[int, str, bool]:
# This behavior is not utilized by any axros-constructed nodes
del parameter_key, parameter_value
return 1, "success", False
def rpc_publisherUpdate(self, _, topic: str, publishers):
handlers = self.node_handle.xmlrpc_handlers.get(("publisherUpdate", topic))
if not handlers:
return 1, "success", True
for handler in handlers:
handler(publishers)
return 1, "success", True
def rpc_requestTopic(self, caller_id, topic: str, protocols):
handlers = self.node_handle.xmlrpc_handlers.get(("requestTopic", topic))
if not handlers:
return -1, f"Not a publisher of [{topic}]", []
for handler in handlers:
handler(protocols)
return (
1,
"ready on " + self.node_handle._tcpros_server_uri,
[
"TCPROS",
self.node_handle._tcpros_server_addr[0],
self.node_handle._tcpros_server_addr[1],
],
)
[docs]class NodeHandle:
"""
Class representing a Pythonic handle for a ROS node, similar to the C++
concept of a node handle. This class provides asynchronous communication with
several ROS features, including publishers, subscribers, and services through
helper methods.
This node must be constructeed using the `await` keyword because it spins up
resources upon initialization. As a result, you cannot setup this node using a specialized
method.
You must shutdown the node using :meth:`~.shutdown`. Otherwise, you can use an
asynchronous context manager.
.. warning::
You cannot spawn two nodes with the same name. Attempts to do so will result
in the node which already exists being killed to make way for the new node
you are initiailizing. If the other node is shutdown, it will be shutdown
safely.
.. container:: operations
.. describe:: async with x:
On enter, the node handle is setup through :meth:`~.setup`. Upon leaving
the context block, the nodehandle is :meth:`~.shutdown`.
Attributes:
master_uri (str): The master URI used by the handle to communicate with ROS.
shutdown_callbacks (set[Callable[[], Coroutine[Any, Any, Any]]]): The callbacks
that are executed when the a shutdown is requested for the node. This
allows the node to call the shutdown coroutines of other classes, such
as :meth:`axros.Publisher.shutdown` and :meth:`axros.Subscriber.shutdown`.
master_proxy (ROSMasterProxy): The proxy to the ROS Master URI. This is used
by the node for all communication.
xmlrpc_server_uri (str): The XMLRPC URI for the node.
tcpros_handlers (dict[tuple[str, str], list[types.TCPROSProtocol]]): The handlers
for incoming TCPROS connections. This allows the node to route incoming
connections to the appropriate class.
xmlrpc_handlers (dict[tuple[str, str], Callable[[Any], tuple[int, str, Any]]]): The
handlers for incoming XMLRPC connections. Acts similar to :attr:`~.tcpros_handlers`.
"""
# pylint: disable=too-many-public-methods
# too-many-public-methods: while we could move methods out of this class,
# that would reduce the simplicity of the class
# as well as its similarity to ROS
master_uri: str
_ns: str
_name: str
_addr: str
shutdown_callbacks: set[Callable[[], Coroutine[Any, Any, Any]]]
_aiohttp_session: aiohttp.ClientSession
_tcpros_server_uri: str
_is_running: bool
master_proxy: rosxmlrpc.ROSMasterProxy
xmlrpc_server_uri: str
tcpros_handlers: dict[tuple[str, str], list[types.TCPROSProtocol]]
xmlrpc_handlers: dict[tuple[str, str], list[Callable[[Any], tuple[int, str, Any]]]]
_tcpros_server: asyncio.AbstractServer
_tcpros_server_addr: tuple[str, int]
_use_sim_time: bool
_clock_sub: subscriber.Subscriber[Clock]
_remappings: dict[str, str]
_sim_time: genpy.Time
_xmlrpc_runner: web.AppRunner
_xmlrpc_site: web.TCPSite
get_log_serv: service.Service
set_log_serv: service.Service
[docs] @classmethod
def from_argv_with_remaining(
cls,
default_name: str,
argv: list[str] | None = None,
anonymous: bool = False,
*,
always_default_name=False,
) -> tuple[NodeHandle, list[str]]:
"""
Constructs a handle from an argument vector. Attempts to find any missing
values for node constructionf from environment variables and the argument
vector.
Args:
default_name (str): The default name of the node.
argv (list[str] | None): The argument vector to use.
anonymous (bool): Whether to make the node anonymous. Appends a random
number to the end of the node name.
always_default_name (bool): Whether to always use the default name of
the node, avoiding remappings. Commonly used in testing or complex
environments where stronger node control is desired. Defaults to ``True``.
"""
if not argv:
argv = sys.argv
res = [argv[0]]
remappings = {}
for arg in argv[1:]:
if ":=" in arg:
before, after = arg.split(":=")
assert before not in remappings
remappings[before] = after
else:
res.append(arg)
name = default_name
if anonymous:
name = f"{name}_{random.randrange(2**62):016x}"
if "__name" in remappings and not always_default_name:
name = remappings["__name"]
addr = socket.gethostname() # could be a bit smarter
addr = os.environ.get("ROS_IP", addr)
addr = os.environ.get("ROS_HOSTNAME", addr)
addr = remappings.get("__ip", addr)
addr = remappings.get("__hostname", addr)
master_uri = None
master_uri = os.environ.get("ROS_MASTER_URI", master_uri)
master_uri = remappings.get("__master", master_uri)
if master_uri is None:
raise ValueError(
"Either ROS_MASTER_URI variable or __master has to "
"be provided for node to connect to ROS."
)
namespace = ""
namespace = os.environ.get("ROS_NAMESPACE", namespace)
namespace = remappings.get("__ns", namespace)
return (
cls(
namespace=namespace,
name=name,
addr=addr,
master_uri=master_uri,
remappings=remappings,
)
), res
[docs] @classmethod
def from_argv(cls, *args, **kwargs) -> NodeHandle:
"""
Equivalent to :meth:`~.from_argv_with_remaining`.
"""
return (cls.from_argv_with_remaining(*args, **kwargs))[0]
def __init__(
self,
namespace: str,
name: str,
addr: str,
master_uri: str,
remappings: dict[str, str],
):
"""
Args:
namespace (str): The namespace to put the node under. The namespace should
not end with a `/`. For example, `nsexample/test`.
name (str): The name of the node itself. The name will be appended behind
the namespace.
addr (str): The address of the ROS master server, typically specified
as an IPv4 address, and often, ``localhost``.
master_uri (str): The URI linking to the ROS master server.
remappings (dict[str, str]): Any remappings to use in the :meth:`~.setup` method.
"""
self._ns = namespace
self._name = name
self._addr = addr
self.master_uri = master_uri
self._remappings = remappings
self._is_running = False
self._is_setting_up = False
def __str__(self) -> str:
return (
f"<axros.NodeHandle at 0x{id(self):0x}, "
f"name={self._name} "
f"running={self.is_running()} "
f"tcpros_uri={getattr(self, '_tcpros_server_uri', None)} "
f"xmlrpc_uri={getattr(self, 'xmlrpc_server_uri', None)}>"
)
__repr__ = __str__
async def _setup_xmlrpc_server(self):
"""
Sets up the XMLRPC server for the node.
"""
self.xmlrpc_handlers = {}
# Start up an HTTP server to handle incoming XMLRPC requests - below is
# an alternative to the blocking web.run_app method that is sometimes
# used with aiohttp.
app = web.Application()
async def handle_xmlrpc(request: web.Request):
slave = _XMLRPCSlave(request, node_handle=self)
return await slave.post()
app.router.add_route("*", "/", handle_xmlrpc)
self._xmlrpc_runner = web.AppRunner(app)
await self._xmlrpc_runner.setup()
self._xmlrpc_site = web.TCPSite(self._xmlrpc_runner, host=self._addr, port=0)
await self._xmlrpc_site.start()
port = self._xmlrpc_site._server.sockets[0].getsockname()[1] # type: ignore
self.xmlrpc_server_uri = f"http://{self._addr}:{port}/"
async def _setup_tcpros_server(self):
self.tcpros_handlers = {}
self._tcpros_server = await asyncio.start_server(
functools.partial(tcpros.callback, self.tcpros_handlers),
family=socket.AF_INET,
host=self._addr,
port=0,
) # Port 0 lets the host assign the port for us
tcpros_server_port = self._tcpros_server.sockets[0].getsockname()[1]
self._tcpros_server_addr = self._addr, tcpros_server_port
self._tcpros_server_uri = f"rosrpc://{self._addr}:{tcpros_server_port}"
[docs] async def setup(self):
"""
Sets up the node handle. This must be called before the node is used.
Raises:
AssertionError: A condition was not met with the variables supplied
to the node in construction.
"""
if self.is_running():
raise exceptions.AlreadySetup(self, self)
if self._ns:
assert self._ns[0] == "/"
assert not self._ns.endswith("/")
self._ns = self._ns # valid values: '', '/a', '/a/b'
assert "/" not in self._name
self._name = self._ns + "/" + self._name
self.shutdown_callbacks = set()
self._addr = self._addr
self._is_setting_up = True
self._aiohttp_session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=10))
self.master_proxy = rosxmlrpc.ROSMasterProxy(
rosxmlrpc.AsyncServerProxy(self.master_uri, self),
self._name,
)
await self._setup_xmlrpc_server()
await self._setup_tcpros_server()
while True:
try:
other_node_uri = await self.master_proxy.lookupNode(self._name)
except rosxmlrpc.ROSMasterError:
break # assume that error means unknown node
except Exception as _:
traceback.print_exc()
await util.wall_sleep(1) # pause so we don't retry immediately
else:
other_node_proxy = rosxmlrpc.AsyncServerProxy(other_node_uri, self)
other_node_proxy = rosxmlrpc.ROSMasterProxy(
other_node_proxy, self._name
)
try:
await util.wrap_timeout(
other_node_proxy.shutdown("new node registered with same name"),
3,
)
except ConnectionRefusedError:
pass
except Exception:
traceback.print_exc()
break
try:
self._use_sim_time = bool(await self.get_param("/use_sim_time"))
except rosxmlrpc.ROSMasterError: # assume that error means not found
self._use_sim_time = False
if self._use_sim_time:
def got_clock(msg: Clock):
self._sim_time = msg.clock
self._clock_sub = self.subscribe("/clock", Clock, got_clock)
await self._clock_sub.setup()
# make sure self._sim_time gets set before we continue
await util.wrap_time_notice(
self._clock_sub.get_next_message(),
1,
"getting simulated time from /clock",
)
for k, v in self._remappings.items():
if k.startswith("_") and not k.startswith("__"):
await self.set_param(self.resolve_name("~" + k[1:]), yaml.safe_load(v))
async def get_loggers_response(_: GetLoggersRequest) -> GetLoggersResponse:
return GetLoggersResponse()
async def set_loggers_response(
_: SetLoggerLevelRequest,
) -> SetLoggerLevelResponse:
return SetLoggerLevelResponse()
self.get_log_serv = self.advertise_service(
"~get_loggers", GetLoggers, get_loggers_response
)
self.set_log_serv = self.advertise_service(
"~set_logger_level", SetLoggerLevel, set_loggers_response
)
await asyncio.gather(self.get_log_serv.setup(), self.set_log_serv.setup())
self.shutdown_callbacks.update(
[self.get_log_serv.shutdown, self.set_log_serv.shutdown]
)
self._is_running = True
self._is_setting_up = False
return self
async def __aenter__(self) -> NodeHandle:
await self.setup()
return self
async def __aexit__(
self, exc_type: type[Exception], exc_value: Exception, traceback: TracebackType
):
await self.shutdown()
def __del__(self):
if self._is_running:
warnings.simplefilter("always", ResourceWarning)
warnings.warn(
f"shutdown() was never called for node '{self._name}'",
ResourceWarning,
)
warnings.simplefilter("default", ResourceWarning)
[docs] async def shutdown(self, *, shutdown_servers: bool = True) -> None:
"""
Shuts down all active connections. This should always be called before
the loop is closed, or else multiple instances of :class:`ResourceWarning`
will be emitted.
"""
if not self._is_running:
warnings.simplefilter("always", ResourceWarning)
warnings.warn(
f"The {self._name} node is not currently running. It may have been shutdown previously or never started.",
ResourceWarning,
)
warnings.simplefilter("default", ResourceWarning)
return
if self._is_running:
return await self._real_shutdown(shutdown_servers)
async def shutdown_servers(self):
if not self._aiohttp_session.closed:
self._tcpros_server.close()
await self._tcpros_server.wait_closed()
await self._xmlrpc_runner.cleanup()
await self._aiohttp_session.close()
async def _real_shutdown(self, shutdown_servers: bool):
self._is_running = False
count, total_coros = 1, len(self.shutdown_callbacks)
while self.shutdown_callbacks:
self.shutdown_callbacks, old = set(), self.shutdown_callbacks
old_coros = [func() for func in old]
for coro in old_coros:
try:
await coro
count += 1
except:
traceback.print_exc()
# Close the open aiohttp connection used for XMLRPC
# Clean this up LAST - other services/publishers/subscribers may still
# be using this session
if shutdown_servers:
await self.shutdown_servers()
[docs] def get_name(self) -> str:
"""
Returns:
str: The name of the node handle.
"""
return self._name
[docs] def get_time(self) -> genpy.Time:
"""
Returns the time used by the node handle. If simulated time is requested
to be used, then the current simulated time is returned. Otherwise, the current
real time is returned.
Returns:
genpy.Time: The current time used by the node handle.
"""
if self._use_sim_time:
return self._sim_time
else:
return genpy.Time(time.time())
[docs] async def sleep_until(self, future_time: float | int | genpy.Time) -> None:
"""
Sleeps for a specified amount of time.
Args:
future_time (Union[float, int, genpy.Time]): The amount of time to sleep until.
If a float or integer is used, then the generated time is constructed
through the ``from_sec`` method of ``genpy.Time``.
Raises:
TypeError: The type of argument passed to ``future_time`` is not an acceptable
type.
"""
if isinstance(future_time, (float, int)):
future_time = genpy.Time.from_sec(future_time)
elif not isinstance(future_time, genpy.Time):
raise TypeError("expected float or genpy.Time")
if self._use_sim_time:
while self._sim_time < future_time:
await self._clock_sub.get_next_message()
else:
while True:
current_time = self.get_time()
if current_time >= future_time:
return
await util.wall_sleep((future_time - current_time).to_sec())
[docs] async def sleep(self, duration: float | int | genpy.Duration) -> None:
"""
Sleeps for a specified duration of time.
Args:
duration (Union[float, int, genpy.Duration]): The amount of time to sleep
for.
Raises:
TypeError: The argument passed to ``duration`` was not an allowed type.
ValueError: The request sleep length was non-negative.
"""
if isinstance(duration, (float, int)):
if duration < 0:
raise ValueError("The sleep length must be non-negative.")
duration = genpy.Duration.from_sec(duration)
elif not isinstance(duration, genpy.Duration):
raise TypeError("expected float or genpy.Duration")
return await self.sleep_until(self.get_time() + duration)
[docs] def resolve_name_without_remapping(self, name: str) -> str:
"""
Resolves the true name of a given node path.
Args:
name (str): The path to resolve.
Returns:
str: The resolved path.
"""
if name.startswith("/"):
return name
if name.startswith("~"):
return self._name + "/" + name[1:]
return self._ns + "/" + name
[docs] def resolve_name(self, name: str) -> str:
"""
Resolves the name of the node handle and updates the name attribute with this
resolved name.
"""
name = self.resolve_name_without_remapping(name)
for before_unresolved, after_unresolved in self._remappings.items():
before = self.resolve_name_without_remapping(before_unresolved)
after = self.resolve_name_without_remapping(after_unresolved)
if name == before or name.startswith(before + "/"):
return after + name[len(before) :]
return name
[docs] def is_running(self) -> bool:
"""
Returns:
bool: Whether the node handle is running. This is ``True`` after the
node is :meth:`~.setup` and becomes ``False`` when the node is :meth:`~.shutdown`.
Will always be opposite of :meth:`~.is_shutdown`.
"""
return self._is_running
[docs] def is_shutdown(self) -> bool:
"""
Returns:
bool: Whether the node handle is not running. This is ``True`` by default,
calling :meth:`~.setup` on the node will make this ``False``. Will always
be opposite of :meth:`~.is_running`.
"""
return not self._is_running
[docs] def advertise_service(
self,
name: str,
service_message: types.ServiceMessage[Request, Reply],
callback: Callable[[Request], Awaitable[Reply]],
) -> service.Service[Request, Reply]:
"""
Creates a service using this node handle. The arguments and keyword
arguments passed to this method are passed into the constructor of the service;
check there for information on what arguments can be passed in.
Args:
name (str): The name to use for the service.
service_message (type[genpy.Message]): A ROS service class to use with the service.
The callback method used by the class will receive the request
class associated with the service, and is expected to return the
response class associated with this class.
callback (Callable[[genpy.Message], Awaitable[genpy.Message]]): An asynchronous callback
to process all incoming service requests. The returned message type
should be the reply type associated with the service.
Raises:
NotSetup: The node is not running. The node likely needs to be :meth:`~.setup`.
Returns:
axros.Service: The given service.
"""
if not self._is_running and not self._is_setting_up:
raise exceptions.NotSetup(self, self)
return service.Service(self, name, service_message, callback)
[docs] def get_service_client(
self, name: str, service_type: S
) -> serviceclient.ServiceClient[S]:
"""
Creates a service client using this node handle. This class can interface
with services by calling them with a particular request message and receiving
a response message.
This class is a generic and supports being associated with the service message
used by the client and service in conjunction.
Args:
name (str): The name of the service client.
service_type (type[genpy.Message]): The message representing the service. This
should not be the request or response message class, but should be
the main message class.
Raises:
NotSetup: The node is not running. The node likely needs to be :meth:`~.setup`.
Returns:
axros.ServiceClient: The constructed service client.
"""
if not self._is_running and not self._is_setting_up:
raise exceptions.NotSetup(self, self)
return serviceclient.ServiceClient(self, name, service_type)
[docs] def subscribe(
self,
name: str,
message_type: type[M],
callback: Callable[[M], None] = lambda _: None,
) -> subscriber.Subscriber[M]:
"""
Creates a subscriber using this node handle. The arguments and keyword
arguments passed to this method are passed into the constructor of the subscriber;
check there for information on what arguments can be passed in.
Note that the subscriber can only be used when it is setup through :meth:`Subscriber.setup`.
Args:
name (str): The name of the topic to subscribe to.
message_type (type[genpy.Message]): The message class shared by the topic.
callback (Callable[[genpy.Message], genpy.Message | None]): The callback to use with
the subscriber. The callback should receive an instance of the message
shared by the topic and return None.
Raises:
NotSetup: The node is not running. The node likely needs to be :meth:`~.setup`.
Returns:
axros.Subscriber: The constructed subscriber.
"""
if not self._is_running and not self._is_setting_up:
raise exceptions.NotSetup(self, self)
return subscriber.Subscriber(self, name, message_type, callback)
[docs] def advertise(
self, name: str, message_type: type[M], *, latching: bool = False
) -> publisher.Publisher[M]:
"""
Creates a publisher using this node handle. The arguments and keyword
arguments passed to this method are passed into the constructor of the publisher;
check there for information on what arguments should be passed in.
Note that the publisher can only be used when it is setup through :meth:`Publisher.setup`.
Args:
name (str): The name of the topic to publish.
message_type (Type[genpy.Message]): The message type that will be published
on the topic.
latching (bool): Enables latching on the publisher. This ensures that all
new connections are immediately sent the most recently sent message
when they connect to the publisher.
Raises:
NotSetup: The node is not running. The node likely needs to be :meth:`~.setup`.
Returns:
axros.Publisher: The given publisher.
"""
if not self._is_running and not self._is_setting_up:
raise exceptions.NotSetup(self, self)
return publisher.Publisher(self, name, message_type, latching)
@overload
async def get_param(self, key: str, desired_type: type[X]) -> X:
...
@overload
async def get_param(
self, key: str, desired_type: None = ...
) -> rosxmlrpc.XMLRPCLegalType:
...
[docs] async def get_param(
self, key: str, desired_type: type[X] | None = None
) -> rosxmlrpc.XMLRPCLegalType:
"""
Gets a parameter value from the ROS parameter server. If the key requested
is a namespace, then a dictionary representing all keys below the given
namespace is returned.
Args:
key (str): The name of the parameter to retrieve from the server.
Raises:
axros.ROSMasterException: The parameter is not set.
NotSetup: The node is not running.
TypeError: The key provided is not a string.
Returns:
:class:`axros.XMLRPCLegalType`: The value of the parameter with the given
name.
"""
if not self._is_running and not self._is_setting_up:
raise exceptions.NotSetup(self, self)
if not isinstance(key, str):
raise TypeError(
f"The '{key}' key used to get an item from the parameter server must be a string, not {key.__class__.__name__}"
)
res = await self.master_proxy.getParam(key)
if desired_type and not isinstance(res, desired_type):
raise TypeError(
f"The parameter '{key}' has type {res.__class__.__name__}, not {desired_type.__name__}.",
)
return res
[docs] async def has_param(self, key: str) -> bool:
"""
Checks whether the ROS parameter server has a parameter with the specified
name.
Args:
key (str): The name of the parameter to check the status of.
Raises:
NotSetup: The node is not running. The node likely needs to be :meth:`~.setup`.
TypeError: The key provided is not a string.
Returns:
bool: Whether the parameter server has the specified key.
"""
if not self._is_running and not self._is_setting_up:
raise exceptions.NotSetup(self, self)
if not isinstance(key, str):
raise TypeError(
f"The '{key}' key used to verify an item exists in the parameter server must be a string, not {key.__class__.__name__}"
)
return await self.master_proxy.hasParam(key)
[docs] async def delete_param(self, key: str) -> int:
"""
Deletes the specified parameter from the parameter server.
Args:
key (str): The parameter to delete from the parameter server.
Raises:
NotSetup: The node is not running. The node likely needs to be :meth:`~.setup`.
ROSMasterError: The specified key does not exist.
TypeError: The key provided is not a string.
Returns:
int: The result of the delete operation. According to ROS documentation,
this value can be ignored.
"""
if not self._is_running and not self._is_setting_up:
raise exceptions.NotSetup(self, self)
if not isinstance(key, str):
raise TypeError(
f"The '{key}' key used to delete an item from the parameter server must be a string, not {key.__class__.__name__}"
)
return await self.master_proxy.deleteParam(key)
[docs] async def set_param(self, key: str, value: rosxmlrpc.XMLRPCLegalType) -> int:
"""
Sets a parameter and value in the ROS parameter server.
.. note::
Tuples are stored as lists in the parameter server. All contents remain
the same, and the contents of the tuple are stored in the same order.
Args:
key (str): The parameter to set in the parameter server.
value (:class:`axros.XMLRPCLegalType`): The value to set for the given parameter.
Raises:
NotSetup: The node is not running. The node likely needs to be :meth:`~.setup`.
TypeError: The key provided is not a string.
Returns:
int: The result of setting the parameter. According to the ROS documentation,
this value can be ignored.
"""
if not self._is_running and not self._is_setting_up:
raise exceptions.NotSetup(self, self)
if not isinstance(key, str):
raise TypeError(
f"The '{key}' key used to set an item in the parameter server must be a string, not {key.__class__.__name__}"
)
return await self.master_proxy.setParam(key, value)
[docs] async def search_param(self, key: str) -> str:
"""
Searches for a parameter on the ROS parameter server, and returns the first
key found that matches the search term.
For more information on how searching is done to find a matching key, view
the `ROS wiki documentation on this method <http://wiki.ros.org/ROS/Parameter%20Server%20API>`_.
Args:
key (str): The search key to use to find parameters.
Raises:
NotSetup: The node is not running. The node likely needs to be :meth:`~.setup`.
TypeError: The key provided is not a string.
Returns:
str: The name of the first key found.
"""
if not self._is_running and not self._is_setting_up:
raise exceptions.NotSetup(self, self)
if not isinstance(key, str):
raise TypeError(
f"The '{key}' key used to search for an item from the parameter server must be a string, not {key.__class__.__name__}"
)
return await self.master_proxy.searchParam(key)
[docs] async def get_param_names(self) -> list[str]:
"""
Gets the names of all parameters in the ROS parameter server.
Raises:
NotSetup: The node is not running. The node likely needs to be :meth:`~.setup`.
Returns:
List[str]: The names of all parameters in the server.
"""
if not self._is_running and not self._is_setting_up:
raise exceptions.NotSetup(self, self)
return await self.master_proxy.getParamNames()
[docs] async def lookup_node(self, name: str) -> str | None:
"""
Looks up the URI of a node based on its name. If it does not exist,
`None` is returned.
Args:
name (str): The name of the node to lookup.
Returns:
Optional[str]: The URI of the node, if it exists.
"""
try:
return await self.master_proxy.lookupNode(name)
except rosxmlrpc.ROSMasterError:
return None