Images

BagCrawler

class mil_ros_tools.BagCrawler(bag_path: str)[source]

Crawls a bag file and displays each image message of a camera through OpenCV.

bag_path

The path to the bag file.

Type

str

bag

The bag object representing a bag file at the file location.

Type

rosbag.Bag

convert(msg: Image)[source]

Converts an Image message to a cv2 compatible image.

Parameters

msg (sensor_msgs.msg.Image) – The Image message.

crawl(topic: Optional[str] = None, is_image: bool = False, max_msgs: float = inf)[source]

Crawls a bag file and returns a generator yielding each message in the file. Additionally, a tqdm progress bar is displayed.

Parameters
  • topic (Optional[str]) –

    ???

  • is_image (bool) –

    ???

  • max_msgs (float) – The maximum number of messages to return.

Yields

genpy.Message – A message in the bag file.

property image_info_topics: List[str]

Warning

This function may not function properly. If the method works, please remove this message.

Finds all topics that use the sensor_msgs/CameraInfo type and exist on a specific camera.

Parameters

cam (str) – The camera to find image topics on. Should either be ‘left’ or ‘right’.

property image_topics: List[str]

Warning

This function may not function properly. If the method works, please remove this message.

Finds all topics that use the sensor_msgs/Image type and exist on a specific camera.

Parameters

cam (str) – The camera to find image topics on. Should either be ‘left’ or ‘right’.

CvDebug

class mil_tools.CvDebug(nh: Optional[NodeHandle] = None, w: int = 1000, h: int = 800, total: int = 8, win_name: str = 'debug', wait: bool = True)[source]

Class that contains methods that assist with debugging with images.

width

The width of the debug image

Type

int

height

The height of the debug image

Type

int

nh

The node handle for the image stream. If None, then images are displayed through the OpenCV GUI.

Type

Optional[NodeHandle]

total

???

Type

int

hor_num

???

Type

float

max_width

???

Type

float

max_height

???

Type

float

wait

Whether or not to wait after showing the image. Set in the constructor.

Type

bool

win_name

The name of the OpenCV GUI image display window. Defaults to "debug".

Type

str

Initialize the Debug class.

Parameters
  • nh (NodeHandle) – The node handle for the image stream

  • w (int) – The width of the image that smaller images are added to

  • h (int) – The height of the image that smaller images are added to

  • win_name (str) – the name of the window that is shown in opencv

  • wait (bool) – whether or not to wait after showing the image

add_image(img, name: str, wait: int = 33, topic: str = 'image') None[source]

Add an image to show to either with a topic or using cv2.imshow().

Parameters
  • name (str) – A unique key name for the image, use the same name if you want to switch out this image for another.

  • wait (int) – The amount of wait time for the imshow image.

  • topic (str) – The name of the topic to publish the image to.

Image_Publisher

Attributes
Methods
class mil_tools.Image_Publisher(topic: str, encoding: str = 'bgr8', queue_size: int = 1)[source]

Publishes OpenCV image mats directly to a ROS topic, avoiding the need for continual conversion.

bridge

The ROS bridge to OpenCV. Created upon instantiation.

Type

CvBridge

encoding

The encoding of the images. Supplied upon creation. Defaults to bgr8.

Type

str

im_pub

The ROS publisher responsible for publishing images to a ROS topic. The topic name and queue size are supplied through the constructor.

Type

rospy.Publisher

publish(cv_image: ndarray)[source]

Publishes an OpenCV image mat to the ROS topic. CvBridgeError exceptions are caught and logged.

Image_Subscriber

class mil_tools.Image_Subscriber(topic, callback=None, encoding='bgr8', queue_size=1)[source]

Calls callback on each image every time a new image is published on a certain topic. This behaves like a conventional subscriber, except handling the additional image conversion.

encoding

The encoding used to convert between OpenCV and ROS image types.

Type

str

camera_info

The information about the camera related to the topic.

Type

CameraInfo

last_image_header

The header of the last image received.

Type

Header

last_image_time

The time of the last image received.

Type

genpy.Time

im_sub

The subscriber to the image topic. The topic name and queue size are received through the constructor.

Type

rospy.Subscriber

info_sub

The subscriber to the camera info topic. The topic name is derived from the root of the supplied topic and the queue size is derived from the constructor.

Type

rospy.Subscriber

bridge

The bridge between OpenCV and ROS.

Type

CvBridge

callback

The callback function to call upon receiving each image.

Type

Callable

convert(data: Image)[source]

Serves as the callback method to the image topic subscriber. Receives data in the form of Image messages, and stores the time and header of the last image received.

Upon receiving the image, this method converts the image to be in the OpenCV format and calls the class’ callback() method.

Parameters

data (Image) – The message type representing an image in ROS.

info_cb(msg: CameraInfo)[source]

Serves as the callback to a ROS subscriber subscribed to a camera info topic.

This callback only executes once, as calling this method unregisters the subscriber.

Parameters

msg (CameraInfo) – The message containing information about the camera.

wait_for_camera_info(timeout: int = 10)[source]

