Source code for navigator_robotx_comms.nodes.robotx_comms_client

#!/usr/bin/env python3

"""
RobotX Communications: A node that handles message communications
with the Technical Director server for the RobotX Communication Protocol
"""

import datetime
import socket
import threading
from enum import IntEnum

import rospy
from geometry_msgs.msg import PointStamped
from mil_tools import thread_lock
from nav_msgs.msg import Odometry
from navigator_msgs.msg import ScanTheCode
from navigator_msgs.srv import (
    MessageDetectDock,
    MessageDetectDockRequest,
    MessageDetectDockResponse,
    MessageEntranceExitGate,
    MessageEntranceExitGateRequest,
    MessageEntranceExitGateResponse,
    MessageFindFling,
    MessageFindFlingRequest,
    MessageFindFlingResponse,
    MessageFollowPath,
    MessageFollowPathRequest,
    MessageFollowPathResponse,
    MessageReactReport,
    MessageReactReportRequest,
    MessageReactReportResponse,
    MessageUAVReplenishment,
    MessageUAVReplenishmentRequest,
    MessageUAVReplenishmentResponse,
    MessageUAVSearchReport,
    MessageUAVSearchReportRequest,
    MessageUAVSearchReportResponse,
)
from navigator_robotx_comms.navigator_robotx_comms import (
    RobotXDetectDockMessage,
    RobotXEntranceExitGateMessage,
    RobotXFindFlingMessage,
    RobotXFollowPathMessage,
    RobotXHeartbeatMessage,
    RobotXReactReportMessage,
    RobotXScanCodeMessage,
    RobotXUAVReplenishmentMessage,
    RobotXUAVSearchReportMessage,
)
from ros_alarms import AlarmListener
from ros_alarms_msgs.msg import Alarm
from std_msgs.msg import String

lock = threading.Lock()


class SystemModes(IntEnum):
    """
    Enumerates constants of friendly system mode names to ints
    """

    KILLED = 3
    AUTONOMOUS = 2
    REMOTE_CONTROLLED = 1


