Source code for mil_passive_sonar.streamed_bandpass

from typing import Optional

import numpy as np
import scipy
from scipy.signal import fftconvolve


[docs]class StreamedBandpass: """ Class for bandpass filtering sequential batches of data. Takes care of the overlap-adding that needs to be done when filtering sequential batches of data individually. Attributes: lower (float): The lower bound of the desired frequency. upper (float): The upper bound of the desired frequency. trans_width (float): A magical special filter property! order (float): Filter order for the bandpass filter. Higher is better, but slower to compute. rate (Optional[float]): The rate at which data is sent. h (Optional[np.ndarray]): The Remez exchange algorithm, if defined. Constructed when the filter is made. """ def __init__( self, lower: float, upper: float, trans_width: float, order: float, rate: Optional[float] = None, ): """ Constructs the class using the variables provided. Checks if the class is ready to make a filter, and if so, the filter is constructed. Args: lower (float): The lower bound of the desired frequency. upper (float): The upper bound of the desired frequency. trans_width (float): A magical special filter property! order (float): Filter order for the bandpass filter. Higher is better, but slower to compute. rate (Optional[float]): The rate at which data is sent. """ self.from_prev = None self.size = order self.h = None self.lower = lower self.upper = upper self.trans_width = trans_width self.rate = rate if self.is_ready_to_make_filter(): self.make_filter()
[docs] def is_ready_to_make_filter(self) -> bool: """ Checks whether the filter is ready to be made. Checks that the :attr:`.lower` variable is not ``None``. Returns: bool: Whether or not the filter can be made. """ return ( self.lower is not None and self.upper is not None and self.trans_width is not None and self.rate is not None and self.size is not None )
[docs] def make_filter(self) -> None: """ Makes the data filter. Raises: Exception: Some necessary parameters of the class are ``None``. """ if not self.is_ready_to_make_filter(): raise Exception( "Some parameters have not been filled in yet: ", self.__dict__, ) self.h = scipy.signal.remez( self.size, [ 0, self.lower - self.trans_width, self.lower, self.upper, self.upper + self.trans_width, self.rate / 2, ], [0, 1, 0], Hz=self.rate, )
[docs] def convolve(self, data: np.ndarray) -> np.ndarray: """ Does the filtering. Args: data (np.ndarray): Numpy array of shape ``(samples, channels)``. Returns: np.ndarray: The resulting filtered data. """ filtered_data = np.apply_along_axis( lambda x: fftconvolve(x, self.h, "full"), 0, data, ) if self.from_prev is not None: filtered_data[: self.size - 1] += self.from_prev self.from_prev = filtered_data[-(self.size - 1) :] return filtered_data[: -(self.size - 1)]