#!/usr/bin/env python3
import inspect
from typing import Union
import rospy
from ros_alarms import Alarm, HandlerBase
from ros_alarms_msgs.msg import Alarm as AlarmMsg
from ros_alarms_msgs.srv import (
AlarmGet,
AlarmGetRequest,
AlarmGetResponse,
AlarmSet,
AlarmSetRequest,
)
[docs]class AlarmServer:
"""
Server responsible for maintaining the status of alarms between all processes.
Attributes:
alarms (Dict[str, ros_alarms.Alarm]): A map from the alarm names to to the Alarm objects.
handlers (Dict[str, HandlerBase]): A map from the alarm names to their appropriate
handlers.
meta_alarms (Dict[str, ros_alarms.Alarm]): A map from the meta alarm names to the Alarm
objects.
"""
def __init__(self):
# Maps alarm name to Alarm objects
self.alarms = {}
# Handler classes for overwriting default alarm functionality
self.handlers = {}
# Maps meta alarm names to predicate Handler functions
self.meta_alarms = {}
msg = "Expecting at most the following alarms: {}"
rospy.loginfo(msg.format(rospy.get_param("/known_alarms", [])))
self._alarm_pub = rospy.Publisher(
"/alarm/updates",
AlarmMsg,
latch=True,
queue_size=100,
)
self._create_meta_alarms()
self._create_alarm_handlers()
# Outside interface to the alarm system. Usually you don't want to
# interface with these directly.
rospy.Service("/alarm/set", AlarmSet, self._on_set_alarm)
rospy.Service("/alarm/get", AlarmGet, self._on_get_alarm)
[docs] def set_alarm(self, alarm: Union[Alarm, AlarmMsg]) -> bool:
"""
Sets or updates the alarm.
Updating the alarm triggers all of the alarms callbacks, if any are set.
The alarm becomes registered within the server and a message representing
the alarm is published to the alarm publishing topic.
Args:
alarm (Union[ros_alarms.Alarm, ros_alarms.msg.Alarm]): The alarm to
set in the server.
Returns:
bool: Whether the operation succeeded.
"""
if alarm.alarm_name in self.handlers:
res = self.handlers[alarm.alarm_name].on_set(alarm)
if res is False:
return False
if alarm.alarm_name in self.alarms:
self.alarms[alarm.alarm_name].update(alarm)
else:
self.alarms[alarm.alarm_name] = Alarm.from_msg(alarm)
if isinstance(alarm, Alarm):
alarm = alarm.as_msg()
self._alarm_pub.publish(alarm)
return True
def _on_set_alarm(self, srv: AlarmSetRequest) -> bool:
self.set_alarm(srv.alarm)
return True
def _on_get_alarm(self, srv: AlarmGetRequest) -> AlarmGetResponse:
"""
Either returns the alarm request if it exists or a blank alarm.
"""
rospy.logdebug(f"Got request for alarm: {srv.alarm_name}")
return self.alarms.get(
srv.alarm_name,
Alarm.blank(srv.alarm_name),
).as_srv_resp()
[docs] def make_tagged_alarm(self, name: str) -> Alarm:
"""
Makes a blank alarm with the node_name of the alarm_server so that users
know it is the initial state.
"""
alarm = Alarm.blank(name)
alarm.node_name = "alarm_server"
return alarm
def _handle_meta_alarm(self, meta_alarm, sub_alarms):
"""
Calls the meta_predicate callback for an alarm handler when one of its metal alarms has changed.
Then, updates the status of the parent alarm, if necessary.
"""
alarms = {
name: alarm for name, alarm in self.alarms.items() if name in sub_alarms
}
meta = self.alarms[meta_alarm]
# Check the predicate, this should return either an alarm object or a boolean for if should be raised
res = self.meta_alarms[meta_alarm](meta, alarms)
# If it an alarm instance send it out as is
if isinstance(res, Alarm):
alarm = res
alarm.alarm_name = meta_alarm # Ensure alarm name is correct
elif isinstance(res, bool):
# If it is a boolean, only update if it changes the raised status
raised_status = res
if raised_status == meta.raised:
return
alarm = meta.as_msg()
alarm.raised = bool(raised_status)
if alarm.raised: # If it is raised, set problem description
alarm.problem_description = "Raised by meta alarm"
else:
rospy.logwarn(
f"Meta alarm callback for {meta_alarm} failed to return an Alarm or boolean",
)
return
self.set_alarm(alarm)
def _create_alarm_handlers(self):
"""
Alarm handlers are classes imported by the alarm server and run code upon a change of state
of their respective alarms.
Handlers should be in a python module (directory with an __init__.py) and in the python path.
They will be loaded from the module specified with the ~handler_module param to the alarm server.
"""
# If the param exists, load it here
handler_module = rospy.get_param("~handler_module", None)
if handler_module is None:
return
# Give handlers access to alarm server
HandlerBase._init(self)
# Import the module where the handlers are stored
alarm_handlers = __import__(handler_module, fromlist=[""])
for handler in [
cls
for name, cls in inspect.getmembers(alarm_handlers)
if inspect.isclass(cls)
and issubclass(cls, HandlerBase)
and hasattr(cls, "alarm_name")
and name != "HandlerBase"
]:
# Have to instantiate so the class exists exists
h = handler()
alarm_name = handler.alarm_name
# Set initial state if necessary (could have already been added while creating metas)
if hasattr(h, "initial_alarm"):
if alarm_name in self.alarms:
self.alarms[alarm_name].update(h.initial_alarm)
else:
self.alarms[alarm_name] = (
h.initial_alarm
) # Update even if already added to server
elif (
alarm_name not in self.alarms
): # Add default initial if not there already
self.alarms[alarm_name] = self.make_tagged_alarm(alarm_name)
else:
pass
# If a handler exists for a meta alarm, we need to save the predicate
if alarm_name in self.meta_alarms:
self.meta_alarms[alarm_name] = h.meta_predicate
self.handlers[alarm_name] = h
rospy.loginfo(f"Loaded handler: {h.alarm_name}")
def _create_meta_alarms(self, namespace="meta_alarms/"):
"""Adds meta alarms to the alarm server
Meta alarms are special in that they are not directly raised or cleared but are instead triggered
by a change of state of their child alarms.
The /meta_alarms parameter defines a the structure of a meta alarm. It has the following structure:
{meta_alarm_name : [list of child alarm names], ...}
Users can also provide more complex triggering mechanisms by providing an alarm handler class with
a 'meta_predicate' method.
"""
meta_alarms_dict = rospy.get_param(namespace, {})
for meta, alarms in meta_alarms_dict.items():
# Add the meta alarm
if meta not in self.alarms:
self.alarms[meta] = self.make_tagged_alarm(meta)
def default(meta, alarms):
"""
If no predicate for a meta-alarm is provided, then the meta-alarm will be raised
if any of the child alarms are raised
"""
return any(alarms.items())
self.meta_alarms[meta] = default
def cb(alarm, meta_name=meta, sub_alarms=alarms):
return self._handle_meta_alarm(meta_name, sub_alarms)
for alarm in alarms:
if alarm not in self.alarms:
self.alarms[alarm] = self.make_tagged_alarm(alarm)
self.alarms[alarm].add_callback(cb)
if __name__ == "__main__":
rospy.init_node("alarm_server")
a = AlarmServer()
rospy.spin()