Source code for mil_ros_tools.bag_crawler

#!/usr/bin/env python3
"""
This file wis written by the team at UF MIL for the 2016 robosub competition.
"""
from typing import List, Optional

import cv2
import rosbag
import tqdm
from cv_bridge import CvBridge
from sensor_msgs.msg import Image


[docs]class BagCrawler: """ Crawls a bag file and displays each image message of a camera through OpenCV. Attributes: bag_path (str): The path to the bag file. bag (rosbag.Bag): The bag object representing a bag file at the file location. """ def __init__(self, bag_path: str): self.bag_path = bag_path self.bag = rosbag.Bag(self.bag_path) self.bridge = CvBridge()
[docs] def convert(self, msg: Image): """ Converts an Image message to a cv2 compatible image. Args: msg (sensor_msgs.msg.Image): The Image message. """ img = self.bridge.imgmsg_to_cv2(msg, desired_encoding="bgr8") return img
[docs] def crawl( self, topic: Optional[str] = None, is_image: bool = False, max_msgs: float = float("inf"), ): """ Crawls a bag file and returns a generator yielding each message in the file. Additionally, a tqdm progress bar is displayed. Args: topic (Optional[str]): ??? is_image (bool): ??? max_msgs (float): The maximum number of messages to return. Yields: genpy.Message: A message in the bag file. """ num_seen = 0 num_msgs = 0 progress_bar = tqdm.tqdm(total=self.bag.get_message_count()) if is_image: topic = self.image_topics[0] for msg_topic, msg, _ in self.bag.read_messages(): progress_bar.update(num_msgs) num_msgs += 1 if msg_topic != topic: continue if num_seen > max_msgs: break num_seen += 1 yield msg progress_bar.close()
@property def image_topics(self, cam: str = "right") -> List[str]: """ .. warning:: This function may not function properly. If the method works, please remove this message. Finds all topics that use the sensor_msgs/Image type and exist on a specific camera. Args: cam (str): The camera to find image topics on. Should either be 'left' or 'right'. """ all_topics = list(self.bag.get_type_and_topic_info()[1].keys()) all_types = list(self.bag.get_type_and_topic_info()[1].values()) topics = [ all_topics[k] for k, _ in enumerate(all_topics) if (all_types[k][0] == "sensor_msgs/Image") ] if cam == "right": topics = [topics[i] for i, t in enumerate(topics) if "right" in t] if cam == "left": topics = [topics[i] for i, t in enumerate(topics) if "left" in t] return topics @property def image_info_topics(self, cam: str = "right") -> List[str]: """ .. warning:: This function may not function properly. If the method works, please remove this message. Finds all topics that use the sensor_msgs/CameraInfo type and exist on a specific camera. Args: cam (str): The camera to find image topics on. Should either be 'left' or 'right'. """ all_topics = list(self.bag.get_type_and_topic_info()[1].keys()) all_types = list(self.bag.get_type_and_topic_info()[1].values()) topics = [ all_topics[k] for k, _ in enumerate(all_topics) if (all_types[k][0] == "sensor_msgs/CameraInfo") ] if cam == "right": topics = [topics[i] for i, t in enumerate(topics) if "right" in t] if cam == "left": topics = [topics[i] for i, t in enumerate(topics) if "left" in t] return topics
if __name__ == "__main__": bag = "test.bag" bc = BagCrawler(bag) for image in bc.crawl(topic=bc.image_topics[0]): cv2.imshow("current_image", image) cv2.waitKey(3)