Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ jobs:
fail-fast: false

matrix:
python-version: ["3.10", "3.11", "3.12", "3.13"]
python-version: ["3.10", "3.11", "3.12", "3.13", "3.14"]
postgres-version: ["13", "14", "15", "16", "17"]
os: [ubuntu-24.04]

Expand Down
5 changes: 3 additions & 2 deletions pgqueuer/executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import asyncio
import dataclasses
import functools
import inspect
import random
from abc import ABC, abstractmethod
from datetime import datetime, timedelta, timezone
Expand Down Expand Up @@ -49,8 +50,8 @@ def is_async_callable(obj: object) -> bool:
while isinstance(obj, functools.partial):
obj = obj.func

return asyncio.iscoroutinefunction(obj) or (
callable(obj) and asyncio.iscoroutinefunction(obj.__call__)
return inspect.iscoroutinefunction(obj) or (
callable(obj) and inspect.iscoroutinefunction(getattr(obj, "__call__", None))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sorry to ask a dummy question, is the getattr necessary here?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prob. not, unsure what i was thinking there.

Copy link
Owner Author

@janbjorge janbjorge Oct 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Owner Author

@janbjorge janbjorge Oct 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the back and forth, i've been writing loads of tests to dig here see.

But the gist of it; Yes, getattr is needed. inspect.iscoroutinefunction(obj) only works for actual async function objects. Instances with async call return False there, so you must look at obj.call. Using getattr(obj, "call", None) avoids an AttributeError and cleanly checks the bound method. Without it you’d misclassify async callable instances or need a try/except.

EDIT;
Make sure to check look at these test cases; https://github.com/janbjorge/pgqueuer/pull/473/files#diff-998ed263122511bfcb910e08f8830fd62a83287a5b2aec5a40e351f5ba5d47adR536

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for your reply, I didn't aware of that

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for asking @trim21 it lead me down an crazy rabithole that i enjoyed!

)


Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ classifiers = [
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Programming Language :: Python :: 3.13",
"Programming Language :: Python :: 3.14",
"Programming Language :: Python",
"Topic :: Database",
"Topic :: Software Development :: Libraries :: Python Modules",
Expand All @@ -46,7 +47,7 @@ dependencies = [
"pydantic>=2.0.0",
"tabulate>=0.9.0",
"typer>=0.15.1",
"uvloop>=0.21.0; sys_platform != 'win32'",
"uvloop>=0.22.0; sys_platform != 'win32'",
"async-timeout>=5.0.1",
]

Expand Down
50 changes: 46 additions & 4 deletions test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
import asyncio
import os
import uuid
from typing import AsyncGenerator
from urllib.parse import urlparse, urlunparse
from typing import Any, AsyncGenerator
from urllib.parse import quote, urlparse, urlunparse

import asyncpg
import psycopg
Expand All @@ -19,11 +19,53 @@
except ModuleNotFoundError:
uvloop = None # type: ignore[assignment]

from testcontainers.postgres import PostgresContainer
from testcontainers.core.container import DockerContainer
from testcontainers.core.wait_strategies import LogMessageWaitStrategy

from pgqueuer.db import SyncPsycopgDriver


class PGQueuerPostgresContainer(DockerContainer):
"""Postgres container with modern wait strategy support."""

def __init__(
self,
image: str = "postgres:latest",
port: int = 5432,
username: str | None = None,
password: str | None = None,
dbname: str | None = None,
*,
driver: str | None = "psycopg2",
**kwargs: Any,
) -> None:
super().__init__(image=image, **kwargs)
self.port = port
self.username = username or os.environ.get("POSTGRES_USER", "test")
self.password = password or os.environ.get("POSTGRES_PASSWORD", "test")
self.dbname = dbname or os.environ.get("POSTGRES_DB", "test")
self.driver_suffix = f"+{driver}" if driver else ""

self.with_exposed_ports(port)
self.with_env("POSTGRES_USER", self.username)
self.with_env("POSTGRES_PASSWORD", self.password)
self.with_env("POSTGRES_DB", self.dbname)
self.waiting_for(LogMessageWaitStrategy("database system is ready to accept connections"))

def get_connection_url(self, host: str | None = None) -> str:
if self._container is None:
msg = "container has not been started"
raise RuntimeError(msg)

host = host or self.get_container_host_ip()
port = self.get_exposed_port(self.port)
quoted_password = quote(self.password, safe=" +")
return (
f"postgresql{self.driver_suffix}://{self.username}:{quoted_password}"
f"@{host}:{port}/{self.dbname}"
)


@pytest.fixture(scope="session", autouse=True)
def event_loop_policy() -> asyncio.AbstractEventLoopPolicy:
"""Provide uvloop if available; fallback to default policy."""
Expand Down Expand Up @@ -63,7 +105,7 @@ async def postgres_container() -> AsyncGenerator[str, None]:
] + (["-c", "vacuum_buffer_usage_limit=8MB"] if int(postgres_version) >= 16 else [])

container = (
PostgresContainer(f"postgres:{postgres_version}", driver=None)
PGQueuerPostgresContainer(f"postgres:{postgres_version}", driver=None)
.with_command(commands)
.with_kwargs(tmpfs={"/var/lib/pg/data": "rw"})
.with_envs(PGDATA="/var/lib/pg/data")
Expand Down
7 changes: 6 additions & 1 deletion test/test_factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,11 @@ async def test_run_factory_with_invalid_input() -> None:
"""
invalid_input: str = "not a valid input"

with pytest.raises(TypeError, match="object str can't be used in 'await' expression"):
with pytest.raises(TypeError) as exc_info:
async with run_factory(invalid_input): # type: ignore[arg-type]
pass # This line should not be reached

assert str(exc_info.value) in {
"object str can't be used in 'await' expression",
"'str' object can't be awaited",
}
Loading