Blocks until camera info has been received for a number of seconds.

Parameters

timeout (int) – The amount of seconds to wait for camera info. Defaults to 10 seconds.

Raises

Exception – No camera info was found after the timeout had finished.

wait_for_camera_model(**kwargs)[source]

Waits for the camera model information.

Returns

The camera model.

Return type

PinholeCameraModel

StereoImageSubscriber

class mil_tools.StereoImageSubscriber(left_image_topic: str, right_image_topic: str, callback: Optional[Callable] = None, slop: Optional[Callable] = None, encoding: str = 'bgr8', queue_size: int = 10)[source]

Abstraction to subscribe to two image topics (ex: left and right camera) and receive callbacks for synchronized images (already converted to numpy arrays). Also contains a helper function to block until the camera info messages for both cameras are received.

bridge

The bridge between ROS and OpenCV. Created when the class is instantiated.

Type

CvBridge

encoding

The encoding between ROS and OpenCV.

Type

str

callback

The callback function which executes each time a new left and right image is received. The method should accept a parameter for the left image and a parameter for the right image, both of which are Image types.

Type

Callable

camera_info_left

The camera info for the left image. Upon instantiation, set to None and updated only after receiving camera info data.

Type

Optional[CameraInfo]

camera_info_right

The camera info for the right image. Upon instantiation, set to None and updated only after receiving camera info data.

Type

Optional[CameraInfo]

last_image_left

The last image received from the left camera. This value is set to None upon class instantiation and should be updated by StereoImageSubscriber.callback.

Type

Optional[Image]

last_image_right

The last image received from the right camera. This value is set to None upon class instantiation and should be updated by StereoImageSubscriber.callback.

Type

Optional[Image]

last_image_left_time

The timestamp when the last left image was received. Set to None upon class instantiation.

Type

Optional[genpy.Time]

last_image_right_time

The timestamp when the last left image was received. Set to None upon class instantiation.

Type

Optional[genpy.Time]

Construct a StereoImageSubscriber

left_image_topic (str): ROS topic to subscribe for the left camera.

For example, /camera/front/left/image_rect_color.

right_image_topic (str): ROS topic to subscribe to for the right

camera. For example, /camera/front/right/image_rect_color.

callback (Optional[Callable]): Function with signature foo(left_img, right_img)

to call when a synchronized pair is ready. If left as None, the latest synced images are stored as self.last_image_left and self.last_image_right.

slop (Optional[int]): Maximum time in seconds between left and right

images to be considered synced. If left as None, will only consider synced if left and right images have exact same header time.

encoding (str): String to pass to CvBridge to encode ROS image message

to numpy array

queue_size (int): Integer, the number of images to store in a buffer

for each camera to find synced images.

wait_for_camera_info(timeout: int = 10, unregister: bool = True) Tuple[CameraInfo, CameraInfo][source]

Blocks until camera info has been received.

Parameters
  • timeout (int) – Time in seconds to wait before throwing exception if camera info is not received

  • unregister (bool) – Whether to unsubscribe from the camera info topics after receiving the first message so that camera_info_left and camera_info_right will not be updated.

Returns

A tuple containing the camera info for the left and right cameras, respectively.

Return type

Tuple[CameraInfo, CameraInfo]

Raises

Exception – if camera info for both cameras is not received within timeout

Plotter

Attributes
class mil_tools.Plotter(topic_name: str, width: int = 20, height: int = 20, dpi: int = 150)[source]

Publishes several plots of 2D data over rostopic via Image messages.

Basic Usage Example:

>>> #  Import
>>> from mil_ros_tools import Plotter
>>> import numpy as np
>>> import rospy
>>> #  ROS node
>>> rospy.init_node('my_node')
>>> #  Create
>>> my_plotter = Plotter('my_plotter_topic')
>>> #  Publish some plots
>>> titles = ['random data', 'sin wave']
>>> data_size = 100
>>> y1 = np.random.rand(data_size)
>>> x1 = np.arange(0, y1.shape[0])
>>> x2 = np.linspace(0, 5, data_size)
>>> y2 = np.sin(x2)
>>> plots = np.vstack((x1, y1, x2, y2))
>>> my_plotter.publish_plots(plots, titles)

For another usage example see mil_passive_sonar triggering.

pub

The ROS publisher node. The topic name is passed in upon construction and the queue size is 1.

Type

rospy.Publisher

thread

The thread used to publish data and plots on.

Type

threading.Thread

enable_disable(req: SetBoolRequest) SetBoolResponse[source]

Serves as a callback for the service responsible for enabling and disabling the plotter.

Parameters

req (SetBoolRequest) – The message received by the service.

Returns

The response sent back by the service.

Return type

SetBoolResponse

is_go() bool[source]

Whether to run the plotter. True if the plotter is enabled, there is more than one connection connected to the publisher and the thread is None.

Returns

Whether to run the plotter.

Return type

bool

publish_plots(plots: np.ndarray, titles: list[str] | None = None, v_lines: list | None = None)[source]

Starts as new thread to publish the data on a plot.