-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpam_ball_trajectories.py
More file actions
executable file
·204 lines (163 loc) · 6.53 KB
/
pam_ball_trajectories.py
File metadata and controls
executable file
·204 lines (163 loc) · 6.53 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
#!/usr/bin/env python3
"""
For manipulating a ball trajectory hdf5 file:
- adding trajectories to it (via json or tennicam files)
- for deleting trajectories
- for getting info about the file
- for translating all the points of a group of trajectories.
"""
import sys
import argparse
import logging
import pathlib
import typing
import numpy as np
import context.ball_trajectories as bt
def _info_group(rbt: bt.RecordedBallTrajectories, group_name: str):
indexes = rbt.get_indexes(group_name)
print("\ngroup: {} , found {} trajectories".format(group_name, len(indexes)))
for index in sorted(indexes):
stamps, positions = rbt.get_stamped_trajectory(group_name, index)
nb_points = len(stamps)
duration = (stamps[-1] - stamps[0]) * 1e-6
first_position = str("{:.2f} " * 3).format(*list(positions[0, :]))
print(
"\tindex: {}\t {} points\t{:.2f} seconds ( first: {})".format(
index, nb_points, duration, first_position
)
)
print()
def _info_whole_file(rbt: bt.RecordedBallTrajectories):
groups = rbt.get_groups()
print()
for group in groups:
print("group: {} , {} trajectories".format(group, len(rbt.get_indexes(group))))
print()
def _info(hdf5_path: pathlib.Path, group_name: str = None):
with bt.RecordedBallTrajectories(hdf5_path) as rbt:
print("\nhdf5 trajectories file: {}".format(hdf5_path))
if group_name:
_info_group(rbt, group_name)
else:
_info_whole_file(rbt)
def _add_json(hdf5_path: pathlib.Path, group_name: str, sampling: int):
logging.info("recording trajectories in {}".format(hdf5_path))
with bt.MutableRecordedBallTrajectories(path=hdf5_path) as rbt:
if group_name in rbt.get_groups():
raise ValueError("group {} already present in the file")
nb_added = rbt.add_json_trajectories(group_name, pathlib.Path.cwd(), sampling)
logging.info("added {} trajectories".format(nb_added))
def _add_tennicam(hdf5_path: pathlib.Path, group_name: str):
logging.info("recording trajectories in {}".format(hdf5_path))
with bt.MutableRecordedBallTrajectories(path=hdf5_path) as rbt:
if group_name in rbt.get_groups():
raise ValueError("group {} already present in the file")
nb_added = rbt.add_tennicam_trajectories(group_name, pathlib.Path.cwd())
logging.info("added {} trajectories".format(nb_added))
def _rm_group(hdf5_path: pathlib.Path, group_name: str):
with bt.MutableRecordedBallTrajectories(path=hdf5_path) as rbt:
rbt.rm_group(group_name)
def _translate(hdf5_path: pathlib.Path, group_name: str, coords: typing.List[float]):
coords = np.array(coords, np.float32)
with bt.RecordedBallTrajectories(path=hdf5_path) as rbt:
for index in rbt.get_indexes(group_name):
stamps, trajectory = rbt.get_stamped_trajectory(
group_name, index, direct=True
)
trajectory += coords
rbt.overwrite(group_name, index, (stamps, trajectory))
def run():
parser = argparse.ArgumentParser()
# the executable will use by default the hdf5 managed by
# pam_configuration, but user has the option to point to
# another file.
parser.add_argument(
"--path",
type=str,
required=False,
help="path to the hdf5 file encapsulating the ball trajectories",
)
# 5 commands supported: info, add-json, add-tennicam,
# rm and translate.
subparser = parser.add_subparsers(dest="command", required=True)
# for displaying info about the hdf5 file
info = subparser.add_parser(
"info",
help="print information about the whole file, or a given group in the file.",
)
info.add_argument(
"--group", type=str, required=False, help="the group of trajectories"
)
# for adding the json files of the current folder
# to a hdf5 trajectory file
add_json = subparser.add_parser(
"add-json",
help="""for saving in a new group all json trajectories present in the current
directory
""",
)
add_json.add_argument(
"--group", type=str, required=True, help="the group of trajectories"
)
add_json.add_argument(
"--sampling-rate-us",
type=int,
required=True,
help="record sampling rate, in microseconds (int)",
)
# for adding the tennicam files of the current folder
# to a hdf5 trajectory file
add_tennicam = subparser.add_parser(
"add-tennicam",
help="for saving in a new group all tennicam present in the current directory",
)
add_tennicam.add_argument(
"--group", type=str, required=True, help="the group of trajectories"
)
# for removing a group from the hdf5 file
rm_group = subparser.add_parser(
"rm", help="for removing trajectories from the file"
)
rm_group.add_argument(
"--group", type=str, required=True, help="the group of trajectories"
)
# for translating all points in all the trajectories of the group
translate = subparser.add_parser(
"translate", help="translate all the positions of all trajectories of the group"
)
translate.add_argument(
"--group", type=str, required=True, help="the group of trajectories"
)
translate.add_argument(
"--coords", type=float, nargs=3, required=True, help="x y z coordinates (float)"
)
# parsing the arguments
args = parser.parse_args()
# all subparsers asks for an optional path. Using this path if provided,
# the default hdf5 path otherwise
if args.path:
hdf5_path = pathlib.Path(args.path)
else:
hdf5_path = bt.RecordedBallTrajectories.get_default_path(create=True)
# checking the file exists
if not pathlib.Path(hdf5_path).is_file():
raise FileNotFoundError("failed to find the file {}".format(hdf5_path))
# going ahead based on the arguments
if args.command == "info":
_info(hdf5_path, args.group)
elif args.command == "add-json":
_add_json(hdf5_path, args.group, args.sampling_rate_us)
elif args.command == "add-tennicam":
_add_tennicam(hdf5_path, args.group)
elif args.command == "rm":
_rm_group(hdf5_path, args.group)
elif args.command == "translate":
_translate(hdf5_path, args.group, args.coords)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
# try:
run()
# except Exception as e:
# logging.error("failed with error: {}".format(e))
# sys.exit(1)
sys.exit(0)