from __future__ import annotations
import json
import traceback
from typing import Callable
import rospy
from ros_alarms_msgs.msg import Alarm as AlarmMsg
from ros_alarms_msgs.srv import (
AlarmGet,
AlarmGetRequest,
AlarmGetResponse,
AlarmSet,
AlarmSetRequest,
)
def parse_json_str(json_str: str) -> dict | str:
parameters = ""
try:
parameters = "" if json_str == "" else json.loads(json_str)
except ValueError:
# User passed in a non JSON string
parameters = {}
parameters["data"] = json_str
finally:
return parameters
def _check_for_alarm(alarm_name, nowarn=False):
if (
not nowarn
and rospy.has_param("/known_alarms")
and alarm_name not in rospy.get_param("/known_alarms")
):
msg = "'{}' is not in the list of known alarms (as defined in the /known_alarms rosparam)"
rospy.logwarn(msg.format(alarm_name))
def _check_for_valid_name(alarm_name, nowarn=False):
if nowarn:
return
assert (
alarm_name.isalnum() or "_" in alarm_name or "-" in alarm_name
), f"Alarm name '{alarm_name}' is not valid!"
def _make_callback_error_string(alarm_name, backtrace=""):
"""Creates a string with the name of the alarm, the stacktrace, and an exception"""
err_msg = "A callback for the alarm: {} threw an error!\n{}"
return err_msg.format(alarm_name, backtrace)
def wait_for_service(
service,
warn_time=1.0,
warn_msg="Waiting for service..",
timeout=None,
):
"""
A fancy extension of wait for service that will warn with a message if it is taking a while.
@param warn_time: float in seconds, how long to wait before logging warn_msg
@param warn_msg: msg logged with rospy.logwarn if warn_time passes without service connected
@param timeout: overall timeout. If None, does nothing. If a float, will raise exception
if many TOTAL seconds has passed without connecting
Copied from https://github.com/uf-mil/mil_common/blob/master/utils/mil_tools/mil_ros_tools/init_helpers.py
to avoid dependency
"""
try:
service.wait_for_service(warn_time)
except rospy.ROSException:
if timeout is not None:
timeout = timeout - warn_time
rospy.logwarn(warn_msg)
service.wait_for_service(timeout)
[docs]class Alarm:
"""
Pythonic representation of a ROS alarm.
This class should not be constructed by clients (excluding internal ``ros_alarms``
code, of course). This class is primarily used by the alarm server node to
manage data about a series of alarms. However, this class may be passed
into callbacks registered to particular alarms.
.. container:: operations
.. describe:: str(x)
Pretty prints the contents of the alarm. Equivalent to ``repr(x)``.
.. describe:: repr(x)
Pretty prints the contents of the alarm. Equivalent to ``str(x)``.
Attributes:
alarm_name (str): The name of this specific alarm.
raised (bool): Whether the alarm has been raised.
node_name (str): The name of the node attached to the alarm. Defaults to
``unknown``.
problem_description (str): A description of the problem related to the alarm.
Defaults to an empty string.
parameters (dict): The JSON parameters associated with the alarm. Defaults
to an empty dict.
severity (int): The severity associated with a raised alarm. Defaults to ``0``.
stamp (rospy.Time): The time of the most recent update. Defaults to now if
ROS has been initialized, otherwise zero.
raised_cbs (List[Tuple[int, Callable]): A list of callbacks to execute
when the alarm is raised. Each callback should be associated with
the severity required to execute the callback through a tuple.
cleared_cbs (List[Tuple[int, Callable]): A list of callbacks to execute
when the alarm is cleared. Each callback should be associated with
the severity required to execute the callback through a tuple.
"""
def __init__(
self,
alarm_name: str,
raised: bool,
node_name: str = "unknown",
problem_description: str = "",
parameters: dict = {},
severity: int = 0,
):
self.alarm_name = alarm_name
self.raised = raised
self.node_name = node_name
self.problem_description = problem_description
self.parameters = parameters
self.severity = severity
try:
self.stamp = rospy.Time.now()
except rospy.ROSInitException:
self.stamp = rospy.Time(0)
# Callbacks to run if the alarm is cleared or raised formatted as follows:
# [(severity_required, cb1), (severity_required, cb2), ...]
self.raised_cbs = []
self.cleared_cbs = []
def __repr__(self):
msg = self.as_msg()
msg.parameters = parse_json_str(msg.parameters)
return str(msg)
__str__ = __repr__
[docs] @classmethod
def blank(cls, name: str) -> Alarm:
"""
Generate a general blank alarm that is cleared with a low severity.
Args:
name (str): The name for the blank alarm.
Returns:
ros_alarms.Alarm: The constructed alarm.
"""
return cls(name, raised=False, severity=0)
[docs] @classmethod
def from_msg(cls, msg: AlarmMsg) -> Alarm:
"""
Generate an alarm object from an Alarm message.
Args:
msg (AlarmMsg): The message to generate the object from.
Returns:
Alarm: The constructed alarm.
"""
node_name = "unknown" if msg.node_name == "" else msg.node_name
parameters = parse_json_str(msg.parameters)
return cls(
msg.alarm_name,
msg.raised,
node_name,
msg.problem_description,
parameters,
msg.severity,
)
def _severity_cb_check(self, severity):
if isinstance(severity, (tuple, list)):
return severity[0] <= self.severity <= severity[1]
# Not a tuple, just an int. The severities should match
return self.severity == severity
[docs] def add_callback(
self,
funct: Callable,
call_when_raised: bool = True,
call_when_cleared: bool = True,
severity_required: tuple[int, int] = (0, 5),
):
"""
Adds a callback function to the alarm.
If the callback is set to be called when the alarm has been raised, and
the alarm is already raised, then the callback is immediately called.
Similarly, if the callback is set to run when the alarm is cleared, and
the alarm is already cleared, then the callback is immediately ran. In
both cases, the callback is registered like any other callback.
Exceptions in callbacks are swallowed and are printed out through a
rospy logging statement.
Args:
funct (Callable): The callback to add.
call_when_raised (bool): Whether to call the callback when the alarm
is raised. Defaults to ``True``.
call_when_cleared (bool): Whether to call the callback when the alarm
is cleared. Defaults to ``True``.
severity_required (Tuple[int, int]): The severity required to run the
callback. The tuple represents the minimum and maximum severities
under which to execute the callback. Defaults to ``(0, 5)``.
"""
if call_when_raised:
self.raised_cbs.append((severity_required, funct))
if self.raised and self._severity_cb_check(severity_required):
# Try to run the callback, absorbing any errors
try:
funct(self)
except Exception:
rospy.logwarn(
_make_callback_error_string(
self.alarm_name,
traceback.format_exc(),
),
)
if call_when_cleared:
self.cleared_cbs.append(((0, 5), funct))
if not self.raised and self._severity_cb_check(severity_required):
# Try to run the callback, absorbing any errors
try:
funct(self)
except Exception:
rospy.logwarn(
_make_callback_error_string(
self.alarm_name,
traceback.format_exc(),
),
)
[docs] def update(self, srv: Alarm):
"""
Updates this alarm with a new AlarmSet request. Also will call any
required callbacks.
Args:
srv (ros_alarms.Alarm): The request to set the alarm.
"""
self.stamp = rospy.Time.now()
node_name = "unknown" if srv.node_name == "" else srv.node_name
parameters = parse_json_str(srv.parameters)
# Update all possible parameters
self.raised = srv.raised
self.node_name = node_name
self.problem_description = srv.problem_description
self.parameters = parameters
self.severity = srv.severity
rospy.loginfo(
"Updating alarm: {}, {}.".format(
self.alarm_name,
"raised" if self.raised else "cleared",
),
)
# Run the callbacks for that alarm
cb_list = self.raised_cbs if srv.raised else self.cleared_cbs
for severity, cb in cb_list:
# If the cb severity is not valid for this alarm's severity, skip it
if srv.raised and not self._severity_cb_check(severity):
continue
# Try to run the callback, absorbing any errors
try:
cb(self)
except Exception:
rospy.logwarn(
_make_callback_error_string(
self.alarm_name,
traceback.format_exc(),
),
)
[docs] def as_msg(self) -> AlarmMsg:
"""
Get this alarm as an Alarm message.
Returns:
AlarmMsg: The constructed message.
"""
alarm = AlarmMsg()
alarm.alarm_name = self.alarm_name
alarm.raised = self.raised
alarm.node_name = self.node_name
alarm.problem_description = self.problem_description
alarm.parameters = json.dumps(self.parameters)
alarm.severity = self.severity
return alarm
[docs] def as_srv_resp(self) -> AlarmGetResponse:
"""
Get this alarm as an AlarmGet response.
Returns:
AlarmGetResponse: The constructed service response.
"""
resp = AlarmGetResponse()
resp.header.stamp = self.stamp
resp.alarm = self.as_msg()
return resp
[docs]class AlarmBroadcaster:
"""
Broadcasts information about an alarm, and allows for the raising and clearing
of the alarm.
"""
def __init__(self, name: str, node_name: str | None = None, nowarn: bool = False):
"""
Args:
name (str): The name of the related alarm.
node_name (Optional[str]): The name of the node alarm.
nowarn (bool): Whether to disable warning for the class' operations.
"""
self._alarm_name = name
_check_for_valid_name(self._alarm_name, nowarn)
_check_for_alarm(self._alarm_name, nowarn)
self._node_name = rospy.get_name() if node_name is None else node_name
self._alarm_set = rospy.ServiceProxy("/alarm/set", AlarmSet)
[docs] def wait_for_server(self, timeout=None):
"""
Wait for node to connect to alarm server. Waits timeout seconds (or forever
if ``timeout`` is ``None``) to connect then fetches the current alarm
and calls callbacks as needed.
.. warning::
User should always call this method before calling other methods.
Args:
timeout (Optional[float]): The amount of seconds to wait before timing
out.
"""
if timeout is not None and timeout < 1.5:
self._alarm_set.wait_for_service(timeout=timeout)
else:
wait_for_service(
self._alarm_set,
warn_time=1.0,
warn_msg="Waiting for alarm server..",
timeout=timeout,
)
rospy.logdebug("alarm server connected")
def _generate_request(
self,
raised,
node_name=None,
problem_description="",
parameters={},
severity=0,
):
request = AlarmSetRequest()
request.alarm.alarm_name = self._alarm_name
request.alarm.node_name = (
node_name if node_name is not None else self._node_name
)
request.alarm.raised = raised
request.alarm.problem_description = problem_description
request.alarm.parameters = json.dumps(parameters)
request.alarm.severity = severity
return request
[docs] def raise_alarm(self, **kwargs):
"""
Raises the alarm.
Args:
kwargs: The associated keyword arguments, as described below.
Keyword Arguments:
node_name (str): String that holds the name of the node making the
request.
problem_description (str): String with a description of the problem
(defaults to empty string).
parameters (dict): JSON dumpable dictionary with optional parameters
that describe the alarm.
severity (int): Integer in [0, 5] that indicates the severity of
the alarm. (5 is most severe)
Returns:
bool: Whether the alarm was successfully raised.
"""
try:
return self._alarm_set(self._generate_request(True, **kwargs)).succeed
except rospy.service.ServiceException:
rospy.logerr("No alarm sever found! Can't raise alarm.")
return False
[docs] def clear_alarm(self, **kwargs):
"""
Clears the alarm.
Args:
kwargs: The associated keyword arguments, as described below.
Keyword Arguments:
node_name (str): String that holds the name of the node making the
request.
problem_description (str): String with a description of the problem
(defaults to empty string).
parameters (dict): JSON dumpable dictionary with optional parameters
that describe the alarm.
severity (int): Integer in [0, 5] that indicates the severity of
the alarm. (5 is most severe)
Returns:
bool: Whether the alarm was successfully cleared.
"""
try:
return self._alarm_set(self._generate_request(False, **kwargs)).succeed
except rospy.service.ServiceException:
rospy.logerr("No alarm sever found! Can't clear alarm.")
return False
[docs]class AlarmListener:
"""
Listens to an alarm.
"""
def __init__(
self,
name: str,
callback_funct: Callable | None = None,
nowarn: bool = False,
**kwargs,
):
"""
Args:
name (str): The alarm name.
callback_funct (Optional[Callable]): The callback function.
nowarn (bool): Whether to enable warnings about the class. Defaults to ``False``.
"""
self._alarm_name = name
self._last_alarm = None
_check_for_valid_name(self._alarm_name, nowarn)
_check_for_alarm(self._alarm_name, nowarn)
self._alarm_get = rospy.ServiceProxy("/alarm/get", AlarmGet)
# Data used to trigger callbacks
self._raised_cbs = [] # [(severity_for_cb1, cb1), (severity_for_cb2, cb2), ...]
self._cleared_cbs = []
rospy.Subscriber("/alarm/updates", AlarmMsg, self._alarm_update)
if callback_funct is not None:
self.add_callback(callback_funct, **kwargs)
[docs] def wait_for_server(self, timeout: float | None = None):
"""
Wait for node to connect to alarm server. Waits timeout seconds (or forever
if ``timeout`` is ``None``) to connect then fetches the current alarm
and calls callbacks as needed.
.. warning::
User should always call this method before calling other methods.
Args:
timeout (Optional[float]): The amount of seconds to wait before timing
out.
"""
if timeout is not None and timeout < 1.5:
self._alarm_get.wait_for_service(timeout=timeout)
else:
wait_for_service(
self._alarm_get,
warn_time=1.0,
warn_msg="Waiting for alarm server..",
timeout=timeout,
)
rospy.logdebug("alarm server connected")
self.get_alarm() # Now that we have service, update callbacks
[docs] def is_raised(self, fetch: bool = True):
"""
Returns whether this alarm is raised or not.
Args:
fetch (bool): Whether to fetch the alarm from the server. If ``False``,
then uses the most recently cached alarm. Defaults to ``True``.
Returns:
bool: Whether the alarm is raised.
"""
alarm = self.get_alarm(fetch=fetch)
if alarm is None:
return False
return alarm.raised
[docs] def is_cleared(self, fetch: bool = True) -> bool:
"""
Returns whether this alarm is cleared or not.
Args:
fetch (bool): Whether to fetch the alarm from the server. If ``False``,
then uses the most recently cached alarm. Defaults to ``True``.
Returns:
bool: Whether the alarm is cleared.
"""
return not self.is_raised(fetch=fetch)
[docs] def get_alarm(self, fetch: bool = True) -> AlarmGetResponse | None:
"""
Returns the alarm message.
Also worth noting, the alarm this method returns has it's ``parameters`` field
converted to a dictionary.
Args:
fetch (bool): Whether to fetch the alarm from the server. If ``False``,
then uses the most recently cached alarm. Defaults to ``True``.
Returns:
AlarmGetResponse: The response message, with its updated ``parameters`` field.
"""
if not fetch:
return self._last_alarm
try:
resp = self._alarm_get(AlarmGetRequest(alarm_name=self._alarm_name))
except rospy.service.ServiceException:
rospy.logerr("No alarm sever found!")
return self._last_alarm
self._alarm_update(resp.alarm)
return self._last_alarm
def _severity_cb_check(self, severity):
# In case _last alarm hasn't been declared yet
if self._last_alarm is None:
return False
if isinstance(severity, (tuple, list)):
return severity[0] <= self._last_alarm.severity <= severity[1]
# Not a tuple or list, just an int. The severities should match
return self._last_alarm.severity == severity
[docs] def add_callback(
self,
funct: Callable,
call_when_raised: bool = True,
call_when_cleared: bool = True,
severity_required: tuple[int, int] = (0, 5),
):
"""
Adds a callback function to the alarm.
Args:
funct (Callable): The callback to add.
call_when_raised (bool): Whether to call the callback when the alarm
is raised. Defaults to ``True``.
call_when_cleared (bool): Whether to call the callback when the alarm
is cleared. Defaults to ``True``.
severity_required (Tuple[int, int]): The severity required to run the
callback. The tuple represents the minimum and maximum severities
under which to execute the callback. Defaults to ``(0, 5)``.
"""
alarm = self._last_alarm
if call_when_raised:
self._raised_cbs.append((severity_required, funct))
if (
alarm is not None
and alarm.raised
and self._severity_cb_check(severity_required)
):
# Try to run the callback, absorbing any errors
try:
alarm.parameters = parse_json_str(alarm.parameters)
funct(alarm)
except Exception:
rospy.logwarn(
_make_callback_error_string(
self._alarm_name,
traceback.format_exc(),
),
)
if call_when_cleared:
self._cleared_cbs.append(((0, 5), funct)) # Clear callbacks always run
if alarm is not None and not alarm.raised:
# Try to run the callback, absorbing any errors
try:
alarm.parameters = parse_json_str(alarm.parameters)
funct(alarm)
except Exception:
rospy.logwarn(
_make_callback_error_string(
self._alarm_name,
traceback.format_exc(),
),
)
[docs] def clear_callbacks(self):
"""
Clears all callbacks.
"""
self._raised_cbs = []
self._cleared_cbs = []
def _alarm_update(self, alarm: AlarmMsg):
if alarm.alarm_name != self._alarm_name:
return
alarm.parameters = parse_json_str(alarm.parameters)
self._last_alarm = alarm
cb_list = self._raised_cbs if alarm.raised else self._cleared_cbs
for severity, cb in cb_list:
# If the cb severity is not valid for this alarm's severity, skip it
if not self._severity_cb_check(severity):
continue
# Try to run the callback, absorbing any errors
try:
cb(alarm)
except Exception:
rospy.logerr(
_make_callback_error_string(
self._alarm_name,
traceback.format_exc(),
),
)
[docs]class HeartbeatMonitor(AlarmBroadcaster):
"""
Used to trigger an alarm if a message on the topic ``topic_name`` isn't published
at least every ``prd`` seconds.
An alarm won't be triggered if no messages are initially received.
All of this class' methods and attributes are internal.
"""
def __init__(
self,
alarm_name: str,
topic_name: str,
msg_class,
prd: float = 0.2,
predicate: Callable | None = None,
nowarn: bool = False,
**kwargs,
):
self._predicate = predicate if predicate is not None else lambda *args: True
self._last_msg_time = None
self._prd = rospy.Duration(prd)
self._dropped = False
super().__init__(alarm_name, nowarn=nowarn, **kwargs)
rospy.Subscriber(topic_name, msg_class, self._got_msg)
rospy.Timer(rospy.Duration(prd / 2), self._check_for_message)
def _got_msg(self, msg):
# If the predicate passes, store the message time
if self._predicate(msg):
self._last_msg_time = rospy.Time.now()
# If it's dropped, clear the alarm and reset the dropped status
if self._dropped:
self.clear_alarm()
self._dropped = False
def _check_for_message(self, *args):
if self._last_msg_time is None:
return
if rospy.Time.now() - self._last_msg_time > self._prd and not self._dropped:
self.raise_alarm()
self._dropped = True