From 8b8ccb4812fce05404908109942fbe5878178ed6 Mon Sep 17 00:00:00 2001 From: Samuel Cormier-Iijima Date: Mon, 3 Jun 2024 10:33:48 -0400 Subject: [PATCH] wip --- .github/workflows/ci.yml | 72 +++++++-------------- asyncio_connection_pool/__init__.py | 16 ++--- asyncio_connection_pool/contrib/aioredis.py | 10 +-- asyncio_connection_pool/contrib/datadog.py | 50 +++++++------- mypy.ini | 18 ------ pyproject.toml | 62 ++++++++++++++++++ riotfile.py | 46 ------------- setup.py | 38 ----------- test/test_pool.py | 50 +++++++------- 9 files changed, 146 insertions(+), 216 deletions(-) delete mode 100644 mypy.ini create mode 100644 pyproject.toml delete mode 100644 riotfile.py delete mode 100644 setup.py diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ec0bc40..dcf569f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -1,63 +1,37 @@ name: CI -on: [push, pull_request] + +on: + push: + pull_request: + jobs: - black: + lint: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - uses: actions/setup-python@v5 - with: - python-version: "3.9" - - run: pip install riot==0.19.0 - - run: riot -v run -s black -- --check . - mypy: - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v4 - - uses: actions/setup-python@v5 - with: - python-version: "3.9" - - run: pip install riot==0.19.0 - - run: riot -v run mypy - flake8: - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v4 - - uses: actions/setup-python@v5 - with: - python-version: "3.9" - - run: pip install riot==0.19.0 - - run: riot -v run flake8 + - run: pip install uv + - run: uv venv + - run: uv pip install --requirement pyproject.toml --all-extras + - run: .venv/bin/ruff format --check . + - run: .venv/bin/ruff check . + - run: .venv/bin/mypy asyncio_connection_pool + test: + runs-on: ubuntu-latest strategy: matrix: - os: [ubuntu-latest, macos-latest] python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"] - runs-on: ${{ matrix.os }} steps: - uses: actions/checkout@v4 - - name: Setup Python - uses: actions/setup-python@v5 + - uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} - - name: install riot - run: pip install riot==0.19.0 - - name: run tests - run: riot -v run --python=${{ matrix.python-version }} test -- --cov=asyncio_connection_pool --cov-branch --cov-config=.coveragerc - - name: install coverage - run: pip install coverage - - name: upload coverage - uses: coverallsapp/github-action@v2 - with: - parallel: true - flag-name: run ${{ join(matrix.*, ' - ') }} - finish-coveralls: - needs: test - if: ${{ always() }} - runs-on: ubuntu-latest - steps: - - name: Coveralls Finished - uses: coverallsapp/github-action@v2 - with: - parallel-finished: true - allow-empty: true + - run: pip install uv + - run: uv venv + - run: uv pip install --requirement pyproject.toml --all-extras + - run: .venv/bin/pytest --cov=asyncio_connection_pool --cov-branch --cov-config=.coveragerc + - run: .venv/bin/codecov + if: matrix.python-version == '3.11' + env: + CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} diff --git a/asyncio_connection_pool/__init__.py b/asyncio_connection_pool/__init__.py index 025f41c..087f5d6 100644 --- a/asyncio_connection_pool/__init__.py +++ b/asyncio_connection_pool/__init__.py @@ -1,6 +1,6 @@ -from abc import ABC, abstractmethod import asyncio import inspect +from abc import ABC, abstractmethod from contextlib import asynccontextmanager from typing import AsyncIterator, Awaitable, Generic, Optional, TypeVar @@ -10,16 +10,13 @@ class ConnectionStrategy(ABC, Generic[Conn]): @abstractmethod - async def make_connection(self) -> Conn: - ... + async def make_connection(self) -> Conn: ... @abstractmethod - def connection_is_closed(self, conn: Conn) -> bool: - ... + def connection_is_closed(self, conn: Conn) -> bool: ... @abstractmethod - async def close_connection(self, conn: Conn) -> None: - ... + async def close_connection(self, conn: Conn) -> None: ... async def _close_connection_compat( @@ -60,14 +57,15 @@ def __init__( *, strategy: ConnectionStrategy[Conn], max_size: int, - burst_limit: Optional[int] = None + burst_limit: Optional[int] = None, ) -> None: self._loop = asyncio.get_event_loop() self.strategy = strategy self.max_size = max_size self.burst_limit = burst_limit if burst_limit is not None and burst_limit < max_size: - raise ValueError("burst_limit must be greater than or equal to max_size") + msg = "burst_limit must be greater than or equal to max_size" + raise ValueError(msg) self.in_use = 0 self.currently_allocating = 0 self.currently_deallocating = 0 diff --git a/asyncio_connection_pool/contrib/aioredis.py b/asyncio_connection_pool/contrib/aioredis.py index 01df105..eb959a9 100644 --- a/asyncio_connection_pool/contrib/aioredis.py +++ b/asyncio_connection_pool/contrib/aioredis.py @@ -1,15 +1,17 @@ -import aioredis from functools import partial + +from redis import asyncio as aioredis + from asyncio_connection_pool import ConnectionStrategy __all__ = ("RedisConnectionStrategy",) -class RedisConnectionStrategy(ConnectionStrategy[aioredis.Redis]): # type: ignore +class RedisConnectionStrategy(ConnectionStrategy[aioredis.Redis]): def __init__(self, *args, **kwargs): - self._create_redis = partial(aioredis.create_redis, *args, **kwargs) + self._create_redis = partial(aioredis.Redis, *args, **kwargs) - async def make_connection(self): + async def make_connection(self) -> aioredis.Redis: return await self._create_redis() def connection_is_closed(self, conn): diff --git a/asyncio_connection_pool/contrib/datadog.py b/asyncio_connection_pool/contrib/datadog.py index e8438fe..14ef530 100644 --- a/asyncio_connection_pool/contrib/datadog.py +++ b/asyncio_connection_pool/contrib/datadog.py @@ -1,7 +1,9 @@ -from contextlib import asynccontextmanager, AsyncExitStack +from contextlib import AsyncExitStack, asynccontextmanager +from typing import AsyncIterator, Awaitable, TypeVar + from datadog import statsd from ddtrace import tracer -from typing import AsyncIterator, TypeVar + from asyncio_connection_pool import ConnectionPool as _ConnectionPool __all__ = ("ConnectionPool",) @@ -18,13 +20,13 @@ def __init__(self, service_name, *args, extra_tags=None, **kwargs): self._extra_tags = extra_tags or [] self._loop.call_soon(self._periodically_send_metrics) - def _periodically_send_metrics(self): + def _periodically_send_metrics(self) -> None: try: self._record_pressure() finally: self._loop.call_later(60, self._periodically_send_metrics) - def _record_pressure(self): + def _record_pressure(self) -> None: statsd.gauge( f"{self._service_name}.pool.total_connections", self._total, @@ -74,7 +76,7 @@ def _record_pressure(self): tags=self._extra_tags, ) - def _record_connection_acquiring(self, value=0): + def _record_connection_acquiring(self, value: int = 0) -> None: self._connections_acquiring += value statsd.gauge( @@ -83,41 +85,35 @@ def _record_connection_acquiring(self, value=0): tags=self._extra_tags, ) - def _connection_maker(self): + async def _connection_maker(self) -> Conn: statsd.increment( f"{self._service_name}.pool.getting_connection", - tags=self._extra_tags + ["method:new"], + tags=[*self._extra_tags, "method:new"], ) - async def connection_maker(self): - with tracer.trace( - f"{self._service_name}.pool._create_new_connection", - service=self._service_name, - ): - return await super()._connection_maker() - - return connection_maker(self) + with tracer.trace( + f"{self._service_name}.pool._create_new_connection", + service=self._service_name, + ): + return await super()._connection_maker() - def _connection_waiter(self): + async def _connection_waiter(self) -> Conn: statsd.increment( f"{self._service_name}.pool.getting_connection", - tags=self._extra_tags + ["method:wait"], + tags=[*self._extra_tags, "method:wait"], ) - async def connection_waiter(self): - with tracer.trace( - f"{self._service_name}.pool._wait_for_connection", - service=self._service_name, - ): - return await super()._connection_waiter() - - return connection_waiter(self) + with tracer.trace( + f"{self._service_name}.pool._wait_for_connection", + service=self._service_name, + ): + return await super()._connection_waiter() - def _get_conn(self): + def _get_conn(self) -> Awaitable[Conn]: if not self.available.empty(): statsd.increment( f"{self._service_name}.pool.getting_connection", - tags=self._extra_tags + ["method:available"], + tags=[*self._extra_tags, "method:available"], ) return super()._get_conn() diff --git a/mypy.ini b/mypy.ini deleted file mode 100644 index 256909f..0000000 --- a/mypy.ini +++ /dev/null @@ -1,18 +0,0 @@ -[mypy] -ignore_missing_imports = True -check_untyped_defs = True -disallow_any_unimported = True -disallow_any_decorated = True -disallow_any_generics = True -disallow_subclassing_any = True -disallow_incomplete_defs = False -disallow_untyped_decorators = True -no_implicit_optional = True -strict_optional = True -warn_redundant_casts = True -warn_unused_ignores = True -warn_return_any = True -warn_unreachable = True -implicit_reexport = False -strict_equality = True -pretty = True diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..dc1025b --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,62 @@ +[build-system] +requires = ["setuptools", "setuptools-scm"] +build-backend = "setuptools.build_meta" + +[project] +name = "asyncio-connection-pool" +dynamic = ["version"] +description = "A high-throughput, optionally-burstable pool free of explicit locking" +readme = "README.md" +requires-python = ">=3.8" +authors = [{ name = "Patrick Gingras", email = "775.pg.12@gmail.com" }] +license = { text = "BSD-3-Clause" } +dependencies = [] + +[project.optional-dependencies] +datadog = ["ddtrace", "datadog"] +aioredis = ["redis>=4.2"] +dev = ["mypy~=1.9", "pytest~=8.1", "pytest-asyncio~=0.23", "ruff~=0.4"] + +[tool.setuptools_scm] + +[tool.ruff.lint] +select = [ + "B", + "COM", + "E", + "EM", + "F", + "I", + "I", + "N", + "PT", + "RSE", + "RUF", + "SIM", + "UP", + "W", +] +ignore = ["COM812"] +preview = true + +[tool.ruff.format] +preview = true + +[tool.mypy] +ignore_missing_imports = true +check_untyped_defs = true +disallow_any_unimported = true +disallow_any_decorated = true +disallow_any_generics = true +disallow_subclassing_any = true +disallow_incomplete_defs = false +disallow_untyped_decorators = true +no_implicit_optional = true +strict_optional = true +warn_redundant_casts = true +warn_unused_ignores = true +warn_return_any = true +warn_unreachable = true +implicit_reexport = false +strict_equality = true +pretty = true diff --git a/riotfile.py b/riotfile.py deleted file mode 100644 index 215be63..0000000 --- a/riotfile.py +++ /dev/null @@ -1,46 +0,0 @@ -from riot import Venv, latest - -venv = Venv( - pys=3, - venvs=[ - Venv( - pys=["3.8", "3.9", "3.10", "3.11", "3.12"], - name="test", - command="pytest {cmdargs}", - pkgs={ - "pytest": latest, - "pytest-asyncio": latest, - "pytest-cov": latest, - # extras_require - "ddtrace": latest, - "datadog": latest, - "aioredis": latest, - }, - ), - Venv( - name="mypy", - command="mypy asyncio_connection_pool", - pkgs={ - "mypy": "==1.1.1", - }, - ), - Venv( - pkgs={"black": "==23.1.0"}, - venvs=[ - Venv( - name="fmt", - command=r"black --exclude '/\.riot/' .", - ), - Venv( - name="black", - command=r"black --exclude '/\.riot/' {cmdargs}", - ), - ], - ), - Venv( - name="flake8", - pkgs={"flake8": "==6.0.0"}, - command="flake8 test asyncio_connection_pool", - ), - ], -) diff --git a/setup.py b/setup.py deleted file mode 100644 index 7cf079c..0000000 --- a/setup.py +++ /dev/null @@ -1,38 +0,0 @@ -from setuptools import setup, find_packages - -with open("README.md", "r") as f: - long_description = f.read() - -setup( - name="asyncio-connection-pool", - description="A high-throughput, optionally-burstable pool free of explicit locking", - url="https://github.com/fellowinsights/asyncio-connection-pool", - author="Patrick Gingras <775.pg.12@gmail.com>", - author_email="775.pg.12@gmail.com", - classifiers=[ - "Programming Language :: Python", - "Programming Language :: Python :: 3.8", - "Programming Language :: Python :: 3.9", - "Programming Language :: Python :: 3.10", - "Programming Language :: Python :: 3.11", - "Programming Language :: Python :: 3.12", - "License :: OSI Approved :: BSD License", - ], - keywords="asyncio", - long_description=long_description, - long_description_content_type="text/markdown", - packages=find_packages( - include=["asyncio_connection_pool", "asyncio_connection_pool.*"] - ), - package_data={"asyncio_connection_pool": ["py.typed"]}, - python_requires=">=3.8", - install_requires=[], - tests_require=["riot"], - extras_require={ - "datadog": ["ddtrace", "datadog"], - "aioredis": ["aioredis"], - }, - setup_requires=["setuptools_scm"], - use_scm_version=True, - zip_safe=False, # for mypy support -) diff --git a/test/test_pool.py b/test/test_pool.py index 564747b..d6f468b 100644 --- a/test/test_pool.py +++ b/test/test_pool.py @@ -1,11 +1,14 @@ import asyncio +import random +from contextlib import asynccontextmanager +from functools import partial + import pytest + from asyncio_connection_pool import ConnectionPool, ConnectionStrategy from asyncio_connection_pool.contrib.datadog import ( ConnectionPool as TracingConnectionPool, ) -from contextlib import asynccontextmanager -from functools import partial @pytest.fixture( @@ -17,8 +20,6 @@ def pool_cls(request): class RandomIntStrategy(ConnectionStrategy[int]): async def make_connection(self): - import random - return random.randint(0, 10000) def connection_is_closed(self, conn): @@ -28,13 +29,16 @@ def close_connection(self, conn): pass -def test_valid_burst_limit(pool_cls): +@pytest.mark.asyncio() +async def test_valid_burst_limit(pool_cls): # noqa: RUF029 """Test that invalid burst_limit values cause errors (only at construction time)""" strategy = RandomIntStrategy() pool_cls(strategy=strategy, max_size=100, burst_limit=None) pool_cls(strategy=strategy, max_size=100, burst_limit=100) pool_cls(strategy=strategy, max_size=100, burst_limit=101) - with pytest.raises(ValueError): + with pytest.raises( + ValueError, match="burst_limit must be greater than or equal to max_size" + ): pool_cls(strategy=strategy, max_size=100, burst_limit=99) @@ -60,7 +64,7 @@ async def inc(self): self.n -= 1 -@pytest.mark.asyncio +@pytest.mark.asyncio() async def test_concurrent_get_connection(pool_cls): """Test handling several connection requests in a short time.""" @@ -70,9 +74,8 @@ async def test_concurrent_get_connection(pool_cls): stop = asyncio.Event() async def connection_holder(): - async with pool.get_connection(): - async with counter.inc(): - await stop.wait() + async with pool.get_connection(), counter.inc(): + await stop.wait() coros = [asyncio.create_task(connection_holder()) for _ in range(nworkers)] await counter.wait() @@ -89,7 +92,7 @@ async def connection_holder(): ), f"{nworkers} connections should be allocated" -@pytest.mark.asyncio +@pytest.mark.asyncio() async def test_currently_allocating(pool_cls): """Test that currently_allocating is accurate.""" @@ -112,10 +115,8 @@ def close_connection(self, conn): ev2 = asyncio.Event() async def worker(): - async with counter.inc(): - async with pool.get_connection(): - async with counter2.inc(): - await ev2.wait() + async with counter.inc(), pool.get_connection(), counter2.inc(): + await ev2.wait() coros = [asyncio.create_task(worker()) for _ in range(nworkers)] await counter.wait() @@ -126,17 +127,17 @@ async def worker(): ), f"{nworkers} workers are waiting for a connection" ev.set() # allow the workers to get their connections await counter2.wait() - assert ( - pool.currently_allocating == 0 and pool.in_use == nworkers - ), "all workers should have their connections now" + assert pool.currently_allocating == 0 + assert pool.in_use == nworkers, "all workers should have their connections now" ev2.set() await asyncio.gather(*coros) + assert pool.currently_allocating == 0 assert ( - pool.in_use == 0 and pool.available.qsize() == nworkers + pool.available.qsize() == nworkers ), "all workers should have returned their connections" -@pytest.mark.asyncio +@pytest.mark.asyncio() async def test_burst(pool_cls): """Test that bursting works when enabled and doesn't when not.""" @@ -151,9 +152,8 @@ def close_connection(self, conn): pool = pool_cls(strategy=Strategy(), max_size=5) async def worker(counter, ev): - async with pool.get_connection(): - async with counter.inc(): - await ev.wait() # hold the connection until we say so + async with pool.get_connection(), counter.inc(): + await ev.wait() # hold the connection until we say so # Use up the normal max_size of the pool main_event = asyncio.Event() @@ -206,7 +206,7 @@ async def waiting_worker(): ), "Workers should return their connections to the pool" -@pytest.mark.asyncio +@pytest.mark.asyncio() async def test_stale_connections(pool_cls): """Test that the pool doesn't hand out closed connections.""" @@ -248,7 +248,7 @@ async def worker(): ), "Make sure connections closed by consumers are not given back out" -@pytest.mark.asyncio +@pytest.mark.asyncio() async def test_handling_cancellederror(): making_connection = asyncio.Event()