Source code for mil_ros_tools.bag_crawler

#!/usr/bin/env python3
This file wis written by the team at UF MIL for the 2016 robosub competition.
from typing import List, Optional

import cv2
import rosbag
import tqdm
from cv_bridge import CvBridge
from sensor_msgs.msg import Image

[docs]class BagCrawler: """ Crawls a bag file and displays each image message of a camera through OpenCV. Attributes: bag_path (str): The path to the bag file. bag (rosbag.Bag): The bag object representing a bag file at the file location. """ def __init__(self, bag_path: str): self.bag_path = bag_path self.bag = rosbag.Bag(self.bag_path) self.bridge = CvBridge()
[docs] def convert(self, msg: Image): """ Converts an Image message to a cv2 compatible image. Args: msg (sensor_msgs.msg.Image): The Image message. """ img = self.bridge.imgmsg_to_cv2(msg, desired_encoding="bgr8") return img
[docs] def crawl( self, topic: Optional[str] = None, is_image: bool = False, max_msgs: float = float("inf"), ): """ Crawls a bag file and returns a generator yielding each message in the file. Additionally, a tqdm progress bar is displayed. Args: topic (Optional[str]): ??? is_image (bool): ??? max_msgs (float): The maximum number of messages to return. Yields: genpy.Message: A message in the bag file. """ num_seen = 0 num_msgs = 0 progress_bar = tqdm.tqdm(total=self.bag.get_message_count()) if is_image: topic = self.image_topics[0] for msg_topic, msg, _ in self.bag.read_messages(): progress_bar.update(num_msgs) num_msgs += 1 if msg_topic != topic: continue if num_seen > max_msgs: break num_seen += 1 yield msg progress_bar.close()
@property def image_topics(self, cam: str = "right") -> List[str]: """ .. warning:: This function may not function properly. If the method works, please remove this message. Finds all topics that use the sensor_msgs/Image type and exist on a specific camera. Args: cam (str): The camera to find image topics on. Should either be 'left' or 'right'. """ all_topics = list(self.bag.get_type_and_topic_info()[1].keys()) all_types = list(self.bag.get_type_and_topic_info()[1].values()) topics = [ all_topics[k] for k, _ in enumerate(all_topics) if (all_types[k][0] == "sensor_msgs/Image") ] if cam == "right": topics = [topics[i] for i, t in enumerate(topics) if "right" in t] if cam == "left": topics = [topics[i] for i, t in enumerate(topics) if "left" in t] return topics @property def image_info_topics(self, cam: str = "right") -> List[str]: """ .. warning:: This function may not function properly. If the method works, please remove this message. Finds all topics that use the sensor_msgs/CameraInfo type and exist on a specific camera. Args: cam (str): The camera to find image topics on. Should either be 'left' or 'right'. """ all_topics = list(self.bag.get_type_and_topic_info()[1].keys()) all_types = list(self.bag.get_type_and_topic_info()[1].values()) topics = [ all_topics[k] for k, _ in enumerate(all_topics) if (all_types[k][0] == "sensor_msgs/CameraInfo") ] if cam == "right": topics = [topics[i] for i, t in enumerate(topics) if "right" in t] if cam == "left": topics = [topics[i] for i, t in enumerate(topics) if "left" in t] return topics
if __name__ == "__main__": bag = "test.bag" bc = BagCrawler(bag) for image in bc.crawl(topic=bc.image_topics[0]): cv2.imshow("current_image", image) cv2.waitKey(3)