diff --git a/rele/__init__.py b/rele/__init__.py index 92e2aca..26b9864 100644 --- a/rele/__init__.py +++ b/rele/__init__.py @@ -1,4 +1,4 @@ -__version__ = "1.16.0b3" +__version__ = "1.16.0b4" try: import django diff --git a/rele/client.py b/rele/client.py index e7d0d6a..23462a7 100644 --- a/rele/client.py +++ b/rele/client.py @@ -190,6 +190,10 @@ def consume(self, subscription_name, callback, scheduler): subscription_path, callback=callback, scheduler=scheduler ) + def close(self): + """Close the SubscriberClient.""" + self._client.close() + class Publisher: """The Publisher Class diff --git a/rele/worker.py b/rele/worker.py index 9a87520..01766dd 100644 --- a/rele/worker.py +++ b/rele/worker.py @@ -125,7 +125,7 @@ def stop(self, signal=None, frame=None): This function has two purposes: - 1. Cancel all the futures created. + 1. Cancel all the futures created and terminate the subscriber. 2. And close all the database connections opened by Django. Even though we cancel the connections for every execution of the callback, we want to be sure @@ -138,10 +138,14 @@ def stop(self, signal=None, frame=None): :param frame: Needed for `signal.signal `_ # noqa """ run_middleware_hook("pre_worker_stop", self._subscriptions) + logger.debug(f"[stop] cancel all futures") for future in self._futures.values(): future.cancel() future.result() + logger.debug(f"[stop] close subscriber") + self._subscriber.close() + run_middleware_hook("post_worker_stop") sys.exit(0) diff --git a/tests/test_worker.py b/tests/test_worker.py index 3ce174b..fce4c93 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -113,6 +113,18 @@ def test_wait_forevers_for_custom_time_period_when_called_with_argument( mock_wait_forever.assert_called_once() + @patch.object(Worker, "_wait_forever") + def test_stop_cancels_futures_and_closes_subscriber( + self, mock_wait_forever, mock_consume, mock_create_subscription, worker + ): + worker.run_forever() + + with pytest.raises(SystemExit): + worker.stop() + + assert worker._futures[sub_stub]._state == FINISHED + assert worker._subscriber._client._closed is True + @patch("rele.contrib.django_db_middleware.db.connections.close_all") def test_stop_closes_db_connections(self, mock_db_close_all, config, worker): config.middleware = ["rele.contrib.DjangoDBMiddleware"]