API Reference

Exception Hierarchy

Exceptions

Attributes
class axros.AxrosException(message: str, node_handle: NodeHandle)[source]

Exception related to axros. Inherits from Exception.

node_handle

The node handle which caused the exception.

Type

axros.NodeHandle

class axros.NotSetup(resource: Resource, node_handle: NodeHandle)[source]

Indicates that a resource (such as a axros.NodeHandle) was not setup, or was previously shutdown and not setup again. To solve this issue, you will likely need to call the class’ setup() method.

Inherits from AxrosException.

class axros.AlreadySetup(resource: Resource, node_handle: NodeHandle)[source]

Indicates that a resource (such as a axros.NodeHandle) was already setup, but setup() was called again.

Inherits from AxrosException.

Attributes
class axros.XMLRPCException(exception: Exception, request_body: bytes, node_handle: NodeHandle)[source]

Represents an error that occurred in the XMLRPC communication. Inherits from AxrosException.

str(x)

Pretty-prints the error status code and message.

repr(x)

Pretty-prints the error status code and message. Equivalent to str(x).

exception

The exception caught in the error.

Type

Exception

request_body

The request body of the request which caused the exception.

Type

bytes

Attributes
class axros.ROSMasterError(message: str, code: int, node_handle: NodeHandle)[source]

Represents an exception that occurred when attempting to communicate with the ROS Master Server. Unlike ROSMasterFailure, this indicates that the request completed, but the returned value is not usable. Similar to a bad request HTTP error.

Inherits from AxrosException.

ros_message

The message from ROS explaining the exception.

Type

str

Attributes
class axros.ROSMasterFailure(message: str, code: int, node_handle: NodeHandle)[source]

Represents a failure that occurred after a node sent an outgoing XMLRPC request to ROS Master. Unlike ROSMasterError, this indicates that the request failed in progress, and did not complete. This may cause side effects visible in other ROS services.

Inherits from AxrosException.

ros_message

The message from ROS explaining the exception.

Type

str

class axros.ServiceError(message: str)[source]

Represents an error with a service client in axros.

Inherits from Exception.

str(x)

Pretty-prints ServiceError name with the given error message.

repr(x)

Pretty-prints ServiceError name with the given error message. Equivalent to str(x).

class axros.TooPastError[source]

User asked for a transform that will never known because it’s from before the start of the history buffer.

Inherits from Exception.

Utility Functions

async axros.wrap_timeout(fut: Awaitable[T], duration: float | genpy.Duration, *, cancel=True) T[source]

Wraps a given future in a timeout-based future. This can be used to ensure that select asyncio.Future objects complete on time. Futures who do not complete on time can be optionally cancelled.

For an equivalent method that does not raise an exception, see axros.wrap_time_notice().

Parameters
  • fut (asyncio.Future | coroutine) – The future object to timeout.

  • duration (float | genpy.Duration) – The duration to timeout.

  • cancel (bool) – Keyword-only argument designating whether to cancel the future when the task times out.

Raises

asyncio.TimeoutError – The task did not complete on time.

Returns

The data returned from the awaitable.

Return type

Any

async axros.wrap_time_notice(fut: Awaitable[T], duration: float | genpy.Duration, description: str) T[source]

Prints a message if a future is taking longer than the noted duration.

>>> await axros.wrap_time_notice(asyncio.sleep(3), 2, "Example task")
Example task is taking a while... # printed at 2 seconds
Example task succeeded! # printed at 3 seconds
>>> await axros.wrap_time_notice(asyncio.sleep(1), 2, "Example task") # nothing is printed
Parameters
  • fut (asyncio.Future) – The future object.

  • duration (float | genpy.Duration) – The duration to wait before printing a message.

  • description (str) – The description to print.

Returns

The result from the awaitable.

Return type

Any

async axros.wall_sleep(duration: genpy.Duration | float) None[source]

Sleeps for a specified duration using asyncio.sleep().

Parameters

duration (genpy.Duration | float) – The amount of time to sleep for.

