Source code for nodes.clicked_point_recorder

#!/usr/bin/env python3

import csv
import datetime
from typing import Dict, Union

import rospy
from geometry_msgs.msg import PointStamped


[docs]class ClickedPointRecorder: """ Listens to RVIZ clicked points, storing points in a CSV file. Support for running through `rosrun`. """ def __init__(self): self.points = [] self.point_sub = rospy.Subscriber("/clicked_point", PointStamped, self.point_cb)
[docs] def point_to_dict(self, point: PointStamped) -> Dict[str, Union[str, int, float]]: """ Returns a dictionary representing a point, where the values for the dict are obtained from the PointStamped message accepted. Args: point (PointStamped): The point message to accept. Returns: Dict[str, Union[str, int, float]]: Dictionary representing point and stamp. """ return { "seq": point.header.seq, "secs": point.header.stamp.secs, "nsecs": point.header.stamp.nsecs, "frame_id": point.header.frame_id, "x": point.point.x, "y": point.point.y, "z": point.point.z, }
[docs] def write_file(self) -> None: """ Writes the points to a CSV file. """ time = datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S") filename = f"clickedpoints{time}.csv" with open(filename, "wx") as csvfile: fieldnames = ["seq", "secs", "nsecs", "frame_id", "x", "y", "z"] writer = csv.DictWriter(csvfile, fieldnames=fieldnames) writer.writeheader() for p in self.points: d = self.point_to_dict(p) writer.writerow(d) rospy.loginfo(f"Writing points to {filename}")
[docs] def point_cb(self, point: PointStamped) -> None: """ Serves as a callback to the point subscriber. Accepts PointStamped messages and adds the messages to the internal points array. Args: point (PointStamped): The message input to the callback. """ rospy.loginfo(f"Received new point: {point}") self.points.append(point)
if __name__ == "__main__": rospy.init_node("clicked_point_recorder") recorder = ClickedPointRecorder() def shutdown_cb(): recorder.write_file() rospy.on_shutdown(shutdown_cb) rospy.spin()