Source code for subjugator_gazebo_tools.bag_manager

import os
import time

import rosbag
import rospy
import yaml


[docs]class BagManager: def __init__(self, nh, diag_dir): self.nh = nh self.diag_dir = diag_dir self.make_dict() self.buffer_array_size = int(self.pre_cache_time / self.time_step) self.cache_index = 0 self.cache = [None] * self.buffer_array_size self.dumping = False
[docs] async def start_caching(self): """ Caches bags using big dicts. .. code-block:: python3 {topic_name_1: subscriber_1, topic_name_2: subscriber_2, ...} Every 'time_step' seconds it will save all of the subscribers' most recent message. This will continue and will fill a cache array that will store 'pre_cache_time' seconds worth of messages before erasing the oldest messages. """ # Used to avoid adding copies of the same message. last_timestamps = dict(self.cache_dict) while True: await self.nh.sleep(self.time_step) if self.dumping: await self.nh.sleep(1) # Add to cache msgs = [] for key in self.cache_dict: msg = self.cache_dict[key].get_last_message() msg_time = await self.cache_dict[key].get_last_message_time() if msg is not None: if (self.nh.get_time() > rospy.Time.from_sec(30)) and ( msg_time < self.nh.get_time() - rospy.Duration(30) ): # This fixes a negative time exception if we are within the first 30 seconds of time. continue if msg_time == last_timestamps[key]: continue last_timestamps[key] = msg_time msgs.append([key, (await msg), (await msg_time)]) else: print(f"There's a problem caching {key}") self.write_to_cache(msgs) self.dump()
[docs] async def make_dict(self): """ Generate caching dictionary from a yaml file. """ with open(self.diag_dir + "messages_to_bag.yaml") as f: messages_to_bag = yaml.safe_load(f) # Set bagging parameters self.time_step = messages_to_bag["PARAMS"]["time_step"] self.pre_cache_time = messages_to_bag["PARAMS"]["pre_cache_time"] self.post_cache_time = messages_to_bag["PARAMS"]["post_cache_time"] self.cache_dict = {} msgs = messages_to_bag["MESSAGES"] for msg in msgs.values(): # Get message information msg_topic = msg["message_topic"] msg_type = msg["message_type"] msg_name = msg["message_name"] # Import the message exec(f"from {msg_type}.msg import {msg_name}") # Create subscriber and add to dictionary self.cache_dict[msg_topic] = self.nh.subscribe(msg_topic, eval(msg_name)) await self.cache_dict[msg_topic]
[docs] async def dump(self): """ Save cached bags and save the next 'post_cache_time' seconds. """ self.dumping = True # Make bag directory if it doesn't exist directory = os.path.join(self.diag_dir, "bags") if not os.path.exists(directory): os.makedirs(directory) bag = rosbag.Bag(os.path.join(directory, str(int(time.time())) + ".bag"), "w") gen = self.read_from_cache() last_timestamps = dict(self.cache_dict) print("Saving post-fail data. Do not exit.") for i in range(int(self.post_cache_time / self.time_step)): for key in self.cache_dict: msg = self.cache_dict[key].get_last_message() msg_time = self.cache_dict[key].get_last_message_time() if msg is not None: if msg_time < self.nh.get_time() - rospy.Duration(0.5): # If the message is too old, forget about it. continue # Lang -- this doesn't work if msg_time == last_timestamps[key]: continue # last_timestamps[key] = msg_time bag.write(key, (await msg), t=(await msg_time)) # msgs.append([key, (await msg), (await msg_time)]) else: print(f"There's a problem recording {key}") await self.nh.sleep(self.time_step) print("Saving pre-fail data. Do not exit.") # We've written the post crash stuff now let's write the pre-crash data. for i in range(self.buffer_array_size): msgs = next(gen) if msgs is not None: for msg in msgs: bag.write(msg[0], msg[1], t=msg[2]) bag.close() self.dumping = False
def write_to_cache(self, data): self.cache[self.cache_index] = data self.cache_index += 1 if self.cache_index >= self.buffer_array_size: self.cache_index = 0 def read_from_cache(self): index = self.cache_index while True: yield self.cache[index] index += 1 if index >= self.buffer_array_size: index = 0