diff --git a/src/local_pathfinding/local_pathfinding/local_path.py b/src/local_pathfinding/local_pathfinding/local_path.py index 0cb892d3a..320b77f64 100644 --- a/src/local_pathfinding/local_pathfinding/local_path.py +++ b/src/local_pathfinding/local_pathfinding/local_path.py @@ -1,6 +1,7 @@ """The path to the next global waypoint, represented by the LocalPath class.""" import math +from collections import deque from typing import List, Optional import custom_interfaces.msg as ci @@ -14,6 +15,7 @@ WIND_SPEED_CHANGE_THRESH_PROP = 0.3 WIND_DIRECTION_CHANGE_THRESH_DEG = 10 +WIND_HISTORY_LEN = 30 LOCAL_WAYPOINT_REACHED_THRESH_KM = 0.5 HEADING_WEIGHT = 0.6 COST_WEIGHT = 0.4 @@ -104,6 +106,11 @@ class LocalPath: path (Path): Collection of coordinates that form the local path to the next global waypoint. state (LocalPathState): the current local path state. + wind_history (List[wcs.Wind]): History of wind sensor readings + (Queue with max length WIND_HISTORY_LEN). + wind_average (Optional[wcs.Wind]): Average of the wind history, used for path planning. + Speed is in kmph, direction in degrees. Updated every time a new wind sensor reading is + added to wind_history. """ def __init__(self, parent_logger: RcutilsLogger): @@ -111,6 +118,8 @@ def __init__(self, parent_logger: RcutilsLogger): self._ompl_path: Optional[OMPLPath] = None self.path: Optional[ci.Path] = None self.state: Optional[LocalPathState] = None + self.wind_history: deque = deque(maxlen=WIND_HISTORY_LEN) + self.wind_average: Optional[wcs.Wind] = None @staticmethod def calculate_desired_heading_and_wp_index( @@ -362,3 +371,42 @@ def _update(self, ompl_path: OMPLPath): self._ompl_path = ompl_path self.path = self._ompl_path.get_path() + + def update_wind_history(self, current_wind: wcs.Wind): + """Updates wind history and recalculates the average wind. The wind values are all + apparent wind. + + Maintains a history of up to WIND_HISTORY_LEN wind readings. When the history + exceeds the max length, the oldest reading is removed. + + Args: + current_wind (wcs.Wind): Current wind speed (kmph) and direction (deg) + """ + + self.wind_history.append(current_wind) + + # Recalculate average wind from history once minimum wind readings reached + if len(self.wind_history) == WIND_HISTORY_LEN: + self.wind_average = self._calculate_average_wind() + + def _calculate_average_wind(self) -> Optional[wcs.Wind]: + """Calculates the average apparent wind from the wind history once the deque is full. + + Returns: + Optional[wcs.Wind]: Average wind object, or None if history is empty. + """ + if len(self.wind_history) == 0: + return None + + avg_speed, sin_sum, cos_sum = 0.0, 0.0, 0.0 + + for wind in self.wind_history: + avg_speed += wind.speed_kmph / len(self.wind_history) + # Use circular mean to handle wrap-around + sin_sum += math.sin(math.radians(wind.dir_deg)) + cos_sum += math.cos(math.radians(wind.dir_deg)) + + avg_direction = math.degrees(math.atan2(sin_sum, cos_sum)) + avg_direction = cs.bound_to_180(avg_direction) + + return wcs.Wind(avg_speed, avg_direction) diff --git a/src/local_pathfinding/test/test_local_path.py b/src/local_pathfinding/test/test_local_path.py index 466a77cf4..399292761 100644 --- a/src/local_pathfinding/test/test_local_path.py +++ b/src/local_pathfinding/test/test_local_path.py @@ -249,12 +249,148 @@ def test_in_collision_zone(local_wp_index, reference_latlon, path, obstacles, re ), ], ) -def test_is_significant_wind_change(new_tw_data, - previous_tw_data, - result): +def test_is_significant_wind_change(new_tw_data, previous_tw_data, result): assert PATH.is_significant_wind_change(new_tw_data, previous_tw_data) == result +@pytest.mark.parametrize( + "wind_readings, expected_length", + [ + # Single reading + ([Wind(speed_kmph=10.0, dir_deg=45.0)], 1), + # Multiple readings below max + ([Wind(speed_kmph=10.0 + i, dir_deg=45.0) for i in range(5)], 5), + # Exactly at max length + ( + [Wind(speed_kmph=10.0, dir_deg=45.0) for _ in range(lp.WIND_HISTORY_LEN)], + lp.WIND_HISTORY_LEN, + ), + # Exceeding max length (should stay at max) + ( + [Wind(speed_kmph=10.0, dir_deg=45.0) for _ in range(lp.WIND_HISTORY_LEN + 5)], + lp.WIND_HISTORY_LEN, + ), + ], +) +def test_update_wind_history_length(wind_readings, expected_length): + """Test that wind history respects max length constraint.""" + PATH.wind_history.clear() + + for wind in wind_readings: + PATH.update_wind_history(wind) + + assert len(PATH.wind_history) == expected_length + + +def test_wind_history_fifo_order(): + """Test that oldest wind readings are removed first.""" + # Clear history for this test + local_path = lp.LocalPath(parent_logger=RcutilsLogger()) + + for i in range(lp.WIND_HISTORY_LEN + 3): + wind = Wind(speed_kmph=10.0 + i, dir_deg=45.0 + i) + local_path.update_wind_history(wind) + + # Should contain the last WIND_HISTORY_LEN readings + # First reading should have speed 10.0 + 3.0 (index 3 of original sequence) + assert local_path.wind_history[0].speed_kmph == 13.0 + # Last reading should have speed 10.0 + 2.0 + WIND_HISTORY_LEN + assert local_path.wind_history[-1].speed_kmph == 12.0 + lp.WIND_HISTORY_LEN + + +@pytest.mark.parametrize( + "wind_readings, result", + [ + # No readings (No average) + ( + [], + None, + ), + # Single reading (No average) + ( + [Wind(speed_kmph=15.0, dir_deg=90.0)], + None, + ), + # Same speed and direction + ( + [Wind(speed_kmph=10.0, dir_deg=45.0) for _ in range(lp.WIND_HISTORY_LEN)], + Wind(speed_kmph=10.0, dir_deg=45.0), + ), + # Different speeds, same direction + ( + [Wind(speed_kmph=0, dir_deg=0) for _ in range(lp.WIND_HISTORY_LEN - 3)] + + [ + Wind(speed_kmph=10.0, dir_deg=0.0), + Wind(speed_kmph=20.0, dir_deg=0.0), + Wind(speed_kmph=30.0, dir_deg=0.0), + ], + Wind(speed_kmph=60.0/lp.WIND_HISTORY_LEN, dir_deg=0.0), + ), + # Same speed, opposite directions + ( + [Wind(speed_kmph=10.0, dir_deg=0.0) for _ in range(lp.WIND_HISTORY_LEN // 2)] + + [Wind(speed_kmph=10.0, dir_deg=180.0) for _ in range(lp.WIND_HISTORY_LEN // 2)], + Wind(speed_kmph=10.0, dir_deg=90.0), + ), + # Circular mean of Angles, same speed + ( + [Wind(speed_kmph=0, dir_deg=180) for _ in range(lp.WIND_HISTORY_LEN - 2)] + + [ + Wind(speed_kmph=10.0, dir_deg=175.0), + Wind(speed_kmph=10.0, dir_deg=-175.0), + ], + Wind(speed_kmph=20.0/lp.WIND_HISTORY_LEN, dir_deg=180.0), + ), + ], +) +def test_calculate_average_wind(wind_readings, result): + """Test average wind calculation with basic cases.""" + local_path = lp.LocalPath(parent_logger=RcutilsLogger()) + + for wind in wind_readings: + local_path.update_wind_history(wind) + + avg = local_path.wind_average + if result is None: + assert avg is None + else: + assert avg.speed_kmph == result.speed_kmph + assert avg.dir_deg == result.dir_deg + + +def test_wind_average_not_set_before_full_history(): + """Test that wind_average isn't set until we have WIND_HISTORY_LEN wind readings.""" + local_path = lp.LocalPath(parent_logger=RcutilsLogger()) + + for _ in range(lp.WIND_HISTORY_LEN - 1): + wind = Wind(speed_kmph=10.0, dir_deg=45.0) + local_path.update_wind_history(wind) + + assert local_path.wind_average is None + + +def test_wind_average_updates_with_new_readings(): + """Test that wind_average updates when new readings are added.""" + local_path = lp.LocalPath(parent_logger=RcutilsLogger()) + + # Fill history with initial readings + for _ in range(lp.WIND_HISTORY_LEN): + wind = Wind(speed_kmph=10.0, dir_deg=45.0) + local_path.update_wind_history(wind) + + first_avg = local_path.wind_average + + # Add a different wind reading (oldest will be removed from deque) + new_wind = Wind(speed_kmph=30.0, dir_deg=45.0) + local_path.update_wind_history(new_wind) + second_avg = local_path.wind_average + + # Average should increase since we're replacing a 10.0 with a 30.0 + assert second_avg is not None + assert first_avg is not None + assert second_avg.speed_kmph > first_avg.speed_kmph + + def test_LocalPathState_parameter_checking(): with pytest.raises(ValueError): lps = (