axros.XMLRPCLegalType

alias of Optional[Union[bool, int, float, str, list, tuple, dict]]

NodeHandle

class axros.NodeHandle(namespace: str, name: str, addr: str, master_uri: str, remappings: dict[str, str])[source]

Class representing a Pythonic handle for a ROS node, similar to the C++ concept of a node handle. This class provides asynchronous communication with several ROS features, including publishers, subscribers, and services through helper methods.

This node must be constructeed using the await keyword because it spins up resources upon initialization. As a result, you cannot setup this node using a specialized method.

You must shutdown the node using shutdown(). Otherwise, you can use an asynchronous context manager.

Warning

You cannot spawn two nodes with the same name. Attempts to do so will result in the node which already exists being killed to make way for the new node you are initiailizing. If the other node is shutdown, it will be shutdown safely.

async with x:

On enter, the node handle is setup through setup(). Upon leaving the context block, the nodehandle is shutdown().

master_uri

The master URI used by the handle to communicate with ROS.

Type

str

shutdown_callbacks

The callbacks that are executed when the a shutdown is requested for the node. This allows the node to call the shutdown coroutines of other classes, such as axros.Publisher.shutdown() and axros.Subscriber.shutdown().

Type

set[Callable[[], Coroutine[Any, Any, Any]]]

master_proxy

The proxy to the ROS Master URI. This is used by the node for all communication.

Type

ROSMasterProxy

xmlrpc_server_uri

The XMLRPC URI for the node.

Type

str

tcpros_handlers

The handlers for incoming TCPROS connections. This allows the node to route incoming connections to the appropriate class.

Type

dict[tuple[str, str], list[types.TCPROSProtocol]]

xmlrpc_handlers

The handlers for incoming XMLRPC connections. Acts similar to tcpros_handlers.

Type

dict[tuple[str, str], Callable[[Any], tuple[int, str, Any]]]

Parameters
  • namespace (str) – The namespace to put the node under. The namespace should not end with a /. For example, nsexample/test.

  • name (str) – The name of the node itself. The name will be appended behind the namespace.

  • addr (str) – The address of the ROS master server, typically specified as an IPv4 address, and often, localhost.

  • master_uri (str) – The URI linking to the ROS master server.

  • remappings (dict[str, str]) – Any remappings to use in the setup() method.

advertise(name: str, message_type: type[M], *, latching: bool = False) publisher.Publisher[M][source]

Creates a publisher using this node handle. The arguments and keyword arguments passed to this method are passed into the constructor of the publisher; check there for information on what arguments should be passed in.

Note that the publisher can only be used when it is setup through Publisher.setup().

Parameters
  • name (str) – The name of the topic to publish.

  • message_type (Type[genpy.Message]) – The message type that will be published on the topic.

  • latching (bool) – Enables latching on the publisher. This ensures that all new connections are immediately sent the most recently sent message when they connect to the publisher.

Raises

NotSetup – The node is not running. The node likely needs to be setup().

Returns

The given publisher.

Return type

axros.Publisher

advertise_service(name: str, service_message: ServiceMessage[Request, Reply], callback: Callable[[Request], Awaitable[Reply]]) Service[Request, Reply][source]

Creates a service using this node handle. The arguments and keyword arguments passed to this method are passed into the constructor of the service; check there for information on what arguments can be passed in.

Parameters
  • name (str) – The name to use for the service.

  • service_message (type[genpy.Message]) – A ROS service class to use with the service. The callback method used by the class will receive the request class associated with the service, and is expected to return the response class associated with this class.

  • callback (Callable[[genpy.Message], Awaitable[genpy.Message]]) – An asynchronous callback to process all incoming service requests. The returned message type should be the reply type associated with the service.

Raises

NotSetup – The node is not running. The node likely needs to be setup().

Returns

The given service.

Return type

axros.Service

async delete_param(key: str) int[source]

Deletes the specified parameter from the parameter server.

Parameters

key (str) – The parameter to delete from the parameter server.

Raises
Returns

