diff --git a/src/local_pathfinding/local_pathfinding/coord_systems.py b/src/local_pathfinding/local_pathfinding/coord_systems.py index 16c5f73d3..880007b76 100644 --- a/src/local_pathfinding/local_pathfinding/coord_systems.py +++ b/src/local_pathfinding/local_pathfinding/coord_systems.py @@ -92,10 +92,7 @@ def polar_to_cartesian(angle_rad: float, magnitude: float) -> XY: - x → east component - y → north component """ - return XY( - x=magnitude * math.sin(angle_rad), - y=magnitude * math.cos(angle_rad) - ) + return XY(x=magnitude * math.sin(angle_rad), y=magnitude * math.cos(angle_rad)) def meters_to_km(meters: float) -> float: @@ -246,3 +243,25 @@ def _latlon_point_to_xy_point(latlon_point: tuple) -> Point: def latlon_list_to_xy_list(reference_latlon, lat_lon_list: List[ci.HelperLatLon]) -> List[XY]: """Converts a list of lat/lon coordinates to x/y coordinates.""" return [latlon_to_xy(reference=reference_latlon, latlon=pos) for pos in lat_lon_list] + + +def rot_to_rad_per_sec(rot: int) -> float: + """ + Convert AIS ROT (rate of turn) int8 value into radians per second. + + Returns: + float: rate of turn in radians per second. + """ + + if rot == -128: + return 0.0 + + if abs(rot) == 127: + rot_dpm = 10.0 + else: + rot_dpm = (abs(rot) / 4.733) ** 2 + + if rot < 0: + rot_dpm *= -1 + + return math.radians(rot_dpm / 60.0) diff --git a/src/local_pathfinding/local_pathfinding/mock_nodes/node_mock_ais.py b/src/local_pathfinding/local_pathfinding/mock_nodes/node_mock_ais.py index ee7423354..918abe2b4 100644 --- a/src/local_pathfinding/local_pathfinding/mock_nodes/node_mock_ais.py +++ b/src/local_pathfinding/local_pathfinding/mock_nodes/node_mock_ais.py @@ -94,15 +94,7 @@ def update_ship_position(self, ship): # Get ROT in radians per second rot = ship.rot.rot - if rot == -128: - rot_dpm = 0 - elif abs(rot) == 127: - rot_dpm = 10 - else: - rot_dpm = (rot / 4.733) ** 2 - rot_rps = math.radians(rot_dpm / 60) - if rot < 0: - rot_rps *= -1 + rot_rps = cs.rot_to_rad_per_sec(rot) # Update heading ship.cog.heading += math.degrees(rot_rps * time) diff --git a/src/local_pathfinding/local_pathfinding/obstacles.py b/src/local_pathfinding/local_pathfinding/obstacles.py index 489be210a..4bd9829d2 100644 --- a/src/local_pathfinding/local_pathfinding/obstacles.py +++ b/src/local_pathfinding/local_pathfinding/obstacles.py @@ -13,6 +13,8 @@ import local_pathfinding.coord_systems as cs # Constants +PREDICTION_HORIZON = 10.0 # seconds +DT = 0.5 PROJ_DISTANCE_NO_COLLISION = 0.0 BOAT_BUFFER = 0.25 # km COLLISION_ZONE_STRETCH_FACTOR = 1.25 # This factor changes the width of the boat collision zone @@ -204,6 +206,7 @@ def __init__( self.ais_ship = ais_ship self.width = cs.meters_to_km(self.ais_ship.width.dimension) self.length = cs.meters_to_km(self.ais_ship.length.dimension) + self._raw_collision_zone = None self.update_collision_zone() def update_sailbot_data( @@ -233,21 +236,78 @@ def update_collision_zone(self, **kwargs) -> None: raise ValueError("Argument AIS Ship ID does not match this Boat instance's ID") self.ais_ship = ais_ship - projected_distance = self._calculate_projected_distance() + # Calculate ROT in radians per second + rot = self.ais_ship.rot.rot + rot_rps = cs.rot_to_rad_per_sec(rot) - # TODO maybe incorporate ROT in this calculation at a later time - collision_zone_width = projected_distance * COLLISION_ZONE_STRETCH_FACTOR * self.width + # Calculate current and future heading and projected distances + dt = DT + speed_kmps = self.ais_ship.sog.speed / 3600.0 + cog_rad = math.radians(self.ais_ship.cog.heading) + x, y = cs.latlon_to_xy(self.reference, self.ais_ship.lat_lon) + projected_distance = self._calculate_projected_distance(cog_rad) + bow_y = self.length / 2 + + if projected_distance == PROJ_DISTANCE_NO_COLLISION: + # If no collision detected, create small collision zone around the boat + RADIUS_MULTIPLIER = 5 + radius = cs.km_to_meters(max(self.width, self.length)) * RADIUS_MULTIPLIER + boat_collision_zone = Point(-self.width / 2, -self.length / 2).buffer( + radius, resolution=16 + ) + raw = self._translate_collision_zone(boat_collision_zone) + self._raw_collision_zone = raw + self.collision_zone = raw.buffer(radius + BOAT_BUFFER, join_style=2) + prepared.prep(self.collision_zone) - # Points of the boat collision cone polygon before rotation and centred at the origin - boat_collision_zone = Polygon( - [ - [-self.width / 2, -self.length / 2], - [-collision_zone_width, self.length / 2 + projected_distance], - [collision_zone_width, self.length / 2 + projected_distance], - [self.width / 2, -self.length / 2], + elif abs(rot_rps) < 1e-4: + # Straight line + collision_zone_width = projected_distance * COLLISION_ZONE_STRETCH_FACTOR * self.width + boat_collision_zone = Polygon( + [ + [-self.width / 2, -self.length / 2], + [-collision_zone_width, self.length / 2 + projected_distance], + [collision_zone_width, self.length / 2 + projected_distance], + [self.width / 2, -self.length / 2], + ] + ) + raw = self._translate_collision_zone(boat_collision_zone) + self._raw_collision_zone = raw + self.collision_zone = raw.buffer(BOAT_BUFFER, join_style=2) + prepared.prep(self.collision_zone) + else: + # Turning motion + R = speed_kmps / abs(rot_rps) + turn_angle = rot_rps * dt + + # Center of rotation in world frame + cx = x - R * math.sin(cog_rad) * math.copysign(1, rot_rps) + cy = y + R * math.cos(cog_rad) * math.copysign(1, rot_rps) + + # Rotate the ship around the center + future_x = cx + R * math.sin(cog_rad + turn_angle) + future_y = cy + R * math.cos(cog_rad + turn_angle) + + delta_heading = rot_rps * dt + future_cog_rad = cog_rad + delta_heading + future_projected_distance = self._calculate_projected_distance( + future_cog_rad, position_override=(future_x, future_y) + ) + + A = [0.0, bow_y] + B = [0.0, bow_y + projected_distance] + C = [ + future_projected_distance * math.sin(delta_heading), + bow_y + future_projected_distance * math.cos(delta_heading), ] - ) + boat_collision_zone = Polygon([A, B, C]) + raw = self._translate_collision_zone(boat_collision_zone) + self._raw_collision_zone = raw + self.collision_zone = raw.buffer(BOAT_BUFFER, join_style=2) + prepared.prep(self.collision_zone) + + def _translate_collision_zone(self, boat_collision_zone): # this code block translates and rotates the collision zone to the correct position dx, dy = cs.latlon_to_xy(self.reference, self.ais_ship.lat_lon) angle_rad = math.radians(-self.ais_ship.cog.heading) @@ -255,11 +315,9 @@ def update_collision_zone(self, **kwargs) -> None: cos_theta = math.cos(angle_rad) transformation = np.array([cos_theta, -sin_theta, sin_theta, cos_theta, dx, dy]) collision_zone = affine_transform(boat_collision_zone, transformation) + return collision_zone - self.collision_zone = collision_zone.buffer(BOAT_BUFFER, join_style=2) - prepared.prep(self.collision_zone) - - def _calculate_projected_distance(self) -> float: + def _calculate_projected_distance(self, cog_rad, position_override=None) -> float: """Calculates the distance the boat obstacle will travel before collision, if Sailbot moves directly towards the soonest possible collision point at its current speed. The system is modeled by two parametric lines extending from the positions of the boat @@ -276,8 +334,10 @@ def _calculate_projected_distance(self) -> float: float: Distance in km that the boat will travel before collision or the constant value PROJ_DISTANCE_NO_COLLISION if a collision is not possible. """ - position = cs.latlon_to_xy(self.reference, self.ais_ship.lat_lon) - cog_rad = math.radians(self.ais_ship.cog.heading) + if position_override is None: + position = cs.latlon_to_xy(self.reference, self.ais_ship.lat_lon) + else: + position = position_override v1 = self.ais_ship.sog.speed * math.sin(cog_rad) v2 = self.ais_ship.sog.speed * math.cos(cog_rad) a, b = position diff --git a/src/local_pathfinding/test/test_coord_systems.py b/src/local_pathfinding/test/test_coord_systems.py index c0b3f4a13..238643581 100644 --- a/src/local_pathfinding/test/test_coord_systems.py +++ b/src/local_pathfinding/test/test_coord_systems.py @@ -91,7 +91,7 @@ def test_true_bearing_to_plotly_cartesian(true_bearing: float, plotly_cartesian: SOUTH_NEG_PI = -math.pi # -π, same physical direction as +π WEST = -math.pi / 2 NORTHEAST = math.pi / 4 -NORTHWEST = - math.pi / 4 +NORTHWEST = -math.pi / 4 SOUTHEAST = 3 * math.pi / 4 SOUTHWEST = -3 * math.pi / 4 @@ -104,19 +104,18 @@ def test_true_bearing_to_plotly_cartesian(true_bearing: float, plotly_cartesian: "angle_rad,speed,expected_xy", [ # Cardinal directions - (NORTH, 10.0, cs.XY(0.0, 10.0)), # North - (EAST, 5.0, cs.XY(5.0, 0.0)), # East - (SOUTH_PI, 2.0, cs.XY(0.0, -2.0)), # South (+π) - (WEST, 4.0, cs.XY(-4.0, 0.0)), # West (-π/2) + (NORTH, 10.0, cs.XY(0.0, 10.0)), # North + (EAST, 5.0, cs.XY(5.0, 0.0)), # East + (SOUTH_PI, 2.0, cs.XY(0.0, -2.0)), # South (+π) + (WEST, 4.0, cs.XY(-4.0, 0.0)), # West (-π/2) (SOUTH_NEG_PI, 3.0, cs.XY(0.0, -3.0)), # South (-π), same as +π - # Diagonals / 45° bearings (NORTHEAST, 10.0, cs.XY(10.0 * sin45, 10.0 * cos45)), (NORTHWEST, 10.0, cs.XY(-10.0 * sin45, 10.0 * cos45)), (SOUTHEAST, 10.0, cs.XY(10.0 * sin45, -10.0 * cos45)), (SOUTHWEST, 10.0, cs.XY(-10.0 * sin45, -10.0 * cos45)), # Zero speed should always give zero vector regardless of angle - (NORTHEAST, 0.0, cs.XY(0.0, 0.0)) + (NORTHEAST, 0.0, cs.XY(0.0, 0.0)), ], ) def test_polar_to_cartesian(angle_rad: float, speed: float, expected_xy: cs.XY): @@ -124,7 +123,8 @@ def test_polar_to_cartesian(angle_rad: float, speed: float, expected_xy: cs.XY): # Component-wise check assert (result.x, result.y) == pytest.approx( - (expected_xy.x, expected_xy.y), rel=1e-6), "incorrect angle(rad) to XY conversion" + (expected_xy.x, expected_xy.y), rel=1e-6 + ), "incorrect angle(rad) to XY conversion" # Magnitude should match the input speed mag = math.hypot(result.x, result.y) @@ -388,3 +388,22 @@ def test_latlon_polygons_to_xy_polygons_empty_Polygon(): assert isinstance(result, List) assert len(result) == 1 assert result[0].is_empty + + +@pytest.mark.parametrize( + "rot, expected_rps", + [ + (-128, 0.0), + (127, math.radians(10 / 60)), + (-127, -math.radians(10 / 60)), + (0, 0.0), + (10, math.radians(((10 / 4.733) ** 2) / 60)), + (-10, -math.radians(((10 / 4.733) ** 2) / 60)), + (50, math.radians(((50 / 4.733) ** 2) / 60)), + (-50, -math.radians(((50 / 4.733) ** 2) / 60)), + ], +) +def test_rot_to_rad_per_sec(rot: int, expected_rps: float): + assert cs.rot_to_rad_per_sec(rot) == pytest.approx( + expected_rps + ), f"Incorrect ROT conversion for rot={rot}" diff --git a/src/local_pathfinding/test/test_obstacles.py b/src/local_pathfinding/test/test_obstacles.py index 19d1b3fe6..9c983e05c 100644 --- a/src/local_pathfinding/test/test_obstacles.py +++ b/src/local_pathfinding/test/test_obstacles.py @@ -14,7 +14,7 @@ from shapely.geometry import MultiPolygon, Point, Polygon, box import local_pathfinding.coord_systems as cs -from local_pathfinding.obstacles import BOAT_BUFFER, Boat, Land, Obstacle +from local_pathfinding.obstacles import Boat, Land, Obstacle def load_pkl(file_path: str) -> Any: @@ -321,8 +321,9 @@ def test_calculate_projected_distance_same_loc( sailbot_speed: float, ): boat1 = Boat(reference_point, sailbot_position, sailbot_speed, ais_ship) + cog_rad = np.radians(ais_ship.cog.heading) - assert boat1._calculate_projected_distance() == pytest.approx( + assert boat1._calculate_projected_distance(cog_rad) == pytest.approx( 0.0 ), "incorrect projected distance" @@ -357,7 +358,8 @@ def test_calculate_projected_distance_diff_loc( target_ship = Boat( reference_point_latlon, sailbot_position_latlon, sailbot_speed_kmph, ais_ship ) - target_ship_dist_km = target_ship._calculate_projected_distance() + cog_rad = np.radians(ais_ship.cog.heading) + target_ship_dist_km = target_ship._calculate_projected_distance(cog_rad) travel_time_hr = target_ship_dist_km / ais_ship.sog.speed target_ship_speed_x_kmph = np.sin(np.radians(ais_ship.cog.heading)) * ais_ship.sog.speed target_ship_speed_y_kmph = np.cos(np.radians(ais_ship.cog.heading)) * ais_ship.sog.speed @@ -381,7 +383,7 @@ def test_calculate_projected_distance_diff_loc( ) -# Test collision zone is created successfully +# Test collision zone for straight line case @pytest.mark.parametrize( "reference_point,sailbot_position,ais_ship,sailbot_speed", [ @@ -401,7 +403,7 @@ def test_calculate_projected_distance_diff_loc( ) ], ) -def test_collision_zone_boat( +def test_collision_zone_straight_line_boat( reference_point: HelperLatLon, sailbot_position: HelperLatLon, ais_ship: HelperAISShip, @@ -411,32 +413,131 @@ def test_collision_zone_boat( boat1.update_collision_zone() assert isinstance(boat1.collision_zone, Polygon) - if boat1.collision_zone is not None: - assert boat1.collision_zone.exterior.coords is not None + assert boat1.collision_zone is not None + assert boat1.collision_zone.exterior.coords is not None -# Test collision zone is positioned correctly -# ais_ship is positioned at the reference point +# Test collision zone for no collision case @pytest.mark.parametrize( "reference_point,sailbot_position,ais_ship,sailbot_speed", [ ( - HelperLatLon(latitude=52.0, longitude=-136.0), + HelperLatLon(latitude=52.268119490007756, longitude=-136.9133983613776), + HelperLatLon(latitude=51.957, longitude=-136.262), + HelperAISShip( + id=1, + lat_lon=HelperLatLon(latitude=51.957, longitude=-136.262), # Same as sailbot + cog=HelperHeading(heading=30.0), + sog=HelperSpeed(speed=20.0), + width=HelperDimension(dimension=20.0), + length=HelperDimension(dimension=100.0), + rot=HelperROT(rot=0), + ), + 15.0, + ) + ], +) +def test_collision_zone_no_collision_boat( + reference_point: HelperLatLon, + sailbot_position: HelperLatLon, + ais_ship: HelperAISShip, + sailbot_speed: float, +): + boat1 = Boat(reference_point, sailbot_position, sailbot_speed, ais_ship) + boat1.update_collision_zone() + + assert isinstance(boat1.collision_zone, Polygon) + assert boat1.collision_zone is not None + # Circular zone should have many vertices + assert len(boat1.collision_zone.exterior.coords) > 10 + + +# Test collision zone for turning motion case +@pytest.mark.parametrize( + "reference_point,sailbot_position,ais_ship,sailbot_speed", + [ + ( + HelperLatLon(latitude=52.268119490007756, longitude=-136.9133983613776), + HelperLatLon(latitude=51.95785651405779, longitude=-136.26282894969611), + HelperAISShip( + id=1, + lat_lon=HelperLatLon(latitude=51.97917631092298, longitude=-137.1106454702385), + cog=HelperHeading(heading=30.0), + sog=HelperSpeed(speed=20.0), + width=HelperDimension(dimension=20.0), + length=HelperDimension(dimension=100.0), + rot=HelperROT(rot=20), + ), + 15.0, + ), + ( + HelperLatLon(latitude=52.268119490007756, longitude=-136.9133983613776), HelperLatLon(latitude=51.95785651405779, longitude=-136.26282894969611), + HelperAISShip( + id=1, + lat_lon=HelperLatLon(latitude=51.97917631092298, longitude=-137.1106454702385), + cog=HelperHeading(heading=30.0), + sog=HelperSpeed(speed=20.0), + width=HelperDimension(dimension=20.0), + length=HelperDimension(dimension=100.0), + rot=HelperROT(rot=-20), + ), + 15.0, + ), + ], +) +def test_collision_zone_turning_boat( + reference_point: HelperLatLon, + sailbot_position: HelperLatLon, + ais_ship: HelperAISShip, + sailbot_speed: float, +): + boat1 = Boat(reference_point, sailbot_position, sailbot_speed, ais_ship) + boat1.update_collision_zone() + + assert isinstance(boat1.collision_zone, Polygon) + assert boat1.collision_zone is not None + assert boat1.collision_zone.exterior.coords is not None + + +# COLLISION ZONE LOGIC + + +# Test collision zone positioning for straight-line motion +@pytest.mark.parametrize( + "reference_point,sailbot_position,ais_ship,sailbot_speed", + [ + ( + HelperLatLon(latitude=52.0, longitude=-136.0), + HelperLatLon(latitude=52.01, longitude=-136.0), HelperAISShip( id=1, lat_lon=HelperLatLon(latitude=52.0, longitude=-136.0), - cog=HelperHeading(heading=0.0), + cog=HelperHeading(heading=0.0), # north sog=HelperSpeed(speed=20.0), width=HelperDimension(dimension=20.0), length=HelperDimension(dimension=100.0), rot=HelperROT(rot=0), ), 15.0, - ) + ), + ( + HelperLatLon(latitude=52.0, longitude=-136.0), + HelperLatLon(latitude=52.0, longitude=-135.99), + HelperAISShip( + id=1, + lat_lon=HelperLatLon(latitude=52.0, longitude=-136.0), + cog=HelperHeading(heading=90.0), + sog=HelperSpeed(speed=20.0), + width=HelperDimension(dimension=20.0), + length=HelperDimension(dimension=100.0), + rot=HelperROT(rot=0), + ), + 15.0, + ), ], ) -def test_position_collision_zone_boat( +def test_straight_line_collision_zone_geometry( reference_point: HelperLatLon, sailbot_position: HelperLatLon, ais_ship: HelperAISShip, @@ -444,18 +545,118 @@ def test_position_collision_zone_boat( ): boat1 = Boat(reference_point, sailbot_position, sailbot_speed, ais_ship) - if boat1.collision_zone is not None: - unbuffered = boat1.collision_zone.buffer(-BOAT_BUFFER, join_style=2) - x, y = np.array(unbuffered.exterior.coords.xy) - x = np.array(x) - y = np.array(y) - assert (x[0] + cs.meters_to_km(boat1.ais_ship.width.dimension) / 2) == pytest.approx( - 0, abs=0.001 + if boat1._raw_collision_zone is not None: + unbuffered = boat1._raw_collision_zone + + stern_x = -cs.meters_to_km(boat1.ais_ship.width.dimension) / 2 + stern_y = -cs.meters_to_km(boat1.ais_ship.length.dimension) / 2 + + dx, dy = cs.latlon_to_xy(reference_point, ais_ship.lat_lon) + angle = np.radians(-ais_ship.cog.heading) + cos_t, sin_t = np.cos(angle), np.sin(angle) + + stern_world = np.array( + [stern_x * cos_t - stern_y * sin_t + dx, stern_x * sin_t + stern_y * cos_t + dy] ) - assert (y[0] + cs.meters_to_km(boat1.ais_ship.length.dimension) / 2) == pytest.approx( - 0, abs=0.001 + + coords = np.array(unbuffered.exterior.coords) + dists = np.linalg.norm(coords - stern_world, axis=1) + closest = coords[np.argmin(dists)] + + assert np.allclose(closest, stern_world, atol=0.001) + + +# Test collision zone positioning for turning motion +@pytest.mark.parametrize( + "reference_point,sailbot_position,ais_ship,sailbot_speed", + [ + ( + HelperLatLon(latitude=52.0, longitude=-136.0), + HelperLatLon(latitude=52.002, longitude=-135.998), # ahead + right + HelperAISShip( + id=1, + lat_lon=HelperLatLon(latitude=52.0, longitude=-136.0), + cog=HelperHeading(heading=0.0), + sog=HelperSpeed(speed=20.0), + width=HelperDimension(dimension=20.0), + length=HelperDimension(dimension=100.0), + rot=HelperROT(rot=30), # turning right + ), + 15.0, + ), + ], +) +def test_turning_collision_zone_geometry( + reference_point: HelperLatLon, + sailbot_position: HelperLatLon, + ais_ship: HelperAISShip, + sailbot_speed: float, +): + boat = Boat(reference_point, sailbot_position, sailbot_speed, ais_ship) + + if boat._raw_collision_zone is not None: + + unbuffered = boat._raw_collision_zone + coords = np.array(unbuffered.exterior.coords[:-1]) + assert coords.shape == (3, 2) + + bow_y = cs.meters_to_km(ais_ship.length.dimension) / 2 + cog_rad = np.radians(ais_ship.cog.heading) + + projected_distance = boat._calculate_projected_distance(cog_rad) + + rot = ais_ship.rot.rot + rot_rps = cs.rot_to_rad_per_sec(rot) + dt = 5.0 + delta_heading = rot_rps * dt + future_cog_rad = cog_rad + delta_heading + + # Compute future boat position + speed_kmps = ais_ship.sog.speed / 3600.0 + R = speed_kmps / abs(rot_rps) + x, y = cs.latlon_to_xy(reference_point, ais_ship.lat_lon) + + cx = x - R * np.sin(cog_rad) * np.sign(rot_rps) + cy = y + R * np.cos(cog_rad) * np.sign(rot_rps) + + future_x = cx + R * np.sin(cog_rad + delta_heading) + future_y = cy + R * np.cos(cog_rad + delta_heading) + + future_projected_distance = boat._calculate_projected_distance( + future_cog_rad, position_override=(future_x, future_y) ) + A = np.array([0.0, bow_y]) + B = np.array([0.0, bow_y + projected_distance]) + C = np.array( + [ + future_projected_distance * np.sin(delta_heading), + bow_y + future_projected_distance * np.cos(delta_heading), + ] + ) + + expected_local = np.vstack([A, B, C]) + + dx, dy = cs.latlon_to_xy(reference_point, ais_ship.lat_lon) + angle_rad = np.radians(-ais_ship.cog.heading) + sin_t = np.sin(angle_rad) + cos_t = np.cos(angle_rad) + + T = np.array( + [ + [cos_t, -sin_t], + [sin_t, cos_t], + ] + ) + + rotated = np.matmul(expected_local, T.T) + translation = np.array([dx, dy]) + expected_world = rotated + translation + coords_sorted = coords[np.lexsort((coords[:, 0], coords[:, 1]))] + expected_sorted = expected_world[np.lexsort((expected_world[:, 0], expected_world[:, 1]))] + + assert np.allclose(coords_sorted, expected_sorted, atol=1e-6) + # Test create collision zone raises error when id of passed ais_ship does not match self's id @pytest.mark.parametrize(