-
Notifications
You must be signed in to change notification settings - Fork 20
/
Copy pathmagicrobot.py
751 lines (573 loc) · 24 KB
/
magicrobot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
import contextlib
import inspect
import logging
import sys
import types
import typing
from typing import Any, Callable, Dict, List, Tuple
import hal
import wpilib
from ntcore import NetworkTableInstance, NetworkTableEntry
# from wpilib.shuffleboard import Shuffleboard
from robotpy_ext.autonomous import AutonomousModeSelector
from robotpy_ext.misc import NotifierDelay
from robotpy_ext.misc.simple_watchdog import SimpleWatchdog
from .inject import get_injection_requests, find_injections
from .magic_tunable import setup_tunables, tunable, collect_feedbacks
from .magic_reset import collect_resets
__all__ = ["MagicRobot"]
class MagicInjectError(ValueError):
pass
class MagicRobot(wpilib.RobotBase):
"""
Robots that use the MagicBot framework should use this as their
base robot class. If you use this as your base, you must
implement the following methods:
- :meth:`createObjects`
- :meth:`teleopPeriodic`
MagicRobot uses the :class:`.AutonomousModeSelector` to allow you
to define multiple autonomous modes and to select one of them via
the SmartDashboard/Shuffleboard.
MagicRobot will set the following NetworkTables variables
automatically:
- ``/robot/mode``: one of 'disabled', 'auto', 'teleop', or 'test'
- ``/robot/is_simulation``: True/False
- ``/robot/is_ds_attached``: True/False
"""
#: Amount of time each loop takes (default is 20ms)
control_loop_wait_time = 0.020
#: Error report interval: when an FMS is attached, how often should
#: uncaught exceptions be reported?
error_report_interval = 0.5
#: A Python logging object that you can use to send messages to the log.
#: It is recommended to use this instead of print statements.
logger = logging.getLogger("robot")
#: If True, teleopPeriodic will be called in autonomous mode
use_teleop_in_autonomous = False
def __init__(self) -> None:
super().__init__()
self._exclude_from_injection = ["logger"]
self.__last_error_report = -10
self._components: List[Tuple[str, Any]] = []
self._feedbacks: List[Tuple[Callable[[], Any], NetworkTableEntry]] = []
self._reset_components: List[Tuple[Dict[str, Any], Any]] = []
self.__done = False
# cache these
self.__is_ds_attached = wpilib.DriverStation.isDSAttached
self.__sd_update = wpilib.SmartDashboard.updateValues
self.__lv_update = wpilib.LiveWindow.updateValues
# self.__sf_update = Shuffleboard.update
def _simulationInit(self) -> None:
pass
def _simulationPeriodic(self) -> None:
pass
def __simulationPeriodic(self) -> None:
hal.simPeriodicBefore()
self._simulationPeriodic()
hal.simPeriodicAfter()
def robotInit(self) -> None:
"""
.. warning:: Internal API, don't override; use :meth:`createObjects` instead
"""
# Create the user's objects and stuff here
self.createObjects()
# Load autonomous modes
self._automodes = AutonomousModeSelector("autonomous")
# Next, create the robot components and wire them together
self._create_components()
# cache these
self.__is_ds_attached = wpilib.DriverStation.isDSAttached
self.__sd_update = wpilib.SmartDashboard.updateValues
self.__lv_update = wpilib.LiveWindow.updateValues
# self.__sf_update = Shuffleboard.update
self.__nt = NetworkTableInstance.getDefault().getTable("/robot")
self.__nt_put_is_ds_attached = self.__nt.getEntry("is_ds_attached").setBoolean
self.__nt_put_mode = self.__nt.getEntry("mode").setString
self.__nt.putBoolean("is_simulation", self.isSimulation())
self.__nt_put_is_ds_attached(self.__is_ds_attached())
self.watchdog = SimpleWatchdog(self.control_loop_wait_time)
self.__periodics: List[Tuple[Callable[[], None], str]] = [
(self.robotPeriodic, "robotPeriodic()"),
]
if self.isSimulation():
self._simulationInit()
self.__periodics.append((self.__simulationPeriodic, "simulationPeriodic()"))
def createObjects(self) -> None:
"""
You should override this and initialize all of your wpilib
objects here (and not in your components, for example). This
serves two purposes:
- It puts all of your motor/sensor initialization in the same
place, so that if you need to change a port/pin number it
makes it really easy to find it. Additionally, if you want
to create a simplified robot program to test a specific
thing, it makes it really easy to copy/paste it elsewhere
- It allows you to use the magic injection mechanism to share
variables between components
.. note:: Do not access your magic components in this function,
as their instances have not been created yet. Do not
create them either.
"""
raise NotImplementedError
def autonomousInit(self) -> None:
"""Initialization code for autonomous mode may go here.
Users may override this method for initialization code which
will be called each time the robot enters autonomous mode,
regardless of the selected autonomous mode.
This can be useful for code that must be run at the beginning of a match.
.. note::
This method is called after every component's ``on_enable`` method,
but before the selected autonomous mode's ``on_enable`` method.
"""
pass
def teleopInit(self) -> None:
"""
Initialization code for teleop control code may go here.
Users may override this method for initialization code which will be
called each time the robot enters teleop mode.
.. note:: The ``on_enable`` functions of all components are called
before this function is called.
"""
pass
def teleopPeriodic(self):
"""
Periodic code for teleop mode should go here.
Users should override this method for code which will be called
periodically at a regular rate while the robot is in teleop mode.
This code executes before the ``execute`` functions of all
components are called.
.. note:: If you want this function to be called in autonomous
mode, set ``use_teleop_in_autonomous`` to True in your
robot class.
"""
func = self.teleopPeriodic.__func__
if not hasattr(func, "firstRun"):
self.logger.warning(
"Default MagicRobot.teleopPeriodic() method... Override me!"
)
func.firstRun = False
def disabledInit(self) -> None:
"""
Initialization code for disabled mode may go here.
Users may override this method for initialization code which will be
called each time the robot enters disabled mode.
.. note:: The ``on_disable`` functions of all components are called
before this function is called.
"""
pass
def disabledPeriodic(self):
"""
Periodic code for disabled mode should go here.
Users should override this method for code which will be called
periodically at a regular rate while the robot is in disabled mode.
This code executes before the ``execute`` functions of all
components are called.
"""
func = self.disabledPeriodic.__func__
if not hasattr(func, "firstRun"):
self.logger.warning(
"Default MagicRobot.disabledPeriodic() method... Override me!"
)
func.firstRun = False
def testInit(self) -> None:
"""Initialization code for test mode should go here.
Users should override this method for initialization code which will be
called each time the robot enters disabled mode.
"""
pass
def testPeriodic(self) -> None:
"""Periodic code for test mode should go here."""
pass
def robotPeriodic(self) -> None:
"""
Periodic code for all modes should go here.
Users must override this method to utilize it
but it is not required.
This function gets called last in each mode.
You may use it for any code you need to run
during all modes of the robot (e.g NetworkTables updates)
The default implementation will update
SmartDashboard, LiveWindow and Shuffleboard.
"""
watchdog = self.watchdog
self.__sd_update()
watchdog.addEpoch("SmartDashboard")
self.__lv_update()
watchdog.addEpoch("LiveWindow")
# self.__sf_update()
# watchdog.addEpoch("Shuffleboard")
def onException(self, forceReport: bool = False) -> None:
"""
This function must *only* be called when an unexpected exception
has occurred that would otherwise crash the robot code. Use this
inside your :meth:`operatorActions` function.
If the FMS is attached (eg, during a real competition match),
this function will return without raising an error. However,
it will try to report one-off errors to the Driver Station so
that it will be recorded in the Driver Station Log Viewer.
Repeated errors may not get logged.
Example usage::
def teleopPeriodic(self):
try:
if self.joystick.getTrigger():
self.shooter.shoot()
except:
self.onException()
try:
if self.joystick.getRawButton(2):
self.ball_intake.run()
except:
self.onException()
# and so on...
:param forceReport: Always report the exception to the DS. Don't
set this to True
"""
# If the FMS is not attached, crash the robot program
if not wpilib.DriverStation.isFMSAttached():
raise
# Otherwise, if the FMS is attached then try to report the error via
# the driver station console. Maybe.
now = wpilib.Timer.getFPGATimestamp()
try:
if (
forceReport
or (now - self.__last_error_report) > self.error_report_interval
):
wpilib.reportError("Unexpected exception", True)
except:
pass # ok, can't do anything here
self.__last_error_report = now
@contextlib.contextmanager
def consumeExceptions(self, forceReport: bool = False):
"""
This returns a context manager which will consume any uncaught
exceptions that might otherwise crash the robot.
Example usage::
def teleopPeriodic(self):
with self.consumeExceptions():
if self.joystick.getTrigger():
self.shooter.shoot()
with self.consumeExceptions():
if self.joystick.getRawButton(2):
self.ball_intake.run()
# and so on...
:param forceReport: Always report the exception to the DS. Don't
set this to True
.. seealso:: :meth:`onException` for more details
"""
try:
yield
except:
self.onException(forceReport=forceReport)
#
# Internal API
#
def startCompetition(self) -> None:
"""
This runs the mode-switching loop.
.. warning:: Internal API, don't override
"""
# TODO: usage reporting?
self.robotInit()
# Tell the DS the robot is ready to be enabled
hal.observeUserProgramStarting()
while not self.__done:
isEnabled, isAutonomous, isTest = self.getControlState()
if not isEnabled:
self._disabled()
elif isAutonomous:
self.autonomous()
elif isTest:
self._test()
else:
self._operatorControl()
def endCompetition(self) -> None:
self.__done = True
self._automodes.endCompetition()
def autonomous(self) -> None:
"""
MagicRobot will do The Right Thing and automatically load all
autonomous mode routines defined in the autonomous folder.
.. warning:: Internal API, don't override
"""
self.__nt_put_mode("auto")
self.__nt_put_is_ds_attached(self.__is_ds_attached())
self._on_mode_enable_components()
try:
self.autonomousInit()
except:
self.onException(forceReport=True)
auto_functions: Tuple[Callable[[], None], ...] = (self._enabled_periodic,)
if self.use_teleop_in_autonomous:
auto_functions = (self.teleopPeriodic,) + auto_functions
self._automodes.run(
self.control_loop_wait_time,
auto_functions,
self.onException,
watchdog=self.watchdog,
)
self._on_mode_disable_components()
def _disabled(self) -> None:
"""
This function is called in disabled mode. You should not
override this function; rather, you should override the
:meth:`disabledPeriodic` function instead.
.. warning:: Internal API, don't override
"""
watchdog = self.watchdog
watchdog.reset()
self.__nt_put_mode("disabled")
ds_attached = None
self._on_mode_disable_components()
try:
self.disabledInit()
except:
self.onException(forceReport=True)
watchdog.addEpoch("disabledInit()")
refreshData = wpilib.DriverStation.refreshData
DSControlWord = wpilib.DSControlWord
with NotifierDelay(self.control_loop_wait_time) as delay:
while not self.__done:
refreshData()
cw = DSControlWord()
if cw.isEnabled():
break
if ds_attached != cw.isDSAttached():
ds_attached = not ds_attached
self.__nt_put_is_ds_attached(ds_attached)
hal.observeUserProgramDisabled()
try:
self.disabledPeriodic()
except:
self.onException()
watchdog.addEpoch("disabledPeriodic()")
self._do_periodics()
# watchdog.disable()
watchdog.printIfExpired()
delay.wait()
watchdog.reset()
def _operatorControl(self) -> None:
"""
This function is called in teleoperated mode. You should not
override this function; rather, you should override the
:meth:`teleopPeriodics` function instead.
.. warning:: Internal API, don't override
"""
watchdog = self.watchdog
watchdog.reset()
self.__nt_put_mode("teleop")
# don't need to update this during teleop -- presumably will switch
# modes when ds is no longer attached
self.__nt_put_is_ds_attached(self.__is_ds_attached())
# initialize things
self._on_mode_enable_components()
try:
self.teleopInit()
except:
self.onException(forceReport=True)
watchdog.addEpoch("teleopInit()")
observe = hal.observeUserProgramTeleop
refreshData = wpilib.DriverStation.refreshData
isTeleopEnabled = wpilib.DriverStation.isTeleopEnabled
with NotifierDelay(self.control_loop_wait_time) as delay:
while not self.__done:
refreshData()
if not isTeleopEnabled():
break
observe()
try:
self.teleopPeriodic()
except:
self.onException()
watchdog.addEpoch("teleopPeriodic()")
self._enabled_periodic()
# watchdog.disable()
watchdog.printIfExpired()
delay.wait()
watchdog.reset()
self._on_mode_disable_components()
def _test(self) -> None:
"""Called when the robot is in test mode"""
watchdog = self.watchdog
watchdog.reset()
self.__nt_put_mode("test")
self.__nt_put_is_ds_attached(self.__is_ds_attached())
wpilib.LiveWindow.setEnabled(True)
# Shuffleboard.enableActuatorWidgets()
try:
self.testInit()
except:
self.onException(forceReport=True)
watchdog.addEpoch("testInit()")
refreshData = wpilib.DriverStation.refreshData
DSControlWord = wpilib.DSControlWord
with NotifierDelay(self.control_loop_wait_time) as delay:
while not self.__done:
refreshData()
cw = DSControlWord()
if not (cw.isTest() and cw.isEnabled()):
break
hal.observeUserProgramTest()
try:
self.testPeriodic()
except:
self.onException()
watchdog.addEpoch("testPeriodic()")
self._do_periodics()
# watchdog.disable()
watchdog.printIfExpired()
delay.wait()
watchdog.reset()
wpilib.LiveWindow.setEnabled(False)
# Shuffleboard.disableActuatorWidgets()
def _on_mode_enable_components(self) -> None:
# initialize things
for _, component in self._components:
on_enable = getattr(component, "on_enable", None)
if on_enable is not None:
try:
on_enable()
except:
self.onException(forceReport=True)
def _on_mode_disable_components(self) -> None:
# deinitialize things
for _, component in self._components:
on_disable = getattr(component, "on_disable", None)
if on_disable is not None:
try:
on_disable()
except:
self.onException(forceReport=True)
def _create_components(self) -> None:
#
# TODO: Will need to inject into any autonomous mode component
# too, as they're a bit different
#
# TODO: Will need to order state machine components before
# other components just in case
components = []
self.logger.info("Creating magic components")
# Identify all of the types, and create them
cls = type(self)
# - Iterate over class variables with type annotations
# .. this hack is necessary for pybind11 based modules
sys.modules["pybind11_builtins"] = types.SimpleNamespace() # type: ignore
injectables = self._collect_injectables()
for m, ctyp in typing.get_type_hints(cls).items():
# Ignore private variables
if m.startswith("_"):
continue
# If the variable has been set, skip it
if hasattr(self, m):
continue
# If the type is not actually a type, give a meaningful error
if not isinstance(ctyp, type):
raise TypeError(
"%s has a non-type annotation on %s (%r); lone non-injection variable annotations are disallowed, did you want to assign a static variable?"
% (cls.__name__, m, ctyp)
)
component = self._create_component(m, ctyp, injectables)
# Store for later
components.append((m, component))
injectables[m] = component
# For each new component, perform magic injection
for cname, component in components:
setup_tunables(component, cname, "components")
self._setup_vars(cname, component, injectables)
self._setup_reset_vars(component)
# Do it for autonomous modes too
for mode in self._automodes.modes.values():
mode.logger = logging.getLogger(mode.MODE_NAME)
setup_tunables(mode, mode.MODE_NAME, "autonomous")
self._setup_vars(mode.MODE_NAME, mode, injectables)
# And for self too
setup_tunables(self, "robot", None)
self._feedbacks += collect_feedbacks(self, "robot", None)
# Call setup functions for components
for cname, component in components:
setup = getattr(component, "setup", None)
if setup is not None:
setup()
# ... and grab all the feedback methods
self._feedbacks += collect_feedbacks(component, cname, "components")
# Call setup functions for autonomous modes
for mode in self._automodes.modes.values():
if hasattr(mode, "setup"):
mode.setup()
self._components = components
def _collect_injectables(self) -> Dict[str, Any]:
injectables = {}
cls = type(self)
for n in dir(self):
if (
n.startswith("_")
or n in self._exclude_from_injection
or isinstance(getattr(cls, n, None), (property, tunable))
):
continue
o = getattr(self, n)
# Don't inject methods
# TODO: This could actually be a cool capability..
if inspect.ismethod(o):
continue
injectables[n] = o
return injectables
def _create_component(self, name: str, ctyp: type, injectables: Dict[str, Any]):
type_hints = typing.get_type_hints(ctyp.__init__)
NoneType = type(None)
init_return_type = type_hints.pop("return", NoneType)
assert (
init_return_type is NoneType
), f"{ctyp!r} __init__ had an unexpected non-None return type hint"
requests = get_injection_requests(type_hints, name)
injections = find_injections(requests, injectables, name)
# Create instance, set it on self
component = ctyp(**injections)
setattr(self, name, component)
# Ensure that mandatory methods are there
if not callable(getattr(component, "execute", None)):
raise ValueError(
"Component %s (%r) must have a method named 'execute'"
% (name, component)
)
# Automatically inject a logger object
component.logger = logging.getLogger(name)
self.logger.info("-> %s (class: %s)", name, ctyp.__name__)
return component
def _setup_vars(self, cname: str, component, injectables: Dict[str, Any]) -> None:
self.logger.debug("Injecting magic variables into %s", cname)
type_hints = typing.get_type_hints(type(component))
requests = get_injection_requests(type_hints, cname, component)
injections = find_injections(requests, injectables, cname)
component.__dict__.update(injections)
def _setup_reset_vars(self, component) -> None:
reset_dict = collect_resets(type(component))
if reset_dict:
component.__dict__.update(reset_dict)
self._reset_components.append((reset_dict, component))
def _do_periodics(self) -> None:
"""Run periodic methods which run in every mode."""
watchdog = self.watchdog
for method, entry in self._feedbacks:
try:
value = method()
except:
self.onException()
else:
entry.setValue(value)
watchdog.addEpoch("@magicbot.feedback")
for periodic, name in self.__periodics:
periodic()
watchdog.addEpoch(name)
def _enabled_periodic(self) -> None:
"""Run components and all periodic methods."""
watchdog = self.watchdog
for name, component in self._components:
try:
component.execute()
except:
self.onException()
watchdog.addEpoch(name)
self._do_periodics()
for reset_dict, component in self._reset_components:
component.__dict__.update(reset_dict)