The result of the delete operation. According to ROS documentation, this value can be ignored.

Return type

int

classmethod from_argv(*args, **kwargs) NodeHandle[source]

Equivalent to from_argv_with_remaining().

classmethod from_argv_with_remaining(default_name: str, argv: list[str] | None = None, anonymous: bool = False, *, always_default_name=False) tuple[NodeHandle, list[str]][source]

Constructs a handle from an argument vector. Attempts to find any missing values for node constructionf from environment variables and the argument vector.

Parameters
  • default_name (str) – The default name of the node.

  • argv (list[str] | None) – The argument vector to use.

  • anonymous (bool) – Whether to make the node anonymous. Appends a random number to the end of the node name.

  • always_default_name (bool) – Whether to always use the default name of the node, avoiding remappings. Commonly used in testing or complex environments where stronger node control is desired. Defaults to True.

get_name() str[source]
Returns

The name of the node handle.

Return type

str

async get_param(key: str, desired_type: type[X]) X[source]
async get_param(key: str, desired_type: None = None) Optional[Union[bool, int, float, str, list, tuple, dict]]

Gets a parameter value from the ROS parameter server. If the key requested is a namespace, then a dictionary representing all keys below the given namespace is returned.

Parameters

key (str) – The name of the parameter to retrieve from the server.

Raises
  • axros.ROSMasterException – The parameter is not set.

  • NotSetup – The node is not running.

  • TypeError – The key provided is not a string.

Returns

The value of the parameter with the given name.

Return type

axros.XMLRPCLegalType

async get_param_names() list[str][source]

Gets the names of all parameters in the ROS parameter server.

Raises

NotSetup – The node is not running. The node likely needs to be setup().

Returns

The names of all parameters in the server.

Return type

List[str]

get_service_client(name: str, service_type: S) ServiceClient[S][source]

Creates a service client using this node handle. This class can interface with services by calling them with a particular request message and receiving a response message.

This class is a generic and supports being associated with the service message used by the client and service in conjunction.

Parameters
  • name (str) – The name of the service client.

  • service_type (type[genpy.Message]) – The message representing the service. This should not be the request or response message class, but should be the main message class.

Raises

NotSetup – The node is not running. The node likely needs to be setup().

Returns

The constructed service client.

Return type

axros.ServiceClient

get_time() Time[source]

Returns the time used by the node handle. If simulated time is requested to be used, then the current simulated time is returned. Otherwise, the current real time is returned.

Returns

The current time used by the node handle.

Return type

genpy.Time

async has_param(key: str) bool[source]

Checks whether the ROS parameter server has a parameter with the specified name.

Parameters

key (str) – The name of the parameter to check the status of.

Raises
  • NotSetup – The node is not running. The node likely needs to be setup().

  • TypeError – The key provided is not a string.

Returns

Whether the parameter server has the specified key.

Return type

bool

is_running() bool[source]
Returns

Whether the node handle is running. This is True after the node is setup() and becomes False when the node is shutdown(). Will always be opposite of is_shutdown().

Return type

bool

is_shutdown() bool[source]
Returns

Whether the node handle is not running. This is True by default, calling setup() on the node will make this False. Will always be opposite of is_running().

Return type

bool

async lookup_node(name: str) str | None[source]

Looks up the URI of a node based on its name. If it does not exist, None is returned.

Parameters

name (str) – The name of the node to lookup.

Returns

The URI of the node, if it exists.

Return type

Optional[str]

resolve_name(name: str) str[source]

Resolves the name of the node handle and updates the name attribute with this resolved name.

resolve_name_without_remapping(name: str) str[source]

Resolves the true name of a given node path.

Parameters

name (str) – The path to resolve.

Returns

The resolved path.

Return type

str

async search_param(key: str) str[source]

Searches for a parameter on the ROS parameter server, and returns the first key found that matches the search term.

For more information on how searching is done to find a matching key, view the ROS wiki documentation on this method.

Parameters

key (str) – The search key to use to find parameters.