[docs]class RobotXStartServices: """ Initializes services and subscribes to necessary publishers in order to facilitate the transmission of messages between the robot client and the Technical Director server. This class is part of a node that should be launched when communication is necessary with official AUVSI software. Attributes: robotx_client (RobotXClient): The client used to connect with the Technical Director server. system_mode (int): The mode of the system. wrench (str): The current wrench mode of the system. For example, ``autonomous``. """ def __init__(self): # define all variables for subscribers self.gps_array = None self.odom = None self.uav_status = 1 self.system_mode = None self.wrench = None self.kill = False # define delimiter for messages self.delim = "," # define parameters self.team_id = rospy.get_param("~team_id") self.td_ip = rospy.get_param("~td_ip") self.td_port = rospy.get_param("~td_port") self.use_test_data = rospy.get_param("~use_test_data") # time last called self.time_last_entrance_exit = None self.time_last_follow_path = None self.time_last_react_report = None self.time_last_find_fling = None self.time_last_uav_replenishment = None self.time_last_uav_search_report = None self.time_last_scan_code = None self.time_last_detect_dock = None # initialize connection to server self.robotx_client = RobotXClient(self.td_ip, self.td_port) self.robotx_client.connect() # setup all message types self.robotx_heartbeat_message = RobotXHeartbeatMessage() self.robotx_entrance_exit_gate_message = RobotXEntranceExitGateMessage() self.robotx_scan_code_message = RobotXScanCodeMessage() self.robotx_detect_dock_message = RobotXDetectDockMessage() self.robotx_follow_path_message = RobotXFollowPathMessage() self.robotx_react_report_message = RobotXReactReportMessage() self.robotx_find_fling_message = RobotXFindFlingMessage() self.robotx_uav_replenishment_message = RobotXUAVReplenishmentMessage() self.robotx_uav_search_report_message = RobotXUAVSearchReportMessage() # setup all subscribers rospy.Subscriber("lla", PointStamped, self.gps_coord_callback) rospy.Subscriber("odom", Odometry, self.gps_odom_callback) rospy.Subscriber("/wrench/selected", String, self.wrench_callback) rospy.Subscriber("/scan_the_code", ScanTheCode, self.scan_the_code_callback) # TODO: setup subscriber for uav status callback update # track kill state for inferring system mode self.kill_listener = AlarmListener("kill", self.kill_callback) self.kill_listener.wait_for_server() # setup all services self.service_entrance_exit_gate_message = rospy.Service( "entrance_exit_gate_message", MessageEntranceExitGate, self.handle_entrance_exit_gate_message, ) self.service_follow_path_message = rospy.Service( "follow_path_message", MessageFollowPath, self.handle_follow_path_message, ) self.service_react_report_message = rospy.Service( "react_report_message", MessageReactReport, self.handle_react_report_message, ) self.service_detect_dock_message = rospy.Service( "detect_dock_message", MessageDetectDock, self.handle_detect_dock_message, ) self.service_find_fling_message = rospy.Service( "find_fling_message", MessageFindFling, self.handle_find_fling_message, ) self.service_uav_replenishment_message = rospy.Service( "uav_replenishment_message", MessageUAVReplenishment, self.handle_uav_replenishment_message, ) self.service_uav_search_report_message = rospy.Service( "uav_search_report_message", MessageUAVSearchReport, self.handle_uav_search_report_message, ) # start sending heartbeat every second rospy.Timer(rospy.Duration(1), self.handle_heartbeat_message)
[docs] def update_system_mode(self) -> None: """ Sets :attr:`.system_mode` according to whether the kill is ``True`` and the mode of the boat. """ if self.kill is True: self.system_mode = SystemModes.KILLED elif self.wrench == "autonomous" or self.wrench == "/wrench/autonomous": self.system_mode = SystemModes.AUTONOMOUS else: self.system_mode = SystemModes.REMOTE_CONTROLLED
[docs] def wrench_callback(self, wrench: String) -> None: """ Updates the :attr:`.wrench` attribute with the most recent wrench value. """ self.wrench = wrench.data
[docs] def kill_callback(self, alarm: Alarm): """ Updates the :attr:`.kill` attribute depending on whether the most recent alarm received by the boat was raised or not. """ self.kill = alarm.raised
[docs] def gps_coord_callback(self, lla: PointStamped) -> None: """ Updates the :attr:`.gps_array` attribute with the most recent :class:`PointStamped` message received. """ self.gps_array = lla
[docs] def gps_odom_callback(self, odom: Odometry): """ Stores the most recent :class:`Odometry` message. """ self.odom = odom
[docs] def uav_status_callback(self, uav_status: int): """ Stores the most recent AUV status experienced by the boat. """ self.uav_status = uav_status
[docs] def system_mode_callback(self, system_mode: int): """ Sets the class' :attr:`system_mode` attribute when given the most recent system mode. """ self.system_mode = system_mode
[docs] def scan_the_code_callback(self, scan_the_code: ScanTheCode): """ Handles the ScanTheCode message requests by calling :meth:`.handle_scan_code_message` with the color pattern specified in the message. Args: scan_the_code (ScanTheCode): The message to format and send to the Technical Director server. """ self.handle_scan_code_message(scan_the_code.color_pattern)
[docs] def handle_heartbeat_message(self, _) -> None: """ Constructs a heartbeat message according to a timer and sends the formatted message to the AUVSI Technical Director station. """ self.update_system_mode() aedt_date_time = self.get_aedt_date_time() message = self.robotx_heartbeat_message.to_string( self.delim, self.team_id, aedt_date_time, self.gps_array, self.odom, self.uav_status, self.system_mode, self.use_test_data, ) self.robotx_client.send_message(message)
[docs] def handle_entrance_exit_gate_message( self, data: MessageEntranceExitGateRequest, ) -> MessageEntranceExitGateResponse: """ Handles requests to make messages to use in the Entrance and Exit Gate mission. Args: data (MessageEntranceExitGateRequest): The request to the service. Returns: MessageEntranceExitGateResponse: The response from the service. The response contains the message needed to send to AUVSI. """ if self.time_last_entrance_exit is not None: seconds_elapsed = rospy.get_time() - self.time_last_entrance_exit if seconds_elapsed < 1: rospy.sleep(1 - seconds_elapsed) self.time_last_entrance_exit = rospy.get_time() aedt_date_time = self.get_aedt_date_time() message = self.robotx_entrance_exit_gate_message.to_string( self.delim, self.team_id, aedt_date_time, data, self.use_test_data, ) self.robotx_client.send_message(message) return MessageEntranceExitGateResponse(message)
[docs] def handle_follow_path_message( self, data: MessageFollowPathRequest, ) -> MessageFollowPathResponse: """ Handles requests to make messages to use in the Follow Path Mission Args: data (MessageFollowPathRequest): The request to the service. Returns: MessageFollowPathResponse: The response from the service. The response contains the message needed to send to AUVSI. """ if self.time_last_follow_path is not None: seconds_elapsed = rospy.get_time() - self.time_last_follow_path if seconds_elapsed < 1: rospy.sleep(1 - seconds_elapsed) self.time_last_follow_path = rospy.get_time() aedt_date_time = self.get_aedt_date_time() message = self.robotx_follow_path_message.to_string( self.delim, self.team_id, aedt_date_time, data, self.use_test_data, ) self.robotx_client.send_message(message) return MessageFollowPathResponse(message)
[docs] def handle_react_report_message( self, data: MessageReactReportRequest, ) -> MessageReactReportResponse: """ Handles requests to make messages to use in the React Report Mission Args: data (MessageReactReportRequest): The request to the service. Returns: MessageReactReportResponse: The response from the service. The response contains the message needed to send to AUVSI. """ if self.time_last_react_report is not None: seconds_elapsed = rospy.get_time() - self.time_last_react_report if seconds_elapsed < 1: rospy.sleep(1 - seconds_elapsed) self.time_last_react_report = rospy.get_time() aedt_date_time = self.get_aedt_date_time() message = self.robotx_react_report_message.to_string( self.delim, self.team_id, aedt_date_time, data, self.use_test_data, ) self.robotx_client.send_message(message) return MessageReactReportResponse(message)
[docs] def handle_scan_code_message(self, color_pattern: str) -> None: """ Handles the color pattern reported over the topic dedicated to the Scan the Code mission. The color is encoded into a message and sent to the AUVSI Technical Director server. Args: color_pattern (str): The color pattern reported by the handled message in the Scan the Code topic. """ if self.time_last_scan_code is not None: seconds_elapsed = rospy.get_time() - self.time_last_scan_code if seconds_elapsed < 1: rospy.sleep(1 - seconds_elapsed) self.time_last_scan_code = rospy.get_time() aedt_date_time = self.get_aedt_date_time() message = self.robotx_scan_code_message.to_string( self.delim, self.team_id, aedt_date_time, color_pattern, self.use_test_data, ) self.robotx_client.send_message(message)
[docs] def handle_detect_dock_message( self, data: MessageDetectDockRequest, ) -> MessageDetectDockResponse: """ Handles requests to make messages to use in the Detect Dock Mission Args: data (MessageDetectDockRequest): The request to the service. Returns: MessageDetectDockResponse: The response from the service. The response contains the message needed to send to AUVSI. """ if self.time_last_detect_dock is not None: seconds_elapsed = rospy.get_time() - self.time_last_detect_dock if seconds_elapsed < 1: rospy.sleep(1 - seconds_elapsed) self.time_last_detect_dock = rospy.get_time() aedt_date_time = self.get_aedt_date_time() message = self.robotx_detect_dock_message.to_string( self.delim, self.team_id, aedt_date_time, data, self.use_test_data, ) self.robotx_client.send_message(message) return MessageDetectDockResponse(message)
[docs] def handle_find_fling_message( self, data: MessageFindFlingRequest, ) -> MessageFindFlingResponse: """ Handles requests to make messages to use in the Find Fling Mission Args: data (MessageFindFlingRequest): The request to the service. Returns: MessageFindFlingResponse: The response from the service. The response contains the message needed to send to AUVSI. """ if self.time_last_find_fling is not None: seconds_elapsed = rospy.get_time() - self.time_last_find_fling if seconds_elapsed < 1: rospy.sleep(1 - seconds_elapsed) self.time_last_find_fling = rospy.get_time() aedt_date_time = self.get_aedt_date_time() message = self.robotx_find_fling_message.to_string( self.delim, self.team_id, aedt_date_time, data, self.use_test_data, ) self.robotx_client.send_message(message) return MessageFindFlingResponse(message)
[docs] def handle_uav_replenishment_message( self, data: MessageUAVReplenishmentRequest, ) -> MessageUAVReplenishmentResponse: """ Handles requests to make messages to use in the UAV Replenishment Mission Args: data (MessageUAVReplenishmentRequest): The request to the service. Returns: MessageUAVReplenishmentResponse: The response from the service. The response contains the message needed to send to AUVSI. """ if self.time_last_uav_replenishment is not None: seconds_elapsed = rospy.get_time() - self.time_last_uav_replenishment if seconds_elapsed < 1: rospy.sleep(1 - seconds_elapsed) self.time_last_uav_replenishment = rospy.get_time() aedt_date_time = self.get_aedt_date_time() message = self.robotx_uav_replenishment_message.to_string( self.delim, self.team_id, aedt_date_time, data, self.use_test_data, ) self.robotx_client.send_message(message) return MessageUAVReplenishmentResponse(message)
[docs] def handle_uav_search_report_message( self, data: MessageUAVSearchReportRequest, ) -> MessageUAVSearchReportResponse: """ Handles requests to make messages to use in the UAV Search Report Mission Args: data (MessageUAVSearchReportRequest): The request to the service. Returns: MessageUAVSearchReportResponse: The response from the service. The response contains the message needed to send to AUVSI. """ if self.time_last_uav_search_report is not None: seconds_elapsed = rospy.get_time() - self.time_last_uav_search_report if seconds_elapsed < 1: rospy.sleep(1 - seconds_elapsed) self.time_last_uav_search_report = rospy.get_time() aedt_date_time = self.get_aedt_date_time() message = self.robotx_uav_search_report_message.to_string( self.delim, self.team_id, aedt_date_time, data, self.use_test_data, ) self.robotx_client.send_message(message) return MessageUAVSearchReportResponse(message)
[docs] def get_aedt_date_time(self) -> str: """ Gets the current time in AEDT in the format of ``%d%m%y{self.delim}%H%M%S``. This is the format specified by AUVSI to use in messages. Returns: str: The constructed string representing the date and time. """ # AEDT is 11 hours ahead of UTC aedt_time = datetime.datetime.utcnow() + datetime.timedelta(hours=11, minutes=0) date_string = aedt_time.strftime("%d%m%y") time_string = aedt_time.strftime("%H%M%S") return date_string + self.delim + time_string
[docs]class RobotXClient: """ Handles communication with Technical Director server at the AUVSI RobotX competition. This class is generally handled by :class:`RobotXStartServices`. Attributes: tcp_port (int): The TCP port allotted for communication between MIL and the Technical Director server at the competition site. connected (bool): Whether a connection is intact between the client and the Technical Director server. socket_connection (Optional[socket.socket]): The socket connection. This is initially set to ``None``, until it is made in :meth:`.connect`. """ def __init__(self, tcp_ip: str, tcp_port: int): """ Args: tcp_ip (str): The IP address used by the Technical Director server to connect to. tcp_port (int): The port used by the Technical Director server that MIL is asked to connect to. """ while True: try: self.tcp_ip = socket.gethostbyname(tcp_ip) break except socket.gaierror as e: rospy.logwarn(f"Failed to resolved {tcp_ip}: {e}") rospy.sleep(1.0) continue self.tcp_port = tcp_port self.connected = False self.socket_connection = None
[docs] def connect(self) -> None: """ Attempts to establish a socket connection at the given IP and port number if no connection has already been established. If a connection can not be established, then the method sleeps for 2 seconds before retrying to establish a connection. """ if not self.connected: rospy.loginfo( f"Attempting Connection to TD Server at {self.tcp_ip}:{self.tcp_port}", ) while not self.connected and not rospy.is_shutdown(): # recreate socket self.socket_connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # attempt to reconnect, otherwise sleep for 2 seconds try: self.socket_connection.connect((self.tcp_ip, self.tcp_port)) self.connected = True rospy.loginfo("Connection to TD Server Successful") except OSError: rospy.sleep(2)
@thread_lock(lock) def send_message(self, message: bytes) -> None: """ Sends a message over the established socket connection. This method is thread-locked. If any error occurs in the transmission of the packet, then :attr:`.connection` is immediately set to ``False``, and a connection is attempted to be re-established through :meth:`.connect`. Args: message (bytes): The message to send over the connection. """ while not rospy.is_shutdown(): try: self.socket_connection.send(bytes(message, "ascii")) break except OSError: rospy.loginfo("Connection to TD Server Lost") self.connected = False self.connect()
if __name__ == "__main__": rospy.init_node("robotx_comms_client") robotx_start_services = RobotXStartServices() rospy.spin()