Source code for mil_passive_sonar.scripts.ping_locator

#!/usr/bin/env python3
import numpy as np
import rospy
from geometry_msgs.msg import Vector3Stamped
from mil_passive_sonar import util
from mil_passive_sonar.msg import Triggered
from mil_ros_tools import Plotter, interweave
from mil_tools import numpy_to_vector3
from rospy.numpy_msg import numpy_msg
from scipy.signal import correlate
from std_srvs.srv import SetBool, SetBoolRequest, SetBoolResponse


[docs]class PingLocator: """ Subscribers to ``/pings``, and publishes vector to the pinger on ``/direction``. Creates a node named ``ping_locator``. """ def __init__(self): rospy.init_node("ping_locator") self.sub = rospy.Subscriber("pings", numpy_msg(Triggered), self.ping_cb) self.pub = rospy.Publisher("direction", Vector3Stamped, queue_size=10) rospy.Service("~enable", SetBool, self.enable) self.enabled = True self.debug = Plotter("~cross_correlation_debug") self.debug_samples = Plotter("~samples_debug") self.dist_h = rospy.get_param("dist_h") self.v_sound = rospy.get_param("v_sound")
[docs] def enable(self, req: SetBoolRequest) -> SetBoolResponse: """ Sets the locator to enabled. Doesn't immediately do anything. Args: req (SetBoolRequest): The service request. Returns: SetBoolResponse: The service response. """ self.enabled = req.data res = SetBoolResponse() res.success = True return res
[docs] def ping_cb(self, msg) -> None: """ Callback called with each new ping. With each call, creates a :class:`~geometry_msgs.msg._Vector3Stamped.Vector3Stamped` message containing the direction of the pinger. Args: msg (Triggered): The received message, with numpy formatting. """ data = msg.hydrophone_samples.data data.resize((msg.hydrophone_samples.samples, msg.hydrophone_samples.channels)) rate = msg.hydrophone_samples.sample_rate cross_corr = np.array( [ correlate(data[:, 0], data[:, i], mode="same") for i in range(0, data.shape[1]) ], ).transpose() total_time = float(cross_corr.shape[0]) / rate time = np.linspace(total_time / -2, total_time / 2, cross_corr.shape[0]) deltas = np.zeros((4,)) maxes = np.array( [np.argmax(cross_corr[:, i]) for i in range(cross_corr.shape[1])], ) if (maxes == cross_corr.shape[0] - 1).any() or (maxes == 0).any(): rospy.logerr( "/hydrophones/ping_locator lack of features on one of the channels", ) return deltas = np.array(time[maxes]) deltas -= deltas[0] plots = interweave(time, cross_corr.transpose()) titles = ["h0 cross h%d" % i for i in range(data.shape[0])] self.debug.publish_plots(plots, titles, time[maxes]) plots = interweave(time, data.transpose()) titles = ["h%d" % i for i in range(data.shape[0])] vlines = np.array( [-1 * deltas[i] + msg.trigger_time for i in range(deltas.shape[0])], ) self.debug_samples.publish_plots(plots, titles, vlines) try: vec = util.calculate_dir_pinger(deltas, self.dist_h, self.v_sound) except Exception as e: rospy.logwarn( f"/hydrophones/ping_locator could not calculate pinger direction {e}", ) return v = Vector3Stamped() v.header = msg.header v.header.frame_id = "hydrophones" v.vector = numpy_to_vector3(vec) self.pub.publish(v)
if __name__ == "__main__": a = PingLocator() rospy.spin()