#!/usr/bin/env python3
from types import ModuleType
from typing import List, Optional
import actionlib
import cv2 # for occupancy grid analysis
import numpy as np
import numpy.linalg as npl
import rospy
# Check scipy version for assume_sorted argument in interp1d
import scipy.interpolate
import tf.transformations as trns
from geometry_msgs.msg import (
Point32,
PointStamped,
PolygonStamped,
Pose,
PoseArray,
PoseStamped,
WrenchStamped,
)
from mil_tools import numpy_to_quaternion
from nav_msgs.msg import OccupancyGrid, Odometry
from navigator_path_planner import boat, car, escape, params
from navigator_path_planner.msg import MoveAction, MoveFeedback, MoveGoal, MoveResult
if int(scipy.__version__.split(".")[1]) < 16:
def interp1d(*args, **kwargs):
kwargs.pop("assume_sorted", None)
return scipy.interpolate.interp1d(*args, **kwargs)
else:
interp1d = scipy.interpolate.interp1d
# INITIALIZATIONS
[docs]class LQRRT_Node:
"""
Example of a ROS node that uses lqRRT for a big boat.
This node subscribes to the current boat state (Odometry message),
and the world-frame ogrid (OccupancyGrid message). It publishes the
REFerence trajectory that moves to the goal, as an Odometry message.
It provides an action for moving said reference to that goal (Move.action).
Attributes:
revisit_period (float): When to refresh the core of the planner.
ref_pub (rospy.Publisher): The publisher serving the lqRRT reference
topic name.
path_pub (rospy.Publisher): Publisher responsible for publishing a PoseArray
to the provided path topic.
tree_pub (rospy.Publisher): Publisher responsible for publishing a PoseArray
to the provided tree topic.
unreachable (bool): Whether the planner sees a goal as being unreachable.
Defaults to ``false``.
done (bool): Whether the planner has finished its movement. Defaults to ``false``.
move_type (MoveAction): How the boat is planning to move towards its goal.
"""
def __init__(
self,
odom_topic: str,
ref_topic: str,
move_topic: str,
path_topic: str,
tree_topic: str,
goal_topic: str,
focus_topic: str,
effort_topic: str,
ogrid_topic: str,
ogrid_threshold: str,
):
"""
Initialize with topic names and ogrid threshold as applicable.
Defaults are generated at the ROS params level.
Args:
odom_topic (str): The name of the topic supplying Odometry
messages.
ref_topic (str): The lqRRT reference topic name.
move_topic (str): The name of the topic hosting the SimpleActionServer
and topic of where to move to.
path_topic (str): The name of the lqRRT path topic name.
tree_topic (str): The name of the topic holding the lqRRT tree.
goal_topic (str): The name of the topic holding the lqRRT goal.
focus_topic (str): The name of the topic holding the lqRRT focus.
effort_topic (str): The name of the topic hosting the lqRRT effort
wrenches.
ogrid_topic (str): The name of the topic holding the occupancy grid.
ogrid_threshold (str): An occupancy grid cell which exceeds this
value will be marked as filled. The value will be converted to
a float in the class.
"""
# One-time initializations
self.revisit_period = 0.05 # s
self.ogrid = None
self.ogrid_threshold = float(ogrid_threshold)
self.state = None
self.tracking = None
self.done = True
# Lil helpers
self.rostime = lambda: rospy.Time.now().to_sec()
self.intup = lambda arr: tuple(np.array(arr, dtype=np.int64))
self.get_hood = lambda img, row, col: img[row - 1 : row + 2, col - 1 : col + 2]
# Set-up planners
self.behaviors_list = [car, boat, escape]
for behavior in self.behaviors_list:
behavior.planner.set_system(erf=self.erf)
behavior.planner.set_runtime(sys_time=self.rostime)
behavior.planner.constraints.set_feasibility_function(self.is_feasible)
# Initialize resettable stuff
self.reset()
# Subscribers
rospy.Subscriber(odom_topic, Odometry, self.odom_cb)
rospy.Subscriber(ogrid_topic, OccupancyGrid, self.ogrid_cb)
rospy.sleep(0.5)
# Publishers
self.ref_pub = rospy.Publisher(ref_topic, Odometry, queue_size=1)
self.path_pub = rospy.Publisher(path_topic, PoseArray, queue_size=3)
self.tree_pub = rospy.Publisher(tree_topic, PoseArray, queue_size=3)
self.goal_pub = rospy.Publisher(goal_topic, PoseStamped, queue_size=3)
self.focus_pub = rospy.Publisher(focus_topic, PointStamped, queue_size=3)
self.eff_pub = rospy.Publisher(effort_topic, WrenchStamped, queue_size=3)
self.sampspace_pub = rospy.Publisher(
sampspace_topic,
PolygonStamped,
queue_size=3,
)
self.guide_pub = rospy.Publisher(guide_topic, PointStamped, queue_size=3)
# Actions
self.move_server = actionlib.SimpleActionServer(
move_topic,
MoveAction,
execute_cb=self.move_cb,
auto_start=False,
)
self.move_server.start()
rospy.sleep(1)
# Timers
rospy.Timer(rospy.Duration(self.revisit_period), self.publish_ref)
rospy.Timer(rospy.Duration(self.revisit_period), self.action_check)
[docs] def reset(self) -> None:
"""
Resets variables that should definitely be cleared before a new action begins.
"""
# Internal plan
self.goal = None
self.get_ref = None
self.get_eff = None
self.x_seq = None
self.u_seq = None
self.tree = None
self.dt = params.dt
# Behavior control
self.move_type = None
self.behavior = None
self.enroute_behavior = None
self.goal_bias = None
self.sample_space = None
self.guide = None
self.speed_factor = np.array(1)
# Planning control
self.last_update_time = None
self.next_runtime = None
self.next_seed = None
self.move_count = 0
self.initial_plan_time = params.basic_duration
self.blind = False
# Issue control
self.stuck = False
self.stuck_count = 0
self.fail_count = 0
self.time_till_issue = None
self.failure_reason = ""
self.preempted = False
self.unreachable = False
self.collided = False
# Unkill all planners
self.lock_tree = False
for behavior in self.behaviors_list:
behavior.planner.unkill()
# ACTION
[docs] def move_cb(self, msg: MoveAction) -> Optional[bool]:
"""
Callback for the move action.
Args:
msg (MoveAction): The action passed by the action client.
Returns:
Optional[bool]: ???
"""
# Start clean
self.done = False
self.reset()
print("=" * 50)
# Make sure odom is publishing (well, at least once)
if self.state is None:
print("Cannot plan until odom is received!\n")
self.move_server.set_aborted(MoveResult("odom"))
self.done = True
return False
# Give desired pose to everyone who needs it
self.set_goal(self.unpack_pose(msg.goal))
# Check given move_type
if msg.move_type in [
MoveGoal.HOLD,
MoveGoal.DRIVE,
MoveGoal.DRIVE_SMOOTH,
MoveGoal.SKID,
MoveGoal.SPIRAL,
MoveGoal.BYPASS,
]:
if msg.blind:
self.blind = True
print(f"Preparing: blind {msg.move_type}")
else:
print(f"Preparing: {msg.move_type}")
self.move_type = msg.move_type
else:
print(f"Unsupported move_type: '{msg.move_type}'\n")
self.move_server.set_aborted(MoveResult("move_type"))
self.done = True
return False
# Station keeping move
if self.move_type == MoveGoal.HOLD:
self.set_goal(self.state)
self.last_update_time = self.rostime()
remain = np.copy(self.goal)
self.get_ref = lambda t: remain
print("\nDone!\n")
self.move_server.set_succeeded(MoveResult(self.failure_reason))
self.done = True
return True
# Bypass move
if self.move_type == MoveGoal.BYPASS:
if self.is_feasible(self.goal, np.zeros(3)) or self.blind:
self.last_update_time = self.rostime()
remain = np.copy(self.goal)
self.get_ref = lambda t: remain
print("\nDone!\n")
self.move_server.set_succeeded(MoveResult(self.failure_reason))
self.done = True
return True
else:
print("\nGoal infeasible! Bypass move rejected.\n")
self.move_server.set_aborted(MoveResult("occupied"))
self.done = True
return False
# Make sure we are not already in a collided state
# if not self.is_feasible(self.state, np.zeros(3)) and not self.blind:
# print("\nCan't move. Already collided.\n")
# self.move_server.set_aborted(MoveResult('collided'))
# self.done = True
# return False
# Check given focus
if self.move_type == MoveGoal.SKID:
if msg.focus.z == 0:
boat.focus = None
self.focus_pub.publish(
self.pack_pointstamped([1e6, 1e6, 1e6], rospy.Time.now()),
)
else:
boat.focus = np.array([msg.focus.x, msg.focus.y, 0])
focus_vec = boat.focus[:2] - self.goal[:2]
focus_goal = np.copy(self.goal)
focus_goal[2] = np.arctan2(focus_vec[1], focus_vec[0])
self.set_goal(focus_goal)
self.focus_pub.publish(
self.pack_pointstamped(boat.focus, rospy.Time.now()),
)
print(f"Focused on: {boat.focus[:2]}")
elif self.move_type == MoveGoal.SPIRAL:
boat.focus = np.array([msg.focus.x, msg.focus.y, msg.focus.z])
self.focus_pub.publish(self.pack_pointstamped(boat.focus, rospy.Time.now()))
if boat.focus[2] >= 0:
print(f"Focused on: {boat.focus[:2]}, counterclockwise")
if boat.focus[2] == 0:
boat.focus[2] = 1
else:
print(f"Focused on: {boat.focus[:2]}, clockwise")
else:
boat.focus = None
self.focus_pub.publish(
self.pack_pointstamped([1e6, 1e6, 1e6], rospy.Time.now()),
)
# Store the initial planning time, if specified
if msg.initial_plan_time > 0:
print(f"Initial plan time: {msg.initial_plan_time}")
self.initial_plan_time = msg.initial_plan_time
# Apply the speed factor, if specified
msg.speed_factor = np.array(msg.speed_factor).flatten()
if len(msg.speed_factor) == 0:
pass
elif len(msg.speed_factor) not in [1, 3]:
print(
"(WARNING: expected speed_factor to be a scalar or in the form [x, y, h])",
)
else:
for i, sf in enumerate(msg.speed_factor):
if np.isclose(sf, 0) or sf < 0:
msg.speed_factor[i] = 1
if not np.all(msg.speed_factor == 1):
self.speed_factor = np.copy(msg.speed_factor)
print(f"Speed_factor: {self.speed_factor}")
if np.any(self.speed_factor < 1) and self.dt > 0.05:
self.dt *= np.min(self.speed_factor)
print(f"(using smaller timestep: {self.dt} s)")
for behavior in self.behaviors_list:
behavior.planner.dt = self.dt
behavior.velmax_pos = self.speed_factor * params.velmax_pos
behavior.velmax_neg = self.speed_factor * params.velmax_neg
behavior.D_pos = params.D_pos / self.speed_factor
behavior.D_neg = params.D_neg / self.speed_factor
# Spiraling
if self.move_type == MoveGoal.SPIRAL:
if np.isclose(npl.norm(boat.focus[:2] - self.state[:2]), 0):
print("\nCan't perform a spiral of zero initial radius! Sorry.")
print("\nTerminated.\n")
self.move_server.set_aborted(MoveResult("degen"))
self.done = True
return False
spr_results = self.spiral_move(
c=boat.focus[:2],
x=self.state,
N=abs(boat.focus[2]),
direc=np.sign(boat.focus[2]),
mpr=msg.goal.position.z / (2 * np.pi),
)
x_seq_spr, T_spr, spr_success = spr_results
if len(x_seq_spr) and spr_success:
self.x_seq = x_seq_spr
self.set_goal(x_seq_spr[-1][:])
self.get_ref = interp1d(
np.arange(len(x_seq_spr)) * self.dt,
np.array(x_seq_spr),
axis=0,
assume_sorted=True,
bounds_error=False,
fill_value=x_seq_spr[-1][:],
)
self.last_update_time = self.rostime()
self.next_runtime = T_spr
self.lock_tree = True
else:
print("\nDesired spiral path is blocked!")
print("\nTerminated.\n")
self.move_server.set_aborted(MoveResult("blocked"))
self.done = True
return False
# Standard driving
elif self.move_type in [MoveGoal.DRIVE, MoveGoal.DRIVE_SMOOTH]:
# Find the heading that points to the goal
p_err = self.goal[:2] - self.state[:2]
h_goal = np.arctan2(p_err[1], p_err[0])
# If we aren't within a cone of that heading and the goal is far away, construct rotation
if (
abs(self.angle_diff(h_goal, self.state[2])) > params.pointshoot_tol
and npl.norm(p_err) > params.free_radius
and self.move_type != MoveGoal.DRIVE_SMOOTH
):
x_seq_rot, T_rot, rot_success, u_seq_rot = self.rotation_move(
self.state,
h_goal,
params.pointshoot_tol,
)
print(f"\nRotating towards goal (duration: {np.round(T_rot, 2)})")
if not rot_success:
print("\n(cannot rotate completely!)")
# Begin interpolating rotation move
self.last_update_time = self.rostime()
if len(x_seq_rot):
self.get_ref = interp1d(
np.arange(len(x_seq_rot)) * self.dt,
np.array(x_seq_rot),
axis=0,
assume_sorted=True,
bounds_error=False,
fill_value=x_seq_rot[-1][:],
)
self.get_eff = interp1d(
np.arange(len(u_seq_rot)) * self.dt,
np.array(u_seq_rot),
axis=0,
assume_sorted=True,
bounds_error=False,
fill_value=u_seq_rot[-1][:],
)
self.next_runtime = np.clip(
T_rot,
params.basic_duration,
2 * np.pi / params.velmax_pos[2],
)
self.next_seed = np.copy(x_seq_rot[-1])
else:
self.next_runtime = params.basic_duration
self.next_seed = np.copy(self.state)
else:
self.next_runtime = params.basic_duration
self.next_seed = np.copy(self.state)
if self.move_type == MoveGoal.DRIVE_SMOOTH:
self.move_type = MoveGoal.DRIVE
# Translate or look-at move
elif self.move_type == MoveGoal.SKID:
self.next_runtime = params.basic_duration
self.next_seed = np.copy(self.state)
# (debug)
if not self.lock_tree:
assert self.next_seed is not None
assert self.next_runtime is not None
# Begin tree-chaining loop
while not rospy.is_shutdown():
clean_update = self.tree_chain()
self.move_count += 1
# Print feedback
if clean_update and not (
self.preempted or self.unreachable or self.stuck or self.lock_tree
):
print(f"\nMove {self.move_count}\n----")
print(f"Behavior: {self.enroute_behavior.__name__[10:]}")
print(
f"Reached goal region: {self.enroute_behavior.planner.plan_reached_goal}",
)
print(f"Goal bias: {np.round(self.goal_bias, 2)}")
print(f"Tree size: {self.tree.size}")
print(f"Move duration: {np.round(self.next_runtime, 1)}")
# elif not self.lock_tree:
# print("\nIssue Status\n----")
# print("Stuck: {}".format(self.stuck))
# print("Collided: {}".format(self.collided))
# print("Unreachable: {}".format(self.unreachable))
# print("Preempted: {}".format(self.preempted))
# print("Tree size: {}".format(self.tree.size))
# print("Move duration: {}".format(np.round(self.next_runtime, 1)))
# Check if action goal is complete
if self.move_type == MoveGoal.SPIRAL:
self.publish_path()
if self.time_till_issue is not None:
self.unreachable = True
self.failure_reason = "blocked"
if (self.rostime() - self.last_update_time) >= T_spr:
self.done = True
elif np.all(np.abs(self.erf(self.goal, self.state)) <= params.real_tol):
remain = np.copy(self.goal)
self.get_ref = lambda t: remain
self.get_eff = lambda t: np.zeros(3)
self.done = True
if self.done:
print("\nDone!\n")
self.move_server.set_succeeded(MoveResult(self.failure_reason))
return True
# Check for abrupt termination or unreachability
if self.preempted or self.unreachable:
remain = np.copy(np.concatenate((self.state[:3], np.zeros(3))))
self.get_ref = lambda t: remain
self.get_eff = lambda t: np.zeros(3)
print("\nTerminated.\n")
self.move_server.set_aborted(MoveResult(self.failure_reason))
self.done = True
return False
# WHERE IT HAPPENS
[docs] def tree_chain(self):
"""
Plans an lqRRT and sets things up to chain along another lqRRT when
called again.
"""
# Make sure we are not currently in an update or a collided state
if self.lock_tree:
rospy.sleep(0.01)
return False
# Thread locking, lol
self.lock_tree = True
# No issue
if self.time_till_issue is None:
if (
self.next_runtime < params.basic_duration
and self.last_update_time is not None
and not self.stuck
):
self.next_runtime = params.basic_duration
self.next_seed = self.get_ref(
self.next_runtime + self.rostime() - self.last_update_time,
)
elif self.stuck:
self.next_runtime = None
self.behavior = self.select_behavior()
(
self.goal_bias,
self.sample_space,
self.guide,
self.pruning,
) = self.select_exploration()
# Distant issue
elif self.time_till_issue > 2 * params.basic_duration:
self.next_runtime = params.basic_duration
self.next_seed = self.get_ref(
self.next_runtime + self.rostime() - self.last_update_time,
)
self.behavior = self.select_behavior()
(
self.goal_bias,
self.sample_space,
self.guide,
self.pruning,
) = self.select_exploration()
# Immediate issue
else:
self.last_update_time = self.rostime()
remain = np.copy(self.state)
self.get_ref = lambda t: remain
self.get_eff = lambda t: np.zeros(3)
self.next_runtime = params.basic_duration
self.next_seed = remain
self.behavior = self.select_behavior()
(
self.goal_bias,
self.sample_space,
self.guide,
self.pruning,
) = self.select_exploration()
# Special first-move case
if self.move_count == 0:
if self.get_ref is not None:
if self.initial_plan_time > self.next_runtime:
self.next_runtime = self.initial_plan_time
else:
self.next_runtime = self.initial_plan_time
# (debug)
if self.stuck and self.time_till_issue is None:
assert self.next_runtime is None
# Update plan
clean_update = self.behavior.planner.update_plan(
x0=self.next_seed,
sample_space=self.sample_space,
goal_bias=self.goal_bias,
guide=self.guide,
pruning=self.pruning,
specific_time=self.next_runtime,
)
# Update finished properly
if clean_update:
# We might be stuck if tree is oddly small
if self.behavior.planner.tree.size == 1:
# Increase stuck count towards threshold
self.stuck_count += 1
if self.stuck_count > params.stuck_threshold:
self.fail_count += 1
if self.fail_count > params.fail_threshold:
print("\nNot making any progress.\nGoal may be unreachable.")
self.unreachable = True
self.failure_reason = "unreachable"
else:
print("\nI think we're stuck...")
self.stuck = True
self.stuck_count = 0
else:
self.stuck = False
else:
self.stuck = False
self.stuck_count = 0
self.fail_count = 0
# Cash-in new goods
self.x_seq = np.copy(self.behavior.planner.x_seq)
self.u_seq = np.copy(self.behavior.planner.u_seq)
self.tree = self.behavior.planner.tree
self.last_update_time = self.rostime()
self.get_ref = self.behavior.planner.get_state
self.get_eff = self.behavior.planner.get_effort
self.next_runtime = self.behavior.planner.T
if self.next_runtime > params.basic_duration:
self.next_runtime *= params.fudge_factor
self.next_seed = self.get_ref(self.next_runtime)
self.enroute_behavior = self.behavior
self.time_till_issue = None
# Visualizers
self.publish_tree()
self.publish_path()
self.publish_expl()
# Make sure all planners are actually unkilled
for behavior in self.behaviors_list:
behavior.planner.unkill()
# Unlocking, lol
self.lock_tree = False
return clean_update
# DECISIONS
[docs] def select_behavior(self) -> ModuleType:
"""
Chooses the behavior for the current move, and returns
the associated module containing the implementation of
that behavior.
Returns:
ModuleType: A module with the behavior implementation
Raises:
ValueError: No relevant implementation could be found
"""
# Are we stuck?
if self.stuck:
return escape
# Positional error norm of next_seed
dist = npl.norm(self.goal[:2] - self.next_seed[:2])
# Are we driving?
if self.move_type == MoveGoal.DRIVE:
# All clear?
if dist < params.free_radius:
return boat
else:
return car
# Are we skidding?
if self.move_type == MoveGoal.SKID:
return boat
# (debug)
raise ValueError("Indeterminant behavior configuration.")
[docs] def select_exploration(
self,
) -> List:
"""
Chooses the goal bias, sample space, guide point, and pruning
choice for the current move and behavior.
"""
# Escaping means maximal exploration
if self.behavior is escape:
if self.guide is None:
gs = np.copy(self.goal)
vec = self.goal[:2] - self.next_seed[:2]
dist = npl.norm(vec)
if dist < 2 * params.free_radius:
gs[:2] = vec + 2 * params.free_radius * (vec / dist)
else:
gs = np.copy(self.guide)
return (0, escape.gen_ss(self.next_seed, self.goal), gs, True)
# Analyze ogrid to find good bias and sample space buffer
if self.ogrid is not None and not self.blind and self.next_seed is not None:
# Get opencv-ready image from current ogrid (255 is occupied, 0 is clear)
occ_img = 255 * np.greater(self.ogrid, self.ogrid_threshold).astype(
np.uint8,
)
# Dilate the image
boat_pix = int(self.ogrid_cpm * params.boat_width)
boat_pix += boat_pix % 2
occ_img_dial = cv2.dilate(occ_img, np.ones((boat_pix, boat_pix), np.uint8))
# Construct the initial sample space and get bounds in pixel coordinates
ss = self.behavior.gen_ss(self.next_seed, self.goal)
pmin = self.intup(
self.ogrid_cpm * ([ss[0][0], ss[1][0]] - self.ogrid_origin),
)
pmax = self.intup(
self.ogrid_cpm * ([ss[0][1], ss[1][1]] - self.ogrid_origin),
)
# Other quantities in pixel coordinates
seed = self.intup(self.ogrid_cpm * (self.next_seed[:2] - self.ogrid_origin))
goal = self.intup(self.ogrid_cpm * (self.goal[:2] - self.ogrid_origin))
step = int(self.ogrid_cpm * params.ss_step)
# Make sure seed and goal are physically meaningful
try:
occ_img_dial[seed[1], seed[0]]
occ_img_dial[goal[1], goal[0]]
except IndexError:
print("Goal and/or seed out of bounds of occupancy grid!")
return (
0,
escape.gen_ss(self.next_seed, self.goal, 1),
np.copy(self.goal),
False,
)
# Initializations
found_entry = False
push = [0, 0, 0, 0]
npush = 0
gs = np.copy(self.goal)
last_offsets = [pmin[0], pmin[1]]
ss_img = np.copy(occ_img_dial[pmin[1] : pmax[1], pmin[0] : pmax[0]])
ss_goal = self.intup(np.subtract(goal, last_offsets))
ss_seed = self.intup(np.subtract(seed, last_offsets))
# Iteratively determine how much to push out the sample space
while np.all(np.less_equal(push, len(occ_img_dial))):
if found_entry:
break
# Find dividing boundary points
if np.any(np.equal(ss_img.shape, 0)):
print("\nOccupancy grid analysis failed... Please report this!")
return (
0.5,
self.behavior.gen_ss(self.next_seed, self.goal),
np.copy(self.goal),
False,
)
bpts = self.boundary_analysis(ss_img, ss_seed, ss_goal)
# If already connected, no expansion necessary
if bpts == "connected":
found_entry = True
break
# If impossible to connect, no sense in expanding
if bpts == "isolated":
break
# Otherwise, prepare to push ss based on boundary intercepts
push_xmin = False
push_xmax = False
push_ymin = False
push_ymax = False
# Buffered boundary imensions
row_min = 1
col_min = 1
row_max = ss_img.shape[0] - 2
col_max = ss_img.shape[1] - 2
# Classify boundary points
for row, col in bpts:
if col == col_min: # left
push_xmin = True
if row == row_min: # top left
push_ymin = True
elif row == row_max: # bottom left
push_ymax = True
elif col == col_max: # right
push_xmax = True
if row == row_min: # top right
push_ymin = True
elif row == row_max: # bottom left
push_ymax = True
elif row == row_min: # top
push_ymin = True
elif row == row_max: # bottom
push_ymax = True
# Push accordingly
if push_xmin:
push[0] += step
npush += 1
if push_xmax:
push[1] += step
npush += 1
if push_ymin:
push[2] += step
npush += 1
if push_ymax:
push[3] += step
npush += 1
# Get image cropped to sample space and offset points of interest
offset_x = (pmin[0] - push[0], pmax[0] + push[1])
offset_y = (pmin[1] - push[2], pmax[1] + push[3])
ss_img = np.copy(
occ_img_dial[
offset_y[0] : offset_y[1],
offset_x[0] : offset_x[1],
],
)
if np.any(np.equal(ss_img.shape, 0)):
print("\nOccupancy grid analysis failed... Please report this!")
return (
0.5,
self.behavior.gen_ss(self.next_seed, self.goal),
np.copy(self.goal),
False,
)
ss_goal = self.intup(np.subtract(goal, [offset_x[0], offset_y[0]]))
ss_seed = self.intup(np.subtract(seed, [offset_x[0], offset_y[0]]))
test_flood = np.copy(ss_img)
cv2.floodFill(
test_flood,
np.zeros(
(test_flood.shape[0] + 2, test_flood.shape[1] + 2),
np.uint8,
),
ss_goal,
69,
)
if test_flood[ss_seed[1], ss_seed[0]] == 69:
gs[:2] = (
np.add(
[col, row],
[last_offsets[0], last_offsets[1]],
).astype(np.float64)
/ self.ogrid_cpm
) + self.ogrid_origin
found_entry = True
break
# Used for remembering the previous sample space coordinates
last_offsets = [offset_x[0], offset_y[0]]
# If we expanded to the limit and found no entry (and goal is unoccupied), goal is infeasible
if not found_entry and self.is_feasible(self.goal, np.zeros(3)):
print("\nGoal is unreachable!")
self.unreachable = True
self.failure_reason = "unreachable"
for behavior in self.behaviors_list:
behavior.planner.kill_update()
return (
0,
escape.gen_ss(self.next_seed, self.goal, 1),
np.copy(self.goal),
False,
)
# Apply push in real coordinates
push = np.array(push, dtype=np.float64) / self.ogrid_cpm
if npush > 0:
push += params.boat_length
ss = self.behavior.gen_ss(
self.next_seed,
self.goal,
push + [params.ss_start] * 4,
)
# Select bias based on density of ogrid in sample space
if ss_img.size:
free_ratio = len(np.argwhere(ss_img == 0)) / ss_img.size
b = np.clip(free_ratio - 0.05 * npush, 0, 1)
if b < 0.9:
b -= 0.2
else:
b = 1
else:
b = 1
gs = np.copy(self.goal)
ss = self.behavior.gen_ss(self.next_seed, self.goal)
# For boating
if self.behavior is boat:
if npl.norm(self.goal[:2] - self.next_seed[:2]) < params.free_radius:
return ([1, 1, 1, 0, 0, 0], ss, gs, False)
else:
if boat.focus is None:
return ([b, b, 1, 0, 0, 1], ss, gs, True)
else:
return ([b, b, 0, 0, 0, 0], ss, gs, True)
# For car-ing
if self.behavior is car:
return ([b, b, 0, 0, 0.5, 0], ss, gs, True)
# (debug)
raise ValueError("Indeterminant behavior configuration.")
# VERIFICATIONS
[docs] def is_feasible(self, x: np.ndarray, u: np.ndarray) -> bool:
"""
Given a state x and effort u, returns whether (x, u) is feasible.
Args:
x (np.ndarray): The state to verify the feasibility of
u (np.ndarray): The corresponding effort to verify the feasibility of
Returns:
bool: Whether the state is feasible.
"""
# If there's no ogrid yet or it's a blind move, anywhere is valid
if self.ogrid is None or self.blind:
return True
# Body to world
c, s = np.cos(x[2]), np.sin(x[2])
R = np.array([[c, -s], [s, c]])
# Vehicle points in world frame
points = x[:2] + R.dot(params.vps).T
# Check for collision
indices = (self.ogrid_cpm * (points - self.ogrid_origin)).astype(np.int64)
try:
grid_values = self.ogrid[indices[:, 1], indices[:, 0]]
except IndexError:
return False
# Greater than threshold is a hit
return np.all(grid_values < self.ogrid_threshold)
[docs] def reevaluate_plan(self) -> None:
"""
Iterates through the current plan re-checking for feasibility using
the newest ogrid data, looking for a clear path if we are escaping,
and checking that the goal is still feasible.
"""
# Make sure a plan exists and that we aren't currently fixing it
last_update_time = self.last_update_time
x_seq = np.copy(self.x_seq)
if (
last_update_time is None
or not np.any(x_seq)
or self.done
or self.time_till_issue is not None
):
return
# Timesteps since last update
iters_passed = int((self.rostime() - last_update_time) / self.dt)
# Make sure that the goal pose is still feasible
if (
not self.is_feasible(self.goal, np.zeros(3))
and self.move_type != MoveGoal.SPIRAL
):
print("\nThe given goal is occupied!\nGoing nearby instead.")
self.time_till_issue = np.inf
self.failure_reason = "occupied"
p_err = self.goal[:2] - self.state[:2]
p_err_mag = npl.norm(p_err)
if p_err_mag <= npl.norm(params.real_tol[:2]):
print("...looks like I am already nearby.")
self.set_goal(self.state)
return
npoints = int(p_err_mag / (params.boat_length / 2))
xline = np.linspace(self.goal[0], self.state[0], npoints)
yline = np.linspace(self.goal[1], self.state[1], npoints)
hline = [np.arctan2(p_err[1], p_err[0])] * npoints
sline = np.vstack((xline, yline, hline, np.zeros((3, npoints)))).T
found_free_goal = False
for i, x in enumerate(sline[:-1]):
if self.is_feasible(x, np.zeros(3)):
self.set_goal(sline[i + 1])
found_free_goal = True
break
if not found_free_goal:
print("\nCould not find a reasonable free goal!")
self.unreachable = True
self.failure_reason = "unreachable"
return
if boat.focus is not None and self.move_type == MoveGoal.SKID:
focus_vec = boat.focus[:2] - self.goal[:2]
focus_goal = np.copy(self.goal)
focus_goal[2] = np.arctan2(focus_vec[1], focus_vec[0])
if self.is_feasible(focus_goal, np.zeros(3)):
self.set_goal(focus_goal)
else:
print("Cancelling focus.")
boat.focus = None
for behavior in self.behaviors_list:
behavior.planner.kill_update()
return
# Check that the next reeval_time seconds in the current plan are still feasible
p_seq = np.copy(
x_seq[
iters_passed : min(
[
len(x_seq),
int(iters_passed + (params.reeval_time / self.dt)),
params.reeval_limit,
],
)
],
)
if len(p_seq):
p_seq[:, 3:] = 0
for i, (x, u) in enumerate(zip(p_seq, [np.zeros(3)] * len(p_seq))):
if not self.is_feasible(x, u):
time_till_collision = i * self.dt
if time_till_collision <= params.collision_threshold:
if not self.collided:
print("\nCollided! (*seppuku*)")
print("But we cannot throw away our shot!")
self.collided = True
self.failure_reason = "collided"
else:
print(
f"\nFound collision on current path!\nTime till collision: {time_till_collision}",
)
self.time_till_issue = time_till_collision
for behavior in self.behaviors_list:
behavior.planner.kill_update()
return
if self.collided:
print("\nNo longer collided! (*unseppuku*)")
self.collided = False
if self.failure_reason == "collided":
self.failure_reason = ""
# If we are escaping, check if we have a clear path again
if self.enroute_behavior is escape:
start = self.get_ref(self.rostime() - last_update_time)
p_err = self.goal[:2] - start[:2]
if npl.norm(p_err) > params.free_radius:
npoints = int(npl.norm(p_err) / params.vps_spacing)
xline = np.linspace(start[0], self.goal[0], npoints)
yline = np.linspace(start[1], self.goal[1], npoints)
hline = [np.arctan2(p_err[1], p_err[0])] * npoints
sline = np.vstack((xline, yline, hline, np.zeros((3, npoints)))).T
checks = []
for x in sline:
checks.append(self.is_feasible(x, np.zeros(3)))
if np.all(checks):
self.time_till_issue = np.inf
self.move_type = MoveGoal.DRIVE
for behavior in self.behaviors_list:
behavior.planner.kill_update()
self.stuck = False
self.stuck_count = 0
print("\nClear path found!")
return
# No concerns
self.time_till_issue = None
[docs] def action_check(self, _: rospy.timer.TimerEvent) -> None:
"""
Manages action preempting. Serves as the callback to a Timer operating
operating every self.revisit_period seconds.
"""
if self.preempted or not self.move_server.is_active():
return
if self.move_server.is_preempt_requested() or (
rospy.is_shutdown() and self.lock_tree
):
self.preempted = True
self.failure_reason = "preempted"
print("\nAction preempted!")
if self.behavior is not None:
for behavior in self.behaviors_list:
behavior.planner.kill_update()
while not self.done:
rospy.sleep(0.1)
return
next_runtime = self.next_runtime
last_update_time = self.last_update_time
if next_runtime is not None and last_update_time is not None:
time_till_next_branch = next_runtime - (self.rostime() - last_update_time)
if self.move_type == MoveGoal.SPIRAL:
self.move_server.publish_feedback(
MoveFeedback(
"boat",
0,
self.tracking,
self.erf(
self.goal,
self.get_ref(self.rostime() - last_update_time),
)[:3],
time_till_next_branch,
),
)
elif (
self.enroute_behavior is not None
and self.tree is not None
and self.tracking is not None
):
self.move_server.publish_feedback(
MoveFeedback(
self.enroute_behavior.__name__[10:],
self.tree.size,
self.tracking,
self.erf(
self.goal,
self.get_ref(self.rostime() - last_update_time),
)[:3],
time_till_next_branch,
),
)
# LIL MATH DOERS
[docs] def rotation_move(self, x: List[float], h: float, tol: float):
"""
Returns a state sequence, total time, success bool and effort
sequence for a simple rotate in place move. Success is False if
the move becomes infeasible before the state heading x[2] is within
the goal heading h+-tol. Give x (start state), and h (goal heading).
Args:
x (List[float]): The current state
h (float): The desired heading (similar to x[2])
tol (float): The tolerance of the heading
Returns:
Tuple[np.ndarray, float, bool, np.ndarray]: A tuple containing
the new state array, the amount of time the movement will take,
whether the operation is feasible, and the new effort array.
"""
# Set-up
x = np.array(x, dtype=np.float64)
xg = np.copy(x)
xg[2] = h
x_seq = []
u_seq = []
T = 0
i = 0
u = np.zeros(3)
# Simulate rotation move
while not rospy.is_shutdown():
# Stop if pose is infeasible
if not self.is_feasible(
np.concatenate((x[:3], np.zeros(3))),
np.zeros(3),
) and len(x_seq):
portion = params.FPR * len(x_seq)
return (
x_seq[: int(portion)],
T - portion * self.dt,
False,
u_seq[: int(portion)],
)
else:
x_seq.append(x)
u_seq.append(u)
# Keep rotating towards goal until tolerance is met
e = self.erf(xg, x)
if abs(e[2]) <= tol:
return (x_seq, T, True, u_seq)
# Step
u = 3 * boat.lqr(x, u)[1].dot(e)
x = boat.dynamics(x, u, self.dt)
T += self.dt
i += 1
[docs] def spiral_move(
self,
x: List[float],
c: List[float],
N: int,
direc: int,
mpr: float,
):
"""
Generates a movement pattern for a spiral movement.
Args:
x (List[float]): Initial state vector.
c (List[float]): Coordinates of initial point.
N (int): The number of revolutions to make.
direc (int): Whether to move + or - in regards to the z-axis
mpr (float): The meters expansion of the spiral per radian
Returns:
Tuple[np.ndarray, float, bool]: The state sequence of the spiral
movement, the amount of time (in seconds) required to make the
movement, and whether the movement is feasible.
"""
# Set-up
if direc < 0:
mpr *= -1
x = np.array(x, dtype=np.float64)
c = np.array(c, dtype=np.float64)
vt = 0
ang = 0
t = 0
x_seq = []
# Radial vector
r = x[:2] - c
rmag = npl.norm(r)
rdir = r / np.clip(rmag, 1e-6, rmag)
# Tangential velocity and acceleration
if direc >= 0:
ht = np.arctan2(rdir[0], -rdir[1])
else:
ht = np.arctan2(-rdir[0], rdir[1])
h0 = self.angle_diff(x[2], ht)
R_h0 = np.array([[np.cos(h0), -np.sin(h0)], [np.sin(h0), np.cos(h0)]])
vtmax = abs(R_h0.dot(self.speed_factor * params.velmax_pos[:2])[0])
atmax = abs(R_h0.dot([params.Fx_max, params.Fy_max])[0] / params.m)
magic_ratio = 0.5 / 5
# Simulate spiral move
anglim = abs(2 * np.pi * N)
while abs(ang) < anglim and not rospy.is_shutdown():
# Rotation for this instant
radfactor = np.clip(magic_ratio * rmag, 0, 1)
vt = np.clip(vt + radfactor * atmax * self.dt, 0, radfactor * vtmax)
w = direc * vt / np.clip(rmag, 1e-6, rmag)
dang = w * self.dt
R = np.array([[np.cos(dang), -np.sin(dang)], [np.sin(dang), np.cos(dang)]])
# Update
t += self.dt
ang += dang
rdir = R.dot(rdir)
rmag += mpr * dang
# Record
x[:2] = c + rmag * rdir
x[2] = x[2] + dang
x[3:5] = R_h0.T.dot([vt, mpr * w])
x[5] = w
if self.is_feasible(x, np.zeros(3)):
x_seq.append(np.copy(x))
else:
return x_seq, t, False
return x_seq, t, True
[docs] def erf(self, goal_state: List[float], state: List[float]) -> np.ndarray:
"""
Returns error given two states goal_state and state.
Angle differences are taken properly on SO2.
Args:
goal_state (List[float]): The goal state
state (List[float]): The state to derive the error from
Returns:
np.ndarray: The error between the goal and passed state.
"""
e = np.subtract(goal_state, state)
e[2] = self.angle_diff(goal_state[2], state[2])
return e
[docs] def angle_diff(self, angle_goal: float, angle: float) -> float:
"""
Takes an angle difference properly on SO2.
Args:
angle_goal (float): The goal angle.
angle (float): The angle to grab the difference from.
Returns:
float: The difference between the desired and goal angle.
"""
c = np.cos(angle)
s = np.sin(angle)
cg = np.cos(angle_goal)
sg = np.sin(angle_goal)
return np.arctan2(sg * c - cg * s, cg * c + sg * s)
[docs] def boundary_analysis(self, img, seed, goal):
"""
Returns a list of the two boundary points of the contour dividing seed from
goal in the occupancy image img. If the seed and goal are connected, returns
'connected' or if they are terminally isolated, returns 'isolated'. Make sure
seed and goal are intups and in the same pixel coordinates as img, and that
occupied pixels have value 255 in img.
"""
# Set-up
img = np.copy(img)
fc1 = 1
fc2 = 2
bpts = []
# If goal can flood to seed then done
flood_goal = np.copy(img)
cv2.floodFill(
flood_goal,
np.zeros((flood_goal.shape[0] + 2, flood_goal.shape[1] + 2), np.uint8),
goal,
fc1,
)
if flood_goal[seed[1], seed[0]] == fc1:
return "connected"
# Filter out the dividing boundary
flood_goal_thresh = fc1 * np.equal(flood_goal, fc1).astype(np.uint8)
flood_seed = np.copy(flood_goal_thresh)
cv2.floodFill(
flood_seed,
np.zeros((flood_seed.shape[0] + 2, flood_seed.shape[1] + 2), np.uint8),
seed,
fc2,
)
flood_seed_thresh = fc2 * np.equal(flood_seed, fc2).astype(np.uint8)
# Buffered boundaries and dimensions
left = img[1:-1, 1]
right = img[1:-1, -2]
top = img[1, 2:-2]
bottom = img[-2, 2:-2]
row_min = 1
col_min = 1
row_max = img.shape[0] - 2
col_max = img.shape[1] - 2
# Buffered boundary points that were occupied, in boundary coordinates
left_cands = np.argwhere(np.equal(left, 255))
right_cands = np.argwhere(np.equal(right, 255))
top_cands = np.argwhere(np.equal(top, 255))
bottom_cands = np.argwhere(np.equal(bottom, 255))
# Convert to original coordinates
left_cands = np.hstack((left_cands + 1, col_min * np.ones_like(left_cands)))
right_cands = np.hstack((right_cands + 1, col_max * np.ones_like(right_cands)))
top_cands = np.hstack((row_min * np.ones_like(top_cands), top_cands + 2))
bottom_cands = np.hstack(
(row_max * np.ones_like(bottom_cands), bottom_cands + 2),
)
cands = np.vstack((left_cands, right_cands, top_cands, bottom_cands))
# Iterate through candidates and store the dividing boundary points
for row, col in cands:
hood = self.get_hood(flood_seed_thresh, row, col)
if np.any(hood == fc2) and np.any(hood == 0):
bpts.append([row, col])
# If they are not connected but there are no boundary points,
# they are forever isolated
if len(bpts) == 0:
return "isolated"
else:
return bpts
# PUBDUBS
[docs] def set_goal(self, x: np.ndarray) -> None:
"""
Publishes the goal state to the goal publisher.
"""
# Create and establish goal
self.goal = np.copy(x)
for behavior in self.behaviors_list:
behavior.planner.set_goal(self.goal)
# Publish goal to the goal publisher
self.goal_pub.publish(
self.pack_posestamped(np.copy(self.goal), rospy.Time.now()),
)
[docs] def publish_ref(self, *args) -> None:
"""
Publishes the reference trajectory as an Odometry message to the ref
publisher. Also publishes the reference effort as a WrenchStamped
message to the effort publisher.
"""
# Make sure a plan exists
last_update_time = self.last_update_time
if self.get_ref is None or last_update_time is None:
return
# Time since last update
time_since = self.rostime() - last_update_time
stamp = rospy.Time.now()
# Publish interpolated reference
self.ref_pub.publish(self.pack_odom(self.get_ref(time_since), stamp))
# Not really necessary, but for fun-and-profit also publish the planner's effort wrench
if self.get_eff is not None:
self.eff_pub.publish(
self.pack_wrenchstamped(self.get_eff(time_since), stamp),
)
[docs] def publish_path(self) -> None:
"""
Publishes all tree-node poses along the current path as a PoseArray.
Publishes to the class' path publisher.
"""
# Make sure a plan exists
if self.x_seq is None:
return
# Construct pose array and publish
pose_list = []
for x in self.x_seq:
pose_list.append(self.pack_pose(x))
if len(pose_list):
msg = PoseArray(poses=pose_list)
msg.header.frame_id = self.world_frame_id
self.path_pub.publish(msg)
[docs] def publish_tree(self) -> None:
"""
Publishes all tree-node poses as a PoseArray. Publishes to the class'
tree publisher.
"""
# Make sure a plan exists
if self.tree is None:
return
# Construct pose array and publish
pose_list = []
for id in range(self.tree.size):
x = self.tree.state[id]
pose_list.append(self.pack_pose(x))
if len(pose_list):
msg = PoseArray(poses=pose_list)
msg.header.frame_id = self.world_frame_id
self.tree_pub.publish(msg)
[docs] def publish_expl(self) -> None:
"""
Publishes sample space as a PolygonStamped to the sample space
publisher and publishes the guide point as a PointStamped to the guide
publisher.
"""
# Make sure a plan exists
if self.sample_space is None or self.guide is None:
return
# Construct and publish
point_list = [
Point32(self.sample_space[0][0], self.sample_space[1][0], 0),
Point32(self.sample_space[0][1], self.sample_space[1][0], 0),
Point32(self.sample_space[0][1], self.sample_space[1][1], 0),
Point32(self.sample_space[0][0], self.sample_space[1][1], 0),
]
ss_msg = PolygonStamped()
ss_msg.header.frame_id = self.world_frame_id
ss_msg.polygon.points = point_list
# Publish to publishers
self.sampspace_pub.publish(ss_msg)
self.guide_pub.publish(self.pack_pointstamped(self.guide[:2], rospy.Time.now()))
# SUBSCRUBS
[docs] def ogrid_cb(self, msg: OccupancyGrid) -> None:
"""
Acts as the callback function to a subscriber to the odom topic.
Expects an OccupancyGrid message. Stores the ogrid array and origin
vector, and then reevaluates the path plan.
Args:
msg (OccupancyGrid): The message passed by the callback.
"""
# Store relevant values
start = self.rostime()
self.ogrid = np.array(msg.data).reshape((msg.info.height, msg.info.width))
self.ogrid_origin = np.array(
[msg.info.origin.position.x, msg.info.origin.position.y],
)
self.ogrid_cpm = 1 / msg.info.resolution
# Reevaluate plan based on occupancy grid
try:
self.reevaluate_plan()
except BaseException:
print("\n(WARNING: something went wrong in reevaluate_plan)\n")
# Check for plan reevaluation taking a long time
elapsed = abs(self.rostime() - start)
if elapsed > 1:
print(
f"\n(WARNING: ogrid callback is taking {np.round(elapsed, 2)} seconds)\n",
)
[docs] def odom_cb(self, msg: Odometry) -> None:
"""
Callback to the odometry topic subscriber. Stores the current state of
the tracking vehicle and its reference frame. Sets the class tracking
variable to True or False based on if the vehicle is successfully
tracking.
Args:
msg (Odometry): The Odometry message received by the callback.
"""
# Store relevant values
self.world_frame_id = msg.header.frame_id
self.body_frame_id = msg.child_frame_id
self.state = self.unpack_odom(msg)
last_update_time = self.last_update_time
# Calculate error between the goal and state and tell if tracking is successful
if self.get_ref is not None and last_update_time is not None:
error = np.abs(
self.erf(self.get_ref(self.rostime() - last_update_time), self.state),
)
if np.all(error < 2 * np.array(params.real_tol)):
self.tracking = True
else:
self.tracking = False
# CONVERTERS
[docs] def unpack_pose(self, msg: Pose) -> np.ndarray:
"""
Converts a Pose message into a state vector with zero velocity.
Args:
msg (Pose): The message to unpack.
Returns:
np.ndarray: A state array containing the x-position, y-position,
and yaw. The velocities are all zero.
"""
q = [msg.orientation.x, msg.orientation.y, msg.orientation.z, msg.orientation.w]
return np.array(
[msg.position.x, msg.position.y, trns.euler_from_quaternion(q)[2], 0, 0, 0],
)
[docs] def unpack_odom(self, msg: Odometry) -> np.ndarray:
"""
Converts an Odometry message into a state vector.
Args:
msg (Odometry): The Odometry message to unpack.
Returns:
np.ndarray: The new state vector, containing 6
float values.
"""
q = [
msg.pose.pose.orientation.x,
msg.pose.pose.orientation.y,
msg.pose.pose.orientation.z,
msg.pose.pose.orientation.w,
]
return np.array(
[
msg.pose.pose.position.x,
msg.pose.pose.position.y,
trns.euler_from_quaternion(q)[2],
msg.twist.twist.linear.x,
msg.twist.twist.linear.y,
msg.twist.twist.angular.z,
],
)
[docs] def pack_pose(self, state: List[float]) -> Pose:
"""
Converts the positional part of a state vector
into a Pose message.
For a timestamped message, use pack_posestamped.
Args:
state (List[float]): The current state of the robot. state[0]
and state[1] represent x + y orientations. state[2]
represents the orientation of the bot.
Returns:
Pose: The Pose message.
"""
msg = Pose()
msg.position.x, msg.position.y = state[:2]
quat = trns.quaternion_from_euler(0, 0, state[2])
msg.orientation = numpy_to_quaternion(quat)
return msg
[docs] def pack_posestamped(self, state: List[float], stamp: rospy.Time) -> PoseStamped:
"""
Converts the positional part of a state vector into a PoseStamped
message with a given header timestamp. For a non-stamped message, use
pack_pose.
Args:
state (List[float]): The current state of the robot. state[0]
and state[1] represent x + y orientations. state[2]
represents the orientation of the bot.
stamp (rospy.Time): The time that the bot's state existed as so.
Returns:
PoseStamped: The PoseStamped message.
"""
msg = PoseStamped()
msg.header.stamp = stamp
msg.header.frame_id = self.world_frame_id
msg.pose.position.x, msg.pose.position.y = state[:2]
quat = trns.quaternion_from_euler(0, 0, state[2])
msg.pose.orientation = numpy_to_quaternion(quat)
return msg
[docs] def pack_odom(self, state: List[float], stamp: rospy.Time) -> Odometry:
"""
Converts a state vector into an Odometry message with
given header timestamp.
Args:
state (List[float]): A list containing the following values:
state[0] - The x-position of the bot
state[1] - The y-position of the bot
state[2] - The orientation (yaw) of the bot
state[3] - The linear speed in the x-direction
state[4] - The linear speed in the y-direction
state[5] - The angular speed in the z-direction
stamp (rospy.Time): The timestamp of the message.
Returns:
Odometry: The packed Odometry message.
"""
msg = Odometry()
msg.header.stamp = stamp
msg.header.frame_id = self.world_frame_id
msg.child_frame_id = self.body_frame_id
msg.pose.pose.position.x, msg.pose.pose.position.y = state[:2]
quat = trns.quaternion_from_euler(0, 0, state[2])
msg.pose.pose.orientation = numpy_to_quaternion(quat)
msg.twist.twist.linear.x, msg.twist.twist.linear.y = state[3:5]
msg.twist.twist.angular.z = state[5]
return msg
[docs] def pack_pointstamped(self, point: List[float], stamp: rospy.Time) -> PointStamped:
"""
Converts a point vector into a PointStamped message with a
given header timestamp.
Args:
point (List[float]): The point vector. point[0] represents
the x-component of the point and point[1] represents the
y-component of the point.
stamp (rospy.Time): The timestamp to attach to the message.
Returns:
PointStamped: The message to pack the point as.
"""
msg = PointStamped()
msg.header.stamp = stamp
msg.header.frame_id = self.world_frame_id
msg.point.x, msg.point.y = point[:2]
msg.point.z = 0
return msg
[docs] def pack_wrenchstamped(
self,
effort: List[float],
stamp: rospy.Time,
) -> WrenchStamped:
"""
Converts an effort vector into a WrenchStamped message
with a given header timestamp.
Args:
effort (List[float]): The effort vector. effort[0] represents
the x-component of force, effort[1] represents the y-component
of force, and effort[2] represents the z-component of torque.
stamp (rospy.Time): The timestamp to attach to the message.
Returns:
WrenchStamped: The message to pack the effort as.
"""
msg = WrenchStamped()
msg.header.stamp = stamp
msg.header.frame_id = self.body_frame_id
msg.wrench.force.x, msg.wrench.force.y = effort[:2]
msg.wrench.torque.z = effort[2]
return msg
# Setup node
if __name__ == "__main__":
rospy.init_node("lqrrt_node")
print("")
move_topic = rospy.get_param("~move_topic", "/move_to")
odom_topic = rospy.get_param("~odom_topic", "/odom")
ogrid_topic = rospy.get_param("~ogrid_topic", "/ogrid")
ogrid_threshold = rospy.get_param("~ogrid_threshold", "90")
ref_topic = rospy.get_param("~ref_topic", "/lqrrt/ref")
path_topic = rospy.get_param("~path_topic", "/lqrrt/path")
tree_topic = rospy.get_param("~tree_topic", "/lqrrt/tree")
goal_topic = rospy.get_param("~goal_topic", "/lqrrt/goal")
focus_topic = rospy.get_param("~focus_topic", "/lqrrt/focus")
effort_topic = rospy.get_param("~effort_topic", "/lqrrt/effort")
sampspace_topic = rospy.get_param("~sampspace_topic", "/lqrrt/sampspace")
guide_topic = rospy.get_param("~guide_topic", "/lqrrt/guide")
better_than_Astar = LQRRT_Node(
odom_topic,
ref_topic,
move_topic,
path_topic,
tree_topic,
goal_topic,
focus_topic,
effort_topic,
ogrid_topic,
ogrid_threshold,
)
rospy.spin()