Source code for mil_ros_tools.msg_helpers

from typing import List, Tuple

import geometry_msgs.msg as geometry_msgs
import nav_msgs.msg as nav_msgs
import numpy as np
import rospy
import std_msgs.msg as std_msgs
from mil_msgs.msg import Point2D, PoseTwist
from tf import transformations


def rosmsg_to_numpy(rosmsg, keys=None):
    """
    ===============
    NOTE: This method should be removed. Please see issue #469 on the GitHub
    repository.
    ===============
    Convert an arbitrary ROS msg to a numpy array
    With no additional arguments, it will by default handle:
        Point2D, Point3D, Vector3D, Quaternion and any lists of these (like Polygon)

    Ex:
        quat = Quaternion(1.0, 0.0, 0.0, 0.0)
        quat is not a vector, you have quat.x, quat.y,... and you can't do math on that

        But wait, there's hope!
            rosmsg_to_numpy(quat) -> array([1.0, 0.0, 0.0, 0.0])

        Yielding a vector, which you can do math on!

        Further, imagine a bounding box message, BB, with properties BB.x, BB.h, BB.y, and BB.w

            rosmsg_to_numpy(BB, ['x', 'h', 'y', 'w']) -> array([BB.x, BB.h, BB.y, BB.w])

            or...
            rosmsg_to_numpy(some_Pose2D, ['x', 'y', 'yaw']) = array([x, y, yaw])

    Note:
        - This function is designed to handle the most common use cases (vectors, points and quaternions)
            without requiring any additional arguments.
    """

    # Recurse for lists like geometry_msgs/Polygon, Pointclou
    if isinstance(rosmsg, list):
        output_array = []
        for item in rosmsg:
            output_array.append(rosmsg_to_numpy(item, keys=keys))
        return np.array(output_array).astype(np.float32)

    if keys is None:
        keys = ["x", "y", "z", "w"]
        output_array = []
        for key in keys:
            # This is not necessarily the fastest way to do this
            if hasattr(rosmsg, key):
                output_array.append(getattr(rosmsg, key))
            else:
                break
        assert (
            len(output_array) != 0
        ), f"Input type {type(rosmsg).__name__} has none of these attributes {keys}."
        return np.array(output_array).astype(np.float32)

    else:
        output_array = np.zeros(len(keys), np.float32)
        for n, key in enumerate(keys):
            output_array[n] = getattr(rosmsg, key)

        return output_array


