Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions procrastinate/contrib/django/django_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,18 +166,19 @@ def get_worker_connector(self) -> connector.BaseAsyncConnector:
"""
alias = settings.settings.DATABASE_ALIAS

kwargs, pool_kwargs = utils.connector_params(alias)
if utils.package_is_installed("psycopg") and utils.package_is_version(
"psycopg", 3
):
from procrastinate import psycopg_connector

return psycopg_connector.PsycopgConnector(
kwargs=utils.connector_params(alias)
kwargs=kwargs, pool_kwargs=pool_kwargs
)
if utils.package_is_installed("aiopg"):
from procrastinate.contrib.aiopg import aiopg_connector

return aiopg_connector.AiopgConnector(**utils.connector_params(alias))
return aiopg_connector.AiopgConnector(**kwargs)

raise django_exceptions.ImproperlyConfigured(
"You must install either psycopg(3) or aiopg to use "
Expand Down
6 changes: 4 additions & 2 deletions procrastinate/contrib/django/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from django.db import connections


def connector_params(alias: str = "default") -> dict[str, Any]:
def connector_params(alias: str = "default") -> tuple[dict[str, Any], dict[str, Any]]:
"""
Returns parameters for in a format that is suitable to be passed to a
connector constructor (see `howto/django`).
Expand All @@ -27,7 +27,9 @@ def connector_params(alias: str = "default") -> dict[str, Any]:
params = wrapper.get_connection_params()
params.pop("cursor_factory", None)
params.pop("context", None)
return params
return params, wrapper.settings_dict.get("OPTIONS", {}).get(
"pool", {}
) or {} # During tests pool is False {"pool": False}


def package_is_installed(name: str) -> bool:
Expand Down
15 changes: 11 additions & 4 deletions procrastinate/psycopg_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ def __init__(
pool_factory: Callable[
..., psycopg_pool.AsyncConnectionPool
] = psycopg_pool.AsyncConnectionPool,
pool_kwargs: dict[str, Any] | None = None,
**kwargs: Any,
):
"""
Expand Down Expand Up @@ -93,6 +94,7 @@ def __init__(
self._json_loads = json_loads
self._json_dumps = json_dumps
self._pool_args = kwargs
self._pool_kwargs = pool_kwargs
self._sync_connector: connector.BaseConnector | None = None

def get_sync_connector(self) -> connector.BaseConnector:
Expand Down Expand Up @@ -140,24 +142,29 @@ async def open_async(
self._pool_externally_set = True
self._async_pool = pool
else:
self._async_pool = await self._create_pool(self._pool_args)
self._async_pool = await self._create_pool(
self._pool_args, self._pool_kwargs
)

await self._async_pool.open(wait=True) # type: ignore

@wrap_exceptions()
async def _create_pool(
self,
pool_args: dict[str, Any],
self, pool_args: dict[str, Any], pool_kwargs: dict[str, Any] | None
) -> psycopg_pool.AsyncConnectionPool:
pool_kwargs = pool_kwargs if pool_kwargs is not None else {}
kwargs = pool_args.pop("kwargs", {})
pool_kwargs = pool_kwargs | pool_args
return self._pool_factory(
**pool_args,
kwargs=kwargs,
# Not specifying open=False raises a warning and will be deprecated.
# It makes sense, as we can't really make async I/Os in a constructor.
open=False,
# Enables a check that will ensure the connections returned when
# using the pool are still alive. If they have been closed by the
# database, they will be seamlessly replaced by a new connection.
check=psycopg_pool.AsyncConnectionPool.check_connection,
**pool_kwargs,
)

@wrap_exceptions()
Expand Down
12 changes: 9 additions & 3 deletions procrastinate/sync_psycopg_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def __init__(
*,
json_dumps: Callable | None = None,
json_loads: Callable | None = None,
pool_kwargs: dict[str, Any] | None = None,
**kwargs: Any,
):
"""
Expand Down Expand Up @@ -87,6 +88,7 @@ def __init__(
self._json_loads = json_loads
self._json_dumps = json_dumps
self._pool_args = kwargs
self._pool_kwargs = pool_kwargs

def get_sync_connector(self) -> connector.BaseConnector:
return self
Expand All @@ -111,21 +113,25 @@ def open(self, pool: psycopg_pool.ConnectionPool | None = None) -> None:
self._pool_externally_set = True
self._pool = pool
else:
self._pool = self._create_pool(self._pool_args)
self._pool = self._create_pool(self._pool_args, self._pool_kwargs)
self._pool.open(wait=True)

@staticmethod
@wrap_exceptions()
def _create_pool(pool_args: dict[str, Any]) -> psycopg_pool.ConnectionPool:
def _create_pool(
pool_args: dict[str, Any], pool_kwargs: dict[str, Any] | None
) -> psycopg_pool.ConnectionPool:
pool_kwargs = pool_kwargs if pool_kwargs is not None else {}
pool = psycopg_pool.ConnectionPool(
**pool_args,
kwargs=pool_args.get("kwargs", {}),
# Not specifying open=False raises a warning and will be deprecated.
# It makes sense, as we can't really make async I/Os in a constructor.
open=False,
# Enables a check that will ensure the connections returned when
# using the pool are still alive. If they have been closed by the
# database, they will be seamlessly replaced by a new connection.
check=psycopg_pool.ConnectionPool.check_connection,
**pool_kwargs,
)
return pool

Expand Down
1 change: 1 addition & 0 deletions tests/acceptance/django_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
"ENGINE": "django.db.backends.postgresql",
"NAME": os.environ.get("PGDATABASE", "procrastinate"),
"TEST": {"NAME": "procrastinate_django_test"},
"OPTIONS": {"pool": {"min_size": 1, "max_size": 4}},
},
}
INSTALLED_APPS = [
Expand Down
8 changes: 8 additions & 0 deletions tests/unit/contrib/django/test_django_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from procrastinate import exceptions
from procrastinate.contrib.django import django_connector as django_connector_module
from procrastinate.psycopg_connector import PsycopgConnector


def test_wrap_exceptions__no_cause():
Expand All @@ -18,3 +19,10 @@ def test_wrap_exceptions__with_cause():
with pytest.raises(exceptions.ConnectorException):
with django_connector_module.wrap_exceptions():
raise django_db.DatabaseError from psycopg_errors.Error()


def test_get_worker_connector():
django_connector = django_connector_module.DjangoConnector()
worker_connector = django_connector.get_worker_connector()
assert isinstance(worker_connector, PsycopgConnector)
assert worker_connector._pool_kwargs == {"min_size": 1, "max_size": 4}
4 changes: 3 additions & 1 deletion tests/unit/contrib/django/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@


def test_connector_params():
assert "dbname" in utils.connector_params()
params, pool_params = utils.connector_params()
assert "dbname" in params
assert pool_params == {"min_size": 1, "max_size": 4}


@pytest.mark.parametrize(
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/test_psycopg_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ async def test_open_async_no_pool_specified(mocker, connector):

await connector.open_async()

assert connector._create_pool.call_count == 1
connector._create_pool.assert_called_once_with({}, None)
assert connector._async_pool.open.await_count == 1


Expand Down
2 changes: 1 addition & 1 deletion tests/unit/test_sync_psycopg_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def test_open_no_pool_specified(mock_create_pool):
connector.open()

assert connector._pool_externally_set is False
mock_create_pool.assert_called_once_with(connector._pool_args)
mock_create_pool.assert_called_once_with(connector._pool_args, None)


def test_open_pool_argument_specified(mock_create_pool, mocker):
Expand Down
Loading