Source code for axros.action

from __future__ import annotations

import asyncio
import random
import traceback
import warnings
from types import TracebackType
from typing import TYPE_CHECKING, Any, Callable, Generic

import genpy
from actionlib_msgs.msg import GoalID, GoalStatus, GoalStatusArray
from axros.exceptions import AlreadySetup, NotSetup
from std_msgs.msg import Header

from . import types, util

if TYPE_CHECKING:
    from .nodehandle import NodeHandle


[docs]class GoalManager(Generic[types.Goal, types.Feedback, types.Result]): """ Manages the interactions between a specific goal and an action client. This class is not meant to be constructed by client objects; rather it is instantiated by the :class:`~.ActionClient` to talk to the :class:`~.SimpleActionServer`. """ _action_client: ActionClient[types.Goal, types.Feedback, types.Result] _goal: types.Goal _goal_id: str _feedback_futs: list[asyncio.Future[types.Feedback]] def __init__( self, action_client: ActionClient[types.Goal, types.Feedback, types.Result], goal: types.Goal, ): """ Args: action_client (ActionClient): The axros action client to use to manage the goal. goal (Goal): The axros goal to manage. """ self._action_client = action_client self._goal = goal self._goal_id = f"{random.randrange(2**64):016x}" assert self._goal_id not in self._action_client._goal_managers self._action_client._goal_managers[self._goal_id] = self self._feedback_futs = [] self._result_fut = asyncio.Future() self._think_thread = asyncio.create_task(self._think()) async def _think(self) -> None: try: now = self._action_client._node_handle.get_time() await self._action_client.wait_for_server() self._action_client._goal_pub.publish( self._action_client._goal_type( header=Header( stamp=now, ), goal_id=GoalID( stamp=now, id=self._goal_id, ), goal=self._goal, ) ) except: traceback.print_exc() def __str__(self) -> str: return f"<axros.GoalManager at 0x{id(self):0x}, action_client={self._action_client} goal={self._goal}>" def _status_callback(self, status): del status def _result_callback(self, status, result: types.Result): del status self.forget() if not self._result_fut.done(): self._result_fut.set_result(result) def _feedback_callback(self, status, feedback: types.Feedback): del status old, self._feedback_futs = self._feedback_futs, [] for fut in old: fut.set_result(feedback)
[docs] def get_result(self) -> asyncio.Future[types.ActionResult[types.Result]]: """ Gets the result of the goal from the manager. Returns: Any: ??? """ return self._result_fut
[docs] def get_feedback(self) -> asyncio.Future[types.ActionFeedback[types.Feedback]]: """ Gets the feedback from all feedback :class:`asyncio.Future` objects. """ fut = asyncio.Future() self._feedback_futs.append(fut) return fut
[docs] def cancel(self) -> None: """ Publishes a message to the action client requesting the goal to be cancelled. """ self._action_client._cancel_pub.publish( GoalID( stamp=genpy.Time(0, 0), id=self._goal_id, ) )
[docs] def forget(self) -> None: """ Leaves the manager running, but requests for the action client to stop work on the specific goal. """ if self._goal_id in self._action_client._goal_managers: del self._action_client._goal_managers[self._goal_id]
[docs]class Goal(Generic[types.Goal]): """ Implementation of a goal object in the axros adaptation of the Simple Action Server. .. container:: operations .. describe:: x == y Determines equality between two goals by comparing their ids. If either side is not a :class:`actionlib_msgs.msg.GoalStatus`, then the result is ``False``. Parameters: goal (GoalStatus): The original goal message which constructs this class status (uint8): An enum representing the status of status_text (str): A string representing the status of the goal """ goal: types.ActionGoal[types.Goal] | None # This needs to be defined goal_id: GoalID status: int status_text: str def __init__( self, goal_msg: types.ActionGoal[types.Goal], status: int = GoalStatus.PENDING, status_text: str = "", ): if goal_msg.goal_id.id == "": self.goal = None self.goal = goal_msg self.status = status self.status_text = status_text self.goal_id = self.goal.goal_id def __eq__(self, rhs: Any): # assert isinstance(self.goal, GoalStatus), f"Value was {type(self.goal)}: {self.goal}" if isinstance(self.goal, GoalStatus) and isinstance(rhs.goal, GoalStatus): return self.goal.goal_id.id == rhs.goal.goal_id.id return False def __str__(self) -> str: return f"<axros.Goal at 0x{id(self):0x}, goal={self.goal} status={self.status} status_text={self.status_text}>"
[docs] def status_msg(self) -> GoalStatus: """ Constructs a GoalStatus message from the Goal class. Returns: GoalStatus: The constructed message. """ msg = GoalStatus() # Type checking # assert isinstance(self.goal, GoalStatus), f"Value was {type(self.goal)}: {self.goal}" # Assemble message msg.goal_id = self.goal.goal_id if self.goal else GoalID() msg.status = self.status msg.text = self.status_text return msg
[docs]class SimpleActionServer(Generic[types.Goal, types.Feedback, types.Result]): """ A simplified implementation of an action server. At a given time, can only at most have a current goal and a next goal. If new goals arrive with one already queued to be next, the goal with the greater timestamp will bump to other. .. code-block:: python >>> # Construct a SimpleActionServer with a node handle, topic namespace, and type >>> serv = SimpleActionServer(nh, '/test_action', turtle_actionlib.msg.ShapeAction) >>> # The server must be started before any goals can be accepted >>> serv.start() >>> # To accept a goal >>> while not self.is_new_goal_available(): # Loop until there is a new goal ... await nh.sleep(0.1) >>> goal = serv.accept_new_goal() >>> # To publish feedback >>> serv.publish_feedback(ShapeFeedback()) >>> # To accept a preempt (a new goal attempted to replace the current one) >>> if self.is_preempt_requested(): ... goal = serv.accept_new_goal() # Automatically cancels old goal >>> # To finish a goal >>> serv.set_succeeded(text='Wahoo!', result=ShapeResult(apothem=1)) >>> # or >>> serv.set_aborted(text='Something went wrong!') .. container:: operations .. describe:: async with x: On entering the block, :meth:`~.setup` is called; upon exit, :meth:`~.shutdown` is called. """ # TODO: # - implement optional callbacks for new goals # - ensure headers are correct for each message goal: Goal[types.Goal] | None next_goal: Goal[types.Goal] | None def __init__( self, node_handle: NodeHandle, name: str, action_type: type[types.Action[types.Goal, types.Feedback, types.Result]], goal_cb: Callable[[], None] | None = None, preempt_cb: Callable[[], None] | None = None, ): self.started = False self._node_handle = node_handle self._name = name self.goal = None self.next_goal = None self.cancel_requested = False self.register_goal_callback(goal_cb) self.register_preempt_callback(preempt_cb) self.goal_cb = None self.preempt_cb = None self.status_frequency = 5.0 self._goal_type = type(action_type().action_goal) self._result_type = type(action_type().action_result) self._feedback_type = type(action_type().action_feedback) self._status_pub = self._node_handle.advertise( self._name + "/status", GoalStatusArray ) self._result_pub = self._node_handle.advertise( self._name + "/result", self._result_type ) self._feedback_pub = self._node_handle.advertise( self._name + "/feedback", self._feedback_type ) self._goal_sub = self._node_handle.subscribe( self._name + "/goal", self._goal_type, self._goal_cb ) self._cancel_sub = self._node_handle.subscribe( self._name + "/cancel", GoalID, self._cancel_cb ) self._is_running = False
[docs] async def setup(self) -> None: """ Sets up the action server. This must be called before the action server can be used. """ if self.is_running(): raise AlreadySetup(self, self._node_handle) await asyncio.gather( self._status_pub.setup(), self._result_pub.setup(), self._feedback_pub.setup(), self._goal_sub.setup(), self._cancel_sub.setup(), ) self._is_running = True
[docs] async def shutdown(self) -> None: """ Shuts the action server down. This must be called before the program is ended. """ await asyncio.gather( self._status_pub.shutdown(), self._result_pub.shutdown(), self._feedback_pub.shutdown(), self._goal_sub.shutdown(), self._cancel_sub.shutdown(), ) self._is_running = False
[docs] def is_running(self) -> bool: """ Returns: bool: Whether the simple action server is running. Set to ``True`` when :meth:`~.setup` is called; set to ``False`` when :meth:`~.shutdown` is called. This is independent of whether :meth:`~.start` or :meth:`~.stop` is called. """ return self._is_running
def __del__(self): # Shutdown can also be achieved by just shutting down each pub/sub subs_pubs_running = ( self._status_pub.is_running() or self._result_pub.is_running() or self._feedback_pub.is_running() or self._goal_sub.is_running() or self._cancel_sub.is_running() ) if subs_pubs_running and self._is_running: warnings.simplefilter("always", ResourceWarning) warnings.warn( f"The '{self._name}' action server was never shutdown(). This may cause issues with this instance of ROS - please fix the errors and completely restart ROS.", ResourceWarning, ) warnings.simplefilter("default", ResourceWarning) async def __aenter__( self, ) -> SimpleActionServer[types.Goal, types.Feedback, types.Result]: await self.setup() return self async def __aexit__( self, exc_type: type[Exception], exc_value: Exception, traceback: TracebackType ): await self.shutdown() def __str__(self) -> str: return f"<axros.SimpleActionServer at 0x{id(self):0x}, name='{self._name}' running={self.is_running()} started={self.started} goal={self.goal} node_handle={self._node_handle}>" def register_goal_callback(self, func: Callable[[], None] | None) -> None: self.goal_cb = func def _process_goal_callback(self): if self.goal_cb: self.goal_cb() def _process_preempt_callback(self): if self.preempt_cb: self.preempt_cb() def register_preempt_callback(self, func: Callable[[], None] | None) -> None: self.preempt_cb = func
[docs] def start(self) -> None: """ Starts the status loop for the server. """ if not self.is_running(): raise NotSetup(self, self._node_handle) self.started = True self._status_loop_future = asyncio.create_task(self._status_loop())
[docs] def stop(self) -> None: """ Stops the status loop for the server, and clears all running goals and all goals scheduled to be run. """ if not self.is_running(): raise NotSetup(self, self._node_handle) self.goal = None self.next_goal = None self.started = False
def accept_new_goal(self) -> types.Goal | None: if not self.is_running(): raise NotSetup(self, self._node_handle) if not self.started: print( "SIMPLE ACTION SERVER: attempted to accept_new_goal without being started" ) return None if not self.next_goal: print( "SIMPLE ACTION SERVER: attempted to accept_new_goal when no new goal is available" ) if self.goal: self.set_preempted(text="New goal accepted in simple action server") self.goal = self.next_goal assert self.goal is not None self.cancel_requested = False self.next_goal = None self.goal.status = GoalStatus.ACTIVE if self.goal else GoalStatus.SUCCEEDED self._publish_status() assert self.goal.goal is not None return self.goal.goal.goal
[docs] def is_new_goal_available(self) -> bool: """ Returns: bool: Whether the next goal is defined, or not ``None``. """ if not self.is_running(): raise NotSetup(self, self._node_handle) return self.next_goal is not None
[docs] def is_preempt_requested(self) -> bool: """ Returns: bool: Whether the goal has been requested to be cancelled, and there is both a goal currently running and a goal scheduled to be run shortly. """ if not self.is_running(): raise NotSetup(self, self._node_handle) return bool(self.goal) if self.next_goal else self.is_cancel_requested()
[docs] def is_cancel_requested(self) -> bool: """ Returns: bool: Whether a goal is currently active and a cancel has been requested. """ if not self.is_running(): raise NotSetup(self, self._node_handle) return self.goal is not None and self.cancel_requested
[docs] def is_active(self) -> bool: """ Returns: bool: Returns whether there is an active goal running. """ if not self.is_running(): raise NotSetup(self, self._node_handle) return self.goal is not None
def _set_result( self, status: int = GoalStatus.SUCCEEDED, text: str = "", result: types.Result | None = None, ) -> None: if not self.started: print("SimpleActionServer: attempted to set_succeeded before starting") return if not self.goal: print("SimpleActionServer: attempted to set_succeeded without a goal") return self.goal.status = status self.goal.status_text = text self._publish_status() result_msg = self._result_type() if result: result_msg.result = result result_msg.status = self.goal.status_msg() self._result_pub.publish(result_msg) self.goal = None
[docs] def set_succeeded(self, result: types.Result | None = None, text: str = "") -> None: """ Sets the status of the current goal to be succeeded. Args: result (Optional[genpy.Message]): The message to attach in the result. text (str): The text to set in the result. Defaults to an empty string. """ if not self.is_running(): raise NotSetup(self, self._node_handle) self._set_result(status=GoalStatus.SUCCEEDED, text=text, result=result)
[docs] def set_aborted(self, result: types.Result | None = None, text: str = "") -> None: """ Sets the status of the current goal to aborted. Args: result (Optional[genpy.Message]): The message to attach in the result. text (str): The text to set in the result. Defaults to an empty string. """ if not self.is_running(): raise NotSetup(self, self._node_handle) self._set_result(status=GoalStatus.ABORTED, text=text, result=result)
[docs] def set_preempted(self, result: types.Result | None = None, text: str = "") -> None: """ Sets the status of the current goal to preempted. Args: result (Optional[genpy.Message]): The message to attach in the result. text (str): The text to set in the result. Defaults to an empty string. """ if not self.is_running(): raise NotSetup(self, self._node_handle) self._set_result(status=GoalStatus.PREEMPTED, text=text, result=result)
[docs] def publish_feedback(self, feedback: types.Feedback | None = None) -> None: """ Publishes a feedback message onto the feedback topic. Args: feedback (Optional[genpy.Message]): The optional feedback message to add to the sent message. """ if not self.is_running(): raise NotSetup(self, self._node_handle) if not self.started: print("SimpleActionServer: attempted to publish_feedback before starting") return if not self.goal: print("SimpleActionServer: attempted to publish_feedback without a goal") return feedback_msg = self._feedback_type() feedback_msg.status = self.goal.status_msg() if feedback: feedback_msg.feedback = feedback self._feedback_pub.publish(feedback_msg)
def _publish_status(self, goal=None) -> None: msg = GoalStatusArray() if self.goal: msg.status_list.append(self.goal.status_msg()) if self.next_goal: msg.status_list.append(self.next_goal.status_msg()) if goal: msg.status_list.append(goal.status_msg()) self._status_pub.publish(msg) async def _status_loop(self): while self.started: self._publish_status() await self._node_handle.sleep(1.0 / self.status_frequency) def _cancel_cb(self, msg: GoalID): cancel_current = False cancel_next = False if msg.stamp != genpy.Time(): if self.next_goal and self.next_goal.goal_id.stamp < msg.stamp: cancel_next = True if self.goal and self.goal.goal_id.stamp < msg.stamp: cancel_current = True if msg.id == "" and msg.stamp == genpy.Time(): cancel_current = self.goal is not None cancel_next = self.next_goal is not None elif msg.id != "": if self.goal and msg.id == self.goal.goal_id.id: cancel_current = True if self.next_goal and msg.id == self.next_goal.goal_id.id: cancel_next = True if cancel_next: assert self.next_goal is not None self.next_goal.status = GoalStatus.RECALLED self.next_goal.status_text = "Goal canceled" result_msg = self._result_type() result_msg.status = self.next_goal.status_msg() self._result_pub.publish(result_msg) self._publish_status() self.next_goal = None if cancel_current and not self.cancel_requested: self.cancel_requested = True self._process_preempt_callback() def _goal_cb(self, msg: types.ActionGoal[types.Goal]) -> None: if not self.started: return None new_goal = Goal(msg) if new_goal.goal is None: # If goal field is empty, invalid goal return None # Throw out duplicate goals if (self.goal and new_goal == self.goal) or ( self.next_goal and new_goal == self.next_goal ): return None now = self._node_handle.get_time() if ( new_goal.goal.goal_id.stamp == genpy.Time() ): # If time is not set, replace with current time new_goal.goal.goal_id.stamp = now if ( self.next_goal is not None and self.next_goal.goal is not None ): # If another goal is queued, handle conflict # If next goal is later, rejct new goal if new_goal.goal.goal_id.stamp < self.next_goal.goal.goal_id.stamp: new_goal.status = GoalStatus.REJECTED new_goal.status_text = "canceled because another goal was received by the simple action server" result_msg = self._result_type() result_msg.header.stamp = now result_msg.status = new_goal.status_msg() self._result_pub.publish(result_msg) self._publish_status(goal=new_goal) return None else: # New goal is later so reject current next_goal self.next_goal.status = GoalStatus.REJECTED self.next_goal.status_text = "Goal bumped by newer goal" result_msg = self._result_type() result_msg.header.stamp = now result_msg.status = self.next_goal.status_msg() self._result_pub.publish(result_msg) self._publish_status() self.next_goal = new_goal self._publish_status() if self.goal: self._process_preempt_callback() else: self._process_goal_callback()
[docs]class ActionClient(Generic[types.Goal, types.Feedback, types.Result]): """ Representation of an action client in axros. This works in conjunction with the :class:`~.SimpleActionServer` by sending the servers goals to execute. In response, the client receives feedback updates and the final result of the goal, which it can handle by itself. """ def __init__( self, node_handle: NodeHandle, name: str, action_type: type[types.Action[types.Goal, types.Feedback, types.Result]] ): """ Args: node_handle (axros.NodeHandle): Node handle used to power the action client. name (str): The name of the action client. action_type (type[genpy.Message]): The action message type used by the action client and server. """ self._node_handle = node_handle self._name = name self._type = action_type self._goal_type = type(action_type().action_goal) self._result_type = type(action_type().action_result) self._feedback_type = type(action_type().action_feedback) self._goal_managers: dict[str, GoalManager] = {} self._goal_pub = self._node_handle.advertise( self._name + "/goal", self._goal_type ) self._cancel_pub = self._node_handle.advertise(self._name + "/cancel", GoalID) self._status_sub = self._node_handle.subscribe( self._name + "/status", GoalStatusArray, self._status_callback ) self._result_sub = self._node_handle.subscribe( self._name + "/result", self._result_type, self._result_callback ) self._feedback_sub = self._node_handle.subscribe( self._name + "/feedback", self._feedback_type, self._feedback_callback ) self._is_running = False
[docs] async def setup(self): """ Sets up the action client. This must be called before the action client can be used. """ if self.is_running(): raise AlreadySetup(self, self._node_handle) await asyncio.gather( self._goal_pub.setup(), self._cancel_pub.setup(), self._status_sub.setup(), self._result_sub.setup(), self._feedback_sub.setup(), ) self._is_running = True
[docs] async def shutdown(self): """ Shuts down the action client. This should always be called when the action client is no longer needed. """ await asyncio.gather( self._goal_pub.shutdown(), self._cancel_pub.shutdown(), self._status_sub.shutdown(), self._result_sub.shutdown(), self._feedback_sub.shutdown(), ) self._is_running = False
def is_running(self) -> bool: return self._is_running def __str__(self) -> str: return f"<axros.ActionClient at 0x{id(self):0x}, name='{self._name}' running={self.is_running()} node_handle={self._node_handle}>" def _status_callback(self, msg: GoalStatusArray) -> None: for status in msg.status_list: if status.goal_id.id in self._goal_managers: manager = self._goal_managers[status.goal_id.id] manager._status_callback(status.status) def _result_callback(self, msg: types.ActionResult[types.Result]) -> None: if msg.status.goal_id.id in self._goal_managers: manager = self._goal_managers[msg.status.goal_id.id] manager._result_callback(msg.status.status, msg.result) def _feedback_callback(self, msg: types.ActionFeedback[types.Feedback]): if msg.status.goal_id.id in self._goal_managers: manager = self._goal_managers[msg.status.goal_id.id] manager._feedback_callback(msg.status.status, msg.feedback)
[docs] def send_goal(self, goal: types.Goal) -> GoalManager[types.Goal, types.Feedback, types.Result]: """ Sends a goal to a goal manager. The goal manager is responsible for the communication between the action client and server; it assists in this process by maintaining individual :class:`asyncio.Future` objects. Raises: NotSetup: The action client has not been :meth:`~.setup`, or was previously :meth:`~.shutdown`. Returns: GoalManager: The manager of the goal. """ if not self.is_running(): raise NotSetup(self, self._node_handle) return GoalManager(self, goal)
[docs] def cancel_all_goals(self) -> None: """ Sends a message to the mission cancellation topic requesting all goals to be cancelled. Raises: NotSetup: The action client has not been :meth:`~.setup`, or was previously :meth:`~.shutdown`. """ if not self.is_running(): raise NotSetup(self, self._node_handle) self._cancel_pub.publish( GoalID( stamp=genpy.Time(0, 0), id="", ) )
[docs] def cancel_goals_at_and_before_time(self, time: genpy.rostime.Time) -> None: """ Cancel all goals scheduled at and before a specific time. Args: time: The time to reference when selecting which goals to cancel. """ if not self.is_running(): raise NotSetup(self, self._node_handle) self._cancel_pub.publish( GoalID( stamp=time, id="", ) )
[docs] async def wait_for_server(self): """ Waits for a server connection. When at least one connection is received, the function terminates. """ if not self.is_running(): raise NotSetup(self, self._node_handle) while not ( set(self._goal_pub.get_connections()) & set(self._cancel_pub.get_connections()) & set(self._status_sub.get_connections()) & set(self._result_sub.get_connections()) & set(self._feedback_sub.get_connections()) ): await util.wall_sleep(0.1) # XXX bad bad bad