"""
This module provides functions which help to ensure that resources are available
when needed.
"""
import time
from typing import Any, Optional
import rospy
import rostest
[docs]def wait_for_param(
param_name: str,
timeout: Optional[float] = None,
poll_rate: float = 0.1,
) -> Optional[Any]:
"""
Blocking wait for a parameter named to exist. Polls at the frequency of poll_rate.
Once the parameter exists, return get and return it.
This function intentionally leaves failure logging duties to the developer.
Args:
param_name (str): The name of the parameter to watch.
timeout (Optional[float]): The number of seconds to wait for the
parameter to exist.
poll_rate (float): The Hz rate to poll at.
Returns:
Optional[Any]: If found, the value of the parameter. Returns ``None`` if
the parameter never came to exist.
"""
start_time = time.time()
rate = rospy.Rate(poll_rate)
while not rospy.is_shutdown():
# Check if the parameter now exists
if rospy.has_param(param_name):
return rospy.get_param(param_name)
# If we exceed a defined timeout, return None
if timeout is not None and time.time() - start_time > timeout:
return None
# Continue to poll at poll_rate
rate.sleep()
[docs]def wait_for_subscriber(node_name: str, topic: str, timeout: float = 5.0) -> bool:
"""
Blocks until a node with the name node_name subscribes to a topic. Useful
in integration tests.
Args:
node_name (str): The node name to check the subscription status of. If
a local node name, then the name is resolved to be the global name.
topic (str): The topic to check whether the node is subscribed to. If a
local topic name, then the name is resolved to be the global name.
timeout (float): The amount of time to wait (in seconds) to attempt to
estalish a connection.
Returns:
bool: Whether the node with the given name has subscribed to the given topic.
"""
end_time = time.time() + timeout
# Wait for time-out or ros-shutdown
while (time.time() < end_time) and (not rospy.is_shutdown()):
subscribed = rostest.is_subscriber(
rospy.resolve_name(topic),
rospy.resolve_name(node_name),
)
# Success scenario: node subscribes
if subscribed:
break
time.sleep(0.1)
# Could do this with a while/else
# But chose to explicitly check
success = rostest.is_subscriber(
rospy.resolve_name(topic),
rospy.resolve_name(node_name),
)
return success
[docs]def wait_for_service(
service,
warn_time: float = 1.0,
warn_msg: str = "Waiting for service..",
timeout: Optional[float] = None,
) -> None:
"""
A fancy extension of wait for service that will warn with a message if it is taking a while.
Args:
warn_time (float): float in seconds, how long to wait before logging warn_msg
warn_msg (str): msg logged with rospy.logwarn if warn_time passes without service connected
timeout (Optional[float]): overall timeout. If None, does nothing. If a float,
will raise exception if many TOTAL seconds has passed without connecting.
"""
try:
service.wait_for_service(warn_time)
except rospy.ROSException:
if timeout is not None:
timeout = timeout - warn_time
rospy.logwarn(warn_msg)
service.wait_for_service(timeout)