[docs]def pose_to_numpy(pose: geometry_msgs.Pose) -> Tuple[float, float]: """ Turns a :class:`Pose` message into a tuple of position and orientation. .. warning :: This method relies on a method (:meth:`mil_ros_tools.rosmsg_to_numpy`) which will be deprecated in the future. This method will need to be updated. Args: pose (Pose): The pose message. Returns: Tuple[float, float]: The tuple of position and orientation. """ # TODO Add unit tests position = rosmsg_to_numpy(pose.position) orientation = rosmsg_to_numpy(pose.orientation) return position, orientation
[docs]def twist_to_numpy(twist: geometry_msgs.Twist): """ Turns a :class:`Twist` message into a tuple of linear and angular speeds. .. warning :: This method relies on a method (:meth:`mil_ros_tools.rosmsg_to_numpy`) which will be deprecated in the future. This method will need to be updated. Args: twist (Twist): The twist message. Returns: Tuple[List[float], List[float]]: The tuple of linear and angular speeds. """ # TODO Add unit tests linear = rosmsg_to_numpy(twist.linear) angular = rosmsg_to_numpy(twist.angular) return linear, angular
[docs]def posetwist_to_numpy(posetwist: PoseTwist): """ Turns a :class:`PoseTwist` message into pose position, pose orientation, twist linear, and twist angular. .. warning :: This method relies on a method (:meth:`mil_ros_tools.rosmsg_to_numpy`) which will be deprecated in the future. This method will need to be updated. Args: twist (PoseTwist): The pose + twist message. Returns: Tuple[Tuple[np.ndarray, np.ndarray], Tuple[np.ndarray, np.ndarray]]: The tuple of linear and angular speeds. """ pose = pose_to_numpy(posetwist.pose) twist = twist_to_numpy(posetwist.twist) return pose, twist
[docs]def odometry_to_numpy(odom: nav_msgs.Odometry): """ Turns a :class:`Odometry` message into its pose, twist, pose covariance, and twist covariance. .. warning :: This method relies on a method (:meth:`mil_ros_tools.rosmsg_to_numpy`) which will be deprecated in the future. This method will need to be updated. Args: odom (Odometry): The odometry message. Returns: Tuple[Tuple[np.ndarray, np.ndarray], Tuple[np.ndarray, np.ndarray], np.ndarray, np.ndarray]: The tuple of pose, twist, pose covariance, and twist covariance. """ # TODO Add unit tests pose = pose_to_numpy(odom.pose.pose) pose_covariance = np.array(odom.pose.covariance).reshape(6, 6) twist = twist_to_numpy(odom.twist.twist) twist_covariance = np.array(odom.twist.covariance).reshape(6, 6) return pose, twist, pose_covariance, twist_covariance
[docs]def wrench_to_numpy(wrench: geometry_msgs.Wrench): """ Turns a :class:`Wrench` message into its force and torque, represented as numpy arrays. .. warning :: This method relies on a method (:meth:`mil_ros_tools.rosmsg_to_numpy`) which will be deprecated in the future. This method will need to be updated. Args: wrench (Wrench): The wrench message. Returns: Tuple[np.ndarray, np.ndarray]: The tuple of force and torque. """ force = rosmsg_to_numpy(wrench.force) torque = rosmsg_to_numpy(wrench.torque) return force, torque
[docs]def numpy_to_point(vector: np.ndarray) -> geometry_msgs.Point: """ Turns a List[:class:`float`] into a :class:`Point` message. Args: vector (np.ndarray): The vector to convert Returns: geometry_msgs.Point: The constructed message. """ np_vector = np.array(vector) if np_vector.size == 2: np_vector = np.append(np_vector, 0) # Assume user is give 2d point return geometry_msgs.Point(*np_vector)
[docs]def numpy_to_point2d(vector: np.ndarray) -> Point2D: """ Turns a :class:`np.ndarray` into a :class:`Point2D` message. Args: vector (np.ndarray): The vector to convert. Should have two values, the first of which represents x and the other y. Returns: Point2D: The constructed message. """ np_vector = np.array(vector) return Point2D(*np_vector)
[docs]def numpy_to_quaternion(np_quaternion: np.ndarray) -> geometry_msgs.Quaternion: """ Turns a List[:class:`float`] into a :class:`Quaternion` message. Args: np_quaternion (np.ndarray): The vector to convert. Should have four values, representing ``x``, ``y``, ``z``, and ``w``. Returns: Quaternion: The constructed message. """ return geometry_msgs.Quaternion(*np_quaternion)
[docs]def numpy_to_twist( linear_vel: np.ndarray, angular_vel: np.ndarray, ) -> geometry_msgs.Twist: """ Turns two :class:`np.ndarray`s into a :class:`Twist` message. Args: linear_vel (np.ndarray): The vector to convert. Values should represent the individual components of ``x``, ``y``, and ``z``. angular_vel (np.ndarray): The vector to convert. Values should represent the individual components of ``x``, ``y``, and ``z``. Returns: Twist: The constructed message. """ # TODO Add unit tests return geometry_msgs.Twist( linear=geometry_msgs.Vector3(*linear_vel), angular=geometry_msgs.Vector3(*angular_vel), )
[docs]def numpy_to_wrench(forcetorque: np.ndarray): """ Turns a np.ndarray into a :class:`Wrench` message. Args: forcetorque (np.ndarray): The vector to convert. Values should represent the individual components of ``force.x``, ``force.y``, ``force.z``, ``torque.x``, ``torque.y``, and ``torque.z``. Returns: Wrench: The constructed message. """ return geometry_msgs.Wrench( force=geometry_msgs.Vector3(*forcetorque[:3]), torque=geometry_msgs.Vector3(*forcetorque[3:]), )
[docs]def numpy_matrix_to_quaternion(np_matrix: np.ndarray): """ Given a 3x3 rotation matrix, convert to a quaternion, and return the quaternion as a ROS message. Args: np_matrix (np.ndarray): The 3x3 rotation matrix. Returns: Quaternion: The constructed message. """ assert np_matrix.shape == (3, 3), "Must submit 3x3 rotation matrix" pose_mat = np.eye(4) pose_mat[:3, :3] = np_matrix np_quaternion = transformations.quaternion_from_matrix(pose_mat) return geometry_msgs.Quaternion(*np_quaternion)
[docs]def numpy_pair_to_pose( np_translation: np.ndarray, np_rotation_matrix: np.ndarray, ) -> geometry_msgs.Pose: """ Convert a rotation matrix and point pair to a Pose message. Args: np_translation (np.ndarray): An array of 2 or 3 points representing the object's position. np_rotation_matrix (np.ndarray): A 3x3 rotation matrix. Returns: geometry_msgs.Pose: The Pose message with the position and orientation. """ orientation = numpy_matrix_to_quaternion(np_rotation_matrix) position = numpy_to_point(np_translation) return geometry_msgs.Pose(position=position, orientation=orientation)
[docs]def numpy_quat_pair_to_pose( np_translation: np.ndarray, np_quaternion: np.ndarray, ) -> geometry_msgs.Pose: """ Convert a quaternion array and point array pair to a Pose message. Args: np_translation (np.ndarray): An array of 2 or 3 points representing the object's position. np_quaternion (np.ndarray): An array with 4 points, representing ``x``, ``y``, ``z``, and ``w`` of a quaternion describing the object. Returns: geometry_msgs.Pose: The Pose message with the position and orientation. """ orientation = numpy_to_quaternion(np_quaternion) position = numpy_to_point(np_translation) return geometry_msgs.Pose(position=position, orientation=orientation)
[docs]def numpy_to_points(points: List[np.ndarray]) -> List[geometry_msgs.Point]: """ Convert a list of :class:`np.ndarray`s into :class:`Point` messages. Args: points (List[np.ndarray]): The points that will be converted to messages. Returns: List[Point]: The resulting point messages. """ ret = [] for point in points: ret.append(numpy_to_point(point)) return ret
[docs]def numpy_to_polygon(polygon: List[np.ndarray]) -> geometry_msgs.Polygon: """ Convert a list of :class:`np.ndarray`s (representing :class:`Point`s) into a :class:`Polygon` message. Args: points (List[np.ndarray]): The points that will be added to the polygon. Returns: Polygon: The resulting message. """ points = numpy_to_points(polygon) return geometry_msgs.Polygon(points=points)
[docs]def numpy_to_vector3(vec: np.ndarray) -> geometry_msgs.Vector3: """ Convert a :class:`np.ndarray` of :class:`float`s with length 3 into a :class:`Vector3` message. Args: vec (np.ndarray): A numpy array with ``x``, ``y``, and ``z``. Returns: Vector3: The Vector3 message. """ assert len(vec) == 3 return geometry_msgs.Vector3(*vec)
[docs]def numpy_to_pose2D(pose: np.ndarray) -> geometry_msgs.Pose2D: """ Convert a :class:`np.ndarray` of :class:`float`s with length 3 into a :class:`Pose2D` message. Args: pose (np.ndarray): A numpy array with ``x``, ``y``, and ``theta``. Returns: Pose2D: The Pose2D message. """ return geometry_msgs.Pose2D(*pose)
[docs]def numpy_to_colorRGBA(color: np.ndarray) -> std_msgs.ColorRGBA: """ Convert a :class:`np.ndarray` of :class:`float`s with length 4 into a :class:`ColorRGBA` message. Args: color (np.ndarray): A numpy array with ``r``, ``g``, ``b``, and ``a``. Returns: ColorRGBA: The ColorRGBA message. """ return std_msgs.ColorRGBA(*color)
[docs]def make_header(frame="/body", stamp=None) -> std_msgs.Header: """ Creates a message header. Args: frame (str): The name of the frame to attach to the header. Defaults to ``/body``. stamp (Optional[rospy.rostime.Time]): The timestamp to attach to the header of the message. If ``None``, then the current time is used. Returns: Header: The constructed header. """ if stamp is None: try: stamp = rospy.Time.now() except rospy.ROSInitException: stamp = rospy.Time(0) header = std_msgs.Header(stamp=stamp, frame_id=frame) return header
[docs]def make_wrench_stamped(force: List[float], torque: List[float], frame: str = "/body"): """ Makes a WrenchStamped message. Args: force (List[float]): An array representing the force components. The list should contain ``force.x``, ``force.y``, ``force.z``. torque (List[float]): An array representing the torque components. The list should contain ``torque.x``, ``torque.y``, and ``torque.z``. frame (str): The frame to attach to the header. Defaults to ``/body``. Returns: WrenchStamped: The constructed message. """ wrench = geometry_msgs.WrenchStamped( header=make_header(frame), wrench=geometry_msgs.Wrench( force=geometry_msgs.Vector3(*force), torque=geometry_msgs.Vector3(*torque), ), ) return wrench
[docs]def make_pose_stamped( position: List[float], orientation: List[float], frame: str = "/body", ) -> geometry_msgs.WrenchStamped: """ Makes a PoseStamped message. Args: position (List[float]): An array representing the force components. The list should contain ``position.x``, ``position.y``, ``position.z``. orientation (List[float]): An array representing the torque components. The list should contain ``orientation.x``, ``orientation.y``, ``orientation.z``, and ``orientation.w``. frame (str): The frame to attach to the header. Defaults to ``/body``. Returns: WrenchStamped: The constructed message. """ wrench = geometry_msgs.WrenchStamped( header=make_header(frame), pose=geometry_msgs.Pose( force=geometry_msgs.Vector3(*position), orientation=geometry_msgs.Quaternion(*orientation), ), ) return wrench
def odom_sub(topic, callback): def wrapped_callback(*args): msg = args[-1] callback(odometry_to_numpy(msg)) return rospy.Subscriber(topic, nav_msgs.Odometry, wrapped_callback, queue_size=1) def ros_to_np_3D(msg): xyz_array = np.array([msg.x, msg.y, msg.z]) return xyz_array