Skip to content
Open
41 changes: 41 additions & 0 deletions src/local_pathfinding/local_pathfinding/local_path.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""The path to the next global waypoint, represented by the LocalPath class."""

import math
from collections import deque
from typing import List, Optional

import custom_interfaces.msg as ci
Expand All @@ -14,6 +15,7 @@

WIND_SPEED_CHANGE_THRESH_PROP = 0.3
WIND_DIRECTION_CHANGE_THRESH_DEG = 10
WIND_HISTORY_LEN = 30
LOCAL_WAYPOINT_REACHED_THRESH_KM = 0.5
HEADING_WEIGHT = 0.6
COST_WEIGHT = 0.4
Expand Down Expand Up @@ -104,13 +106,19 @@ class LocalPath:
path (Path): Collection of coordinates that form the local path to the next
global waypoint.
state (LocalPathState): the current local path state.
wind_history (List[wcs.Wind]): History of wind sensor readings
(Queue with max length WIND_HISTORY_LEN).
wind_average (Optional[wcs.Wind]): Average of the wind history, used for path planning.
Updated every time a new wind sensor reading is added to wind_history.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make it clear here that the direction for this wind object is in degrees.

"""

def __init__(self, parent_logger: RcutilsLogger):
self._logger = parent_logger.get_child(name="local_path")
self._ompl_path: Optional[OMPLPath] = None
self.path: Optional[ci.Path] = None
self.state: Optional[LocalPathState] = None
self.wind_history: deque = deque(maxlen=WIND_HISTORY_LEN)
self.wind_average: Optional[wcs.Wind] = None

@staticmethod
def calculate_desired_heading_and_wp_index(
Expand Down Expand Up @@ -362,3 +370,36 @@ def _update(self, ompl_path: OMPLPath):

self._ompl_path = ompl_path
self.path = self._ompl_path.get_path()

def update_wind_history(self, current_wind: wcs.Wind):
"""Updates wind history and recalculates the average wind. The wind values are all
apparent wind.

Maintains a history of up to WIND_HISTORY_LEN wind readings. When the history
exceeds the max length, the oldest reading is removed.

Args:
current_wind (wcs.Wind): Current wind speed (kmph) and direction (deg)
"""

self.wind_history.append(current_wind)

# Recalculate average wind from history once minimum wind readings reached
if len(self.wind_history) == WIND_HISTORY_LEN:
self.wind_average = self._calculate_average_wind()

def _calculate_average_wind(self) -> Optional[wcs.Wind]:
"""Calculates the average apparent wind from the wind history once the deque is full.

Returns:
Optional[wcs.Wind]: Average wind object, or None if history is empty.
"""
avg_speed = sum(wind.speed_kmph for wind in self.wind_history) / len(self.wind_history)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Return none if wind_history is empty before this line or it may cause an error.


# Use circular mean to handle wrap-around
sin_sum = sum(math.sin(math.radians(wind.dir_deg)) for wind in self.wind_history)
cos_sum = sum(math.cos(math.radians(wind.dir_deg)) for wind in self.wind_history)
avg_direction = math.degrees(math.atan2(sin_sum, cos_sum))
avg_direction = cs.bound_to_180(avg_direction)

return wcs.Wind(avg_speed, avg_direction)
142 changes: 139 additions & 3 deletions src/local_pathfinding/test/test_local_path.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,12 +249,148 @@ def test_in_collision_zone(local_wp_index, reference_latlon, path, obstacles, re
),
],
)
def test_is_significant_wind_change(new_tw_data,
previous_tw_data,
result):
def test_is_significant_wind_change(new_tw_data, previous_tw_data, result):
assert PATH.is_significant_wind_change(new_tw_data, previous_tw_data) == result


@pytest.mark.parametrize(
"wind_readings, expected_length",
[
# Single reading
([Wind(speed_kmph=10.0, dir_deg=45.0)], 1),
# Multiple readings below max
([Wind(speed_kmph=10.0 + i, dir_deg=45.0) for i in range(5)], 5),
# Exactly at max length
(
[Wind(speed_kmph=10.0, dir_deg=45.0) for _ in range(lp.WIND_HISTORY_LEN)],
lp.WIND_HISTORY_LEN,
),
# Exceeding max length (should stay at max)
(
[Wind(speed_kmph=10.0, dir_deg=45.0) for _ in range(lp.WIND_HISTORY_LEN + 5)],
lp.WIND_HISTORY_LEN,
),
],
)
def test_update_wind_history_length(wind_readings, expected_length):
"""Test that wind history respects max length constraint."""
PATH.wind_history.clear()

