forked from OpenJobDescription/openjd-sessions-for-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_runner_base.py
More file actions
1071 lines (955 loc) · 41.9 KB
/
test_runner_base.py
File metadata and controls
1071 lines (955 loc) · 41.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
import json
import os
import time
from datetime import datetime, timedelta, timezone
from logging.handlers import QueueHandler
from pathlib import Path
from queue import SimpleQueue
from typing import Optional, cast
from unittest.mock import MagicMock, call
import pytest
from openjd.model import SymbolTable
from openjd.model.v2023_09 import Action as Action_2023_09
from openjd.model.v2023_09 import DataString as DataString_2023_09
from openjd.model.v2023_09 import (
EmbeddedFileText as EmbeddedFileText_2023_09,
)
from openjd.model.v2023_09 import (
EmbeddedFileTypes as EmbeddedFileTypes_2023_09,
)
from openjd.model.v2023_09 import (
CommandString as CommandString_2023_09,
ArgString as ArgString_2023_09,
)
from openjd.sessions import ActionState, PosixSessionUser, WindowsSessionUser
from openjd.sessions._embedded_files import EmbeddedFilesScope
from openjd.sessions._os_checker import is_posix, is_windows
from openjd.sessions._runner_base import (
NotifyCancelMethod,
ScriptRunnerBase,
ScriptRunnerState,
TerminateCancelMethod,
)
from openjd.sessions._tempdir import TempDir
from .conftest import (
build_logger,
collect_queue_messages,
has_posix_target_user,
has_windows_user,
WIN_SET_TEST_ENV_VARS_MESSAGE,
POSIX_SET_TARGET_USER_ENV_VARS_MESSAGE,
)
# For testing, since ScriptRunnerBase is an abstract base class.
class TerminatingRunner(ScriptRunnerBase):
_cancel_called = False
def cancel(
self, *, time_limit: Optional[timedelta] = None, mark_action_failed: bool = False
) -> None:
self._cancel_called = True
self._cancel(TerminateCancelMethod())
class NotifyingRunner(ScriptRunnerBase):
def cancel(
self, *, time_limit: Optional[timedelta] = None, mark_action_failed: bool = False
) -> None:
self._cancel_called_at = datetime.now(timezone.utc)
if time_limit is None:
self._cancel(NotifyCancelMethod(timedelta(seconds=2)))
else:
self._cancel(NotifyCancelMethod(time_limit))
# tmp_path - builtin temporary directory
@pytest.mark.usefixtures("tmp_path")
class TestScriptRunnerBase:
test_env_vars: dict[str, Optional[str]] = {
"FOO": "BAR",
"dollar_sign": "This costs $100",
"single_quote": "They're smart",
"double_quote": 'They said, "Hello!"',
"back_slash": "C:\\Windows\\System32",
"caret_symbol": "Up^Down",
"pipe_symbol": "Left|Right",
"ampersand_symbol": "Fish&Chips",
"less_than": "1 < 2",
"greater_than": "3 > 2",
"asterisk_star": "Twinkle*twinkle",
"question_mark": "Who? What? Where?",
"colon_symbol": "Time: 12:00 PM",
"semicolon_symbol": "Item1; Item2; Item3",
"equal_sign": "1 + 1 = 2",
"at_symbol": "user@example.com",
"hash_symbol": "#1 Winner",
"tilde_symbol": "Approximately~100",
"percent_symbol": "50% off",
"exclamation_mark": "Surprise!",
"square_brackets": "Array[5]",
"win_injection1": "& Get-Process",
"win_injection2": "; Get-Process",
"win_injection3": "| Get-Process",
"win_injection4": "& Get-Process",
"win_injection5": "nGet-ChildItem C:\\",
"win_injection6": "rnStart-Process notepad.exe",
"win_injection7": "$(Get-Process)",
"posix_injection1": "$(whoami)",
"posix_injection2": "; whoami",
"posix_injection3": "| whoami",
}
def test_initialized(self, tmp_path: Path) -> None:
# Test the property getters for a runner that is only initialized.
# GIVEN
with TerminatingRunner(logger=MagicMock(), session_working_directory=tmp_path) as runner:
pass
# THEN
assert runner.state == ScriptRunnerState.READY
assert runner.exit_code is None
def test_basic_run(self, tmp_path: Path, python_exe: str) -> None:
# Run a simple command with no timeout and check the state during and
# after the run.
# GIVEN
callback = MagicMock()
with TerminatingRunner(
logger=MagicMock(), session_working_directory=tmp_path, callback=callback
) as runner:
# WHEN
runner._run([python_exe, "-c", "import time; time.sleep(0.25)"])
# THEN
assert runner.state == ScriptRunnerState.RUNNING
assert runner.exit_code is None
current_wait_seconds = 0
while runner.state == ScriptRunnerState.RUNNING and current_wait_seconds < 10:
time.sleep(1)
current_wait_seconds += 1
assert runner.state == ScriptRunnerState.SUCCESS
assert runner.exit_code == 0
callback.assert_has_calls([call(ActionState.RUNNING), call(ActionState.SUCCESS)])
@pytest.mark.parametrize("attempt", [i for i in range(0, 100)])
def test_fast_run_no_deadlock(self, attempt: int, tmp_path: Path) -> None:
# Run a really fast command multiple times. We're trying to ensure that there's no
# deadlock in between the _run() and _on_process_exit() method obtaining the lock.
# This is a probabilistic test; it is not 100% reliable for reproducing the deadlock.
# GIVEN
callback = MagicMock()
with TerminatingRunner(
logger=MagicMock(), session_working_directory=tmp_path, callback=callback
) as runner:
# WHEN
runner._run(["whoami"])
# THEN
# Nothing to check. We just want to run it fast. The test will deadlock if
# we have a problem. Just wait for the application to exit
while runner.state == ScriptRunnerState.RUNNING:
time.sleep(0.0001)
def test_working_dir_is_cwd(
self,
tmp_path: Path,
message_queue: SimpleQueue,
queue_handler: QueueHandler,
python_exe: str,
) -> None:
# Test to make sure that the current working dir of the command that's run is
# the startup directory.
# GIVEN
logger = build_logger(queue_handler)
with TerminatingRunner(
logger=logger, session_working_directory=tmp_path, startup_directory=tmp_path
) as runner:
# WHEN
runner._run([python_exe, "-c", "import os; print(os.getcwd())"])
# Wait until the process exits.
while runner.state == ScriptRunnerState.RUNNING:
time.sleep(0.1)
# THEN
messages = collect_queue_messages(message_queue)
assert str(tmp_path) in messages
def test_failing_run(self, tmp_path: Path, python_exe: str) -> None:
# Test to make sure that we properly communicate a process with
# non-zero return as
# GIVEN
with TerminatingRunner(logger=MagicMock(), session_working_directory=tmp_path) as runner:
# WHEN
runner._run([python_exe, "-c", "import sys; sys.exit(1)"])
# THEN
while runner.state == ScriptRunnerState.RUNNING:
time.sleep(0.1)
assert runner.state == ScriptRunnerState.FAILED
assert runner.exit_code == 1
@pytest.mark.usefixtures("message_queue", "queue_handler")
def test_fail_to_run(
self, tmp_path: Path, message_queue: SimpleQueue, queue_handler: QueueHandler
) -> None:
# Test that we don't blow up in an unexpected way when we cannot actually
# run the subprocess for some reason.
# GIVEN
logger = build_logger(queue_handler)
runner = TerminatingRunner(logger=logger, session_working_directory=tmp_path)
# WHEN
if is_posix():
runner._run([str(tmp_path)])
else:
runner._run(["test_failed_command"])
# This process should finish within 25s
for _ in range(125):
if runner.state in (
ScriptRunnerState.FAILED,
ScriptRunnerState.SUCCESS,
ScriptRunnerState.TIMEOUT,
):
break
# Give the command time to fail out.
time.sleep(0.2)
messages = collect_queue_messages(message_queue)
# THEN
if is_windows():
# Note: On posix, we embed the command in a shell script. That shell script
# starts running just fine, but then will return non-0.
assert any(
item.startswith("Process failed to start") for item in messages
), "Logged error message is not correct."
assert runner.state == ScriptRunnerState.FAILED
assert runner.exit_code != 0
@pytest.mark.usefixtures("message_queue", "queue_handler")
def test_run_with_env_vars(
self,
tmp_path: Path,
message_queue: SimpleQueue,
queue_handler: QueueHandler,
python_exe: str,
) -> None:
# Run a simple command with no timeout and check the state during and
# after the run.
# GIVEN
logger = build_logger(queue_handler)
with TerminatingRunner(
logger=logger, session_working_directory=tmp_path, os_env_vars=self.test_env_vars
) as runner:
# WHEN
runner._run(
[
python_exe,
"-c",
r"import os;print(*(f'{k} = {v}' for k,v in os.environ.items()), sep='\n')",
]
)
# Wait until the process exits.
while runner.state == ScriptRunnerState.RUNNING:
time.sleep(0.1)
# THEN
messages = collect_queue_messages(message_queue)
for key, value in self.test_env_vars.items():
if is_windows():
assert f"{key.upper()} = {value}" in messages
else:
assert f"{key} = {value}" in messages
@pytest.mark.skipif(not is_posix(), reason="posix-only test")
@pytest.mark.xfail(
not has_posix_target_user(),
reason=POSIX_SET_TARGET_USER_ENV_VARS_MESSAGE,
)
@pytest.mark.usefixtures("message_queue", "queue_handler", "posix_target_user")
def test_run_as_posix_user(
self,
posix_target_user: PosixSessionUser,
message_queue: SimpleQueue,
queue_handler: QueueHandler,
) -> None:
# Test that we run the process as a specific desired user
# GIVEN
tmpdir = TempDir(user=posix_target_user)
logger = build_logger(queue_handler)
with TerminatingRunner(
logger=logger, session_working_directory=tmpdir.path, user=posix_target_user
) as runner:
# WHEN
runner._run(
[
# Note: Intentionally not `sys.executable`. Reasons:
# 1) This is a cross-account command, and python_exe may be in a user-specific venv
# 2) This test is, generally, intended to be run in a docker container where the system
# python is the correct version that we want to run under.
"python",
"-c",
"import os; print(os.getuid())",
]
)
# Wait until the process exits.
while runner.state == ScriptRunnerState.RUNNING:
time.sleep(0.1)
# THEN
assert runner.state == ScriptRunnerState.SUCCESS
assert runner.exit_code == 0
messages = collect_queue_messages(message_queue)
assert str(os.getuid()) not in messages # type: ignore
import pwd
uid = pwd.getpwnam(posix_target_user.user).pw_uid # type: ignore
assert str(uid) in messages
tmpdir.cleanup()
@pytest.mark.skipif(not is_posix(), reason="posix-only test")
@pytest.mark.xfail(
not has_posix_target_user(),
reason=POSIX_SET_TARGET_USER_ENV_VARS_MESSAGE,
)
@pytest.mark.usefixtures("message_queue", "queue_handler", "posix_target_user")
def test_run_as_posix_user_with_env_vars(
self,
posix_target_user: PosixSessionUser,
message_queue: SimpleQueue,
queue_handler: QueueHandler,
) -> None:
# Test that we run the process as a specific desired user with env vars defined as expected
# GIVEN
tmpdir = TempDir(user=posix_target_user)
logger = build_logger(queue_handler)
with TerminatingRunner(
logger=logger,
session_working_directory=tmpdir.path,
user=posix_target_user,
os_env_vars=self.test_env_vars,
) as runner:
# WHEN
runner._run(
[
# Note: Intentionally not `python_exe`. Reasons:
# 1) This is a cross-account command, and python_exe may be in a user-specific venv
# 2) This test is, generally, intended to be run in a docker container where the system
# python is the correct version that we want to run under.
"python",
"-c",
r"import os;print(*(f'{k} = {v}' for k,v in os.environ.items()), sep='\n')",
]
)
# Wait until the process exits.
while runner.state == ScriptRunnerState.RUNNING:
time.sleep(0.1)
# THEN
messages = collect_queue_messages(message_queue)
for key, value in self.test_env_vars.items():
assert f"{key} = {value}" in messages
tmpdir.cleanup()
@pytest.mark.skipif(not is_windows(), reason="Windows-only test")
@pytest.mark.xfail(
not has_windows_user(),
reason=WIN_SET_TEST_ENV_VARS_MESSAGE,
)
@pytest.mark.timeout(90)
def test_run_as_windows_user(
self,
windows_user: WindowsSessionUser,
message_queue: SimpleQueue,
queue_handler: QueueHandler,
) -> None:
# Test that we run the process as a specific desired user
# GIVEN
from openjd.sessions._win32._helpers import get_process_user # type: ignore
tmpdir = TempDir(user=windows_user)
logger = build_logger(queue_handler)
with TerminatingRunner(
logger=logger, session_working_directory=tmpdir.path, user=windows_user
) as runner:
# WHEN
runner._run(["whoami"])
# Wait until the process exits.
while runner.state == ScriptRunnerState.RUNNING:
time.sleep(0.1)
# THEN
assert runner.state == ScriptRunnerState.SUCCESS
assert runner.exit_code == 0
messages = collect_queue_messages(message_queue)
process_user = get_process_user()
assert all([process_user not in message for message in messages])
assert any(windows_user.user in message for message in messages)
tmpdir.cleanup()
@pytest.mark.skipif(not is_windows(), reason="Windows-only test")
@pytest.mark.xfail(
not has_windows_user(),
reason=WIN_SET_TEST_ENV_VARS_MESSAGE,
)
@pytest.mark.timeout(90)
def test_failed_run_as_windows_user(
self,
windows_user: WindowsSessionUser,
message_queue: SimpleQueue,
queue_handler: QueueHandler,
) -> None:
# Test we fail properly when given a command that does not exist
# GIVEN
tmpdir = TempDir(user=windows_user)
logger = build_logger(queue_handler)
with TerminatingRunner(
logger=logger, session_working_directory=tmpdir.path, user=windows_user
) as runner:
# WHEN
runner._run(["test_not_a_command"])
# Wait until the process exits.
while runner.state == ScriptRunnerState.RUNNING:
time.sleep(0.1)
# THEN
assert runner.state == ScriptRunnerState.FAILED
assert runner.exit_code is None
messages = collect_queue_messages(message_queue)
assert messages == ["openjd_fail: Could not find executable file: test_not_a_command"]
tmpdir.cleanup()
@pytest.mark.skipif(not is_windows(), reason="Windows-only test")
@pytest.mark.xfail(
not has_windows_user(),
reason=WIN_SET_TEST_ENV_VARS_MESSAGE,
)
@pytest.mark.timeout(30)
@pytest.mark.usefixtures("message_queue", "queue_handler", "windows_user")
def test_run_as_windows_user_with_env_vars(
self,
windows_user: WindowsSessionUser,
message_queue: SimpleQueue,
queue_handler: QueueHandler,
) -> None:
# Test that we run the process as a specific desired user with env vars defined as expected
# GIVEN
tmpdir = TempDir(user=windows_user)
logger = build_logger(queue_handler)
with TerminatingRunner(
logger=logger,
session_working_directory=tmpdir.path,
user=windows_user,
os_env_vars=self.test_env_vars,
) as runner:
# WHEN
runner._run(
[
# Note: Intentionally not `python_exe`. Reasons:
# 1) This is a cross-account command, and python_exe may be in a user-specific venv
# 2) This test is, generally, intended to be run in a docker container where the system
# python is the correct version that we want to run under.
"python",
"-c",
r"import os;print(*(f'{k} = {v}' for k,v in os.environ.items()), sep='\n')",
]
)
# Wait until the process exits.
while runner.state == ScriptRunnerState.RUNNING:
time.sleep(0.1)
# THEN
messages = collect_queue_messages(message_queue)
for key, value in self.test_env_vars.items():
assert f"{key.upper()} = {value}" in messages
tmpdir.cleanup()
@pytest.mark.skipif(not is_posix(), reason="posix-specific test")
@pytest.mark.xfail(
not has_posix_target_user(),
reason=POSIX_SET_TARGET_USER_ENV_VARS_MESSAGE,
)
@pytest.mark.usefixtures("message_queue", "queue_handler", "posix_target_user")
@pytest.mark.timeout(40)
def test_does_not_inherit_env_vars_posix(
self,
posix_target_user: PosixSessionUser,
message_queue: SimpleQueue,
queue_handler: QueueHandler,
) -> None:
# Security test.
# Run a command that tries to read from this process's environment. It should not be able
# to obtain values from it.
# Only the cross-user case ensures that environment is not passed through; this is to ensure
# that sensitive information that is defines in the initiating process' environment is not
# propagated through a user boundary to the subprocess.
# GIVEN
tmpdir = TempDir(user=posix_target_user)
var_name = "TEST_DOES_NOT_INHERIT_ENV_VARS_VAR"
os.environ[var_name] = "TEST_VALUE"
logger = build_logger(queue_handler)
with TerminatingRunner(
logger=logger, session_working_directory=tmpdir.path, user=posix_target_user
) as runner:
# WHEN
runner._run(
[
# Note: Intentionally not `python_exe`. Reasons:
# 1) This is a cross-account command, and python_exe may be in a user-specific venv
# 2) This test is, generally, intended to be run in a docker container where the system
# python is the correct version that we want to run under.
"python",
"-c",
f"import time; import os; time.sleep(0.25); print(os.environ.get('{var_name}', 'NOT_PRESENT')); print(os.environ)",
]
)
# THEN
assert runner.state == ScriptRunnerState.RUNNING
assert runner.exit_code is None
current_wait_seconds = 0
while runner.state == ScriptRunnerState.RUNNING and current_wait_seconds < 10:
time.sleep(1)
current_wait_seconds += 1
assert runner.state == ScriptRunnerState.SUCCESS
assert runner.exit_code == 0
messages = collect_queue_messages(message_queue)
assert os.environ[var_name] not in messages
assert "NOT_PRESENT" in messages
@pytest.mark.skipif(not is_windows(), reason="Windows-specific test")
@pytest.mark.xfail(
not has_windows_user(),
reason=WIN_SET_TEST_ENV_VARS_MESSAGE,
)
def test_does_not_inherit_env_vars_windows(
self,
windows_user: WindowsSessionUser,
message_queue: SimpleQueue,
queue_handler: QueueHandler,
) -> None:
# Security test.
# Run a command that tries to read from this process's environment. It should not be able
# to obtain values from it.
# Only the cross-user case ensures that environment is not passed through; this is to ensure
# that sensitive information that is defines in the initiating process' environment is not
# propagated through a user boundary to the subprocess.
# GIVEN
tmpdir = TempDir(user=windows_user)
var_name = "TEST_DOES_NOT_INHERIT_ENV_VARS_VAR"
os.environ[var_name] = "TEST_VALUE"
logger = build_logger(queue_handler)
with TerminatingRunner(
logger=logger, session_working_directory=tmpdir.path, user=windows_user
) as runner:
# WHEN
py_script = f"import os; v=os.environ.get('{var_name}'); print('NOT_PRESENT' if v is None else v)"
# Use the default 'python' rather than 'sys.executable' since we typically do not have access to
# python_exe when running with impersonation since it's in a hatch environment for the local user.
runner._run(["python", "-c", py_script])
# THEN
assert runner.state == ScriptRunnerState.RUNNING
assert runner.exit_code is None
current_wait_seconds = 0
while runner.state == ScriptRunnerState.RUNNING and current_wait_seconds < 10:
time.sleep(1)
current_wait_seconds += 1
assert runner.state == ScriptRunnerState.SUCCESS
assert runner.exit_code == 0
messages = collect_queue_messages(message_queue)
assert os.environ[var_name] not in messages
assert "NOT_PRESENT" in messages
def test_cannot_run_twice(self, tmp_path: Path, python_exe: str) -> None:
# Run a simple command with no timeout and check the state during and
# after the run.
# GIVEN
callback = MagicMock()
with TerminatingRunner(
logger=MagicMock(), session_working_directory=tmp_path, callback=callback
) as runner:
# WHEN
runner._run([python_exe, "-c", "print('hello')"])
# THEN
with pytest.raises(RuntimeError):
runner._run([python_exe, "-c", "print('hello')"])
@pytest.mark.usefixtures("message_queue", "queue_handler")
def test_run_action(
self,
tmp_path: Path,
message_queue: SimpleQueue,
queue_handler: QueueHandler,
python_exe: str,
) -> None:
# Run a test of the _run_action method that makes sure that the action runs
# and the format strings are evaluated.
# GIVEN
action = Action_2023_09(
command=CommandString_2023_09("{{Task.PythonInterpreter}}"),
args=[ArgString_2023_09("{{Task.ScriptFile}}")],
timeout=(5),
)
python_app_loc = (Path(__file__).parent / "support_files" / "app_20s_run.py").resolve()
symtab = SymbolTable(
source={
"Task.PythonInterpreter": python_exe,
"Task.ScriptFile": str(python_app_loc),
}
)
logger = build_logger(queue_handler)
with TerminatingRunner(logger=logger, session_working_directory=tmp_path) as runner:
# WHEN
runner._run_action(action, symtab)
# wait for the process to exit
while runner.state == ScriptRunnerState.RUNNING:
time.sleep(0.2)
# THEN
assert runner.state == ScriptRunnerState.TIMEOUT
messages = collect_queue_messages(message_queue)
# The application prints out 0, ..., 9 once a second for 10s.
# If it ended early, then we printed the first but not the last.
print(messages)
assert "Log from test 0" in messages
assert "Log from test 9" not in messages
@pytest.mark.usefixtures("message_queue", "queue_handler")
@pytest.mark.parametrize(
argnames=("default_timeout_seconds", "action_timeout_seconds"),
argvalues=(
pytest.param(1, 5, id="action-timeout-prevails"),
pytest.param(2, None, id="default-applied"),
pytest.param(None, None, id="no-timeout"),
),
)
def test_run_action_default_timeout(
self,
tmp_path: Path,
message_queue: SimpleQueue,
queue_handler: QueueHandler,
default_timeout_seconds: Optional[int],
action_timeout_seconds: Optional[int],
python_exe: str,
) -> None:
# Tests that the effective timeout is applied correctly given a supplied default timeout
# and an optional timeout defined on the action
# GIVEN
expected_effective_timeout_seconds: Optional[int] = None
if action_timeout_seconds is not None:
expected_effective_timeout_seconds = action_timeout_seconds
elif default_timeout_seconds is not None:
expected_effective_timeout_seconds = default_timeout_seconds
default_timeout = (
timedelta(seconds=default_timeout_seconds)
if default_timeout_seconds is not None
else None
)
action = Action_2023_09(
command=CommandString_2023_09("{{Task.PythonInterpreter}}"),
args=[ArgString_2023_09("{{Task.ScriptFile}}")],
timeout=action_timeout_seconds,
)
python_app_loc = (Path(__file__).parent / "support_files" / "app_20s_run.py").resolve()
symtab = SymbolTable(
source={
"Task.PythonInterpreter": python_exe,
"Task.ScriptFile": str(python_app_loc),
}
)
logger = build_logger(queue_handler)
with TerminatingRunner(logger=logger, session_working_directory=tmp_path) as runner:
# WHEN
runner._run_action(action, symtab, default_timeout=default_timeout)
# wait for the process to exit
while runner.state == ScriptRunnerState.RUNNING:
time.sleep(0.2)
# THEN
if expected_effective_timeout_seconds is not None:
assert runner.state == ScriptRunnerState.TIMEOUT
else:
assert runner.state == ScriptRunnerState.SUCCESS
messages = collect_queue_messages(message_queue)
# The application prints out 0, ..., 19 once a second for 20s .
# If it ended early, then we printed the first but not the last.
print(messages)
if expected_effective_timeout_seconds is not None:
assert f"Log from test {expected_effective_timeout_seconds - 1}" in messages
assert f"Log from test {expected_effective_timeout_seconds + 1}" not in messages
else:
assert "Log from test 19" in messages
@pytest.mark.usefixtures("message_queue", "queue_handler")
def test_run_action_bad_formatstring(
self,
tmp_path: Path,
message_queue: SimpleQueue,
queue_handler: QueueHandler,
) -> None:
# Run a test of the _run_action method when the input has a bad format string.
# We shouldn't even try to run the action in this case, and fail out early.
# GIVEN
action = Action_2023_09(
command=CommandString_2023_09("{{Task.PythonInterpreter}}"),
args=[ArgString_2023_09("{{Task.ScriptFile}}")],
timeout=1,
)
symtab = SymbolTable()
logger = build_logger(queue_handler)
with TerminatingRunner(logger=logger, session_working_directory=tmp_path) as runner:
# WHEN
runner._run_action(action, symtab)
# THEN
assert runner.state == ScriptRunnerState.FAILED
assert runner.exit_code is None
messages = collect_queue_messages(message_queue)
assert any(m.startswith("openjd_fail") for m in messages)
@pytest.mark.usefixtures("message_queue", "queue_handler")
def test_cancel_terminate(
self,
tmp_path: Path,
message_queue: SimpleQueue,
queue_handler: QueueHandler,
python_exe: str,
) -> None:
# Test that the subprocess is terminated when doing a TERMINATE style
# cancelation
# GIVEN
callback = MagicMock()
logger = build_logger(queue_handler)
with TerminatingRunner(
logger=logger, session_working_directory=tmp_path, callback=callback
) as runner:
python_app_loc = (Path(__file__).parent / "support_files" / "app_20s_run.py").resolve()
runner._run([python_exe, str(python_app_loc)])
# WHEN
runner.cancel()
# THEN
# Wait for the app to exit
while runner.state == ScriptRunnerState.CANCELING:
time.sleep(0.2)
assert runner.state == ScriptRunnerState.CANCELED
assert runner.exit_code != 0
time.sleep(1) # Some time for the cancel callback to be invoked.
callback.assert_has_calls([call(ActionState.RUNNING), call(ActionState.CANCELED)])
messages = collect_queue_messages(message_queue)
# Didn't get to the end of the application run
assert "Log from test 9" not in messages
@pytest.mark.usefixtures("message_queue", "queue_handler")
@pytest.mark.xfail(not is_posix(), reason="Signals not yet implemented for non-posix")
def test_run_with_time_limit(
self,
tmp_path: Path,
message_queue: SimpleQueue,
queue_handler: QueueHandler,
python_exe: str,
) -> None:
# Test that the subprocess is terminated when doing a TERMINATE style
# cancelation
# GIVEN
logger = build_logger(queue_handler)
with TerminatingRunner(logger=logger, session_working_directory=tmp_path) as runner:
python_app_loc = (Path(__file__).parent / "support_files" / "app_20s_run.py").resolve()
# WHEN
runner._run([python_exe, str(python_app_loc)], time_limit=timedelta(seconds=1))
# THEN
# Wait until the process exits. We'll be in CANCELING state between when the timeout is reached
# and the process finally exits.
while runner.state in (ScriptRunnerState.RUNNING, ScriptRunnerState.CANCELING):
time.sleep(0.1)
assert runner.state == ScriptRunnerState.TIMEOUT
assert runner.exit_code != 0
assert cast(TerminatingRunner, runner)._cancel_called
messages = collect_queue_messages(message_queue)
# Didn't get to the end of the application run
assert "Log from test 9" not in messages
@pytest.mark.usefixtures("message_queue", "queue_handler")
def test_cancel_notify(
self,
tmp_path: Path,
message_queue: SimpleQueue,
queue_handler: QueueHandler,
python_exe: str,
) -> None:
# Test that NOTIFY_THEN_CANCEL first signals a SIGTERM and then a SIGKILL
# GIVEN
proc_id: Optional[int] = None
logger = build_logger(queue_handler)
with NotifyingRunner(logger=logger, session_working_directory=tmp_path) as runner:
python_app_loc = (
Path(__file__).parent / "support_files" / "app_20s_run_ignore_signal.py"
).resolve()
runner._run([python_exe, str(python_app_loc)])
# WHEN
secs = 2 if not is_windows() else 5
time.sleep(secs) # Give the process a little time to do something
now = datetime.now(timezone.utc)
assert runner._process is not None
assert runner._process._pid is not None
proc_id = runner._process._pid
runner.cancel(time_limit=timedelta(seconds=2))
# THEN
assert runner.state == ScriptRunnerState.CANCELING
# Wait until the process exits.
while runner.state == ScriptRunnerState.CANCELING:
time.sleep(0.1)
# This should be CANCELED rather than TIMEOUT because this test is manually calling
# the cancel() method rather than letting the action reach its runtime limit.
assert (
runner.state == ScriptRunnerState.CANCELED
) # TODO - This test is flaky. Sometimes, this is 'RUNNING'
assert runner.exit_code != 0
messages = collect_queue_messages(message_queue)
assert "Trapped" in messages
trapped_idx = messages.index("Trapped")
process_exit_idx = messages.index(
f"Process pid {proc_id} exited with code: {runner.exit_code} (unsigned) / {hex(runner.exit_code)} (hex)"
)
# Should be at least one more number printed after the Trapped
# to indicate that we didn't immediately terminate the script.
assert any(msg.isdigit() for msg in messages[trapped_idx + 1 : process_exit_idx])
# Didn't get to the end
assert "Log from test 9" not in messages
# Notification file exists
assert os.path.exists(tmp_path / "cancel_info.json")
with open(tmp_path / "cancel_info.json", "r") as file:
notification_data_json = file.read()
notification_data = json.loads(notification_data_json)
assert len(notification_data) == 1
assert "NotifyEnd" in notification_data
assert notification_data["NotifyEnd"][-1] == "Z"
# Stripping the Z removes timezone information. Need to ensure it's not interpreted as local
time_end = datetime.fromisoformat(notification_data["NotifyEnd"][:-1]).replace(
tzinfo=timezone.utc
)
# Timestamp should be around 2s from cancel signal, but give a 1s window
# for timing differences.
delta_t = time_end - now
assert timedelta(seconds=1) < delta_t < timedelta(seconds=3)
@pytest.mark.skipif(not is_posix(), reason="posix-only test")
@pytest.mark.xfail(
not has_posix_target_user(),
reason=POSIX_SET_TARGET_USER_ENV_VARS_MESSAGE,
)
@pytest.mark.requires_cap_kill
def test_cancel_notify_direct_signal_with_cap_kill(
self,
tmp_path: Path,
message_queue: SimpleQueue,
queue_handler: QueueHandler,
) -> None:
# Test for Linux hosts, that when CAP_KILL is in the permitted (and possibly effective)
# capability set(s), that the runner will:
# 1. directly signal the subprocess to notify
# 2. retain the status of CAP_KILL in the thread's effective capability set
# GIVEN
logger = build_logger(queue_handler)
from openjd.sessions._linux._capabilities import (
_has_capability,
_get_libcap,
CAP_KILL,
CapabilitySetType,
)
# Record whether CAP_KILL is in the effective capability set before
# notifying the subprocess
libcap = _get_libcap()
assert libcap is not None, "Libcap not found"
caps = libcap.cap_get_proc()
cap_kill_was_effective = _has_capability(
libcap=libcap,
caps=caps,
capability=CAP_KILL,
capability_set_type=CapabilitySetType.EFFECTIVE,
)
with NotifyingRunner(logger=logger, session_working_directory=tmp_path) as runner:
# Path to compiled C program that outputs the PID of the process sending the signal
output_signal_sender_app_loc = (
Path(__file__).parent / "support_files" / "output_signal_sender"
).resolve()
assert output_signal_sender_app_loc.exists(), "output_signal_sender is not compiled."
runner._run([str(output_signal_sender_app_loc)])
# WHEN
secs = 2 if not is_windows() else 5
time.sleep(secs) # Give the process a little time to do something
runner.cancel(time_limit=timedelta(seconds=2))
# THEN
for _ in range(10):
if runner.state == ScriptRunnerState.CANCELED:
break
time.sleep(1)
else:
# Terminate the subprocess
runner.cancel()
assert False, "output_signal_sender process did not exit when sent SIGTERM"
assert runner.exit_code == 0
# Collect stdout lines. Based on the code of output_signal_sender.c, only a single
# line should be output with the PID of the process that sent the SIGINT signal.
# Extracting the log line requires finding the preceeding log line emitted by the runner,
# then taking the following line and parsing it as an integer
messages = collect_queue_messages(message_queue)
for i, message in enumerate(messages):
if message.startswith('INTERRUPT: Sending signal "term" to process '):
break
else:
assert False, "Could not find log line before stdout"
pid_line = messages[i + 1]
signal_sender_pid = int(pid_line)
current_pid = os.getpid()
assert (
current_pid == signal_sender_pid
), "The runner's subprocess was not directly signalled"
# Assert that the presence/absence of CAP_KILL in the effective capability set
# is unchanged after calling Runner.cancel()
caps = libcap.cap_get_proc()
cap_kill_effective_after_cancel = _has_capability(
libcap=libcap,
caps=caps,
capability=CAP_KILL,
capability_set_type=CapabilitySetType.EFFECTIVE,
)
assert (
cap_kill_was_effective == cap_kill_effective_after_cancel
), "CAP_KILL added/removed from effetive set and persisted after cancelation"
@pytest.mark.usefixtures("message_queue", "queue_handler")
def test_cancel_double_cancel_notify(
self,
tmp_path: Path,
message_queue: SimpleQueue,
queue_handler: QueueHandler,
python_exe: str,
) -> None:
# Test that NOTIFY_THEN_CANCEL can be called twice, and the second time will
# shrink the grace period
# GIVEN
logger = build_logger(queue_handler)
with NotifyingRunner(logger=logger, session_working_directory=tmp_path) as runner:
python_app_loc = (
Path(__file__).parent / "support_files" / "app_20s_run_ignore_signal.py"
).resolve()
runner._run([python_exe, str(python_app_loc)])
# WHEN
secs = 2 if not is_windows() else 5
time.sleep(secs) # Give the process a little time to do something
runner.cancel(time_limit=timedelta(seconds=15))
runner.cancel(time_limit=timedelta(seconds=1 if not is_windows() else 3))