Source code for mil_missions_core.chain_with_timeout

import asyncio
import json
from typing import Any, Dict, List, Type

from twisted.python import failure

from .exceptions import ParametersException, SubmissionException, TimeoutException


[docs]def MakeChainWithTimeout(base: Type): """ Generate a ``ChainWithTimeout`` mission with the BaseMission specified. Used by individual robotics platforms to reuse this example mission. For documentation on the ``ChainWithTimeout`` class, you will need to view the source code. Args: base (Type): A base class to use when constructing the ``ChainWithTimeout`` class. Returns: ChainWithTimeout: The constructed class. """ class ChainWithTimeout(base): """ Example of a mission which runs an arbitrary number of other missions in a linear order. This is the mission used by the rqt plugin under the "Chained Missions" section. """ async def run_submission_with_timeout( self, mission: str, timeout: float, parameters: str, ): """ Runs a child mission, throwing an exception if more time than specified in timeout passes before the mission finishes executing. Args: mission (str): The name of the mission to run. timeout (float): The amount of time to wait before throwing a :class:`mil_missions_core.TimeoutException`. parameters (str): The parameters to send to the mission. Raises: TimeoutException: The timeout expired and the mission did not complete. Exception: The name of the mission provided was not found. Returns: ??? """ submission = self.run_submission(mission, parameters) if timeout == 0: # Timeout of zero means no timeout result = await submission return result timeout_fut = self.nh.sleep(timeout) result, index = await asyncio.wait( [submission, timeout_fut], return_when=asyncio.FIRST_COMPLETED, ) if index == 0: timeout_fut.cancel() return result if index == 1: submission.cancel() raise TimeoutException(timeout) @classmethod def decode_parameters(cls, parameters: str) -> Dict[Any, Any]: """ Goes through list of missions to chain and fills in missing attributes, like timeout with defaults. If something is invalid, raise an exception. The parameters are primarily parsed using :meth:`json.loads`. Args: parameters (str): The parameters to decode. Raises: ParametersException: An issue was found with the parameters. The :attr:`~mil_missions_core.ParametersException.msg` explains what issue was found. Returns: Dict[Any, Any]: The parsed parameters. """ parameters = json.loads(parameters) if not isinstance(parameters, dict): raise ParametersException("must be a dictionary") if "missions" not in parameters: raise ParametersException('must have "missions" list') if not isinstance(parameters["missions"], list): raise ParametersException('"missions" attribute must be a list') for mission in parameters["missions"]: if "mission" not in mission: raise Exception('invalid parameters, missing attribute "mission"') if not cls.has_mission(mission["mission"]): raise Exception( 'mission "{}" not available'.format(mission["mission"]), ) if "parameters" not in mission: mission["parameters"] = "" try: mission["parameters"] = cls.get_mission( mission["mission"], ).decode_parameters(mission["parameters"]) except Exception as e: raise ParametersException( "Invalid parameters for {}: {}".format( mission["mission"], str(e), ), ) if "timeout" not in mission: mission["timeout"] = 0 if "required" not in mission: mission["required"] = True return parameters async def run(self, parameters: Dict[str, List]): """ Runs a list of child missions specified in the parameters with optional timeouts. Args: parameters (Dict[str, List]): A dictionary containing at least a ``"missions"`` key which is equal to a list of mission objects that can be ran. """ for mission in parameters["missions"]: # Run each child mission linearly def cb(final): """ Called when a submission finishes. If it succeeded, print the result in feedback. If it failed or timedout, print the failure and stop the whole chain if that mission is required. """ if isinstance(final, failure.Failure): self.send_feedback( "{} FAILED: {}".format( mission["mission"], final.getErrorMessage(), ), ) if mission[ "required" ]: # Fail whole chain if a required mission times out or fails raise SubmissionException( mission["mission"], final.getErrorMessage(), ) else: self.send_feedback( "{} SUCCEEDED: {}".format(mission["mission"], final), ) print("NO FAIL BRO") fut = self.run_submission_with_timeout( mission["mission"], mission["timeout"], mission["parameters"], ) fut.add_done_callback(cb) await fut self.send_feedback("Done with all") return "All missions complete or skipped." return ChainWithTimeout