#!/usr/bin/python3
"""
Note: The repeated use of CvBridge (instead of using make_image_msg and
get_image_msg in the classes) is intentional, to avoid the use of a global
cvbridge, and to avoid reinstantiating a CvBrige for each use.
"""
from os import path
from typing import Callable, List, Optional, Tuple
import message_filters
import numpy as np
import rospy
from cv_bridge import CvBridge, CvBridgeError
from image_geometry import PinholeCameraModel
from sensor_msgs.msg import CameraInfo, Image
from .init_helpers import wait_for_param
def get_parameter_range(parameter_root: str):
"""
Fetches the {parameter_root}/hsv_low and {parameter_root}/hsv_high parameters.
"""
low_param, high_param = parameter_root + "/hsv_low", parameter_root + "/hsv_high"
rospy.logwarn(f"Blocking -- waiting for parameters {low_param} and {high_param}")
wait_for_param(low_param)
wait_for_param(high_param)
low = rospy.get_param(low_param)
high = rospy.get_param(high_param)
rospy.loginfo(f"Got {low_param} and {high_param}")
return np.array([low, high]).transpose()
def make_image_msg(cv_image: List[float], encoding: str = "bgr8"):
"""
Take a CV image, and produce a ROS image message.
"""
bridge = CvBridge()
image_message = bridge.cv2_to_imgmsg(cv_image, encoding)
return image_message
def get_image_msg(ros_image: Image, encoding: str = "bgr8"):
"""
Take a ros image message, and return an OpenCV image.
"""
bridge = CvBridge()
cv_image = bridge.imgmsg_to_cv2(ros_image, desired_encoding=encoding)
return cv_image
[docs]class Image_Publisher:
"""
Publishes OpenCV image mats directly to a ROS topic, avoiding the need for
continual conversion.
Attributes:
bridge (CvBridge): The ROS bridge to OpenCV. Created upon instantiation.
encoding (str): The encoding of the images. Supplied upon creation.
Defaults to ``bgr8``.
im_pub (rospy.Publisher): The ROS publisher responsible for publishing
images to a ROS topic. The topic name and queue size are supplied
through the constructor.
"""
def __init__(self, topic: str, encoding: str = "bgr8", queue_size: int = 1):
self.bridge = CvBridge()
self.encoding = encoding
self.im_pub = rospy.Publisher(topic, Image, queue_size=queue_size)
def get_num_connections(self) -> int:
return self.im_pub.get_num_connections()
[docs] def publish(self, cv_image: np.ndarray):
"""
Publishes an OpenCV image mat to the ROS topic. :class:`CvBridgeError`
exceptions are caught and logged.
"""
try:
image_message = self.bridge.cv2_to_imgmsg(cv_image, self.encoding)
self.im_pub.publish(image_message)
except CvBridgeError as e:
# Intentionally absorb CvBridge Errors
rospy.logerr(str(e))
[docs]class Image_Subscriber:
"""
Calls callback on each image every time a new image is published on a certain
topic. This behaves like a conventional subscriber, except handling the
additional image conversion.
Attributes:
encoding (str): The encoding used to convert between OpenCV and ROS image
types.
camera_info (CameraInfo): The information about the camera related to the topic.
last_image_header (Header): The header of the last image received.
last_image_time (genpy.Time): The time of the last image received.
im_sub (rospy.Subscriber): The subscriber to the image topic. The topic
name and queue size are received through the constructor.
info_sub (rospy.Subscriber): The subscriber to the camera info topic.
The topic name is derived from the root of the supplied topic and the
queue size is derived from the constructor.
bridge (CvBridge): The bridge between OpenCV and ROS.
callback (Callable): The callback function to call upon receiving each
image.
"""
def __init__(self, topic, callback=None, encoding="bgr8", queue_size=1):
if callback is None:
def callback(im):
return setattr(self, "last_image", im)
self.encoding = encoding
self.camera_info = None
self.last_image_header = None
self.last_image_time = None
self.last_image = None
self.im_sub = rospy.Subscriber(
topic,
Image,
self.convert,
queue_size=queue_size,
)
root_topic, image_subtopic = path.split(rospy.remap_name(topic))
self.info_sub = rospy.Subscriber(
root_topic + "/camera_info",
CameraInfo,
self.info_cb,
queue_size=queue_size,
)
self.bridge = CvBridge()
self.callback = callback
[docs] def wait_for_camera_info(self, timeout: int = 10):
"""
Blocks until camera info has been received for a number of seconds.
Args:
timeout (int): The amount of seconds to wait for camera info.
Defaults to 10 seconds.
Raises:
Exception: No camera info was found after the timeout had finished.
"""
rospy.logwarn(
"Blocking -- waiting at most %d seconds for camera info." % timeout,
)
timeout = rospy.Duration(timeout)
rospy.sleep(0.1) # Make sure we don't have negative time
start_time = rospy.Time.now()
while (rospy.Time.now() - start_time < timeout) and (not rospy.is_shutdown()):
if self.camera_info is not None:
rospy.loginfo("Camera info found!")
return self.camera_info
rospy.sleep(0.2)
rospy.logerr("Camera info not found.")
raise Exception("Camera info not found.")
[docs] def wait_for_camera_model(self, **kwargs):
"""
Waits for the camera model information.
Returns:
PinholeCameraModel: The camera model.
"""
info_msg = self.wait_for_camera_info(**kwargs)
model = PinholeCameraModel()
model.fromCameraInfo(info_msg)
return model
[docs] def info_cb(self, msg: CameraInfo):
"""
Serves as the callback to a ROS subscriber subscribed to a camera info
topic.
This callback only executes once, as calling this method unregisters the
subscriber.
Args:
msg (CameraInfo): The message containing information about the camera.
"""
self.info_sub.unregister()
self.camera_info = msg
[docs] def convert(self, data: Image):
"""
Serves as the callback method to the image topic subscriber. Receives data
in the form of Image messages, and stores the time and header of the last
image received.
Upon receiving the image, this method converts the image to be in the
OpenCV format and calls the class' :meth:`~Image_Subscriber.callback` method.
Args:
data (Image): The message type representing an image in ROS.
"""
self.last_image_header = data.header
self.last_image_time = data.header.stamp
try:
image = self.bridge.imgmsg_to_cv2(data, desired_encoding=self.encoding)
self.callback(image)
except CvBridgeError as e:
# Intentionally absorb CvBridge Errors
rospy.logerr(e)
[docs]class StereoImageSubscriber:
"""
Abstraction to subscribe to two image topics (ex: left and right camera) and
receive callbacks for synchronized images (already converted to numpy arrays).
Also contains a helper function to block until the camera info messages for both
cameras are received.
Attributes:
bridge (CvBridge): The bridge between ROS and OpenCV. Created when the
class is instantiated.
encoding (str): The encoding between ROS and OpenCV.
callback (Callable): The callback function which executes each time a new
left and right image is received. The method should accept a parameter
for the left image and a parameter for the right image, both of which
are :class:`Image` types.
camera_info_left (Optional[CameraInfo]): The camera info for the left
image. Upon instantiation, set to ``None`` and updated only after receiving
camera info data.
camera_info_right (Optional[CameraInfo]): The camera info for the right
image. Upon instantiation, set to ``None`` and updated only after receiving
camera info data.
last_image_left (Optional[Image]): The last image received from the left
camera. This value is set to ``None`` upon class instantiation and
should be updated by :attr:`StereoImageSubscriber.callback`.
last_image_right (Optional[Image]): The last image received from the right
camera. This value is set to ``None`` upon class instantiation and
should be updated by :attr:`StereoImageSubscriber.callback`.
last_image_left_time (Optional[genpy.Time]): The timestamp when the last
left image was received. Set to ``None`` upon class instantiation.
last_image_right_time (Optional[genpy.Time]): The timestamp when the last
left image was received. Set to ``None`` upon class instantiation.
"""
def __init__(
self,
left_image_topic: str,
right_image_topic: str,
callback: Optional[Callable] = None,
slop: Optional[Callable] = None,
encoding: str = "bgr8",
queue_size: int = 10,
):
"""
Construct a StereoImageSubscriber
left_image_topic (str): ROS topic to subscribe for the left camera.
For example, ``/camera/front/left/image_rect_color``.
right_image_topic (str): ROS topic to subscribe to for the right
camera. For example, ``/camera/front/right/image_rect_color``.
callback (Optional[Callable]): Function with signature ``foo(left_img, right_img)``
to call when a synchronized pair is ready. If left as ``None``,
the latest synced images are stored as self.last_image_left
and self.last_image_right.
slop (Optional[int]): Maximum time in seconds between left and right
images to be considered synced. If left as None, will only
consider synced if left and right images have exact same header time.
encoding (str): String to pass to CvBridge to encode ROS image message
to numpy array
queue_size (int): Integer, the number of images to store in a buffer
for each camera to find synced images.
"""
if (
callback is None
): # Set default callback to just set image_left and image_right
def callback(image_left, image_right):
setattr(self, "last_image_left", image_left)
setattr(self, "last_image_right", image_right)
self.bridge = CvBridge()
self.encoding = encoding
self.callback = callback
self.camera_info_left = None
self.camera_info_right = None
self.last_image_left = None
self.last_image_time_left = None
self.last_image_right = None
self.last_image_time_right = None
# Subscribe to image and camera info topics for both cameras
root_topic_left, image_subtopic_left = path.split(left_image_topic)
self._info_sub_left = rospy.Subscriber(
root_topic_left + "/camera_info",
CameraInfo,
lambda info: setattr(self, "camera_info_left", info),
queue_size=queue_size,
)
image_sub_left = message_filters.Subscriber(left_image_topic, Image)
root_topic_right, image_subtopic_right = path.split(right_image_topic)
self._info_sub_right = rospy.Subscriber(
root_topic_right + "/camera_info",
CameraInfo,
lambda info: setattr(self, "camera_info_right", info),
queue_size=queue_size,
)
image_sub_right = message_filters.Subscriber(right_image_topic, Image)
# Use message_filters library to set up synchronized subscriber to both image topics
if slop is None:
self._image_sub = message_filters.TimeSynchronizer(
[image_sub_left, image_sub_right],
queue_size,
)
else:
self._image_sub = message_filters.ApproximateTimeSynchronizer(
[image_sub_left, image_sub_right],
queue_size,
slop,
)
self._image_sub.registerCallback(self._image_callback)
[docs] def wait_for_camera_info(
self,
timeout: int = 10,
unregister: bool = True,
) -> Tuple[CameraInfo, CameraInfo]:
"""
Blocks until camera info has been received.
Args:
timeout (int): Time in seconds to wait before throwing exception if camera info is not received
unregister (bool): Whether to unsubscribe from the camera info topics
after receiving the first message so that :attr:`~StereoImageSubscriber.camera_info_left`
and :attr:`~StereoImageSubscriber.camera_info_right` will not be updated.
Returns:
Tuple[CameraInfo, CameraInfo]: A tuple containing the camera info for the left and right cameras, respectively.
Raises:
Exception: if camera info for both cameras is not received within timeout
"""
timeout = rospy.Time.now() + rospy.Duration(timeout)
while (rospy.Time.now() < timeout) and (not rospy.is_shutdown()):
if self.camera_info_left is not None and self.camera_info_right is not None:
if unregister:
self._info_sub_left.unregister()
self._info_sub_right.unregister()
return self.camera_info_left, self.camera_info_right
rospy.sleep(0.05)
if self.camera_info_left is not None and self.camera_info_right is not None:
if unregister:
self._info_sub_left.unregister()
self._info_sub_right.unregister()
return self.camera_info_left, self.camera_info_right
raise Exception("Camera info not found.")
def _image_callback(self, left_img: Image, right_img: Image):
"""
Internal wrapper around image callback. Updates latest timestamps and
converts ROS image messages to numpy arrays (for use with OpenCV, etc)
before calling user defined callback.
Args:
left_img (Image): The synchronized image from the left camera
right_img (Image): The synchronized image from the right camera
"""
try:
self.last_image_time_left = left_img.header.stamp
self.last_image_time_right = right_img.header.stamp
img_left = self.bridge.imgmsg_to_cv2(
left_img,
desired_encoding=self.encoding,
)
img_right = self.bridge.imgmsg_to_cv2(
right_img,
desired_encoding=self.encoding,
)
self.callback(img_left, img_right)
except CvBridgeError as e:
# Intentionally absorb CvBridge Errors
rospy.logerr(e)