Skip to content

Ball velocity filtering and small bug fix #21

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bin/pam_ball_trajectories.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def _add_tennicam(hdf5_path: pathlib.Path, group_name: str):
with bt.MutableRecordedBallTrajectories(path=hdf5_path) as rbt:
if group_name in rbt.get_groups():
raise ValueError("group {} already present in the file")
nb_added = rbt.add_tennicam_trajectories(group_name, pathlib.Path.cwd())
nb_added = rbt.add_tennicam_trajectories(group_name, pathlib.Path.cwd())
logging.info("added {} trajectories".format(nb_added))


Expand Down
96 changes: 88 additions & 8 deletions python/context/ball_trajectories.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from __future__ import annotations
import typing
import nptyping as npt
from typing import Union, Tuple


import random
import math
Expand All @@ -17,7 +19,8 @@

import o80
import pam_configuration
import tennicam_client

from context import LowPassFilter


assert int(npt.__version__[0]) >= 2, "Need nptyping >=2."
Expand Down Expand Up @@ -83,7 +86,7 @@ def to_stamped_trajectory(input: DurationTrajectory) -> StampedTrajectory:
return stamps, positions


def to_duration_trajectory(input: StampedTrajectory) -> DurationTrajectory:
def to_duration_trajectory(input: StampedTrajectory, vel_filter_window_size: int = 1) -> DurationTrajectory:
"""
Converts a StampedTrajectories to a DurationTrajectory.
The velocities are estimated by performing finite differences.
Expand All @@ -92,6 +95,12 @@ def to_duration_trajectory(input: StampedTrajectory) -> DurationTrajectory:
dp = np.diff(input[1], axis=0)
velocities = (dp.T / (dt * 1e-6)).T
positions = input[1][:-1, :]

#velocity filtering
if vel_filter_window_size > 1:
filter_lp = [LowPassFilter(vel_filter_window_size), LowPassFilter(vel_filter_window_size), LowPassFilter(vel_filter_window_size)]
velocities = [[filter_lp[i].get(v[i]) for i in range(3)] for v in velocities]

return dt, positions, velocities


Expand Down Expand Up @@ -290,6 +299,8 @@ def add_tennicam_trajectories(
The number of trajectories added to the file.
"""

import tennicam_client

def _read_trajectory(tennicam_file: pathlib.Path) -> StampedTrajectory:
"""
Parse the file and returned the corresponding
Expand Down Expand Up @@ -475,12 +486,60 @@ def get_trajectory(self, index: int) -> StampedTrajectory:
"""
return self._data[index]

def random_trajectory(self) -> StampedTrajectory:
def get_trajectory_with_translation(
self, index: int, translation: typing.Sequence[float]
) -> StampedTrajectory:
"""
Returns the trajectory at the requested index, translated
by the provided translation vector.
"""
if len(translation) != 3:
raise ValueError("Translation must be a 3d vector")
trajectory = self._data[index]
positions = trajectory[1] + np.array(translation, np.float32)
return trajectory[0], positions

def get_trajectory_with_random_translation(
self, index: int, translation_range: typing.Sequence[float]
) -> StampedTrajectory:
"""
Returns the trajectory at the requested index, translated
by a random vector in the provided range.
The range is expected to be a 3d vector, i.e. [x_min, x_max, y_min, y_max, z_min, z_max].
"""
if len(translation_range) != 6:
raise ValueError("Translation range must be a 6d vector")
translation = [
random.uniform(translation_range[i*2], translation_range[i*2 + 1])
for i in range(3)
]
return self.get_trajectory_with_translation(index, translation)

def random_trajectory(self, return_index: bool = False) -> Union[StampedTrajectory, Tuple[StampedTrajectory, int]]:
"""
Returns one of the trajectory, randomly selected.
If return_index is True, also returns the selected index.
"""
index = random.choice(list(range(len(self._data.keys()))))
return self._data[index]
trajectory = self._data[index]

return (trajectory, index) if return_index else trajectory

def random_trajectory_with_random_translation(
self, translation_range: typing.Sequence[float], return_index: bool = False
) -> Union[StampedTrajectory, Tuple[StampedTrajectory, int]]:
"""
Returns one of the trajectory, randomly selected,
translated by a random vector in the provided range.
The range is expected to be a 3d vector, i.e. [x_min, x_max, y_min, y_max, z_min, z_max].
If return_index is True, also returns the selected index.
"""
index = random.choice(list(range(len(self._data.keys()))))
translated_trajectory = self.get_trajectory_with_random_translation(
index, translation_range
)

return (translated_trajectory, index) if return_index else translated_trajectory

def get_different_random_trajectories(
self, nb_trajectories: int
Expand All @@ -498,23 +557,44 @@ def get_different_random_trajectories(
random.shuffle(indexes)
return [self._data[index] for index in indexes[:nb_trajectories]]

def get_different_random_trajectories_with_random_translation(
self, nb_trajectories: int, translation_range: typing.Sequence[float]
) -> StampedTrajectories:
"""
Returns a list of trajectories, randomly
ordered and selected, translated by a random vector
in the provided range.
The range is expected to be a 3d vector, i.e. [x_min, x_max, y_min, y_max, z_min, z_max].
"""
if nb_trajectories > self.size():
raise ValueError(
"BallTrajectories: only {} trajectories "
"available ({} requested)".format(self.size(), nb_trajectories)
)
indexes = list(self._data.keys())
random.shuffle(indexes)
return [
self.get_trajectory_with_random_translation(index, translation_range)
for index in indexes[:nb_trajectories]
]

@staticmethod
def to_duration(input: StampedTrajectory) -> DurationTrajectory:
def to_duration(input: StampedTrajectory, vel_filter_window_size: int = 1) -> DurationTrajectory:
"""
Returns a corresponding duration trajectory
"""
return to_duration_trajectory(input)
return to_duration_trajectory(input, vel_filter_window_size)

@classmethod
def iterate(
cls, input: StampedTrajectory
cls, input: StampedTrajectory, vel_filter_window_size: int = 1
) -> typing.Generator[DurationPoint, None, None]:
"""
Generator over the trajectory.
Yields tuples (duration in microseconds, state), state having
a position and a velocity attribute.
"""
durations, positions, velocities = cls.to_duration(input)
durations, positions, velocities = cls.to_duration(input, vel_filter_window_size)
for d, p, v in zip(durations, positions, velocities):
yield d, o80.Item3dState(p, v)
return
Expand Down
Loading