Source code for subjugator_vision_tools.marker_occ_grid

#!/usr/bin/env python3

import cv2
import numpy as np
import rospy
import tf
from geometry_msgs.msg import Point, Pose, Pose2D, Quaternion
from image_geometry import PinholeCameraModel
from mil_ros_tools import msg_helpers
from nav_msgs.msg import MapMetaData, OccupancyGrid
from std_msgs.msg import Header
from std_srvs.srv import Empty, EmptyResponse
from subjugator_msgs.srv import SearchPose


def unit_vector(vect):
    return vect / np.linalg.norm(vect)


def make_2D_rotation(angle):
    c, s = np.cos(angle), np.sin(angle)
    return np.array([[c, -s], [s, c]], dtype=np.float32)


[docs]class OccGridUtils: """ Contains functions for dealing with occupancy grids as well as storing and publishing them. All distance measured in meters. """ def __init__( self, res, width, height, starting_pose, topic_name: str = "/search_grid", ): self.meta_data = MapMetaData() # Resolution is m/cell. Width is X, height is Y. self.meta_data.resolution = res # rospy.get_param("map_resolution") self.meta_data.width = width # rospy.get_param("map_width") self.meta_data.height = height # rospy.get_param("map_height") # Starting Position self.mid_x = -starting_pose.x self.mid_y = -starting_pose.y p = Point(x=-starting_pose.x * res, y=-starting_pose.y * res, z=0) q = Quaternion( *tf.transformations.quaternion_from_euler(0, 0, starting_pose.theta), ) self.meta_data.origin = Pose(position=p, orientation=q) rospy.Service("/reset_occ_grid", Empty, self.reset_grid) # Create array of -1's of the correct size self.occ_grid = np.zeros((self.meta_data.height, self.meta_data.width)) - 1 self.searched = np.zeros((self.meta_data.height, self.meta_data.width)) self.markers = np.zeros((self.meta_data.height, self.meta_data.width)) self.occ_grid_pub = rospy.Publisher(topic_name, OccupancyGrid, queue_size=1) self.searcher = Searcher(self.searched, res, [self.mid_x, self.mid_y])
[docs] def add_circle(self, center, radius): """ Adds a circle to the grid. Also used to project the camera's view onto the grid because it is rotationally intolerant. """ center_offset = np.array(center) / self.meta_data.resolution - np.array( [self.mid_x, self.mid_y], ) try: radius = int(radius / self.meta_data.resolution) except Exception: radius = 0 cv2.circle(self.searched, tuple(center_offset.astype(np.int32)), radius, 1, -1)
[docs] def found_marker(self, pose_2d): """ Used to mark found markers. It doesn't matter that this isn't perfect, we just need to know that something is there. """ TRENCH_LENGTH = 1.22 / self.meta_data.resolution # cells (4 ft) TRENCH_WIDTH = 0.1524 / self.meta_data.resolution # cells (6 inches) center = np.array( [pose_2d.x, pose_2d.y], ) # The negative all depends on how the center is returned rotation = -pose_2d.theta center_offset = center / self.meta_data.resolution - np.array( [self.mid_x, self.mid_y], ) rot_top_point = np.dot( np.array([TRENCH_LENGTH, 0]) / 2, make_2D_rotation(rotation), ).astype(np.int32) rot_bottom_point = np.dot( -np.array([TRENCH_LENGTH, 0]) / 2, make_2D_rotation(rotation), ).astype(np.int32) pos_top_point = np.int0(rot_top_point + center_offset) pos_bottom_point = np.int0(rot_bottom_point + center_offset) cv2.line( self.markers, tuple(pos_top_point), tuple(pos_bottom_point), 101, int(TRENCH_WIDTH), )
[docs] def publish_grid(self): """ Take the occupancy grid and send it out over ros with timestamps and whatnot. """ t = rospy.Time.now() header = Header(stamp=t, frame_id="map") # Populate occ grid msg occ_msg = OccupancyGrid() occ_msg.header = header occ_msg.info = self.meta_data # Make sure values don't go out of range occ_grid = self.searched + self.markers - 1 occ_msg.data = np.clip(occ_grid.flatten(), -1, 100) self.occ_grid_pub.publish(occ_msg)
[docs] def reset_grid(self, srv): """ Resets occupancy grid. I'm using a random service since I don't really care about getting or returning information here. """ # Create array of -1's of the correct size rospy.loginfo("Resetting Grid.") self.occ_grid = np.zeros((self.meta_data.height, self.meta_data.width)) - 1 self.searched = np.zeros((self.meta_data.height, self.meta_data.width)) self.markers = np.zeros((self.meta_data.height, self.meta_data.width)) return EmptyResponse()
[docs]class Searcher: """ Intended to provide a service that will return a pose to go to in order to search for a missing marker. Not sure how this will be implemented in its entirety. """ def __init__(self, searched_area, grid_res, position_offset): self.searched_area = searched_area self.grid_res = grid_res self.position_offset = position_offset self.poly_generator = self.polygon_generator() self.max_searches = 12 self.current_search = 0 rospy.Service("/next_search_pose", SearchPose, self.return_pose)
[docs] def return_pose(self, srv): """ Returns the next pose to go to in order to try and find the marker. Only searches in a circle around our current location (could be extended to searching farther if we need to). This will skip points that have already been searched. """ if srv.reset_search: self.current_search = 0 self.poly_generator = self.polygon_generator() return [0, False, srv.intial_position] # We search at 1.5 * r so that there is some overlay in the search fields. np_pose = msg_helpers.pose_to_numpy(srv.intial_position) rot_mat = make_2D_rotation( tf.transformations.euler_from_quaternion(np_pose[1])[2], ) coor = ( np.append( np.dot(rot_mat, next(self.poly_generator)) * srv.search_radius * 1.75, 0, ) + np_pose[0] ) # if self.current_search > self.max_searches: # rospy.logwarn("Searched {} times. Marker not found.".format(self.max_searches)) # return [0, False, srv.intial_position] self.current_search += 1 rospy.loginfo(f"Search number: {self.current_search}") # Should be variable based on height maybe? pose = msg_helpers.numpy_quat_pair_to_pose(coor, np.array([0, 0, 0, 1])) return [self.check_searched(coor[:2], srv.search_radius), True, pose]
[docs] def check_searched(self, search_center, search_radius): """ Mask out a circle of the searched area around the point, then find the area of the grid in that mask. If the area is above some threshold assume we have searched this area and do not need to search it again. """ center_offset = np.array(search_center) / self.grid_res - self.position_offset # Create mask circle_mask = np.zeros(self.searched_area.shape) radius = int(search_radius / self.grid_res) cv2.circle(circle_mask, tuple(center_offset.astype(np.int32)), radius, 1, -1) masked_search = self.searched_area * circle_mask # If there has already been a marker found on the grid in our search area go there. if np.max(masked_search) > 5: return 0 # If the center is off of the grid, skip that spot. if len(circle_mask[circle_mask > 0.5]) / (np.pi * radius**2) < 0.3: return 1 return len(masked_search[masked_search > 0.5]) / len( circle_mask[circle_mask > 0.5], )
[docs] def polygon_generator(self, n=12): """ Using a generator here to allow for easy expansion in the future. """ # TODO: Add difference search patterns. theta = np.radians(360 / n) rot_mat = make_2D_rotation(theta) # Start by going directly right point = np.array([0, -1]) yield point count = 0 while True: point = np.dot(rot_mat, point) if count == n - 1: point[1] -= 1 count = 0 else: count += 1 yield point
class MarkerOccGrid(OccGridUtils): """ Handles updating occupancy grid when new data comes in. TODO: Upon call can return some path to go to in order to find them. """ def __init__( self, image_sub, grid_res, grid_width, grid_height, grid_starting_pose, ): super(self.__class__, self).__init__( res=grid_res, width=grid_width, height=grid_height, starting_pose=grid_starting_pose, ) self.tf_listener = tf.TransformListener() self.cam = PinholeCameraModel() self.camera_info = image_sub.wait_for_camera_info() if self.camera_info is None: # Maybe raise an alarm here. rospy.logerr("I don't know what to do without my camera info.") self.cam.fromCameraInfo(self.camera_info) def update_grid(self, timestamp): """ Takes marker information to update occupacy grid. """ x_y_position, height = self.get_tf(timestamp) self.add_circle(x_y_position, self.calculate_visual_radius(height)) self.publish_grid() def add_marker(self, marker, timestamp): """ Find the actual 3d pose of the marker and fill in the occupancy grid for that pose. This works by: 1. Calculate unit vector between marker point and the image center in the image frame. 2. Use height measurement to find real life distance (m) between center point and marker center. 3. Use unit vec and magnitude to find dx and dy in meters. 3. Pass info to OccGridUtils. """ if marker[0] is None: return x_y_position, height = self.get_tf(timestamp) if marker[2] < self.calculate_marker_area(height) * 0.6: # If the detected region isn't big enough dont add it. rospy.logwarn("Marker found but it is too small, not adding marker.") return # Calculate position of marker accounting for camera rotation. dir_vector = unit_vector(np.array([self.cam.cx(), self.cam.cy()] - marker[0])) trans, rot = self.tf_listener.lookupTransform("map", "/downward", timestamp) cam_rotation = tf.transformations.euler_from_quaternion(rot)[2] + np.pi / 2 dir_vector = np.dot(dir_vector, make_2D_rotation(cam_rotation)) marker_rotation = cam_rotation + marker[1] trans, rot = self.tf_listener.lookupTransform("map", "/base_link", timestamp) if (np.abs(np.array(rot)[:2]) > 0.005).any(): rospy.logwarn("We're at a weird angle, not adding marker.") magnitude = self.calculate_visual_radius(height, second_point=marker[0]) local_position = dir_vector[::-1] * magnitude position = local_position + x_y_position # print local_position # Pose on ground plane from center pose = Pose2D(x=position[0], y=position[1], theta=marker_rotation) self.found_marker(pose) # The image coordinate pose and real life pose are different. pose = Pose2D(x=position[0], y=position[1], theta=marker_rotation) # In meters with initial point at (0,0) return pose def get_tf(self, timestamp=None): """ x_y position and height in meters """ if timestamp is None: timestamp = rospy.Time() self.tf_listener.waitForTransform( "map", "/downward", timestamp, rospy.Duration(5.0), ) trans, rot = self.tf_listener.lookupTransform("map", "/downward", timestamp) x_y_position = trans[:2] self.tf_listener.waitForTransform( "/ground", "/downward", timestamp, rospy.Duration(5.0), ) trans, _ = self.tf_listener.lookupTransform("/ground", "/downward", timestamp) height = np.nan_to_num(trans[2]) return x_y_position, height def correct_height(self, measured_height, timestamp): """ Adjust the measured height from the seafloor using our orientation relative to it. We assume the floor is flat (should be valid for transdec but maybe not for pool). All the math is just solving triangles. Note: If the roll or pitch is too far off, the range could be to a point that is not planar to the floor directly under us - in which case this will fail. TODO: See if this actually can produce better results. """ trans, rot = self.tf_listener.lookupTransform("map", "/base_link", timestamp) euler_rotation = tf.transformations.euler_from_quaternion(rot) roll_offset = np.abs(np.sin(euler_rotation[0]) * measured_height) pitch_offset = np.abs(np.sin(euler_rotation[1]) * measured_height) height = np.sqrt(measured_height**2 - (roll_offset**2 + pitch_offset**2)) return height def calculate_marker_area(self, height): """ Estimate what the area of the marker should be so we don't add incorrect markers to the occupancy grid. What we really don't want is to add markers to the grid that are on the edge since the direction and center of the marker are off. """ MARKER_LENGTH = 1.22 # m MARKER_WIDTH = 0.1524 # m # Get m/px on the ground floor. m = self.calculate_visual_radius(height) px = self.cam.cy() if self.cam.cy() < self.cam.cx() else self.cam.cx() m_px = m / px marker_area_m = MARKER_WIDTH * MARKER_LENGTH marker_area_px = marker_area_m / (m_px**2) return marker_area_px def calculate_visual_radius(self, height, second_point=None): """ Draws rays to find the radius of the FOV of the camera in meters. It also can work to find the distance between two planar points some distance from the camera. """ mid_ray = np.array([0, 0, 1]) if second_point is None: if self.cam.cy() < self.cam.cx(): second_point = np.array([self.cam.cx(), 0]) else: second_point = np.array([0, self.cam.cy()]) edge_ray = unit_vector(self.cam.projectPixelTo3dRay(second_point)) # Calculate angle between vectors and use that to find r theta = np.arccos(np.dot(mid_ray, edge_ray)) return np.tan(theta) * height if __name__ == "__main__": rospy.init_node("searcher") tr = MarkerOccGrid( res=0.1, width=100, height=500, starting_pose=Pose2D(x=50, y=50, theta=0), ) rospy.spin()