for wind in wind_readings:
PATH.update_wind_history(wind)

assert len(PATH.wind_history) == expected_length


def test_wind_history_fifo_order():
"""Test that oldest wind readings are removed first."""
# Clear history for this test
local_path = lp.LocalPath(parent_logger=RcutilsLogger())

for i in range(lp.WIND_HISTORY_LEN + 3):
wind = Wind(speed_kmph=10.0 + i, dir_deg=45.0 + i)
local_path.update_wind_history(wind)

# Should contain the last WIND_HISTORY_LEN readings
# First reading should have speed 10.0 + 3.0 (index 3 of original sequence)
assert local_path.wind_history[0].speed_kmph == 13.0
# Last reading should have speed 10.0 + 2.0 + WIND_HISTORY_LEN
assert local_path.wind_history[-1].speed_kmph == 12.0 + lp.WIND_HISTORY_LEN


@pytest.mark.parametrize(
"wind_readings, result",
[
# No readings (No average)
(
[],
None,
),
# Single reading (No average)
(
[Wind(speed_kmph=15.0, dir_deg=90.0)],
None,
),
# Same speed and direction
(
[Wind(speed_kmph=10.0, dir_deg=45.0) for _ in range(lp.WIND_HISTORY_LEN)],
Wind(speed_kmph=10.0, dir_deg=45.0),
),
# Different speeds, same direction
(
[Wind(speed_kmph=0, dir_deg=0) for _ in range(lp.WIND_HISTORY_LEN - 3)] +
[
Wind(speed_kmph=10.0, dir_deg=0.0),
Wind(speed_kmph=20.0, dir_deg=0.0),
Wind(speed_kmph=30.0, dir_deg=0.0),
],
Wind(speed_kmph=60.0/lp.WIND_HISTORY_LEN, dir_deg=0.0),
),
# Same speed, opposite directions
(
[Wind(speed_kmph=10.0, dir_deg=0.0) for _ in range(lp.WIND_HISTORY_LEN // 2)] +
[Wind(speed_kmph=10.0, dir_deg=180.0) for _ in range(lp.WIND_HISTORY_LEN // 2)],
Wind(speed_kmph=10.0, dir_deg=90.0),
),
# Circular mean of Angles, same speed
(
[Wind(speed_kmph=0, dir_deg=180) for _ in range(lp.WIND_HISTORY_LEN - 2)] +
[
Wind(speed_kmph=10.0, dir_deg=175.0),
Wind(speed_kmph=10.0, dir_deg=-175.0),
],
Wind(speed_kmph=20.0/lp.WIND_HISTORY_LEN, dir_deg=180.0),
),
],
)
def test_calculate_average_wind(wind_readings, result):
"""Test average wind calculation with basic cases."""
local_path = lp.LocalPath(parent_logger=RcutilsLogger())

for wind in wind_readings:
local_path.update_wind_history(wind)

avg = local_path.wind_average
if result is None:
assert avg is None
else:
assert avg.speed_kmph == result.speed_kmph
assert avg.dir_deg == result.dir_deg


def test_wind_average_not_set_before_full_history():
"""Test that wind_average isn't set until we have WIND_HISTORY_LEN wind readings."""
local_path = lp.LocalPath(parent_logger=RcutilsLogger())

for _ in range(lp.WIND_HISTORY_LEN - 1):
wind = Wind(speed_kmph=10.0, dir_deg=45.0)
local_path.update_wind_history(wind)

assert local_path.wind_average is None


def test_wind_average_updates_with_new_readings():
"""Test that wind_average updates when new readings are added."""
local_path = lp.LocalPath(parent_logger=RcutilsLogger())

# Fill history with initial readings
for _ in range(lp.WIND_HISTORY_LEN):
wind = Wind(speed_kmph=10.0, dir_deg=45.0)
local_path.update_wind_history(wind)

first_avg = local_path.wind_average

# Add a different wind reading (oldest will be removed from deque)
new_wind = Wind(speed_kmph=30.0, dir_deg=45.0)
local_path.update_wind_history(new_wind)
second_avg = local_path.wind_average

# Average should increase since we're replacing a 10.0 with a 30.0
assert second_avg is not None
assert first_avg is not None
assert second_avg.speed_kmph > first_avg.speed_kmph


def test_LocalPathState_parameter_checking():
with pytest.raises(ValueError):
lps = (
Expand Down