API Reference¶
Exception Hierarchy¶
Exceptions¶
- class axros.AxrosException(message: str, node_handle: NodeHandle)[source]¶
Exception related to axros. Inherits from
Exception
.- node_handle¶
The node handle which caused the exception.
- Type
- class axros.NotSetup(resource: Resource, node_handle: NodeHandle)[source]¶
Indicates that a resource (such as a
axros.NodeHandle
) was not setup, or was previously shutdown and not setup again. To solve this issue, you will likely need to call the class’setup()
method.Inherits from
AxrosException
.
- class axros.AlreadySetup(resource: Resource, node_handle: NodeHandle)[source]¶
Indicates that a resource (such as a
axros.NodeHandle
) was already setup, butsetup()
was called again.Inherits from
AxrosException
.
- class axros.XMLRPCException(exception: Exception, request_body: bytes, node_handle: NodeHandle)[source]¶
Represents an error that occurred in the XMLRPC communication. Inherits from
AxrosException
.- str(x)
Pretty-prints the error status code and message.
- repr(x)
Pretty-prints the error status code and message. Equivalent to
str(x)
.
- class axros.ROSMasterError(message: str, code: int, node_handle: NodeHandle)[source]¶
Represents an exception that occurred when attempting to communicate with the ROS Master Server. Unlike
ROSMasterFailure
, this indicates that the request completed, but the returned value is not usable. Similar to a bad request HTTP error.Inherits from
AxrosException
.
- class axros.ROSMasterFailure(message: str, code: int, node_handle: NodeHandle)[source]¶
Represents a failure that occurred after a node sent an outgoing XMLRPC request to ROS Master. Unlike
ROSMasterError
, this indicates that the request failed in progress, and did not complete. This may cause side effects visible in other ROS services.Inherits from
AxrosException
.
Utility Functions¶
- async axros.wrap_timeout(fut: Awaitable[T], duration: float | genpy.Duration, *, cancel=True) T [source]¶
Wraps a given future in a timeout-based future. This can be used to ensure that select
asyncio.Future
objects complete on time. Futures who do not complete on time can be optionally cancelled.For an equivalent method that does not raise an exception, see
axros.wrap_time_notice()
.- Parameters
fut (
asyncio.Future
|coroutine
) – The future object to timeout.duration (
float
| genpy.Duration) – The duration to timeout.cancel (
bool
) – Keyword-only argument designating whether to cancel the future when the task times out.
- Raises
asyncio.TimeoutError – The task did not complete on time.
- Returns
The data returned from the awaitable.
- Return type
Any
- async axros.wrap_time_notice(fut: Awaitable[T], duration: float | genpy.Duration, description: str) T [source]¶
Prints a message if a future is taking longer than the noted duration.
>>> await axros.wrap_time_notice(asyncio.sleep(3), 2, "Example task") Example task is taking a while... # printed at 2 seconds Example task succeeded! # printed at 3 seconds >>> await axros.wrap_time_notice(asyncio.sleep(1), 2, "Example task") # nothing is printed
- Parameters
fut (
asyncio.Future
) – The future object.duration (
float
| genpy.Duration) – The duration to wait before printing a message.description (
str
) – The description to print.
- Returns
The result from the awaitable.
- Return type
Any
NodeHandle¶
- clsNodeHandle.from_argv
- clsNodeHandle.from_argv_with_remaining
- defadvertise
- defadvertise_service
- asyncdelete_param
- defget_name
- asyncget_param
- asyncget_param_names
- defget_service_client
- defget_time
- asynchas_param
- defis_running
- defis_shutdown
- asynclookup_node
- defresolve_name
- defresolve_name_without_remapping
- asyncsearch_param
- asyncset_param
- asyncsetup
- asyncshutdown
- asyncsleep
- asyncsleep_until
- defsubscribe
- class axros.NodeHandle(namespace: str, name: str, addr: str, master_uri: str, remappings: dict[str, str])[source]¶
Class representing a Pythonic handle for a ROS node, similar to the C++ concept of a node handle. This class provides asynchronous communication with several ROS features, including publishers, subscribers, and services through helper methods.
This node must be constructeed using the await keyword because it spins up resources upon initialization. As a result, you cannot setup this node using a specialized method.
You must shutdown the node using
shutdown()
. Otherwise, you can use an asynchronous context manager.Warning
You cannot spawn two nodes with the same name. Attempts to do so will result in the node which already exists being killed to make way for the new node you are initiailizing. If the other node is shutdown, it will be shutdown safely.
- async with x:
On enter, the node handle is setup through
setup()
. Upon leaving the context block, the nodehandle isshutdown()
.
- shutdown_callbacks¶
The callbacks that are executed when the a shutdown is requested for the node. This allows the node to call the shutdown coroutines of other classes, such as
axros.Publisher.shutdown()
andaxros.Subscriber.shutdown()
.- Type
set[Callable[[], Coroutine[Any, Any, Any]]]
- master_proxy¶
The proxy to the ROS Master URI. This is used by the node for all communication.
- Type
- tcpros_handlers¶
The handlers for incoming TCPROS connections. This allows the node to route incoming connections to the appropriate class.
- xmlrpc_handlers¶
The handlers for incoming XMLRPC connections. Acts similar to
tcpros_handlers
.
- Parameters
namespace (str) – The namespace to put the node under. The namespace should not end with a /. For example, nsexample/test.
name (str) – The name of the node itself. The name will be appended behind the namespace.
addr (str) – The address of the ROS master server, typically specified as an IPv4 address, and often,
localhost
.master_uri (str) – The URI linking to the ROS master server.
remappings (dict[str, str]) – Any remappings to use in the
setup()
method.
- advertise(name: str, message_type: type[M], *, latching: bool = False) publisher.Publisher[M] [source]¶
Creates a publisher using this node handle. The arguments and keyword arguments passed to this method are passed into the constructor of the publisher; check there for information on what arguments should be passed in.
Note that the publisher can only be used when it is setup through
Publisher.setup()
.- Parameters
name (str) – The name of the topic to publish.
message_type (Type[genpy.Message]) – The message type that will be published on the topic.
latching (bool) – Enables latching on the publisher. This ensures that all new connections are immediately sent the most recently sent message when they connect to the publisher.
- Raises
NotSetup – The node is not running. The node likely needs to be
setup()
.- Returns
The given publisher.
- Return type
- advertise_service(name: str, service_message: ServiceMessage[Request, Reply], callback: Callable[[Request], Awaitable[Reply]]) Service[Request, Reply] [source]¶
Creates a service using this node handle. The arguments and keyword arguments passed to this method are passed into the constructor of the service; check there for information on what arguments can be passed in.
- Parameters
name (str) – The name to use for the service.
service_message (type[genpy.Message]) – A ROS service class to use with the service. The callback method used by the class will receive the request class associated with the service, and is expected to return the response class associated with this class.
callback (Callable[[genpy.Message], Awaitable[genpy.Message]]) – An asynchronous callback to process all incoming service requests. The returned message type should be the reply type associated with the service.
- Raises
NotSetup – The node is not running. The node likely needs to be
setup()
.- Returns
The given service.
- Return type
- async delete_param(key: str) int [source]¶
Deletes the specified parameter from the parameter server.
- Parameters
key (str) – The parameter to delete from the parameter server.
- Raises
NotSetup – The node is not running. The node likely needs to be
setup()
.ROSMasterError – The specified key does not exist.
TypeError – The key provided is not a string.
- Returns
The result of the delete operation. According to ROS documentation, this value can be ignored.
- Return type
- classmethod from_argv(*args, **kwargs) NodeHandle [source]¶
Equivalent to
from_argv_with_remaining()
.
- classmethod from_argv_with_remaining(default_name: str, argv: list[str] | None = None, anonymous: bool = False, *, always_default_name=False) tuple[NodeHandle, list[str]] [source]¶
Constructs a handle from an argument vector. Attempts to find any missing values for node constructionf from environment variables and the argument vector.
- Parameters
default_name (str) – The default name of the node.
anonymous (bool) – Whether to make the node anonymous. Appends a random number to the end of the node name.
always_default_name (bool) – Whether to always use the default name of the node, avoiding remappings. Commonly used in testing or complex environments where stronger node control is desired. Defaults to
True
.
- async get_param(key: str, desired_type: type[X]) X [source]¶
- async get_param(key: str, desired_type: None = None) Optional[Union[bool, int, float, str, list, tuple, dict]]
Gets a parameter value from the ROS parameter server. If the key requested is a namespace, then a dictionary representing all keys below the given namespace is returned.
- Parameters
key (str) – The name of the parameter to retrieve from the server.
- Raises
- Returns
The value of the parameter with the given name.
- Return type
- async get_param_names() list[str] [source]¶
Gets the names of all parameters in the ROS parameter server.
- get_service_client(name: str, service_type: S) ServiceClient[S] [source]¶
Creates a service client using this node handle. This class can interface with services by calling them with a particular request message and receiving a response message.
This class is a generic and supports being associated with the service message used by the client and service in conjunction.
- get_time() Time [source]¶
Returns the time used by the node handle. If simulated time is requested to be used, then the current simulated time is returned. Otherwise, the current real time is returned.
- Returns
The current time used by the node handle.
- Return type
genpy.Time
- async has_param(key: str) bool [source]¶
Checks whether the ROS parameter server has a parameter with the specified name.
- is_running() bool [source]¶
- Returns
Whether the node handle is running. This is
True
after the node issetup()
and becomesFalse
when the node isshutdown()
. Will always be opposite ofis_shutdown()
.- Return type
- is_shutdown() bool [source]¶
- Returns
Whether the node handle is not running. This is
True
by default, callingsetup()
on the node will make thisFalse
. Will always be opposite ofis_running()
.- Return type
- async lookup_node(name: str) str | None [source]¶
Looks up the URI of a node based on its name. If it does not exist, None is returned.
- resolve_name(name: str) str [source]¶
Resolves the name of the node handle and updates the name attribute with this resolved name.
- resolve_name_without_remapping(name: str) str [source]¶
Resolves the true name of a given node path.
- async search_param(key: str) str [source]¶
Searches for a parameter on the ROS parameter server, and returns the first key found that matches the search term.
For more information on how searching is done to find a matching key, view the ROS wiki documentation on this method.
- async set_param(key: str, value: Optional[Union[bool, int, float, str, list, tuple, dict]]) int [source]¶
Sets a parameter and value in the ROS parameter server.
Note
Tuples are stored as lists in the parameter server. All contents remain the same, and the contents of the tuple are stored in the same order.
- Parameters
key (str) – The parameter to set in the parameter server.
value (
axros.XMLRPCLegalType
) – The value to set for the given parameter.
- Raises
- Returns
The result of setting the parameter. According to the ROS documentation, this value can be ignored.
- Return type
- async setup()[source]¶
Sets up the node handle. This must be called before the node is used.
- Raises
AssertionError – A condition was not met with the variables supplied to the node in construction.
- async shutdown(*, shutdown_servers: bool = True) None [source]¶
Shuts down all active connections. This should always be called before the loop is closed, or else multiple instances of
ResourceWarning
will be emitted.
- async sleep(duration: float | int | genpy.Duration) None [source]¶
Sleeps for a specified duration of time.
- Parameters
duration (Union[float, int, genpy.Duration]) – The amount of time to sleep for.
- Raises
TypeError – The argument passed to
duration
was not an allowed type.ValueError – The request sleep length was non-negative.
- async sleep_until(future_time: float | int | genpy.Time) None [source]¶
Sleeps for a specified amount of time.
- subscribe(name: str, message_type: type[M], callback: Callable[[M], None] = <function NodeHandle.<lambda>>) subscriber.Subscriber[M] [source]¶
Creates a subscriber using this node handle. The arguments and keyword arguments passed to this method are passed into the constructor of the subscriber; check there for information on what arguments can be passed in.
Note that the subscriber can only be used when it is setup through
Subscriber.setup()
.- Parameters
name (str) – The name of the topic to subscribe to.
message_type (type[genpy.Message]) – The message class shared by the topic.
callback (Callable[[genpy.Message], genpy.Message | None]) – The callback to use with the subscriber. The callback should receive an instance of the message shared by the topic and return None.
- Raises
NotSetup – The node is not running. The node likely needs to be
setup()
.- Returns
The constructed subscriber.
- Return type
Subscriber¶
- defget_connections
- defget_last_message
- defget_last_message_time
- defget_next_message
- defis_running
- defrecently_read
- asyncsetup
- asyncshutdown
- class axros.Subscriber(node_handle: NodeHandle, name: str, message_type: type[M], callback: Callable[[M], None] = <function Subscriber.<lambda>>)[source]¶
A subscriber in the axros suite. This class should usually be made through
axros.NodeHandle.subscribe()
.- async with x:
On entering the block, the subscriber is
setup()
; upon leaving the block, the subscriber isshutdown()
.
- Parameters
node_handle (NodeHandle) – The node handle used to communicate with the ROS master server.
name (str) – The name of the topic to subscribe to.
message_type (Type[genpy.Message]) – The message class shared by the topic.
callback (Callable[[genpy.Message], genpy.Message | None]) – The callback to use with the subscriber. The callback should receive an instance of the message shared by the topic and return None.
Note
If you are subscribing to a publisher publishing messages at a very fast rate, you should always use a callback over
get_next_message()
.- get_connections() list[str] [source]¶
Returns the
callerid
of each connected client. If a connection does not provide an ID, then the value may be None.- Returns
A list of the caller IDs of all nodes connected to the subscriber.
- Return type
List[str]
- get_last_message() M | None [source]¶
Gets the last received message. This may be
None
if no message has been received.- Returns
The last sent message, or
None
.- Return type
Optional[genpy.Message]
- get_last_message_time() genpy.Time | None [source]¶
Gets the time associated with the last received message. May be
None
if no message has been received yet.- Returns
The time of the last received message, or
None
.- Return type
Optional[genpy.Time]
- get_next_message() asyncio.Future[M] [source]¶
Returns a
Future
which will contain the next message received by the subscriber.Warning
This method should not be used to reliably obtain messages from publishers publishing at very high rates. Because of the overhead in setting up futures, this method will likely cause some messages to get lost.
Instead, you should use a callback with the subscriber - all messages are passed through the callback.
- Raises
NotSetup – The subscriber was never
setup()
or was previouslyshutdown()
.- Returns
A future which will eventually contain the next message published on the topic.
- Return type
asyncio.Future[genpy.Message]
- recently_read(*, seconds=1) bool [source]¶
Whether the subscriber has recently read messages.
Using this method is helpful when the subscriber may not be fast enough to keep up with all messages published. This method can be used to determine when the subscriber has finished reading queued messages. This is helpful at the end of programs, when a publisher has stopped publishing, but the subscriber has not yet received all messages.
>>> while sub.recently_read(): ... print("Subscriber is still reading messages...") ... await asyncio.sleep(0.5) >>> print("Subscriber has finished catching up on messages!")
- async setup() None [source]¶
Sets up the subscriber by registering the subscriber with ROS and listing handlers supported by the topic.
The subscriber must be setup before use.
- async shutdown()[source]¶
Shuts the subscriber down. All operations scheduled by the subscriber are immediately cancelled.
- Raises
ResourceWarning – The subscriber is already not running.
Publisher¶
- defget_connections
- defis_running
- defpublish
- asyncsetup
- asyncshutdown
- class axros.Publisher(node_handle: NodeHandle, name: str, message_type: type[M], latching: bool = False)[source]¶
A Publisher in the axros suite. Allows for the publishing of messages onto a certain ROS topic. This class should usually be made through
axros.NodeHandle.advertise()
.Before use, the publisher must be
setup()
and after use, it must beshutdown()
.- async with x:
On entering the block, the publisher is
setup()
; upon leaving the block, the publisher isshutdown()
.
- Parameters
node_handle (NodeHandle) – The node handle to associate with this publisher. Used to communicate with the ROS master server.
name (str) – The name of the publisher topic.
message_type (Type[genpy.Message]) – The message type that will be published on the topic.
latching (bool) – Enables latching on the publisher. This ensures that all new connections are immediately sent the most recently sent message when they connect to the publisher.
- publish(msg: M) None [source]¶
Publishes a message onto the topic. The message is serialized and sent to all connections connected to the publisher.
- Parameters
msg (genpy.Message) – The ROS message to send to all connected clients.
- Raises
TypeError – The message type was invalid.
NotSetup – The publisher was not
setup()
or was previouslyshutdown()
.
- async setup() None [source]¶
Sets up the publisher by registering the name of the publisher with ROS and registering callbacks associated with the publisher with the parent node handle.
This should always be called before the publisher is ever used.
- async shutdown()[source]¶
Shuts the publisher down. All operations scheduled by the publisher are cancelled.
- Raises
ResourceWarning – The publisher is not running. It may need to be
setup()
.
Service¶
- defis_running
- asyncsetup
- asyncshutdown
- class axros.Service(node_handle: NodeHandle, name: str, service_type: types.ServiceMessage[Request, Reply], callback: Callable[[Request], Awaitable[Reply]])[source]¶
A service in the axros suite. Handles incoming requests through a user-supplied asynchronous callback function, which is expected to return a response message.
This class completes the server aspect of the server-client relationship in ROS. The client class is the
axros.ServiceClient
class - this class can be used to call services.- async with x:
On entering the block, the publisher is
setup()
; upon leaving the block, the publisher isshutdown()
.
- Parameters
node_handle (NodeHandle) – The node handle to use in conjunction with the service.
name (str) – The name to use for the service.
service_type (ServiceType) – A ROS service class to use with the service. The callback method used by the class will receive the request class associated with the service, and is expected to return the response class associated with this class.
callback (Callable[[genpy.Message], Awaitable[genpy.Message]]) – An asynchronous callback to process all incoming service requests. The returned message type should be the reply type associated with the service.
- is_running() bool [source]¶
- Returns
Whether the service is running; ie, able to accept requests.
- Return type
ServiceClient¶
- asyncwait_for_service
- class axros.ServiceClient(node_handle: NodeHandle, name: str, service_type: S)[source]¶
A client connected to a service in axros. This is the client class of the client-server relationship in ROS; the server is the
axros.Service
class.- await x(request_class)
Makes a request to the service using an instance of the
request_class
request type. This operation returns the result sent by the topic through the master server. Any errors will raise an instance ofaxros.ServiceError
.
- Parameters
node_handle (NodeHandle) – The node handle serving as the client to the service.
name (str) – The name of the service to connect to.
AsyncServerProxy¶
- class axros.AsyncServerProxy(uri: str, node_handle: NodeHandle, *, headers: Iterable[tuple[str, str]] | None = None, context: Any | None = None)[source]¶
Asynchronous server proxy to a specific server URI using
aiohttp.ClientSession
. This class is used in conjunction withaxros.ROSMasterProxy
.- transport¶
The transports used to facilitate requests.
- Type
AsyncioTransport
- session¶
The client session used in the communication.
- Parameters
uri (str) – The URI used to connect to ROS Master. This can often be obtained through
os.environ["ROS_MASTER_URI"]
.node_handle (axros.NodeHandle) – The node handle starting the asynchronous proxy.
headers (Iterable[tuple[
str
,str
]]) – The specific headers to use for connection.context (Any | None) – The context for the asynchronous session.
ROSMasterProxy¶
- class axros.ROSMasterProxy(proxy: AsyncServerProxy, caller_id: str)[source]¶
The axros proxy to the ROS master server. This class does not provide a general suite of discrete methods for contracting with the master server, rather all method requests are forwarded to the server through the class’
__getattr__
method.This class implements all operations related to communication with ROS through its [Master API](https://wiki.ros.org/ROS/Master_API). Generally, this class is used on behalf of a node or process to make outbound requests to other ROS services. It is not used to receive connections from other ROS services.
Generally, this method should not be used in application-level code, rather helper methods should be called from classes such as
axros.NodeHandle
oraxros.Subscriber
. However, this class may be extended for new applications of low-level code.>>> import os >>> session = aiohttp.ClientSession() >>> proxy = AsyncServerProxy(os.environ["ROS_MASTER_URI"], session) >>> master = ROSMasterProxy(proxy, "") >>> print(await master.hasParam("/always_true")) True >>> await session.close()
- Parameters
proxy (
AsyncServerProxy
) – The proxy to a general XMLRPC server.caller_id (
str
) – The caller ID making the following requests.
- x.attr
Calls the master server with the attribute name representing the name of the method to call. The value is returned in a Deferred object. If the status code is not 1, then
axros.Error
is raised.- Raises:
XMLRPCException
: General error in communication. This couldbe due to a multitude of reasons.
ROSMasterError
: The status code returned by the ROS masterserver was -1, indicating that the request completed, but the associated return value is not usable.
ROSMasterFailure
: The status code returned by the ROS masterserver was 0, indicating that the request did not successfully complete.
- Parameters
proxy (axros.AsyncServerProxy) – The proxy representing an XMLRPC connection to the ROS master server.
caller_id (str) – The ID of the caller in the proxy.
Goal¶
- defstatus_msg
- class axros.Goal(goal_msg: ActionGoal[Goal], status: int = 0, status_text: str = '')[source]¶
Implementation of a goal object in the axros adaptation of the Simple Action Server.
- x == y
Determines equality between two goals by comparing their ids. If either side is not a
actionlib_msgs.msg.GoalStatus
, then the result isFalse
.
- Parameters
goal (GoalStatus) – The original goal message which constructs this class
status (uint8) – An enum representing the status of
status_text (str) – A string representing the status of the goal
GoalManager¶
- defcancel
- defforget
- defget_feedback
- defget_result
- class axros.GoalManager(action_client: ActionClient[Goal, Feedback, Result], goal: Goal)[source]¶
Manages the interactions between a specific goal and an action client.
This class is not meant to be constructed by client objects; rather it is instantiated by the
ActionClient
to talk to theSimpleActionServer
.- Parameters
action_client (ActionClient) – The axros action client to use to manage the goal.
goal (Goal) – The axros goal to manage.
- cancel() None [source]¶
Publishes a message to the action client requesting the goal to be cancelled.
- forget() None [source]¶
Leaves the manager running, but requests for the action client to stop work on the specific goal.
- get_feedback() asyncio.Future[types.ActionFeedback[types.Feedback]] [source]¶
Gets the feedback from all feedback
asyncio.Future
objects.
- get_result() asyncio.Future[types.ActionResult[types.Result]] [source]¶
Gets the result of the goal from the manager.
- Returns
???
- Return type
Any
SimpleActionServer¶
- defis_active
- defis_cancel_requested
- defis_new_goal_available
- defis_preempt_requested
- defis_running
- defpublish_feedback
- defset_aborted
- defset_preempted
- defset_succeeded
- asyncsetup
- asyncshutdown
- defstart
- defstop
- class axros.SimpleActionServer(node_handle: NodeHandle, name: str, action_type: type[types.Action[types.Goal, types.Feedback, types.Result]], goal_cb: Callable[[], None] | None = None, preempt_cb: Callable[[], None] | None = None)[source]¶
A simplified implementation of an action server. At a given time, can only at most have a current goal and a next goal. If new goals arrive with one already queued to be next, the goal with the greater timestamp will bump to other.
>>> # Construct a SimpleActionServer with a node handle, topic namespace, and type >>> serv = SimpleActionServer(nh, '/test_action', turtle_actionlib.msg.ShapeAction) >>> # The server must be started before any goals can be accepted >>> serv.start() >>> # To accept a goal >>> while not self.is_new_goal_available(): # Loop until there is a new goal ... await nh.sleep(0.1) >>> goal = serv.accept_new_goal() >>> # To publish feedback >>> serv.publish_feedback(ShapeFeedback()) >>> # To accept a preempt (a new goal attempted to replace the current one) >>> if self.is_preempt_requested(): ... goal = serv.accept_new_goal() # Automatically cancels old goal >>> # To finish a goal >>> serv.set_succeeded(text='Wahoo!', result=ShapeResult(apothem=1)) >>> # or >>> serv.set_aborted(text='Something went wrong!')
- async with x:
On entering the block,
setup()
is called; upon exit,shutdown()
is called.
- is_cancel_requested() bool [source]¶
- Returns
Whether a goal is currently active and a cancel has been requested.
- Return type
- is_new_goal_available() bool [source]¶
- Returns
Whether the next goal is defined, or not
None
.- Return type
- is_preempt_requested() bool [source]¶
- Returns
Whether the goal has been requested to be cancelled, and there is both a goal currently running and a goal scheduled to be run shortly.
- Return type
- is_running() bool [source]¶
- Returns
Whether the simple action server is running. Set to
True
whensetup()
is called; set toFalse
whenshutdown()
is called. This is independent of whetherstart()
orstop()
is called.- Return type
- publish_feedback(feedback: types.Feedback | None = None) None [source]¶
Publishes a feedback message onto the feedback topic.
- Parameters
feedback (Optional[genpy.Message]) – The optional feedback message to add to the sent message.
- set_aborted(result: types.Result | None = None, text: str = '') None [source]¶
Sets the status of the current goal to aborted.
- Parameters
result (Optional[genpy.Message]) – The message to attach in the result.
text (str) – The text to set in the result. Defaults to an empty string.
- set_preempted(result: types.Result | None = None, text: str = '') None [source]¶
Sets the status of the current goal to preempted.
- Parameters
result (Optional[genpy.Message]) – The message to attach in the result.
text (str) – The text to set in the result. Defaults to an empty string.
- set_succeeded(result: types.Result | None = None, text: str = '') None [source]¶
Sets the status of the current goal to be succeeded.
- Parameters
result (Optional[genpy.Message]) – The message to attach in the result.
text (str) – The text to set in the result. Defaults to an empty string.
- async setup() None [source]¶
Sets up the action server. This must be called before the action server can be used.
ActionClient¶
- defcancel_all_goals
- defcancel_goals_at_and_before_time
- defsend_goal
- asyncsetup
- asyncshutdown
- asyncwait_for_server
- class axros.ActionClient(node_handle: NodeHandle, name: str, action_type: type[types.Action[types.Goal, types.Feedback, types.Result]])[source]¶
Representation of an action client in axros. This works in conjunction with the
SimpleActionServer
by sending the servers goals to execute. In response, the client receives feedback updates and the final result of the goal, which it can handle by itself.- Parameters
node_handle (axros.NodeHandle) – Node handle used to power the action client.
name (str) – The name of the action client.
action_type (type[genpy.Message]) – The action message type used by the action client and server.
- cancel_all_goals() None [source]¶
Sends a message to the mission cancellation topic requesting all goals to be cancelled.
- Raises
NotSetup – The action client has not been
setup()
, or was previouslyshutdown()
.
- cancel_goals_at_and_before_time(time: Time) None [source]¶
Cancel all goals scheduled at and before a specific time.
- Parameters
time – The time to reference when selecting which goals to cancel.
- send_goal(goal: Goal) GoalManager[Goal, Feedback, Result] [source]¶
Sends a goal to a goal manager. The goal manager is responsible for the communication between the action client and server; it assists in this process by maintaining individual
asyncio.Future
objects.- Raises
NotSetup – The action client has not been
setup()
, or was previouslyshutdown()
.- Returns
The manager of the goal.
- Return type
- async setup()[source]¶
Sets up the action client. This must be called before the action client can be used.
Transform¶
- class axros.Transform(p: list[float], q: list[float])[source]¶
Represents a tf transform in the axros suite.
- x + y
Adds the components of each transform together.
- x - y
Subtracts the components between each transform.
- x * y
Multiplies the components of each transform together.
- str(x)
Prints the constructor of the class in a string-formatted fashion.
- repr(x)
Prints the constructor of the class in a string-formatted fashion. Equivalent to
str(x)
.
- classmethod from_Pose_message(msg: Pose)[source]¶
Constructs a transform from a
geometry_msgs.msg.Pose
message.
- classmethod from_Transform_message(msg: Transform)[source]¶
Constructs a transform from a
geometry_msgs.msg.Transform
message.
TransformBroadcaster¶
- class axros.TransformBroadcaster(node_handle: NodeHandle)[source]¶
Broadasts transforms onto a topic.
- Parameters
node_handle (NodeHandle) – The node handle used to communicate with the ROS master server.
- send_transform(transform: TransformStamped)[source]¶
Sends a stamped Transform message onto the publisher.
- Parameters
transform (TransformStamped) – The transform to publish onto the topic.
- Raises
TypeError – The transform class was not of the TransformStamped class.
TransformListener¶
- asyncshutdown