-
Notifications
You must be signed in to change notification settings - Fork 0
/
scenario_manager.py
231 lines (181 loc) · 7.24 KB
/
scenario_manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
#!/usr/bin/env python
# Copyright (c) 2018-2020 Intel Corporation
#
# This work is licensed under the terms of the MIT license.
# For a copy, see <https://opensource.org/licenses/MIT>.
"""
This module provides the ScenarioManager implementation.
It must not be modified and is for reference only!
"""
from __future__ import print_function
import sys
import time
import py_trees
from srunner.autoagents.agent_wrapper import AgentWrapper
from srunner.scenariomanager.carla_data_provider import CarlaDataProvider
from srunner.scenariomanager.result_writer import ResultOutputProvider
from srunner.scenariomanager.timer import GameTime
from srunner.scenariomanager.watchdog import Watchdog
class ScenarioManager(object):
"""
Basic scenario manager class. This class holds all functionality
required to start, and analyze a scenario.
The user must not modify this class.
To use the ScenarioManager:
1. Create an object via manager = ScenarioManager()
2. Load a scenario via manager.load_scenario()
3. Trigger the execution of the scenario manager.run_scenario()
This function is designed to explicitly control start and end of
the scenario execution
4. Trigger a result evaluation with manager.analyze_scenario()
5. If needed, cleanup with manager.stop_scenario()
"""
def __init__(self, debug_mode=False, sync_mode=False, timeout=2.0):
"""
Setups up the parameters, which will be filled at load_scenario()
"""
self.scenario = None
self.scenario_tree = None
self.scenario_class = None
self.ego_vehicles = None
self.other_actors = None
self._debug_mode = debug_mode
self._agent = None
self._sync_mode = sync_mode
self._running = False
self._timestamp_last_run = 0.0
self._timeout = timeout
self._watchdog = Watchdog(float(self._timeout))
self.scenario_duration_system = 0.0
self.scenario_duration_game = 0.0
self.start_system_time = None
self.end_system_time = None
def _reset(self):
"""
Reset all parameters
"""
self._running = False
self._timestamp_last_run = 0.0
self.scenario_duration_system = 0.0
self.scenario_duration_game = 0.0
self.start_system_time = None
self.end_system_time = None
GameTime.restart()
def cleanup(self):
"""
This function triggers a proper termination of a scenario
"""
if self.scenario is not None:
self.scenario.terminate()
if self._agent is not None:
self._agent.cleanup()
self._agent = None
CarlaDataProvider.cleanup()
def load_scenario(self, scenario, agent=None):
"""
Load a new scenario
"""
self._reset()
self._agent = AgentWrapper(agent) if agent else None
if self._agent is not None:
self._sync_mode = True
self.scenario_class = scenario
self.scenario = scenario.scenario
self.scenario_tree = self.scenario.scenario_tree
self.ego_vehicles = scenario.ego_vehicles
self.other_actors = scenario.other_actors
# To print the scenario tree uncomment the next line
# py_trees.display.render_dot_tree(self.scenario_tree)
if self._agent is not None:
self._agent.setup_sensors(self.ego_vehicles[0], self._debug_mode)
def run_scenario(self):
"""
Trigger the start of the scenario and wait for it to finish/fail
"""
print("ScenarioManager: Running scenario {}".format(self.scenario_tree.name))
self.start_system_time = time.time()
start_game_time = GameTime.get_time()
self._watchdog.start()
self._running = True
while self._running:
timestamp = None
world = CarlaDataProvider.get_world()
if world:
snapshot = world.get_snapshot()
if snapshot:
timestamp = snapshot.timestamp
if timestamp:
self._tick_scenario(timestamp)
self._watchdog.stop()
self.cleanup()
self.end_system_time = time.time()
end_game_time = GameTime.get_time()
self.scenario_duration_system = self.end_system_time - \
self.start_system_time
self.scenario_duration_game = end_game_time - start_game_time
if self.scenario_tree.status == py_trees.common.Status.FAILURE:
print("ScenarioManager: Terminated due to failure")
def _tick_scenario(self, timestamp):
"""
Run next tick of scenario and the agent.
If running synchornously, it also handles the ticking of the world.
"""
if self._timestamp_last_run < timestamp.elapsed_seconds and self._running:
self._timestamp_last_run = timestamp.elapsed_seconds
self._watchdog.update()
if self._debug_mode:
print("\n--------- Tick ---------\n")
# Update game time and actor information
GameTime.on_carla_tick(timestamp)
CarlaDataProvider.on_carla_tick()
if self._agent is not None:
ego_action = self._agent()
# Tick scenario
self.scenario_tree.tick_once()
if self._debug_mode:
print("\n")
py_trees.display.print_ascii_tree(self.scenario_tree, show_status=True)
sys.stdout.flush()
if self.scenario_tree.status != py_trees.common.Status.RUNNING:
self._running = False
if self._agent is not None:
self.ego_vehicles[0].apply_control(ego_action)
if self._sync_mode and self._running and self._watchdog.get_status():
CarlaDataProvider.get_world().tick()
def get_running_status(self):
"""
returns:
bool: False if watchdog exception occured, True otherwise
"""
return self._watchdog.get_status()
def stop_scenario(self):
"""
This function is used by the overall signal handler to terminate the scenario execution
"""
self._running = False
def analyze_scenario(self, stdout, filename, junit):
"""
This function is intended to be called from outside and provide
the final statistics about the scenario (human-readable, in form of a junit
report, etc.)
"""
failure = False
timeout = False
result = "SUCCESS"
if self.scenario.test_criteria is None:
print("Nothing to analyze, this scenario has no criteria")
return True
for criterion in self.scenario.get_criteria():
if (not criterion.optional and
criterion.test_status != "SUCCESS" and
criterion.test_status != "ACCEPTABLE"):
failure = True
result = "FAILURE"
elif criterion.test_status == "ACCEPTABLE":
result = "ACCEPTABLE"
if self.scenario.timeout_node.timeout and not failure:
timeout = True
result = "TIMEOUT"
output = ResultOutputProvider(self, result, stdout, filename, junit)
output.write()
return failure or timeout