Source code for navigator_robotx_comms.navigator_robotx_comms

#!/usr/bin/env python3
"""
RobotX Communications Library: A library that handles serialization and deserialization
of messages for the RobotX Communication Protocol
"""

from __future__ import annotations

import math
from typing import Any

import tf.transformations as trans
from geographic_msgs.msg import GeoPoint
from mil_tools import rosmsg_to_numpy
from nav_msgs.msg import Odometry
from navigator_msgs.srv import (
    MessageDetectDockRequest,
    MessageEntranceExitGateRequest,
    MessageFindFlingRequest,
    MessageFollowPathRequest,
    MessageUAVReplenishmentRequest,
    MessageWildlifeEncounterRequest,
)


class BitwiseXORChecksum:
    def __init__(self):
        self.tot_checksum = 0

    def ret_checksum(self, message):
        for unit_counter in range(len(message)):
            self.tot_checksum = ord(message[unit_counter]) ^ self.tot_checksum
        return self.tot_checksum


[docs]class RobotXHeartbeatMessage: """ Handles formation of the heartbeat message. .. warning:: The following code pertains to the **2024 edition** of the AUVSI RobotX competition, held in Sarasota. Updates to the specifications may have changed since this competition, and therefore, the code may not accurately represent the specifications MIL must produce for the competition. Attributes: message_id (str): The ID identifying the message as a MIL heartbeat message. For the 2022 season, this is ``RXHRB``. """ def __init__(self): self.message_id = "RXHRB" self.timestamp_last = None
[docs] def from_string(self, delim: bytes, string: str) -> tuple[list[str], list[str]]: """ From a message representing a message as a string, return the data and checksum lists encoded in the string. Args: delim (str): The delimiter separating the values in the data list. Frequently is a comma. string (str): The message represented as a string. Returns: Tuple[List[str], List[str]]: A tuple, where the first element is a list of data values, and the second element is the checksum info. """ data_list = string.split(delim) checksum_list = string.split(b"*") return data_list, checksum_list
[docs] def to_string( self, delim: str, team_id: str, edt_date_time: Any, gps_array: Any, odom: Odometry | None, uav_status: int | None, system_mode: int | None, use_test_data: bool, ) -> str: """ Given the necessary information to encode in the message, a message (as a string) is created. This message is ready to be sent back through the RobotX communications protocol. Args: delim (str): The delimiter to use when separating the data. team_id (str): The team ID used by MIL when sending messages. edt_date_time (Any): Presumably (??) a datetime object representing the current time in AEDT. gps_array (Optional[Any]): A specific message type containing at least a point. (??) odom (Optional[Odometry]): An optional odometry message to encode in the message. If ``None``, then empty strings are used in the message instead of the current position. uav_status (Optional[int]): The status of the UAV. If ``None``, then zero is used in the message. system_mode (Optional[int]): The current mode of the boat. If ``None``, then zero is used in the message. use_test_data (bool): Whether to use a sample message. If so, most of the other parameters are ignored, as they are not needed. Returns: str: The constructed message. """ if gps_array is not None: latitude = gps_array.point.x longitude = gps_array.point.y else: latitude = "" longitude = "" if odom is not None: quaternion = odom.pose.pose.orientation quaternion_numpy = rosmsg_to_numpy(quaternion) euler_angles = trans.euler_from_quaternion(quaternion_numpy) north_south = "" east_west = "" if 0 < euler_angles[2] < math.pi / 2: north_south = "N" east_west = "E" elif math.pi / 2 < euler_angles[2] < math.pi: north_south = "N" east_west = "W" elif -math.pi / 2 > euler_angles[2] > -math.pi: north_south = "S" east_west = "W" elif 0 > euler_angles[2] > -math.pi / 2: north_south = "S" east_west = "E" else: east_west = "" north_south = "" if uav_status is None: uav_status = 0 if system_mode is None: system_mode = 0 first_half_data = f"{self.message_id}{delim}{edt_date_time}{delim}{latitude}{delim}{north_south}" second_half_data = f"{longitude}{delim}{east_west}{delim}{team_id}{delim}{system_mode}{delim}{uav_status!s}" full_data = first_half_data + delim + second_half_data # test data if use_test_data: full_data = "RXHRB,111221,161229,21.31198,N,157.88972,W,ROBOT,2,1" checksum_calc = BitwiseXORChecksum() checksum = checksum_calc.ret_checksum(full_data) hex_checksum = format(checksum, "02X") msg_return = f"${full_data}*{str(hex_checksum).zfill(2)}\r\n" return msg_return
[docs]class RobotXEntranceExitGateMessage: """ Handles formation of entrance and exit gates message. .. warning:: The following code pertains to the **2024 edition** of the AUVSI RobotX competition, held in Sarasota. Updates to the specifications may have changed since this competition, and therefore, the code may not accurately represent the specifications MIL must produce for the competition. Attributes: message_id (str): The ID of the message to signal that it is related to the entrance and exit gates. """ def __init__(self): self.message_id = "RXGAT"
[docs] def from_string(self, delim: bytes, string: str) -> tuple[list[str], list[str]]: """ Constructs a list of data values and a checksum list from a provided message. Args: delim (str): The delimiter splitting up the data values. string (str): The message to get the data values from. Returns: Tuple[List[str], List[str]]: A tuple representing two values. The first is the list of data values encoded in the message. The second value is the checksum list encoded in the message. """ data_list = string.split(delim) checksum_list = string.split(b"*") return data_list, checksum_list
[docs] def to_string( self, delim: str, team_id: Any, edt_date_time: Any, data: MessageEntranceExitGateRequest, use_test_data: bool, ) -> str: """ Constructs a message using the provided parameters. This message is formatted according to 2022 AUVSI specifications. Args: delim (str): The delimiter to use in between data values. team_id (Any): A value (??) that can be converted to a string to represent the MIL team ID. edt_date_time (Any): A value (??) used to represent the current date + time in AEDT. data (MessageEntranceExitGateRequest): The data about the entrance/exit gate mission. use_test_data (bool): Whether to use test data in the message. If ``True``, then most of the other parameters are ignored. Returns: str: The encoded message. """ data = f"{self.message_id}{delim}{edt_date_time}{delim}{team_id}{delim}{data.entrance_gate!s}{delim}{data.exit_gate!s}" # test data if use_test_data: data = "RXGAT,111221,161229,ROBOT,1,2" checksum_calc = BitwiseXORChecksum() checksum = checksum_calc.ret_checksum(data) hex_checksum = format(checksum, "02X") msg_return = f"${data}*{hex_checksum}\r\n" return msg_return
[docs]class RobotXFollowPathMessage: """ Handles formation of follow path message. .. warning:: The following code pertains to the **2024 edition** of the AUVSI RobotX competition, held in Sarasota. Updates to the specifications may have changed since this competition, and therefore, the code may not accurately represent the specifications MIL must produce for the competition. Attributes: message_id (str): The ID of the message to signal that it is related to the follow path message. """ def __init__(self): self.message_id = "RXPTH"
[docs] def from_string(self, delim: bytes, string: str) -> tuple[list[str], list[str]]: """ Constructs a list of data values and a checksum list from a provided message. Args: delim (str): The delimiter splitting up the data values. string (str): The message to get the data values from. Returns: Tuple[List[str], List[str]]: A tuple representing two values. The first is the list of data values encoded in the message. The second value is the checksum list encoded in the message. """ data_list = string.split(delim) checksum_list = string.split(b"*") return data_list, checksum_list
[docs] def to_string( self, delim: str, team_id: Any, edt_date_time: Any, data: MessageFollowPathRequest, use_test_data: bool, ) -> str: """ Constructs a message using the provided parameters. This message is formatted according to 2022 AUVSI specifications. Args: delim (str): The delimiter to use in between data values. team_id (Any): A value (??) that can be converted to a string to represent the MIL team ID. edt_date_time (Any): A value (??) used to represent the current date + time in AEDT. data (MessageFollowPathRequest): The data about the follow path mission. use_test_data (bool): Whether to use test data in the message. If ``True``, then most of the other parameters are ignored. Returns: str: The encoded message. """ data = f"{self.message_id}{delim}{edt_date_time}{delim}{team_id}{delim}{data.finished!s}" # test data if use_test_data: data = "RXPTH,111221,161229,ROBOT,1" checksum_calc = BitwiseXORChecksum() checksum = checksum_calc.ret_checksum(data) hex_checksum = format(checksum, "02X") msg_return = f"${data}*{hex_checksum}\r\n" return msg_return
[docs]class RobotXWildlifeEncounterMessage: """ Handles formation of the wildlife encounter message. .. warning:: The following code pertains to the **2024 edition** of the AUVSI RobotX competition, held in Sarsota. Updates to the specifications may have changed since this competition, and therefore, the code may not accurately represent the specifications MIL must produce for the competition. Attributes: message_id (str): The ID of the message to signal that it is related to the follow path message. """ def __init__(self): self.message_id = "RXENC"
[docs] def from_string(self, delim: bytes, string: str) -> tuple[list[str], list[str]]: """ Constructs a list of data values and a checksum list from a provided message. Args: delim (str): The delimiter splitting up the data values. string (str): The message to get the data values from. Returns: Tuple[List[str], List[str]]: A tuple representing two values. The first is the list of data values encoded in the message. The second value is the checksum list encoded in the message. """ data_list = string.split(delim) checksum_list = string.split(b"*") return data_list, checksum_list
[docs] def to_string( self, delim: str, team_id: Any, edt_date_time: Any, data: MessageWildlifeEncounterRequest, use_test_data: bool, ) -> str: """ Constructs a message using the provided parameters. This message is formatted according to 2022 AUVSI specifications. Args: delim (str): The delimiter to use in between data values. team_id (Any): A value (??) that can be converted to a string to represent the MIL team ID. edt_date_time (Any): A value (??) used to represent the current date + time in AEDT. data (MessageWildlifeEncounterRequest): The data about the wildlife encounter mission. use_test_data (bool): Whether to use test data in the message. If ``True``, then most of the other parameters are ignored. Returns: str: The encoded message. """ data_ = f"{self.message_id}{delim}{edt_date_time}{delim}{team_id}{delim}{data.circling_wildlife}{delim}{'CW' if data.clockwise else 'CCW'}{delim}{data.number_of_circles}" # test data if use_test_data: data_ = "RXENC,111221,161229,ROBOT,3,P,C,T" checksum_calc = BitwiseXORChecksum() checksum = checksum_calc.ret_checksum(data_) hex_checksum = format(checksum, "02X") msg_return = f"${data_}*{hex_checksum}\r\n" return msg_return
[docs]class RobotXScanCodeMessage: """ Handles formation and sending of scan the code message. .. warning:: The following code pertains to the **2024 edition** of the AUVSI RobotX competition, held in Sarasota. Updates to the specifications may have changed since this competition, and therefore, the code may not accurately represent the specifications MIL must produce for the competition. Attributes: message_id (str): The ID identifying the message as a Scan the Code message type. Equal to ``RXCOD``. """ def __init__(self): self.message_id = "RXCOD"
[docs] def from_string(self, delim: bytes, string: str) -> tuple[list[str], list[str]]: """ Returns the information encoded in a message. Args: delim (str): The delimiter separating values in the data list encoded in the message. string (str): The message string to use to get data from. Returns: Tuple[List[str], List[str]]: The tuple containing a list of data values as the first value, and a list of checksums as the second value. """ data_list = string.split(delim) checksum_list = string.split(b"*") return data_list, checksum_list
[docs] def to_string( self, delim: str, team_id: Any, edt_date_time: Any, color_pattern: str, use_test_data: bool, ) -> str: """ Constructs a Scan the Code status message. Args: delim (str): The string delimiter used to separate distinct data points in the message. team_id (Any): The team ID used by MIL in the competition. edt_date_time (Any): The datetime to send in AEDT. color_pattern (str): The color pattern to send in the message. use_test_data (bool): Whether to use test data when sending the message. Returns: str: The constructed message. """ data = f"{self.message_id}{delim}{edt_date_time}{delim}{team_id}{delim}{color_pattern}" # test data if use_test_data: data = "RXCOD,111221,161229,ROBOT,RBG" checksum_calc = BitwiseXORChecksum() checksum = checksum_calc.ret_checksum(data) hex_checksum = format(checksum, "02X") msg_return = f"${data}*{hex_checksum}\r\n" return msg_return
[docs]class RobotXDetectDockMessage: """ Handles formation of detect dock message. .. warning:: The following code pertains to the **2024 edition** of the AUVSI RobotX competition, held in Sarasota. Updates to the specifications may have changed since this competition, and therefore, the code may not accurately represent the specifications MIL must produce for the competition. Attributes: message_id (str): The ID of the message to signal that it is related to the detect dock message. """ def __init__(self): self.message_id = "RXDOK"
[docs] def from_string(self, delim: bytes, string: str) -> tuple[list[str], list[str]]: """ Constructs a list of data values and a checksum list from a provided message. Args: delim (str): The delimiter splitting up the data values. string (str): The message to get the data values from. Returns: Tuple[List[str], List[str]]: A tuple representing two values. The first is the list of data values encoded in the message. The second value is the checksum list encoded in the message. """ data_list = string.split(delim) checksum_list = string.split(b"*") return data_list, checksum_list
[docs] def to_string( self, delim: str, team_id: Any, edt_date_time: Any, data: MessageDetectDockRequest, use_test_data: bool, ) -> str: """ Constructs a message using the provided parameters. This message is formatted according to 2022 AUVSI specifications. Args: delim (str): The delimiter to use in between data values. team_id (Any): A value (??) that can be converted to a string to represent the MIL team ID. edt_date_time (Any): A value (??) used to represent the current date + time in AEDT. data (MessageDetectDockRequest): The data about the detect dock mission. use_test_data (bool): Whether to use test data in the message. If ``True``, then most of the other parameters are ignored. Returns: str: The encoded message. """ data = f"{self.message_id}{delim}{edt_date_time}{delim}{team_id}{delim}{data.color!s}{delim}{data.ams_status!s}" # test data if use_test_data: data = "RXDOK,111221,161229,ROBOT,R,1" checksum_calc = BitwiseXORChecksum() checksum = checksum_calc.ret_checksum(data) hex_checksum = format(checksum, "02X") msg_return = f"${data}*{hex_checksum}\r\n" return msg_return
[docs]class RobotXFindFlingMessage: """ Handles formation of find fling message. .. warning:: The following code pertains to the **2024 edition** of the AUVSI RobotX competition, held in Sarasota. Updates to the specifications may have changed since this competition, and therefore, the code may not accurately represent the specifications MIL must produce for the competition. Attributes: message_id (str): The ID of the message to signal that it is related to the find fling message. """ def __init__(self): self.message_id = "RXFLG"
[docs] def from_string(self, delim: bytes, string: str) -> tuple[list[str], list[str]]: """ Constructs a list of data values and a checksum list from a provided message. Args: delim (str): The delimiter splitting up the data values. string (str): The message to get the data values from. Returns: Tuple[List[str], List[str]]: A tuple representing two values. The first is the list of data values encoded in the message. The second value is the checksum list encoded in the message. """ data_list = string.split(delim) checksum_list = string.split(b"*") return data_list, checksum_list
[docs] def to_string( self, delim: str, team_id: Any, edt_date_time: Any, data: MessageFindFlingRequest, use_test_data: bool, ) -> str: """ Constructs a message using the provided parameters. This message is formatted according to 2022 AUVSI specifications. Args: delim (str): The delimiter to use in between data values. team_id (Any): A value (??) that can be converted to a string to represent the MIL team ID. edt_date_time (Any): A value (??) used to represent the current date + time in AEDT. data (MessageFindFlingRequest): The data about the find fling mission. use_test_data (bool): Whether to use test data in the message. If ``True``, then most of the other parameters are ignored. Returns: str: The encoded message. """ data = f"{self.message_id}{delim}{edt_date_time}{delim}{team_id}{delim}{data.color!s}{delim}{data.ams_status!s}" # test data if use_test_data: data = "RXFLG,111221,161229,ROBOT,R,2" checksum_calc = BitwiseXORChecksum() checksum = checksum_calc.ret_checksum(data) hex_checksum = format(checksum, "02X") msg_return = f"${data}*{hex_checksum}\r\n" return msg_return
[docs]class RobotXUAVReplenishmentMessage: """ Handles formation of UAV replenishment message. .. warning:: The following code pertains to the **2024 edition** of the AUVSI RobotX competition, held in Sarasota. Updates to the specifications may have changed since this competition, and therefore, the code may not accurately represent the specifications MIL must produce for the competition. Attributes: message_id (str): The ID of the message to signal that it is related to the uav replenishment message. """ def __init__(self): self.message_id = "RXUAV"
[docs] def from_string(self, delim: bytes, string: str) -> tuple[list[str], list[str]]: """ Constructs a list of data values and a checksum list from a provided message. Args: delim (str): The delimiter splitting up the data values. string (str): The message to get the data values from. Returns: Tuple[List[str], List[str]]: A tuple representing two values. The first is the list of data values encoded in the message. The second value is the checksum list encoded in the message. """ data_list = string.split(delim) checksum_list = string.split(b"*") return data_list, checksum_list
[docs] def to_string( self, delim: str, team_id: Any, edt_date_time: Any, data: MessageUAVReplenishmentRequest, use_test_data: bool, ) -> str: """ Constructs a message using the provided parameters. This message is formatted according to 2022 AUVSI specifications. Args: delim (str): The delimiter to use in between data values. team_id (Any): A value (??) that can be converted to a string to represent the MIL team ID. edt_date_time (Any): A value (??) used to represent the current date + time in AEDT. data (MessageUAVReplenishmentRequest): The data about the WAV replenishment mission. use_test_data (bool): Whether to use test data in the message. If ``True``, then most of the other parameters are ignored. Returns: str: The encoded message. """ data = f"{self.message_id}{delim}{edt_date_time}{delim}{team_id}{delim}{data.uav_status!s}{delim}{data.item_status!s}" # test data if use_test_data: data = "RXUAV,111221,161229,ROBOT,2,1" checksum_calc = BitwiseXORChecksum() checksum = checksum_calc.ret_checksum(data) hex_checksum = format(checksum, "02X") msg_return = f"${data}*{hex_checksum}\r\n" return msg_return
[docs]class RobotXUAVSearchReportMessage: """ Handles formation of UAV search report message. .. warning:: The following code pertains to the **2024 edition** of the AUVSI RobotX competition, held in Sarasota. Updates to the specifications may have changed since this competition, and therefore, the code may not accurately represent the specifications MIL must produce for the competition. Attributes: message_id (str): The ID of the message to signal that it is related to the uav search report message. """ def __init__(self): self.message_id = "RXSAR"
[docs] def from_string(self, delim: bytes, string: str) -> tuple[list[str], list[str]]: """ Constructs a list of data values and a checksum list from a provided message. Args: delim (str): The delimiter splitting up the data values. string (str): The message to get the data values from. Returns: Tuple[List[str], List[str]]: A tuple representing two values. The first is the list of data values encoded in the message. The second value is the checksum list encoded in the message. """ data_list = string.split(delim) checksum_list = string.split(b"*") return data_list, checksum_list
def lat_lon_ns(self, point: GeoPoint) -> tuple[float, str, float, str]: return ( abs(point.latitude), "N" if point.latitude >= 0 else "S", abs(point.longitude), "E" if point.longitude >= 0 else "W", )
[docs] def to_string( self, delim: str, team_id: Any, edt_date_time: Any, data: MessageUAVReplenishmentRequest, use_test_data: bool, ) -> str: """ Constructs a message using the provided parameters. This message is formatted according to 2022 AUVSI specifications. Args: delim (str): The delimiter to use in between data values. team_id (Any): A value (??) that can be converted to a string to represent the MIL team ID. edt_date_time (Any): A value (??) used to represent the current date + time in AEDT. data (MessageUAVReplenishmentRequest): The data about the WAV replenishment mission. use_test_data (bool): Whether to use test data in the message. If ``True``, then most of the other parameters are ignored. Returns: str: The encoded message. """ o1_lat, o1_ns, o1_lon, o1_ew = self.lat_lon_ns(data.object1_location) o2_lat, o2_ns, o2_lon, o2_ew = self.lat_lon_ns(data.object2_location) data = f"{self.message_id}{delim}{edt_date_time}{delim}{data.object1!s}{delim}{o1_lat!s}{delim}{o1_ns!s}{delim}{o1_lon!s}{delim}{o1_ew!s}{delim}{data.object2!s}{delim}{o2_lat!s}{delim}{o2_ns!s}{delim}{o2_lon!s}{delim}{o2_ew!s}{delim}{team_id}{delim}{data.uav_status!s}" # test data if use_test_data: data = "RXSAR,111221,161229,R,21.31198,N,157.88972,W,N, 21.32198,N,157.89972,W,ROBOT,2" checksum_calc = BitwiseXORChecksum() checksum = checksum_calc.ret_checksum(data) hex_checksum = format(checksum, "02X") msg_return = f"${data}*{hex_checksum}\r\n" return msg_return