Raises
  • NotSetup – The node is not running. The node likely needs to be setup().

  • TypeError – The key provided is not a string.

Returns

The name of the first key found.

Return type

str

async set_param(key: str, value: Optional[Union[bool, int, float, str, list, tuple, dict]]) int[source]

Sets a parameter and value in the ROS parameter server.

Note

Tuples are stored as lists in the parameter server. All contents remain the same, and the contents of the tuple are stored in the same order.

Parameters
  • key (str) – The parameter to set in the parameter server.

  • value (axros.XMLRPCLegalType) – The value to set for the given parameter.

Raises
  • NotSetup – The node is not running. The node likely needs to be setup().

  • TypeError – The key provided is not a string.

Returns

The result of setting the parameter. According to the ROS documentation, this value can be ignored.

Return type

int

async setup()[source]

Sets up the node handle. This must be called before the node is used.

Raises

AssertionError – A condition was not met with the variables supplied to the node in construction.

async shutdown(*, shutdown_servers: bool = True) None[source]

Shuts down all active connections. This should always be called before the loop is closed, or else multiple instances of ResourceWarning will be emitted.

async sleep(duration: float | int | genpy.Duration) None[source]

Sleeps for a specified duration of time.

Parameters

duration (Union[float, int, genpy.Duration]) – The amount of time to sleep for.

Raises
  • TypeError – The argument passed to duration was not an allowed type.

  • ValueError – The request sleep length was non-negative.

async sleep_until(future_time: float | int | genpy.Time) None[source]

Sleeps for a specified amount of time.

Parameters

future_time (Union[float, int, genpy.Time]) – The amount of time to sleep until. If a float or integer is used, then the generated time is constructed through the from_sec method of genpy.Time.

Raises

TypeError – The type of argument passed to future_time is not an acceptable type.

subscribe(name: str, message_type: type[M], callback: Callable[[M], None] = <function NodeHandle.<lambda>>) subscriber.Subscriber[M][source]

Creates a subscriber using this node handle. The arguments and keyword arguments passed to this method are passed into the constructor of the subscriber; check there for information on what arguments can be passed in.

Note that the subscriber can only be used when it is setup through Subscriber.setup().

Parameters
  • name (str) – The name of the topic to subscribe to.

  • message_type (type[genpy.Message]) – The message class shared by the topic.

  • callback (Callable[[genpy.Message], genpy.Message | None]) – The callback to use with the subscriber. The callback should receive an instance of the message shared by the topic and return None.

Raises

NotSetup – The node is not running. The node likely needs to be setup().

Returns

The constructed subscriber.

Return type

axros.Subscriber

Subscriber

class axros.Subscriber(node_handle: NodeHandle, name: str, message_type: type[M], callback: Callable[[M], None] = <function Subscriber.<lambda>>)[source]

A subscriber in the axros suite. This class should usually be made through axros.NodeHandle.subscribe().

async with x:

On entering the block, the subscriber is setup(); upon leaving the block, the subscriber is shutdown().

Parameters
  • node_handle (NodeHandle) – The node handle used to communicate with the ROS master server.

  • name (str) – The name of the topic to subscribe to.

  • message_type (Type[genpy.Message]) – The message class shared by the topic.

  • callback (Callable[[genpy.Message], genpy.Message | None]) – The callback to use with the subscriber. The callback should receive an instance of the message shared by the topic and return None.

Note

If you are subscribing to a publisher publishing messages at a very fast rate, you should always use a callback over get_next_message().

get_connections() list[str][source]

Returns the callerid of each connected client. If a connection does not provide an ID, then the value may be None.

Returns

A list of the caller IDs of all nodes connected to the subscriber.

Return type

List[str]

get_last_message() M | None[source]

Gets the last received message. This may be None if no message has been received.

Returns

The last sent message, or None.

Return type

Optional[genpy.Message]

get_last_message_time() genpy.Time | None[source]

Gets the time associated with the last received message. May be None if no message has been received yet.

Returns

The time of the last received message, or None.

Return type

Optional[genpy.Time]

