diff --git a/libkirk/main.py b/libkirk/main.py index 97d1689..4c43c05 100644 --- a/libkirk/main.py +++ b/libkirk/main.py @@ -460,20 +460,12 @@ async def session_run() -> None: exit_code = RC_ERROR finally: try: - # at this point loop has been closed, so we can collect all - # tasks and cancel them - loop.run_until_complete( - # pyrefly: ignore[bad-argument-type] - asyncio.gather( - *[ - session.stop(), - libkirk.events.stop(), - ] - ) - ) - libkirk.cancel_tasks(loop) + loop.run_until_complete(session.stop()) except KeyboardInterrupt: - pass + loop.run_until_complete(session.stop()) + + libkirk.cancel_tasks(loop) + loop.run_until_complete(libkirk.events.stop()) parser.exit(exit_code) diff --git a/libkirk/scheduler.py b/libkirk/scheduler.py index ab9990c..d0bb634 100644 --- a/libkirk/scheduler.py +++ b/libkirk/scheduler.py @@ -9,6 +9,7 @@ import asyncio import logging import os +import signal import sys import time from typing import ( @@ -147,7 +148,7 @@ def __init__( self._timeout = 0.0 if timeout < 0.0 else timeout self._max_workers = 1 if max_workers < 1 else max_workers self._results = [] - self._stop = False + self._stop_cnt = 0 self._stopped = False self._running_tests_sem = asyncio.Semaphore(1) self._schedule_lock = asyncio.Lock() @@ -203,7 +204,12 @@ def stopped(self) -> bool: async def stop(self) -> None: self._logger.info("Stopping tests execution") - self._stop = True + self._stop_cnt += 1 + + if self._stop_cnt > 1: + # by stopping SUT first, we cause scheduler to complete + # current test immediatelly without waiting + await self._sut.stop() try: # we enter in the semaphore queue in order to get highest @@ -217,7 +223,7 @@ async def stop(self) -> None: async with self._schedule_lock: pass finally: - self._stop = False + self._stop_cnt = 0 self._stopped = True self._logger.info("All tests have been completed") @@ -227,7 +233,7 @@ async def _run_test(self, test: Test) -> None: Run a single test and populate the results array. """ async with self._running_tests_sem: - if self._stop: + if self._stop_cnt > 0: self._logger.info("Test '%s' has been stopped", test.name) return None @@ -294,6 +300,13 @@ async def _run_test(self, test: Test) -> None: "exec_time": exec_time, } + # we won't consider tests killed by kirk during forcibly stop, + # but only if they have been killed by an external application + # or kernel OOM + if test_data["returncode"] == -signal.SIGKILL and self._stop_cnt > 1: + self._logger.info("Test killed: %s", test.name) + return + results = await self._framework.read_result( test, test_data["stdout"], @@ -304,6 +317,12 @@ async def _run_test(self, test: Test) -> None: self._logger.debug("results=%s", results) self._results.append(results) + await libkirk.events.fire("test_completed", results) + await self._write_kmsg(test, results) + + self._logger.info("Test completed: %s", test.name) + self._logger.debug(results) + # raise kernel errors at the end so we can collect test results if status == self.KERNEL_TAINTED: await libkirk.events.fire("kernel_tainted", tainted_msg) @@ -317,12 +336,6 @@ async def _run_test(self, test: Test) -> None: await libkirk.events.fire("sut_not_responding") raise KernelTimeoutError() - await libkirk.events.fire("test_completed", results) - await self._write_kmsg(test, results) - - self._logger.info("Test completed: %s", test.name) - self._logger.debug(results) - async def _run_and_wait(self, tests: List[Test]) -> None: """ Run tests one after another. @@ -380,11 +393,10 @@ async def schedule(self, jobs: List[Any]) -> None: exc_name = err.__class__.__name__ self._logger.info("%s caught during tests execution", exc_name) - raise_exc = not self._stop async with self._running_tests_sem: pass - if raise_exc: + if self._stop_cnt == 0: self._logger.info("Propagating %s exception", exc_name) raise err diff --git a/libkirk/session.py b/libkirk/session.py index 06327d2..d24dcfe 100644 --- a/libkirk/session.py +++ b/libkirk/session.py @@ -375,6 +375,9 @@ async def stop(self) -> None: """ Stop the current session. """ + # we don't want to send session_stopped more than once + already_stopped = self._stop == True + self._stop = True try: await self._inner_stop() @@ -385,7 +388,9 @@ async def stop(self) -> None: async with self._exec_lock: pass finally: - await libkirk.events.fire("session_stopped") + if not already_stopped: + await libkirk.events.fire("session_stopped") + self._stop = False async def _schedule_once(self, suites_obj: List[Suite]) -> None: