Source code for axros.util

"""
Utility functions which make transforms and asynchronous programming easier.

All non-private methods can be used throughout client and application code.
"""
from __future__ import annotations

import asyncio
import contextlib
from typing import Awaitable, TypeVar

import genpy

T = TypeVar("T")


[docs]async def wall_sleep(duration: genpy.Duration | float) -> None: """ Sleeps for a specified duration using :func:`asyncio.sleep`. Args: duration (genpy.Duration | :class:`float`): The amount of time to sleep for. """ if isinstance(duration, genpy.Duration): duration = duration.to_sec() elif not isinstance(duration, (float, int)): raise TypeError("expected float or genpy.Duration") await asyncio.sleep(duration)
[docs]async def wrap_timeout( fut: Awaitable[T], duration: float | genpy.Duration, *, cancel=True ) -> T: """ Wraps a given future in a timeout-based future. This can be used to ensure that select :class:`asyncio.Future` objects complete on time. Futures who do not complete on time can be optionally cancelled. For an equivalent method that does not raise an exception, see :meth:`axros.wrap_time_notice`. Args: fut (:class:`asyncio.Future` | :class:`coroutine`): The future object to timeout. duration (:class:`float` | genpy.Duration): The duration to timeout. cancel (:class:`bool`): Keyword-only argument designating whether to cancel the future when the task times out. Raises: asyncio.TimeoutError: The task did not complete on time. Returns: Any: The data returned from the awaitable. """ if isinstance(duration, genpy.Duration): timeout = duration.to_sec() else: timeout = float(duration) if cancel: return await asyncio.wait_for(fut, timeout) return await asyncio.wait_for(asyncio.shield(fut), timeout)
async def _print_message_helper(duration: float, description: str) -> bool: try: await asyncio.sleep(duration) print(f"{description} is taking a while...") return True except asyncio.CancelledError: return False
[docs]async def wrap_time_notice( fut: Awaitable[T], duration: float | genpy.Duration, description: str ) -> T: """ Prints a message if a future is taking longer than the noted duration. .. code-block:: python >>> await axros.wrap_time_notice(asyncio.sleep(3), 2, "Example task") Example task is taking a while... # printed at 2 seconds Example task succeeded! # printed at 3 seconds >>> await axros.wrap_time_notice(asyncio.sleep(1), 2, "Example task") # nothing is printed Args: fut (:class:`asyncio.Future`): The future object. duration (:class:`float` | genpy.Duration): The duration to wait before printing a message. description (:class:`str`): The description to print. Returns: Any: The result from the awaitable. """ if isinstance(duration, genpy.Duration): timeout = duration.to_sec() else: timeout = float(duration) task = asyncio.create_task(_print_message_helper(timeout, description)) result = await fut task.cancel() with contextlib.suppress(asyncio.CancelledError): if await task: print(f"{description} succeeded!") return result