get_next_message() asyncio.Future[M][source]

Returns a Future which will contain the next message received by the subscriber.

Warning

This method should not be used to reliably obtain messages from publishers publishing at very high rates. Because of the overhead in setting up futures, this method will likely cause some messages to get lost.

Instead, you should use a callback with the subscriber - all messages are passed through the callback.

Raises

NotSetup – The subscriber was never setup() or was previously shutdown().

Returns

A future which will eventually contain the next message published on the topic.

Return type

asyncio.Future[genpy.Message]

is_running() bool[source]
Returns

Whether the subscriber is running.

Return type

bool

recently_read(*, seconds=1) bool[source]

Whether the subscriber has recently read messages.

Using this method is helpful when the subscriber may not be fast enough to keep up with all messages published. This method can be used to determine when the subscriber has finished reading queued messages. This is helpful at the end of programs, when a publisher has stopped publishing, but the subscriber has not yet received all messages.

>>> while sub.recently_read():
...     print("Subscriber is still reading messages...")
...     await asyncio.sleep(0.5)
>>> print("Subscriber has finished catching up on messages!")
Parameters

seconds (float) – The number of seconds to wait after receiving a message before claiming that the subscriber is no longer actively reading messages. Defaults to 1 second.

Returns

Whether the subscriber has recently read messages.

Return type

bool

async setup() None[source]

Sets up the subscriber by registering the subscriber with ROS and listing handlers supported by the topic.

The subscriber must be setup before use.

async shutdown()[source]

Shuts the subscriber down. All operations scheduled by the subscriber are immediately cancelled.

Raises

ResourceWarning – The subscriber is already not running.

Publisher

Attributes
Methods
class axros.Publisher(node_handle: NodeHandle, name: str, message_type: type[M], latching: bool = False)[source]

A Publisher in the axros suite. Allows for the publishing of messages onto a certain ROS topic. This class should usually be made through axros.NodeHandle.advertise().

Before use, the publisher must be setup() and after use, it must be shutdown().

async with x:

On entering the block, the publisher is setup(); upon leaving the block, the publisher is shutdown().

message_type

The type of message being published on the topic.

Type

type[genpy.Message]

Parameters
  • node_handle (NodeHandle) – The node handle to associate with this publisher. Used to communicate with the ROS master server.

  • name (str) – The name of the publisher topic.

  • message_type (Type[genpy.Message]) – The message type that will be published on the topic.

  • latching (bool) – Enables latching on the publisher. This ensures that all new connections are immediately sent the most recently sent message when they connect to the publisher.

get_connections()[source]

Gets connections to the publisher.

Returns

The list of nodes connected to the publisher.

Return type

list[str]

is_running() bool[source]
Returns

Whether the publisher is running. True when the publisher is setup() and False otherwise (including when the node handle or publisher is shutdown).

Return type

bool

publish(msg: M) None[source]

Publishes a message onto the topic. The message is serialized and sent to all connections connected to the publisher.

Parameters

msg (genpy.Message) – The ROS message to send to all connected clients.

Raises
async setup() None[source]

Sets up the publisher by registering the name of the publisher with ROS and registering callbacks associated with the publisher with the parent node handle.

This should always be called before the publisher is ever used.

async shutdown()[source]

Shuts the publisher down. All operations scheduled by the publisher are cancelled.

Raises

ResourceWarning – The publisher is not running. It may need to be setup().

Service

Methods
class axros.Service(node_handle: NodeHandle, name: str, service_type: types.ServiceMessage[Request, Reply], callback: Callable[[Request], Awaitable[Reply]])[source]

A service in the axros suite. Handles incoming requests through a user-supplied asynchronous callback function, which is expected to return a response message.

This class completes the server aspect of the server-client relationship in ROS. The client class is the axros.ServiceClient class - this class can be used to call services.

async with x:

On entering the block, the publisher is setup(); upon leaving the block, the publisher is shutdown().

