#!/usr/bin/env python3
from typing import Union
import numpy as np
import rospy
from mil_passive_sonar import util
from mil_passive_sonar.msg import HydrophoneSamplesStamped, Ping, Triggered
from mil_passive_sonar.streamed_bandpass import StreamedBandpass
from mil_ros_tools import Plotter, interweave
from rospy.numpy_msg import numpy_msg
from scipy.ndimage.filters import maximum_filter1d
from std_msgs.msg import Header
from std_srvs.srv import (
SetBool,
SetBoolRequest,
SetBoolResponse,
Trigger,
TriggerRequest,
TriggerResponse,
)
[docs]class HydrophoneTrigger:
"""
ROS Node meant to trigger only when a ping happens in our target frequency range.
Subscribes to raw hydrophone samples on ``/samples``.
Publishes samples (of gradient) from right around the triggering on ``/pings``
when a ping is detected and found to be in the target frequency range.
Optionally, you can publish a plot of the frequency response of the filter on `/filter_debug`
by service calling `/filter_debug_trigger`.
Attributes:
general_lower (int): The lower bound of the frequency range. Set to the
frequency of the lowest frequency pinger - 10 kHz.
general_upper (int): The upper bound of the frequency range. Set to the
frequency of the lowest frequency pinger - 10 kHz.
time (float): The amount of time that has passed since recording started,
according to the messages received.
window_time (float): The maximum number of seconds to wait for another ping.
pub (rospy.Publisher): A publisher for the ``/pings`` topic. Publishes
:class:`Triggered` messages.
sub (Optional[rospy.Subscriber]): A subscriber to the ``/samples`` topic. Receives
either a :class:`~mil_passive_sonar.msg._Ping.Ping` or
:class:`~mil_passive_sonar.msg._HydrophoneSamplesStamped.HydrophoneSamplesStamped` message,
and sends the message to :meth:`.hydrophones_cb`.
trigger_debug (Plotter): A plotter responsible for publishing debug data
about the triggering behavior of the system.
"""
def __init__(self):
# Attributes about our general frequency range (all pinger live here)
# Frequency range garunteed to be relatively quiet except for the pingers (in Hz)
self.general_lower = 15000 # lowest frequency pinger - 10 kHz
self.general_upper = 50000 # highest frequency pinger + 10 kHz
# Misc Attributes
# time (from start of samples)
self.time = 0.0
# max convolution window, the amount of time that we assume should be quiet before a ping in sec
self.window_time = 0.1
# prev data buffer
self.prev_data = None
# Ros stuff
rospy.init_node("hydrophone_trigger")
# used for old bags
msg_type = rospy.get_param("~sample_msg_type")
if msg_type == "Ping":
self.sub = rospy.Subscriber("samples", numpy_msg(Ping), self.hydrophones_cb)
elif msg_type == "HydrophoneSamplesStamped":
self.sub = rospy.Subscriber(
"samples",
numpy_msg(HydrophoneSamplesStamped),
self.hydrophones_cb,
)
else:
rospy.logfatal("sample_msg_type not supported")
return
self.pub = rospy.Publisher("pings", numpy_msg(Triggered), queue_size=1)
rospy.Service("~filter_debug_trigger", Trigger, self.filter_response)
rospy.Service("~enable", SetBool, self.enable)
rospy.Service("~reset", Trigger, self.reset)
# Debug attributes
# Debug plot publishers
self.trigger_debug = Plotter("~trigger_debug")
self.sample_at_trigger_debug = Plotter("~sample_at_trigger_debug")
self.filter_debug = Plotter("~filter_debug")
self.get_params()
def get_params(self):
self.enabled = rospy.get_param("~enable_on_launch")
# Attributes about our target frequency range
# target Frequency in Hz
self.target = rospy.get_param("~target_frequency")
# tolerance around that frequerncy in Hz
tolerance = rospy.get_param("~frequency_tolerance")
# Filter attributes
# special filter property, do not change in Hz
trans_width = 500
# Filter order for bandpass filter, (higher is better, but slower to compute)
filt_order = 6000
# reset bandpass filter
self.bandpass_filter = StreamedBandpass(
self.target - tolerance,
self.target + tolerance,
trans_width,
filt_order,
)
# Physical Properties
# min time between pings (of any kind) in sec
self.min_time_between_pings = rospy.get_param("~min_time_between_pings")
self.prev_trigger_time = -1 * self.min_time_between_pings
self.dist_h = rospy.get_param("dist_h")
self.v_sound = rospy.get_param("v_sound")
# Misc attributes
# minimum gradient of the max convolution wrt time to trigger a time of arrivals calculation
self.threshold = rospy.get_param("~threshold")
self.trigger_offset = rospy.get_param("~trigger_offset")
# how far after the triggering time to make upper bound of samples at triggering in sec
self.trigger_window_future = 1.0 * (self.dist_h / self.v_sound)
# how far before the triggering time to make lower bound of samples at triggering in sec
self.trigger_window_past = 1.0 * (self.dist_h / self.v_sound)
[docs] def reset(self, req: TriggerRequest) -> TriggerResponse:
"""
Resets the system using the :meth`.get_params` method.
Args:
req (TriggerRequest): The request received.
Returns:
TriggerResponse: Whether the reset request was successful.
"""
res = TriggerResponse()
res.success = True
self.get_params()
return res
[docs] def enable(self, req: SetBoolRequest) -> SetBoolResponse:
"""
Enables the system; can be called through a :class:`SetBoolRequest` request.
Args:
req (SetBoolRequest): The service request.
Returns:
SetBoolResponse: The response to the service request.
"""
self.enabled = req.data
res = SetBoolResponse()
res.success = True
return res
[docs] def filter_response(self, req: TriggerRequest) -> TriggerResponse:
"""
Filters the debug service. Serves as a callback to the service.
Args:
req (TriggerRequest): The trigger request.
Returns:
TriggerResponse: The response to the request.
"""
res = TriggerResponse()
if self.bandpass_filter.h is None:
res.message = "filter has not yet been created (created on the first callback after target being set)"
res.success = False
return res
x, y = util.find_freq_response(
self.bandpass_filter.h,
self.rate,
self.general_lower,
self.general_upper,
)
plots = np.vstack((x, y))
titles = ["frequency (Hz) vs Gain (dB)"]
self.filter_debug.publish_plots(plots, titles=titles)
res.success = True
return res
[docs] def hydrophones_cb(self, msg: Union[Ping, HydrophoneSamplesStamped]) -> None:
"""
The callback to the hydrophone samples topic. Can receive either a :class:`~mil_passive_sonar.msg._Ping.Ping`
or :class:`~mil_passive_sonar.msg._HydrophoneSamplesStamped.HydrophoneSamplesStamped` message.
Args:
msg (Union[Ping, HydrophoneSamplesStamped]): The message that can be
received by the callback.
"""
# Record start time of cb to make sure we are running in real time
start_cb = rospy.get_rostime()
if isinstance(msg, numpy_msg(Ping)):
msg_channels = msg.channels
msg_samples = msg.samples
msg_sample_rate = msg.sample_rate
msg_data = msg.data
elif isinstance(msg, numpy_msg(HydrophoneSamplesStamped)):
msg_channels = msg.hydrophone_samples.channels
msg_samples = msg.hydrophone_samples.samples
msg_sample_rate = msg.hydrophone_samples.sample_rate
msg_data = msg.hydrophone_samples.data
# if not enabled, go no further
if not self.enabled:
return
if self.bandpass_filter.rate is None:
self.rate = msg_sample_rate
self.window_size = int(self.window_time * self.rate)
self.bandpass_filter.rate = self.rate
self.bandpass_filter.make_filter()
# Resize the 1d array to be a 2d array of samples by channels
msg_data.resize(msg_samples, msg_channels)
# Apply a bandpass filter to the data for our general frequency range (on all channels)
new_data = self.bandpass_filter.convolve(msg_data)
# We always look at 2 messages of data at a time concatenated (0.1 sec each msg (for sylphase board))
if self.prev_data is None:
self.prev_data = new_data
return
else:
data = np.concatenate((self.prev_data, new_data))
# Time according to the passive sonar interface (assuming no missed messages)
time = np.linspace(
self.time,
self.time + data.shape[0] / float(self.rate),
data.shape[0],
)
# only use hydrophone 0 to trigger, more efficient
# do a max convolution on the data
max_convolves = np.apply_along_axis(
lambda x: maximum_filter1d(x, self.window_size, axis=0),
0,
data,
)
max_convolve = max_convolves[:, 0]
# NOTE: No need to crop because zero padding is used and we are doing a max convolution
# take the gradient of the max convolution (we are looking for steep increase = big gradient)
gradients = np.gradient(max_convolves, axis=0)
if np.max(gradients[:, 0]) >= self.threshold:
triggered_at_idx = np.min(np.where(gradients[:, 0] >= self.threshold)[0])
triggered_at_idx += int(self.trigger_offset * self.rate)
trigger_time = time[triggered_at_idx]
# if we have triggered very recently, do not trigger (echo protection)
if trigger_time - self.prev_trigger_time > self.min_time_between_pings:
self.prev_trigger_time = trigger_time
start = triggered_at_idx - int(self.rate * self.trigger_window_past)
end = triggered_at_idx + int(self.rate * self.trigger_window_future)
ping_data = gradients[start:end]
triggered_at_sample = triggered_at_idx + (self.window_size - 1) / 2
start_sample = triggered_at_sample - int(
50 * self.rate * self.trigger_window_past,
)
end_sample = triggered_at_sample + int(
50 * self.rate * self.trigger_window_future,
)
ping_samples = data[start_sample:end_sample]
try:
freq = util.find_freq(ping_samples, self.rate)
except Exception as e:
rospy.logwarn(f"/hydrophone/triggering in find_freq {e}")
freq = self.target
rospy.loginfo(f"triggered at {trigger_time:f} on {freq:f} Hz")
ping = Triggered()
ping.header = Header()
ping.header.stamp = rospy.Time.now()
ping.hydrophone_samples.channels = msg_channels
ping.hydrophone_samples.samples = end - start
ping.hydrophone_samples.sample_rate = self.rate
ping.hydrophone_samples.data = ping_data.flatten()
ping.trigger_time = -1 * (
trigger_time
- (
(
2 * trigger_time
- self.trigger_window_past
+ self.trigger_window_future
)
/ 2
)
)
self.pub.publish(ping)
trigger_time_samples = (
triggered_at_sample / float(self.rate)
) + self.time
if self.trigger_debug.is_go():
plot_data = data[:, 0]
titles = [
"time vs Gradient of Max convolve",
f"time vs max_convolve (Window = {self.window_time} sec)",
"time vs hydrophone0 data",
]
vlines = [trigger_time, trigger_time, trigger_time_samples]
plotable = np.vstack((gradients[:, 0], max_convolve, plot_data))
plots = interweave(time, plotable)
self.trigger_debug.publish_plots(plots, titles, vlines)
if self.sample_at_trigger_debug.is_go():
plot_time = time[start_sample:end_sample]
plots = interweave(plot_time, ping_samples.transpose())
titles = ["h%i" % i for i in range(4)]
vline = [trigger_time_samples]
self.sample_at_trigger_debug.publish_plots(plots, titles, vline)
self.time += msg_samples / float(self.rate)
self.prev_data = new_data
spare_time = (float(msg_samples) / self.rate) - (
rospy.get_rostime() - start_cb
).to_sec()
if spare_time < 0:
rospy.logwarn(
f"Spare Time After Callback: {spare_time}, Running slower than real time",
)
if __name__ == "__main__":
a = HydrophoneTrigger()
rospy.spin()