diff --git a/.github/workflows/build_deploy.yml b/.github/workflows/build_deploy.yml deleted file mode 100644 index e2f9a5b..0000000 --- a/.github/workflows/build_deploy.yml +++ /dev/null @@ -1,69 +0,0 @@ -name: Build - -on: - pull_request: - release: - types: - - published - -jobs: - build_wheel: - name: Build wheel - runs-on: ubuntu-latest - - steps: - - uses: actions/checkout@v2 - # Include all history and tags - with: - fetch-depth: 0 - - - uses: actions/setup-python@v2 - name: Install Python - with: - python-version: '3.8' - - - name: Build wheel - run: | - python -m pip install wheel - python -m pip wheel -w dist . - - - uses: actions/upload-artifact@v2 - with: - path: dist/*.whl - - build_sdist: - name: Build source distribution - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v2 - # Include all history and tags - with: - fetch-depth: 0 - - - uses: actions/setup-python@v2 - name: Install Python - with: - python-version: '3.8' - - - name: Build sdist - run: | - python setup.py sdist - - - uses: actions/upload-artifact@v2 - with: - path: dist/*.tar.gz - - upload_pypi: - needs: [build_wheel, build_sdist] - runs-on: ubuntu-latest - if: github.event_name == 'release' && github.event.action == 'published' - steps: - - uses: actions/download-artifact@v2 - with: - name: artifact - path: dist - - - uses: pypa/gh-action-pypi-publish@master - with: - user: __token__ - password: ${{ secrets.PYPI_TOKEN }} diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ec0bc40..735569d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -1,63 +1,37 @@ name: CI -on: [push, pull_request] + +on: + push: + pull_request: + jobs: - black: + lint: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - uses: actions/setup-python@v5 - with: - python-version: "3.9" - - run: pip install riot==0.19.0 - - run: riot -v run -s black -- --check . - mypy: - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v4 - - uses: actions/setup-python@v5 - with: - python-version: "3.9" - - run: pip install riot==0.19.0 - - run: riot -v run mypy - flake8: - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v4 - - uses: actions/setup-python@v5 - with: - python-version: "3.9" - - run: pip install riot==0.19.0 - - run: riot -v run flake8 + - run: pip install uv + - run: uv venv + - run: uv pip install --requirement pyproject.toml --all-extras + - run: .venv/bin/ruff format --check . + - run: .venv/bin/ruff check . + - run: .venv/bin/mypy asyncio_connection_pool + test: + runs-on: ubuntu-latest strategy: matrix: - os: [ubuntu-latest, macos-latest] python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"] - runs-on: ${{ matrix.os }} steps: - uses: actions/checkout@v4 - - name: Setup Python - uses: actions/setup-python@v5 + - uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} - - name: install riot - run: pip install riot==0.19.0 - - name: run tests - run: riot -v run --python=${{ matrix.python-version }} test -- --cov=asyncio_connection_pool --cov-branch --cov-config=.coveragerc - - name: install coverage - run: pip install coverage - - name: upload coverage - uses: coverallsapp/github-action@v2 - with: - parallel: true - flag-name: run ${{ join(matrix.*, ' - ') }} - finish-coveralls: - needs: test - if: ${{ always() }} - runs-on: ubuntu-latest - steps: - - name: Coveralls Finished - uses: coverallsapp/github-action@v2 - with: - parallel-finished: true - allow-empty: true + - run: pip install uv + - run: uv venv + - run: uv pip install --editable ".[dev,datadog,aioredis]" + - run: .venv/bin/pytest --cov=asyncio_connection_pool --cov-branch --cov-config=.coveragerc + - run: .venv/bin/codecov + if: matrix.python-version == '3.11' + env: + CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml new file mode 100644 index 0000000..1d96a27 --- /dev/null +++ b/.github/workflows/release.yml @@ -0,0 +1,25 @@ +name: Release + +on: + release: + types: + - created + +jobs: + build: + runs-on: ubuntu-latest + permissions: + # IMPORTANT: this permission is mandatory for trusted publishing + id-token: write + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-python@v5 + with: + python-version: "3.12" + - run: pip install uv + - run: uv venv + - run: uv pip install --requirement pyproject.toml + - run: uv pip install setuptools setuptools-scm wheel build + - run: .venv/bin/python -m build --no-isolation + - name: Publish package distributions to PyPI + uses: pypa/gh-action-pypi-publish@release/v1 diff --git a/README.md b/README.md index 12f3e56..3988b41 100644 --- a/README.md +++ b/README.md @@ -1,11 +1,13 @@ # asyncio-connection-pool -[![GitHub Workflow Status (main)](https://img.shields.io/github/workflow/status/fellowinsights/asyncio-connection-pool/CI/main?style=flat)][main CI] +[![GitHub Workflow Status (main)](https://img.shields.io/github/actions/workflow/status/fellowapp/asyncio-connection-pool/ci.yml?branch=main&style=flat)][main CI] [![PyPI](https://img.shields.io/pypi/v/asyncio-connection-pool?style=flat)][package] [![PyPI - Python Version](https://img.shields.io/pypi/pyversions/asyncio-connection-pool?style=flat)][package] +[![codecov](https://codecov.io/gh/fellowapp/asyncio-connection-pool/graph/badge.svg?token=F3D4D9EG6M)](https://codecov.io/gh/fellowapp/asyncio-connection-pool) +[![License](https://img.shields.io/pypi/l/prosemirror.svg?style=flat)](https://github.com/fellowapp/asyncio-connection-pool/blob/master/LICENSE.md) [![Fellow Careers](https://img.shields.io/badge/fellow.app-hiring-576cf7.svg?style=flat)](https://fellow.app/careers/) -[main CI]: https://github.com/fellowinsights/asyncio-connection-pool/actions?query=workflow%3ACI+branch%3Amain +[main CI]: https://github.com/fellowapp/asyncio-connection-pool/actions?query=workflow%3ACI+branch%3Amain [package]: https://pypi.org/project/asyncio-connection-pool/ This is a generic, high-throughput, optionally-burstable pool for asyncio. @@ -21,10 +23,11 @@ Some cool features: - The contents of the pool can be anything; just implement a `ConnectionStrategy`. -[^1]: Theoretically, there is an implicit "lock" that is held while an asyncio - task is executing. No other task can execute until the current task - yields (since it's cooperative multitasking), so any operations during - that time are atomic. +[^1]: + Theoretically, there is an implicit "lock" that is held while an asyncio + task is executing. No other task can execute until the current task + yields (since it's cooperative multitasking), so any operations during + that time are atomic. ## Why? @@ -36,10 +39,8 @@ We also thought it would be nice if we didn't need to keep many connections open when they weren't needed, but still have the ability to make more when they are required. - ## API - ### `asyncio_connection_pool.ConnectionPool` This is the implementation of the pool. It is generic over a type of @@ -57,7 +58,6 @@ pool = ConnectionPool(strategy=my_strategy, max_size=15) The constructor can optionally be passed an integer as `burst_limit`. This allows the pool to open more connections than `max_size` temporarily. - #### `@asynccontextmanager async def get_connection(self) -> AsyncIterator[Conn]` This method is the only way to get a connection from the pool. It is expected @@ -77,13 +77,11 @@ are available, the caller will yield to the event loop. When the block is exited, the connection will be returned to the pool. - ### `asyncio_connection_pool.ConnectionStrategy` This is an abstract class that defines the interface of the object passed as `strategy`. A subclass _must_ implement the following methods: - #### `async def create_connection(self) -> Awaitable[Conn]` This method is called to create a new connection to the resource. This happens @@ -96,7 +94,6 @@ the pool, and in most cases will be stored in the pool to be re-used later. If this method raises an exception, it will bubble up to the frame where `ConnectionPool.get_connection()` was called. - #### `def connection_is_closed(self, conn: Conn) -> bool` This method is called to check if a connection is no longer able to be used. @@ -111,7 +108,6 @@ exception is suppressed unless it is not a `BaseException`, like `asyncio.CancelledError`. It is the responsibility of the `ConnectionStrategy` implementation to avoid leaking a connection in this case. - #### `async def close_connection(self, conn: Conn)` This method is called to close a connection. This occurs when the pool has @@ -122,8 +118,7 @@ If this method raises an exception, the connection is assumed to be closed and the exception bubbles to the caller of `ConnectionPool.get_connection().__aexit__` (usually an `async with` block). - -## Integrations with 3rd-party libraries +## Integrations with 3rd-party libraries This package includes support for [`ddtrace`][ddtrace]/[`datadog`][datadog] and for [`aioredis`][aioredis] (<2.0.0). @@ -142,14 +137,12 @@ arguments of the base class, supports: - Optional `extra_tags` argument: Additional tags to provide to all metrics (strings in a `"key:value"` format) - ### `asyncio_connection_pool.contrib.aioredis.RedisConnectionStrategy` This class implements the `ConnectionStrategy` abstract methods, using `aioredis.Redis` objects as connections. The constructor takes arbitrary arguments and forwards them to `aioredis.create_redis`. - ## How is this safe without locks? I encourage you to read the [source](https://github.com/fellowinsights/asyncio-connection-pool/blob/master/asyncio_connection_pool/__init__.py) diff --git a/asyncio_connection_pool/__init__.py b/asyncio_connection_pool/__init__.py index 025f41c..f452d4d 100644 --- a/asyncio_connection_pool/__init__.py +++ b/asyncio_connection_pool/__init__.py @@ -1,6 +1,6 @@ -from abc import ABC, abstractmethod import asyncio import inspect +from abc import ABC, abstractmethod from contextlib import asynccontextmanager from typing import AsyncIterator, Awaitable, Generic, Optional, TypeVar @@ -10,16 +10,13 @@ class ConnectionStrategy(ABC, Generic[Conn]): @abstractmethod - async def make_connection(self) -> Conn: - ... + async def make_connection(self) -> Conn: ... @abstractmethod - def connection_is_closed(self, conn: Conn) -> bool: - ... + def connection_is_closed(self, conn: Conn) -> bool: ... @abstractmethod - async def close_connection(self, conn: Conn) -> None: - ... + async def close_connection(self, conn: Conn) -> None: ... async def _close_connection_compat( @@ -60,18 +57,19 @@ def __init__( *, strategy: ConnectionStrategy[Conn], max_size: int, - burst_limit: Optional[int] = None + burst_limit: Optional[int] = None, ) -> None: self._loop = asyncio.get_event_loop() self.strategy = strategy self.max_size = max_size self.burst_limit = burst_limit if burst_limit is not None and burst_limit < max_size: - raise ValueError("burst_limit must be greater than or equal to max_size") + msg = "burst_limit must be greater than or equal to max_size" + raise ValueError(msg) self.in_use = 0 self.currently_allocating = 0 self.currently_deallocating = 0 - self.available: "asyncio.Queue[Conn]" = asyncio.Queue(maxsize=self.max_size) + self.available: asyncio.Queue[Conn] = asyncio.Queue(maxsize=self.max_size) @property def _total(self) -> int: @@ -110,7 +108,7 @@ def _get_conn(self) -> Awaitable[Conn]: # Incidentally, awaiting a done Future doesn't involve yielding to # the event loop; it's more like getting the next value from a # generator. - fut: "asyncio.Future[Conn]" = self._loop.create_future() + fut: asyncio.Future[Conn] = self._loop.create_future() fut.set_result(self.available.get_nowait()) self.in_use += 1 return fut diff --git a/asyncio_connection_pool/contrib/aioredis.py b/asyncio_connection_pool/contrib/aioredis.py deleted file mode 100644 index 01df105..0000000 --- a/asyncio_connection_pool/contrib/aioredis.py +++ /dev/null @@ -1,20 +0,0 @@ -import aioredis -from functools import partial -from asyncio_connection_pool import ConnectionStrategy - -__all__ = ("RedisConnectionStrategy",) - - -class RedisConnectionStrategy(ConnectionStrategy[aioredis.Redis]): # type: ignore - def __init__(self, *args, **kwargs): - self._create_redis = partial(aioredis.create_redis, *args, **kwargs) - - async def make_connection(self): - return await self._create_redis() - - def connection_is_closed(self, conn): - return conn.closed - - async def close_connection(self, conn): - conn.close() - await conn.wait_closed() diff --git a/asyncio_connection_pool/contrib/datadog.py b/asyncio_connection_pool/contrib/datadog.py index e8438fe..57ac49e 100644 --- a/asyncio_connection_pool/contrib/datadog.py +++ b/asyncio_connection_pool/contrib/datadog.py @@ -1,7 +1,9 @@ -from contextlib import asynccontextmanager, AsyncExitStack -from datadog import statsd +from contextlib import AsyncExitStack, asynccontextmanager +from typing import Any, AsyncIterator, Awaitable, Coroutine, TypeVar + +from datadog.dogstatsd.base import statsd from ddtrace import tracer -from typing import AsyncIterator, TypeVar + from asyncio_connection_pool import ConnectionPool as _ConnectionPool __all__ = ("ConnectionPool",) @@ -18,13 +20,13 @@ def __init__(self, service_name, *args, extra_tags=None, **kwargs): self._extra_tags = extra_tags or [] self._loop.call_soon(self._periodically_send_metrics) - def _periodically_send_metrics(self): + def _periodically_send_metrics(self) -> None: try: self._record_pressure() finally: self._loop.call_later(60, self._periodically_send_metrics) - def _record_pressure(self): + def _record_pressure(self) -> None: statsd.gauge( f"{self._service_name}.pool.total_connections", self._total, @@ -74,7 +76,7 @@ def _record_pressure(self): tags=self._extra_tags, ) - def _record_connection_acquiring(self, value=0): + def _record_connection_acquiring(self, value: int = 0) -> None: self._connections_acquiring += value statsd.gauge( @@ -83,13 +85,13 @@ def _record_connection_acquiring(self, value=0): tags=self._extra_tags, ) - def _connection_maker(self): + def _connection_maker(self) -> Coroutine[Any, Any, Conn]: statsd.increment( f"{self._service_name}.pool.getting_connection", - tags=self._extra_tags + ["method:new"], + tags=[*self._extra_tags, "method:new"], ) - async def connection_maker(self): + async def connection_maker(self) -> Conn: with tracer.trace( f"{self._service_name}.pool._create_new_connection", service=self._service_name, @@ -98,13 +100,13 @@ async def connection_maker(self): return connection_maker(self) - def _connection_waiter(self): + def _connection_waiter(self) -> Coroutine[Any, Any, Conn]: statsd.increment( f"{self._service_name}.pool.getting_connection", - tags=self._extra_tags + ["method:wait"], + tags=[*self._extra_tags, "method:wait"], ) - async def connection_waiter(self): + async def connection_waiter(self) -> Conn: with tracer.trace( f"{self._service_name}.pool._wait_for_connection", service=self._service_name, @@ -113,11 +115,11 @@ async def connection_waiter(self): return connection_waiter(self) - def _get_conn(self): + def _get_conn(self) -> Awaitable[Conn]: if not self.available.empty(): statsd.increment( f"{self._service_name}.pool.getting_connection", - tags=self._extra_tags + ["method:available"], + tags=[*self._extra_tags, "method:available"], ) return super()._get_conn() diff --git a/mypy.ini b/mypy.ini deleted file mode 100644 index 256909f..0000000 --- a/mypy.ini +++ /dev/null @@ -1,18 +0,0 @@ -[mypy] -ignore_missing_imports = True -check_untyped_defs = True -disallow_any_unimported = True -disallow_any_decorated = True -disallow_any_generics = True -disallow_subclassing_any = True -disallow_incomplete_defs = False -disallow_untyped_decorators = True -no_implicit_optional = True -strict_optional = True -warn_redundant_casts = True -warn_unused_ignores = True -warn_return_any = True -warn_unreachable = True -implicit_reexport = False -strict_equality = True -pretty = True diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..a383b99 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,68 @@ +[build-system] +requires = ["setuptools", "setuptools-scm"] +build-backend = "setuptools.build_meta" + +[project] +name = "asyncio-connection-pool" +dynamic = ["version"] +description = "A high-throughput, optionally-burstable pool free of explicit locking" +readme = "README.md" +requires-python = ">=3.8" +authors = [{ name = "Patrick Gingras", email = "775.pg.12@gmail.com" }] +license = { text = "BSD-3-Clause" } +dependencies = [] + +[project.optional-dependencies] +datadog = ["ddtrace", "datadog"] +dev = [ + "codecov~=2.1", + "mypy~=1.9", + "pytest~=8.1", + "pytest-asyncio~=0.23", + "pytest-cov~=5.0", + "ruff~=0.4", +] + +[tool.setuptools_scm] + +[tool.ruff.lint] +select = [ + "B", + "COM", + "E", + "EM", + "F", + "I", + "I", + "N", + "PT", + "RSE", + "RUF", + "SIM", + "UP", + "W", +] +ignore = ["COM812"] +preview = true + +[tool.ruff.format] +preview = true + +[tool.mypy] +ignore_missing_imports = true +check_untyped_defs = true +disallow_any_unimported = true +disallow_any_decorated = true +disallow_any_generics = true +disallow_subclassing_any = true +disallow_incomplete_defs = false +disallow_untyped_decorators = true +no_implicit_optional = true +strict_optional = true +warn_redundant_casts = true +warn_unused_ignores = true +warn_return_any = true +warn_unreachable = true +implicit_reexport = false +strict_equality = true +pretty = true diff --git a/riotfile.py b/riotfile.py deleted file mode 100644 index 215be63..0000000 --- a/riotfile.py +++ /dev/null @@ -1,46 +0,0 @@ -from riot import Venv, latest - -venv = Venv( - pys=3, - venvs=[ - Venv( - pys=["3.8", "3.9", "3.10", "3.11", "3.12"], - name="test", - command="pytest {cmdargs}", - pkgs={ - "pytest": latest, - "pytest-asyncio": latest, - "pytest-cov": latest, - # extras_require - "ddtrace": latest, - "datadog": latest, - "aioredis": latest, - }, - ), - Venv( - name="mypy", - command="mypy asyncio_connection_pool", - pkgs={ - "mypy": "==1.1.1", - }, - ), - Venv( - pkgs={"black": "==23.1.0"}, - venvs=[ - Venv( - name="fmt", - command=r"black --exclude '/\.riot/' .", - ), - Venv( - name="black", - command=r"black --exclude '/\.riot/' {cmdargs}", - ), - ], - ), - Venv( - name="flake8", - pkgs={"flake8": "==6.0.0"}, - command="flake8 test asyncio_connection_pool", - ), - ], -) diff --git a/setup.py b/setup.py deleted file mode 100644 index 7cf079c..0000000 --- a/setup.py +++ /dev/null @@ -1,38 +0,0 @@ -from setuptools import setup, find_packages - -with open("README.md", "r") as f: - long_description = f.read() - -setup( - name="asyncio-connection-pool", - description="A high-throughput, optionally-burstable pool free of explicit locking", - url="https://github.com/fellowinsights/asyncio-connection-pool", - author="Patrick Gingras <775.pg.12@gmail.com>", - author_email="775.pg.12@gmail.com", - classifiers=[ - "Programming Language :: Python", - "Programming Language :: Python :: 3.8", - "Programming Language :: Python :: 3.9", - "Programming Language :: Python :: 3.10", - "Programming Language :: Python :: 3.11", - "Programming Language :: Python :: 3.12", - "License :: OSI Approved :: BSD License", - ], - keywords="asyncio", - long_description=long_description, - long_description_content_type="text/markdown", - packages=find_packages( - include=["asyncio_connection_pool", "asyncio_connection_pool.*"] - ), - package_data={"asyncio_connection_pool": ["py.typed"]}, - python_requires=">=3.8", - install_requires=[], - tests_require=["riot"], - extras_require={ - "datadog": ["ddtrace", "datadog"], - "aioredis": ["aioredis"], - }, - setup_requires=["setuptools_scm"], - use_scm_version=True, - zip_safe=False, # for mypy support -) diff --git a/test/test_pool.py b/test/test_pool.py index 564747b..915b0bd 100644 --- a/test/test_pool.py +++ b/test/test_pool.py @@ -1,11 +1,14 @@ import asyncio +import random +from contextlib import asynccontextmanager +from functools import partial + import pytest + from asyncio_connection_pool import ConnectionPool, ConnectionStrategy from asyncio_connection_pool.contrib.datadog import ( ConnectionPool as TracingConnectionPool, ) -from contextlib import asynccontextmanager -from functools import partial @pytest.fixture( @@ -17,8 +20,6 @@ def pool_cls(request): class RandomIntStrategy(ConnectionStrategy[int]): async def make_connection(self): - import random - return random.randint(0, 10000) def connection_is_closed(self, conn): @@ -28,13 +29,16 @@ def close_connection(self, conn): pass -def test_valid_burst_limit(pool_cls): +@pytest.mark.asyncio() +async def test_valid_burst_limit(pool_cls): # noqa: RUF029 """Test that invalid burst_limit values cause errors (only at construction time)""" strategy = RandomIntStrategy() pool_cls(strategy=strategy, max_size=100, burst_limit=None) pool_cls(strategy=strategy, max_size=100, burst_limit=100) pool_cls(strategy=strategy, max_size=100, burst_limit=101) - with pytest.raises(ValueError): + with pytest.raises( + ValueError, match="burst_limit must be greater than or equal to max_size" + ): pool_cls(strategy=strategy, max_size=100, burst_limit=99) @@ -60,7 +64,7 @@ async def inc(self): self.n -= 1 -@pytest.mark.asyncio +@pytest.mark.asyncio() async def test_concurrent_get_connection(pool_cls): """Test handling several connection requests in a short time.""" @@ -70,9 +74,8 @@ async def test_concurrent_get_connection(pool_cls): stop = asyncio.Event() async def connection_holder(): - async with pool.get_connection(): - async with counter.inc(): - await stop.wait() + async with pool.get_connection(), counter.inc(): + await stop.wait() coros = [asyncio.create_task(connection_holder()) for _ in range(nworkers)] await counter.wait() @@ -89,7 +92,7 @@ async def connection_holder(): ), f"{nworkers} connections should be allocated" -@pytest.mark.asyncio +@pytest.mark.asyncio() async def test_currently_allocating(pool_cls): """Test that currently_allocating is accurate.""" @@ -112,10 +115,8 @@ def close_connection(self, conn): ev2 = asyncio.Event() async def worker(): - async with counter.inc(): - async with pool.get_connection(): - async with counter2.inc(): - await ev2.wait() + async with counter.inc(), pool.get_connection(), counter2.inc(): + await ev2.wait() coros = [asyncio.create_task(worker()) for _ in range(nworkers)] await counter.wait() @@ -126,17 +127,17 @@ async def worker(): ), f"{nworkers} workers are waiting for a connection" ev.set() # allow the workers to get their connections await counter2.wait() - assert ( - pool.currently_allocating == 0 and pool.in_use == nworkers - ), "all workers should have their connections now" + assert pool.currently_allocating == 0 + assert pool.in_use == nworkers, "all workers should have their connections now" ev2.set() await asyncio.gather(*coros) + assert pool.in_use == 0 assert ( - pool.in_use == 0 and pool.available.qsize() == nworkers + pool.available.qsize() == nworkers ), "all workers should have returned their connections" -@pytest.mark.asyncio +@pytest.mark.asyncio() async def test_burst(pool_cls): """Test that bursting works when enabled and doesn't when not.""" @@ -151,9 +152,8 @@ def close_connection(self, conn): pool = pool_cls(strategy=Strategy(), max_size=5) async def worker(counter, ev): - async with pool.get_connection(): - async with counter.inc(): - await ev.wait() # hold the connection until we say so + async with pool.get_connection(), counter.inc(): + await ev.wait() # hold the connection until we say so # Use up the normal max_size of the pool main_event = asyncio.Event() @@ -206,7 +206,7 @@ async def waiting_worker(): ), "Workers should return their connections to the pool" -@pytest.mark.asyncio +@pytest.mark.asyncio() async def test_stale_connections(pool_cls): """Test that the pool doesn't hand out closed connections.""" @@ -248,7 +248,7 @@ async def worker(): ), "Make sure connections closed by consumers are not given back out" -@pytest.mark.asyncio +@pytest.mark.asyncio() async def test_handling_cancellederror(): making_connection = asyncio.Event()