Parameters
  • node_handle (NodeHandle) – The node handle to use in conjunction with the service.

  • name (str) – The name to use for the service.

  • service_type (ServiceType) – A ROS service class to use with the service. The callback method used by the class will receive the request class associated with the service, and is expected to return the response class associated with this class.

  • callback (Callable[[genpy.Message], Awaitable[genpy.Message]]) – An asynchronous callback to process all incoming service requests. The returned message type should be the reply type associated with the service.

is_running() bool[source]
Returns

Whether the service is running; ie, able to accept requests.

Return type

bool

async setup() None[source]

Sets up the service to be able to receive incoming connections.

This must be called before the service can be used.

async shutdown() None[source]

Shuts the service down. Cancels all operations currently scheduled to be completed by the service.

ServiceClient

Methods
class axros.ServiceClient(node_handle: NodeHandle, name: str, service_type: S)[source]

A client connected to a service in axros. This is the client class of the client-server relationship in ROS; the server is the axros.Service class.

await x(request_class)

Makes a request to the service using an instance of the request_class request type. This operation returns the result sent by the topic through the master server. Any errors will raise an instance of axros.ServiceError.

Parameters
  • node_handle (NodeHandle) – The node handle serving as the client to the service.

  • name (str) – The name of the service to connect to.

async wait_for_service()[source]

Waits for the service to appear. Checks to see if the service has appeared 10 times per second.

AsyncServerProxy

Attributes
class axros.AsyncServerProxy(uri: str, node_handle: NodeHandle, *, headers: Iterable[tuple[str, str]] | None = None, context: Any | None = None)[source]

Asynchronous server proxy to a specific server URI using aiohttp.ClientSession. This class is used in conjunction with axros.ROSMasterProxy.

transport

The transports used to facilitate requests.

Type

AsyncioTransport

session

The client session used in the communication.

Type

aiohttp.ClientSession

uri

The URI used to connect to ROS master.

Type

str

Parameters
  • uri (str) – The URI used to connect to ROS Master. This can often be obtained through os.environ["ROS_MASTER_URI"].

  • node_handle (axros.NodeHandle) – The node handle starting the asynchronous proxy.

  • headers (Iterable[tuple[str, str]]) – The specific headers to use for connection.

  • context (Any | None) – The context for the asynchronous session.

ROSMasterProxy

class axros.ROSMasterProxy(proxy: AsyncServerProxy, caller_id: str)[source]

The axros proxy to the ROS master server. This class does not provide a general suite of discrete methods for contracting with the master server, rather all method requests are forwarded to the server through the class’ __getattr__ method.

This class implements all operations related to communication with ROS through its [Master API](https://wiki.ros.org/ROS/Master_API). Generally, this class is used on behalf of a node or process to make outbound requests to other ROS services. It is not used to receive connections from other ROS services.

Generally, this method should not be used in application-level code, rather helper methods should be called from classes such as axros.NodeHandle or axros.Subscriber. However, this class may be extended for new applications of low-level code.

>>> import os
>>> session = aiohttp.ClientSession()
>>> proxy = AsyncServerProxy(os.environ["ROS_MASTER_URI"], session)
>>> master = ROSMasterProxy(proxy, "")
>>> print(await master.hasParam("/always_true"))
True
>>> await session.close()
Parameters
  • proxy (AsyncServerProxy) – The proxy to a general XMLRPC server.

  • caller_id (str) – The caller ID making the following requests.

x.attr

Calls the master server with the attribute name representing the name of the method to call. The value is returned in a Deferred object. If the status code is not 1, then axros.Error is raised.

Raises:
XMLRPCException: General error in communication. This could

be due to a multitude of reasons.

ROSMasterError: The status code returned by the ROS master

server was -1, indicating that the request completed, but the associated return value is not usable.

ROSMasterFailure: The status code returned by the ROS master

server was 0, indicating that the request did not successfully complete.

Parameters
  • proxy (axros.AsyncServerProxy) – The proxy representing an XMLRPC connection to the ROS master server.

  • caller_id (str) – The ID of the caller in the proxy.

Goal

Methods
class axros.Goal(goal_msg: ActionGoal[Goal], status: int = 0, status_text: str = '')[source]

Implementation of a goal object in the axros adaptation of the Simple Action Server.

x == y

Determines equality between two goals by comparing their ids. If either side is not a actionlib_msgs.msg.GoalStatus, then the result is False.

Parameters
  • goal (GoalStatus) – The original goal message which constructs this class

  • status (uint8) – An enum representing the status of

  • status_text (str) – A string representing the status of the goal

status_msg() GoalStatus[source]

Constructs a GoalStatus message from the Goal class.

Returns

The constructed message.

Return type

GoalStatus

GoalManager

Methods
class axros.GoalManager(action_client: ActionClient[Goal, Feedback, Result], goal: Goal)[source]

Manages the interactions between a specific goal and an action client.

This class is not meant to be constructed by client objects; rather it is instantiated by the ActionClient to talk to the SimpleActionServer.

Parameters
  • action_client (ActionClient) – The axros action client to use to manage the goal.

  • goal (Goal) – The axros goal to manage.

cancel() None[source]

Publishes a message to the action client requesting the goal to be cancelled.

forget() None[source]

Leaves the manager running, but requests for the action client to stop work on the specific goal.

get_feedback() asyncio.Future[types.ActionFeedback[types.Feedback]][source]

Gets the feedback from all feedback asyncio.Future objects.

get_result() asyncio.Future[types.ActionResult[types.Result]][source]

Gets the result of the goal from the manager.

Returns

???

Return type

Any

SimpleActionServer

class axros.SimpleActionServer(node_handle: NodeHandle, name: str, action_type: type[types.Action[types.Goal, types.Feedback, types.Result]], goal_cb: Callable[[], None] | None = None, preempt_cb: Callable[[], None] | None = None)[source]

A simplified implementation of an action server. At a given time, can only at most have a current goal and a next goal. If new goals arrive with one already queued to be next, the goal with the greater timestamp will bump to other.

>>> # Construct a SimpleActionServer with a node handle, topic namespace, and type
>>> serv = SimpleActionServer(nh, '/test_action', turtle_actionlib.msg.ShapeAction)
>>> # The server must be started before any goals can be accepted
>>> serv.start()
>>> # To accept a goal
>>> while not self.is_new_goal_available():  # Loop until there is a new goal
...     await nh.sleep(0.1)
>>> goal = serv.accept_new_goal()
>>> # To publish feedback
>>> serv.publish_feedback(ShapeFeedback())
>>> # To accept a preempt (a new goal attempted to replace the current one)
>>> if self.is_preempt_requested():
...     goal = serv.accept_new_goal()  # Automatically cancels old goal
>>> # To finish a goal
>>> serv.set_succeeded(text='Wahoo!', result=ShapeResult(apothem=1))
>>> # or
>>> serv.set_aborted(text='Something went wrong!')
async with x:

On entering the block, setup() is called; upon exit, shutdown() is called.

is_active() bool[source]
Returns

Returns whether there is an active goal running.

Return type

bool

is_cancel_requested() bool[source]
Returns

Whether a goal is currently active and a cancel has been requested.

Return type

bool

is_new_goal_available() bool[source]
Returns

Whether the next goal is defined, or not None.

Return type

bool

is_preempt_requested() bool[source]
Returns

Whether the goal has been requested to be cancelled, and there is both a goal currently running and a goal scheduled to be run shortly.

Return type

bool

is_running() bool[source]
Returns

Whether the simple action server is running. Set to True when setup() is called; set to False when shutdown() is called. This is independent of whether start() or stop() is called.

Return type

bool

publish_feedback(feedback: types.Feedback | None = None) None[source]

Publishes a feedback message onto the feedback topic.

Parameters

feedback (Optional[genpy.Message]) – The optional feedback message to add to the sent message.

set_aborted(result: types.Result | None = None, text: str = '') None[source]

Sets the status of the current goal to aborted.

Parameters
  • result (Optional[genpy.Message]) – The message to attach in the result.

  • text (str) – The text to set in the result. Defaults to an empty string.

set_preempted(result: types.Result | None = None, text: str = '') None[source]

Sets the status of the current goal to preempted.

Parameters
  • result (Optional[genpy.Message]) – The message to attach in the result.

  • text (str) – The text to set in the result. Defaults to an empty string.

set_succeeded(result: types.Result | None = None, text: str = '') None[source]

Sets the status of the current goal to be succeeded.

Parameters
  • result (Optional[genpy.Message]) – The message to attach in the result.

  • text (str) – The text to set in the result. Defaults to an empty string.

async setup() None[source]

Sets up the action server. This must be called before the action server can be used.

async shutdown() None[source]

Shuts the action server down. This must be called before the program is ended.

start() None[source]

Starts the status loop for the server.

stop() None[source]

Stops the status loop for the server, and clears all running goals and all goals scheduled to be run.

ActionClient

class axros.ActionClient(node_handle: NodeHandle, name: str, action_type: type[types.Action[types.Goal, types.Feedback, types.Result]])[source]

Representation of an action client in axros. This works in conjunction with the SimpleActionServer by sending the servers goals to execute. In response, the client receives feedback updates and the final result of the goal, which it can handle by itself.

Parameters
  • node_handle (axros.NodeHandle) – Node handle used to power the action client.

  • name (str) – The name of the action client.

  • action_type (type[genpy.Message]) – The action message type used by the action client and server.

cancel_all_goals() None[source]

Sends a message to the mission cancellation topic requesting all goals to be cancelled.

Raises

NotSetup – The action client has not been setup(), or was previously shutdown().

cancel_goals_at_and_before_time(time: Time) None[source]

Cancel all goals scheduled at and before a specific time.

Parameters

time – The time to reference when selecting which goals to cancel.

send_goal(goal: Goal) GoalManager[Goal, Feedback, Result][source]

Sends a goal to a goal manager. The goal manager is responsible for the communication between the action client and server; it assists in this process by maintaining individual asyncio.Future objects.

Raises

NotSetup – The action client has not been setup(), or was previously shutdown().

Returns

The manager of the goal.

Return type

GoalManager

async setup()[source]

Sets up the action client. This must be called before the action client can be used.

async shutdown()[source]

Shuts down the action client. This should always be called when the action client is no longer needed.

async wait_for_server()[source]

Waits for a server connection. When at least one connection is received, the function terminates.

Transform

class axros.Transform(p: list[float], q: list[float])[source]

Represents a tf transform in the axros suite.

x + y

Adds the components of each transform together.

x - y

Subtracts the components between each transform.

x * y

Multiplies the components of each transform together.

str(x)

Prints the constructor of the class in a string-formatted fashion.

repr(x)

Prints the constructor of the class in a string-formatted fashion. Equivalent to str(x).

classmethod from_Pose_message(msg: Pose)[source]

Constructs a transform from a geometry_msgs.msg.Pose message.

classmethod from_Transform_message(msg: Transform)[source]

Constructs a transform from a geometry_msgs.msg.Transform message.

classmethod identity()[source]

Creates and identity transform. The x, y, and z coordinates are all set to zero. The quaternion is set to [0, 0, 0, 1].

inverse() Transform[source]

Constructs an inverse transform.

Returns

The inverse transform.

Return type

Transform

TransformBroadcaster

Methods
class axros.TransformBroadcaster(node_handle: NodeHandle)[source]

Broadasts transforms onto a topic.

Parameters

node_handle (NodeHandle) – The node handle used to communicate with the ROS master server.

send_transform(transform: TransformStamped)[source]

Sends a stamped Transform message onto the publisher.

Parameters

transform (TransformStamped) – The transform to publish onto the topic.

Raises

TypeError – The transform class was not of the TransformStamped class.

TransformListener

Methods
class axros.TransformListener(node_handle: NodeHandle, history_length: genpy.Duration = genpy.Duration[10000000000])[source]
async shutdown() None[source]

Shuts the transform listener down.