From 46c0389a0194f5411dad4ceb46f6ac89182adfdc Mon Sep 17 00:00:00 2001 From: Kai Schlamp Date: Sun, 2 Feb 2025 23:16:48 +0000 Subject: [PATCH 1/8] Add middleware option to worker --- procrastinate/app.py | 11 ++++++++++- procrastinate/middleware.py | 18 ++++++++++++++++++ procrastinate/worker.py | 5 ++++- 3 files changed, 32 insertions(+), 2 deletions(-) create mode 100644 procrastinate/middleware.py diff --git a/procrastinate/app.py b/procrastinate/app.py index ac2d50c35..f68858ee4 100644 --- a/procrastinate/app.py +++ b/procrastinate/app.py @@ -13,7 +13,15 @@ from typing_extensions import NotRequired, Unpack -from procrastinate import blueprints, exceptions, jobs, manager, schema, utils +from procrastinate import ( + blueprints, + exceptions, + jobs, + manager, + middleware, + schema, + utils, +) from procrastinate import connector as connector_module if TYPE_CHECKING: @@ -34,6 +42,7 @@ class WorkerOptions(TypedDict): delete_jobs: NotRequired[str | jobs.DeleteJobCondition] additional_context: NotRequired[dict[str, Any]] install_signal_handlers: NotRequired[bool] + middleware: NotRequired[middleware.Middleware] class App(blueprints.Blueprint): diff --git a/procrastinate/middleware.py b/procrastinate/middleware.py new file mode 100644 index 000000000..8b92e6ee0 --- /dev/null +++ b/procrastinate/middleware.py @@ -0,0 +1,18 @@ +from __future__ import annotations + +from collections.abc import Awaitable +from typing import TYPE_CHECKING, Callable + +from procrastinate import job_context + +if TYPE_CHECKING: + from procrastinate import worker + +ProcessTask = Callable[..., Awaitable] +Middleware = Callable[[ProcessTask, job_context.JobContext, "worker.Worker"], Awaitable] + + +async def default_middleware( + process_task: ProcessTask, context: job_context.JobContext, worker: worker.Worker +): + return await process_task() diff --git a/procrastinate/worker.py b/procrastinate/worker.py index f4227d9d8..a693efde6 100644 --- a/procrastinate/worker.py +++ b/procrastinate/worker.py @@ -14,6 +14,7 @@ exceptions, job_context, jobs, + middleware, periodic, retry, signals, @@ -45,6 +46,7 @@ def __init__( delete_jobs: str | jobs.DeleteJobCondition | None = None, additional_context: dict[str, Any] | None = None, install_signal_handlers: bool = True, + middleware: middleware.Middleware = middleware.default_middleware, ): self.app = app self.queues = queues @@ -61,6 +63,7 @@ def __init__( ) or jobs.DeleteJobCondition.NEVER self.additional_context = additional_context self.install_signal_handlers = install_signal_handlers + self.middleware = middleware if self.worker_name: self.logger = logger.getChild(self.worker_name) @@ -251,7 +254,7 @@ async def ensure_async() -> Callable[..., Awaitable]: return task_result - job_result.result = await ensure_async() + job_result.result = await self.middleware(ensure_async, context, self) except BaseException as e: exc_info = e From ab76856dea832b882da30093787b9dfcf95d2f0f Mon Sep 17 00:00:00 2001 From: Kai Schlamp Date: Mon, 3 Feb 2025 00:08:09 +0000 Subject: [PATCH 2/8] Test a custom worker middleware --- tests/unit/test_worker.py | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/tests/unit/test_worker.py b/tests/unit/test_worker.py index 3d88d0626..d997eafef 100644 --- a/tests/unit/test_worker.py +++ b/tests/unit/test_worker.py @@ -736,3 +736,28 @@ async def test_run_no_signal_handlers(worker, kill_own_pid): await asyncio.sleep(0.01) # Test that handlers are NOT installed kill_own_pid(signal=signal.SIGINT) + + +async def test_worker_middleware(app: App): + @app.task() + async def task_func(): + return 42 + + await task_func.defer_async() + + middleware_called = False + + async def custom_middleware(process_task, context, worker): + nonlocal middleware_called + middleware_called = True + assert isinstance(context, JobContext) + assert isinstance(worker, Worker) + worker.stop() + result = await process_task() + assert result == 42 + return result + + worker = Worker(app, wait=True, middleware=custom_middleware) + await worker.run() + + assert middleware_called From faf5df06f714bd59b9b62e63075fb99d591d40d7 Mon Sep 17 00:00:00 2001 From: Kai Schlamp Date: Mon, 3 Feb 2025 00:32:41 +0000 Subject: [PATCH 3/8] Add some documentation for the middleware --- docs/howto/advanced/middleware.md | 36 +++++++++++++++++++++++++------ procrastinate/app.py | 3 +++ 2 files changed, 33 insertions(+), 6 deletions(-) diff --git a/docs/howto/advanced/middleware.md b/docs/howto/advanced/middleware.md index f0717f98b..ca168c2e1 100644 --- a/docs/howto/advanced/middleware.md +++ b/docs/howto/advanced/middleware.md @@ -1,12 +1,38 @@ -# Add a task middleware +# Add a middleware to the worker -As of today, Procrastinate has no specific way of ensuring a piece of code runs -before or after every job. That being said, you can always decide to use +Procrastinate allows you to add a middleware to the worker. This middleware +wraps the execution of every task, and can be used to add custom behavior before +or after the task is executed. + +```python +def custom_middleware(process_task, context, worker): + # do something before the task is executed + result = await process_task() + # do something after the task is executed + return result + +app.run_worker(middleware=custom_middleware) +``` + +The middleware is a coroutine that takes three arguments: +- `process_task`: a coroutine that runs the task +- `context`: a `JobContext` object that contains information about the job +- `worker`: the worker that runs the job + +The middleware should await `process_task` to run the task and return the result. + +The `worker` instance can be used to stop the worker from the middleware by +calling `worker.stop()`. This will stop the worker after the jobs currently being +processed are done. + +# Add a middleware to a specific task + +If you only want to wrap a specific task with a middleware you can use your own decorator instead of `@app.task` and have this decorator implement the actions you need and delegate the rest to `@app.task`. It might look like this: -``` +```python import functools def task(original_func=None, **kwargs): @@ -26,5 +52,3 @@ def task(original_func=None, **kwargs): return wrap(original_func) ``` - -Then, define all of your tasks using this `@task` decorator. diff --git a/procrastinate/app.py b/procrastinate/app.py index f68858ee4..3c72ed2ab 100644 --- a/procrastinate/app.py +++ b/procrastinate/app.py @@ -325,6 +325,9 @@ async def run_worker_async(self, **kwargs: Unpack[WorkerOptions]) -> None: worker. Use ``False`` if you want to handle signals yourself (e.g. if you run the work as an async task in a bigger application) (defaults to ``True``) + middleware: ``Optional[Middleware]`` + A coroutine that can be used to wrap the task execution. The default middleware + just awaits the task and returns the result. """ self.perform_import_paths() worker = self._worker(**kwargs) From 268d49396975c4b7b726fb83fe933223ff6f7248 Mon Sep 17 00:00:00 2001 From: Kai Schlamp Date: Tue, 18 Feb 2025 00:12:46 +0000 Subject: [PATCH 4/8] Add middleware option to task --- procrastinate/app.py | 2 +- procrastinate/blueprints.py | 10 ++++- procrastinate/middleware.py | 33 ++++++++++++--- procrastinate/tasks.py | 14 ++++++- procrastinate/worker.py | 30 +++++++++++--- tests/unit/test_middleware.py | 77 +++++++++++++++++++++++++++++++++++ tests/unit/test_worker.py | 25 ------------ 7 files changed, 152 insertions(+), 39 deletions(-) create mode 100644 tests/unit/test_middleware.py diff --git a/procrastinate/app.py b/procrastinate/app.py index 3c72ed2ab..ea0d3e0be 100644 --- a/procrastinate/app.py +++ b/procrastinate/app.py @@ -42,7 +42,7 @@ class WorkerOptions(TypedDict): delete_jobs: NotRequired[str | jobs.DeleteJobCondition] additional_context: NotRequired[dict[str, Any]] install_signal_handlers: NotRequired[bool] - middleware: NotRequired[middleware.Middleware] + middleware: NotRequired[middleware.WorkerMiddleware] class App(blueprints.Blueprint): diff --git a/procrastinate/blueprints.py b/procrastinate/blueprints.py index e39619189..67a1f72f8 100644 --- a/procrastinate/blueprints.py +++ b/procrastinate/blueprints.py @@ -7,7 +7,7 @@ from typing_extensions import Concatenate, ParamSpec, TypeVar, Unpack -from procrastinate import exceptions, jobs, periodic, retry, utils +from procrastinate import exceptions, jobs, middleware, periodic, retry, utils from procrastinate.job_context import JobContext if TYPE_CHECKING: @@ -211,6 +211,7 @@ def task( priority: int = jobs.DEFAULT_PRIORITY, lock: str | None = None, queueing_lock: str | None = None, + middleware: middleware.ProcessTask[R] | None = None, ) -> Callable[[Callable[P, R]], Task[P, R, P]]: """Declare a function as a task. This method is meant to be used as a decorator Parameters @@ -249,6 +250,10 @@ def task( Default is no retry. pass_context : Passes the task execution context in the task as first + middleware : + A function that can be used to wrap the task execution. The default middleware + just calls the task function and returns its result. If the task is async, + the middleware should be async too and return an awaitable. """ ... @@ -265,6 +270,7 @@ def task( priority: int = jobs.DEFAULT_PRIORITY, lock: str | None = None, queueing_lock: str | None = None, + middleware: middleware.ProcessTask[R] | None = None, ) -> Callable[ [Callable[Concatenate[JobContext, P], R]], Task[Concatenate[JobContext, P], R, P], @@ -299,6 +305,7 @@ def task( priority: int = jobs.DEFAULT_PRIORITY, lock: str | None = None, queueing_lock: str | None = None, + middleware: middleware.ProcessTask[R] | None = None, ): from procrastinate.tasks import Task @@ -329,6 +336,7 @@ def _wrap(func: Callable[P, R]) -> Task[P, R, P]: aliases=aliases, retry=retry, pass_context=pass_context, + middleware=middleware, ) self._register_task(task) diff --git a/procrastinate/middleware.py b/procrastinate/middleware.py index 8b92e6ee0..2f0283845 100644 --- a/procrastinate/middleware.py +++ b/procrastinate/middleware.py @@ -1,18 +1,41 @@ from __future__ import annotations from collections.abc import Awaitable -from typing import TYPE_CHECKING, Callable +from typing import TYPE_CHECKING, Callable, TypeVar from procrastinate import job_context +R = TypeVar("R") + if TYPE_CHECKING: from procrastinate import worker -ProcessTask = Callable[..., Awaitable] -Middleware = Callable[[ProcessTask, job_context.JobContext, "worker.Worker"], Awaitable] +ProcessTask = Callable[..., R] +WorkerMiddleware = Callable[ + [ProcessTask[Awaitable], job_context.JobContext, "worker.Worker"], Awaitable +] +TaskMiddleware = Callable[[ProcessTask[R], job_context.JobContext, "worker.Worker"], R] + + +async def default_worker_middleware( + process_task: ProcessTask, + context: job_context.JobContext, + worker: worker.Worker, +): + return await process_task() -async def default_middleware( - process_task: ProcessTask, context: job_context.JobContext, worker: worker.Worker +async def default_async_task_middleware( + process_task: ProcessTask, + context: job_context.JobContext, + worker: worker.Worker, ): return await process_task() + + +def default_sync_task_middleware( + process_task: ProcessTask, + context: job_context.JobContext, + worker: worker.Worker, +): + return process_task() diff --git a/procrastinate/tasks.py b/procrastinate/tasks.py index 876da1c69..923f55054 100644 --- a/procrastinate/tasks.py +++ b/procrastinate/tasks.py @@ -1,6 +1,7 @@ from __future__ import annotations import datetime +import inspect import logging from typing import Callable, Generic, TypedDict, cast @@ -8,6 +9,7 @@ from procrastinate import app as app_module from procrastinate import blueprints, exceptions, jobs, manager, types, utils +from procrastinate import middleware as middleware_module from procrastinate import retry as retry_module logger = logging.getLogger(__name__) @@ -18,7 +20,7 @@ R = TypeVar("R") -class ConfigureTaskOptions(TypedDict): +class ConfigureTaskOptions(TypedDict, Generic[R]): lock: NotRequired[str | None] queueing_lock: NotRequired[str | None] task_kwargs: NotRequired[types.JSONDict | None] @@ -26,6 +28,7 @@ class ConfigureTaskOptions(TypedDict): schedule_in: NotRequired[types.TimeDeltaParams | None] queue: NotRequired[str | None] priority: NotRequired[int | None] + middleware: NotRequired[middleware_module.TaskMiddleware[R] | None] def configure_task( @@ -85,6 +88,7 @@ def __init__( priority: int = jobs.DEFAULT_PRIORITY, lock: str | None = None, queueing_lock: str | None = None, + middleware: middleware_module.TaskMiddleware[R] | None = None, ): #: Default queue to send deferred jobs to. The queue can be overridden #: when a job is deferred. @@ -113,6 +117,14 @@ def __init__( #: Default queueing lock. The queuing lock can be overridden when a job #: is deferred. self.queueing_lock: str | None = queueing_lock + #: Middleware to be used when the task is executed. + if middleware is not None: + self.middleware = middleware + else: + if inspect.iscoroutinefunction(func): + self.middleware = middleware_module.default_async_task_middleware + else: + self.middleware = middleware_module.default_sync_task_middleware def add_namespace(self, namespace: str) -> None: """ diff --git a/procrastinate/worker.py b/procrastinate/worker.py index a693efde6..d48c81ea9 100644 --- a/procrastinate/worker.py +++ b/procrastinate/worker.py @@ -46,7 +46,7 @@ def __init__( delete_jobs: str | jobs.DeleteJobCondition | None = None, additional_context: dict[str, Any] | None = None, install_signal_handlers: bool = True, - middleware: middleware.Middleware = middleware.default_middleware, + middleware: middleware.WorkerMiddleware = middleware.default_worker_middleware, ): self.app = app self.queues = queues @@ -233,14 +233,32 @@ async def _process_job(self, context: job_context.JobContext): exc_info: bool | BaseException = False async def ensure_async() -> Callable[..., Awaitable]: - await_func: Callable[..., Awaitable] + job_args = [context] if task.pass_context else [] if inspect.iscoroutinefunction(task.func): - await_func = task + + async def run_task_async(): + return await task.func(*job_args, **job.task_kwargs) + + wrapped_middleware = functools.partial( + task.middleware, + run_task_async, + context, + self, + ) else: - await_func = functools.partial(utils.sync_to_async, task) - job_args = [context] if task.pass_context else [] - task_result = await await_func(*job_args, **job.task_kwargs) + def run_task_sync(): + return task(*job_args, **job.task_kwargs) + + wrapped_middleware = functools.partial( + utils.sync_to_async, + task.middleware, + run_task_sync, + context, + self, + ) + + task_result = await wrapped_middleware() # In some cases, the task function might be a synchronous function # that returns an awaitable without actually being a # coroutinefunction. In that case, in the await above, we haven't diff --git a/tests/unit/test_middleware.py b/tests/unit/test_middleware.py new file mode 100644 index 000000000..1248afa22 --- /dev/null +++ b/tests/unit/test_middleware.py @@ -0,0 +1,77 @@ +from __future__ import annotations + +from procrastinate.app import App +from procrastinate.job_context import JobContext +from procrastinate.worker import Worker + + +async def test_worker_middleware(app: App): + @app.task() + async def task_func(): + return 42 + + await task_func.defer_async() + + middleware_called = False + + async def custom_worker_middleware(process_task, context, worker): + assert isinstance(context, JobContext) + assert isinstance(worker, Worker) + worker.stop() + result = await process_task() + assert result == 42 + nonlocal middleware_called + middleware_called = True + return result + + await app.run_worker_async(wait=True, middleware=custom_worker_middleware) + + assert middleware_called + + +async def test_sync_task_middleware(app: App): + middleware_called = False + + def sync_task_middleware(process_task, context, worker): + assert isinstance(context, JobContext) + assert isinstance(worker, Worker) + worker.stop() + result = process_task() + assert result == 42 + nonlocal middleware_called + middleware_called = True + return result + + @app.task(middleware=sync_task_middleware) + def my_task(a): + return a + + await my_task.defer_async(a=42) + + await app.run_worker_async(wait=True) + + assert middleware_called + + +async def test_async_task_middleware(app: App): + middleware_called = False + + async def async_task_middleware(process_task, context, worker): + assert isinstance(context, JobContext) + assert isinstance(worker, Worker) + worker.stop() + result = await process_task() + assert result == 42 + nonlocal middleware_called + middleware_called = True + return result + + @app.task(middleware=async_task_middleware) + async def my_task(a): + return a + + await my_task.defer_async(a=42) + + await app.run_worker_async(wait=True) + + assert middleware_called diff --git a/tests/unit/test_worker.py b/tests/unit/test_worker.py index d997eafef..3d88d0626 100644 --- a/tests/unit/test_worker.py +++ b/tests/unit/test_worker.py @@ -736,28 +736,3 @@ async def test_run_no_signal_handlers(worker, kill_own_pid): await asyncio.sleep(0.01) # Test that handlers are NOT installed kill_own_pid(signal=signal.SIGINT) - - -async def test_worker_middleware(app: App): - @app.task() - async def task_func(): - return 42 - - await task_func.defer_async() - - middleware_called = False - - async def custom_middleware(process_task, context, worker): - nonlocal middleware_called - middleware_called = True - assert isinstance(context, JobContext) - assert isinstance(worker, Worker) - worker.stop() - result = await process_task() - assert result == 42 - return result - - worker = Worker(app, wait=True, middleware=custom_middleware) - await worker.run() - - assert middleware_called From 4de14b6efc7c5937114bab5d74d697b814efa0cd Mon Sep 17 00:00:00 2001 From: Kai Schlamp Date: Tue, 18 Feb 2025 00:32:21 +0000 Subject: [PATCH 5/8] For Python 3.9 use TypedDict from typing_extensions --- procrastinate/tasks.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/procrastinate/tasks.py b/procrastinate/tasks.py index 923f55054..50432743b 100644 --- a/procrastinate/tasks.py +++ b/procrastinate/tasks.py @@ -3,9 +3,9 @@ import datetime import inspect import logging -from typing import Callable, Generic, TypedDict, cast +from typing import Callable, Generic, cast -from typing_extensions import NotRequired, ParamSpec, TypeVar, Unpack +from typing_extensions import NotRequired, ParamSpec, TypedDict, TypeVar, Unpack from procrastinate import app as app_module from procrastinate import blueprints, exceptions, jobs, manager, types, utils From 6a067ae898ecc9600afebfb809c1686b98656b2a Mon Sep 17 00:00:00 2001 From: Kai Schlamp Date: Thu, 20 Feb 2025 21:57:25 +0000 Subject: [PATCH 6/8] Don't make task middleware configurable --- procrastinate/blueprints.py | 10 +++++----- procrastinate/tasks.py | 7 +++---- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/procrastinate/blueprints.py b/procrastinate/blueprints.py index 67a1f72f8..ed7f5fbbf 100644 --- a/procrastinate/blueprints.py +++ b/procrastinate/blueprints.py @@ -211,7 +211,7 @@ def task( priority: int = jobs.DEFAULT_PRIORITY, lock: str | None = None, queueing_lock: str | None = None, - middleware: middleware.ProcessTask[R] | None = None, + middleware: middleware.TaskMiddleware[R] | None = None, ) -> Callable[[Callable[P, R]], Task[P, R, P]]: """Declare a function as a task. This method is meant to be used as a decorator Parameters @@ -252,8 +252,8 @@ def task( Passes the task execution context in the task as first middleware : A function that can be used to wrap the task execution. The default middleware - just calls the task function and returns its result. If the task is async, - the middleware should be async too and return an awaitable. + just calls the task function and returns its result. If the task is a coroutine, + the middleware should also be a coroutine. """ ... @@ -270,7 +270,7 @@ def task( priority: int = jobs.DEFAULT_PRIORITY, lock: str | None = None, queueing_lock: str | None = None, - middleware: middleware.ProcessTask[R] | None = None, + middleware: middleware.TaskMiddleware[R] | None = None, ) -> Callable[ [Callable[Concatenate[JobContext, P], R]], Task[Concatenate[JobContext, P], R, P], @@ -305,7 +305,7 @@ def task( priority: int = jobs.DEFAULT_PRIORITY, lock: str | None = None, queueing_lock: str | None = None, - middleware: middleware.ProcessTask[R] | None = None, + middleware: middleware.TaskMiddleware[R] | None = None, ): from procrastinate.tasks import Task diff --git a/procrastinate/tasks.py b/procrastinate/tasks.py index 50432743b..ba4cd1131 100644 --- a/procrastinate/tasks.py +++ b/procrastinate/tasks.py @@ -3,9 +3,9 @@ import datetime import inspect import logging -from typing import Callable, Generic, cast +from typing import Callable, Generic, TypedDict, cast -from typing_extensions import NotRequired, ParamSpec, TypedDict, TypeVar, Unpack +from typing_extensions import NotRequired, ParamSpec, TypeVar, Unpack from procrastinate import app as app_module from procrastinate import blueprints, exceptions, jobs, manager, types, utils @@ -20,7 +20,7 @@ R = TypeVar("R") -class ConfigureTaskOptions(TypedDict, Generic[R]): +class ConfigureTaskOptions(TypedDict): lock: NotRequired[str | None] queueing_lock: NotRequired[str | None] task_kwargs: NotRequired[types.JSONDict | None] @@ -28,7 +28,6 @@ class ConfigureTaskOptions(TypedDict, Generic[R]): schedule_in: NotRequired[types.TimeDeltaParams | None] queue: NotRequired[str | None] priority: NotRequired[int | None] - middleware: NotRequired[middleware_module.TaskMiddleware[R] | None] def configure_task( From 00a3977d07a84ddde681b7ddae16042304a5a92c Mon Sep 17 00:00:00 2001 From: Kai Schlamp Date: Thu, 20 Feb 2025 23:06:37 +0000 Subject: [PATCH 7/8] Improve documentation --- docs/howto/advanced/middleware.md | 80 ++++++++++++++++++++----------- procrastinate/blueprints.py | 5 +- procrastinate/middleware.py | 2 +- 3 files changed, 55 insertions(+), 32 deletions(-) diff --git a/docs/howto/advanced/middleware.md b/docs/howto/advanced/middleware.md index ca168c2e1..aeebba6f2 100644 --- a/docs/howto/advanced/middleware.md +++ b/docs/howto/advanced/middleware.md @@ -1,54 +1,76 @@ -# Add a middleware to the worker +# Middleware for workers and tasks -Procrastinate allows you to add a middleware to the worker. This middleware -wraps the execution of every task, and can be used to add custom behavior before -or after the task is executed. +## Worker middleware + +Procrastinate lets you extend a worker's behavior by adding a custom middleware. +This middleware wraps the execution of every task assigned to the worker, allowing +you to execute custom logic before and after each task runs. You might use it to +log task activity, measure performance, or handle errors consistently. ```python def custom_middleware(process_task, context, worker): - # do something before the task is executed + # Execute any logic before the task runs result = await process_task() - # do something after the task is executed + # Execute any logic after the task runs return result app.run_worker(middleware=custom_middleware) ``` -The middleware is a coroutine that takes three arguments: -- `process_task`: a coroutine that runs the task +The middleware is coroutine that takes three arguments: +- `process_task`: a coroutine (without arguments) that runs the task - `context`: a `JobContext` object that contains information about the job - `worker`: the worker that runs the job The middleware should await `process_task` to run the task and return the result. +:::{note} The `worker` instance can be used to stop the worker from the middleware by calling `worker.stop()`. This will stop the worker after the jobs currently being -processed are done. +processed are finished. +::: -# Add a middleware to a specific task +:::{warning} +When the middleware is called, the job is already fetched from the database and +in `doing` state. After the `process_task` coroutine is called the job is still +in `doing` state and will be updated after the middleware returns. +::: -If you only want to wrap a specific task with a middleware you can use -your own decorator instead of `@app.task` and have this decorator -implement the actions you need and delegate the rest to `@app.task`. -It might look like this: +## Task middleware + +You can also add a middleware to a specific task. This middleware will only wrap +the execution of this specific task then. -```python -import functools -def task(original_func=None, **kwargs): - def wrap(func): - def new_func(*job_args, **job_kwargs): - # This is the custom part - log_something() - result = func(*job_args, **job_kwargs) - log_something_else() - return result +:::{note} +For a sync task, the middleware should be a sync function. For an async task, the +middleware should be a coroutine. +::: - wrapped_func = functools.update_wrapper(new_func, func, updated=()) - return app.task(**kwargs)(wrapped_func) +```python +# for a sync task +def custom_sync_middleware(process_task, context, worker): + # do something at the beginning of the task + result = process_task() + # do something at the end of the task + return result - if not original_func: - return wrap +@app.task(middleware=custom_sync_middleware) +def my_task(): + ... - return wrap(original_func) +# or for an async task +async def custom_async_middleware(process_task, context, worker): + # do something at the beginning of the task + result = await process_task() + # do something at the end of the task + return result + +@app.task(middleware=custom_async_middleware) +async def my_task(): + ... ``` + +::: {warning} +Just like with worker middleware, when the task middleware is executed, the job has already been fetched from the database and is in `doing` state. The final state of the job will only be updated once the middleware has fully completed its execution. +::: diff --git a/procrastinate/blueprints.py b/procrastinate/blueprints.py index ed7f5fbbf..714fc41f5 100644 --- a/procrastinate/blueprints.py +++ b/procrastinate/blueprints.py @@ -252,8 +252,9 @@ def task( Passes the task execution context in the task as first middleware : A function that can be used to wrap the task execution. The default middleware - just calls the task function and returns its result. If the task is a coroutine, - the middleware should also be a coroutine. + just calls the task function and returns its result. If the task is synchronous, + the middleware must also be a sync function. If the task is async, the middleware + must be async, too. """ ... diff --git a/procrastinate/middleware.py b/procrastinate/middleware.py index 2f0283845..9530347bc 100644 --- a/procrastinate/middleware.py +++ b/procrastinate/middleware.py @@ -10,7 +10,7 @@ if TYPE_CHECKING: from procrastinate import worker -ProcessTask = Callable[..., R] +ProcessTask = Callable[[], R] WorkerMiddleware = Callable[ [ProcessTask[Awaitable], job_context.JobContext, "worker.Worker"], Awaitable ] From 5e31e64b82e94378edeb424538cd0101a865a0b4 Mon Sep 17 00:00:00 2001 From: Kai Schlamp Date: Fri, 21 Feb 2025 17:57:50 +0000 Subject: [PATCH 8/8] Improve documentation --- docs/howto/advanced/middleware.md | 74 +++++++++++++++---------------- 1 file changed, 37 insertions(+), 37 deletions(-) diff --git a/docs/howto/advanced/middleware.md b/docs/howto/advanced/middleware.md index aeebba6f2..35f6d32c1 100644 --- a/docs/howto/advanced/middleware.md +++ b/docs/howto/advanced/middleware.md @@ -1,76 +1,76 @@ # Middleware for workers and tasks -## Worker middleware - -Procrastinate lets you extend a worker's behavior by adding a custom middleware. -This middleware wraps the execution of every task assigned to the worker, allowing -you to execute custom logic before and after each task runs. You might use it to -log task activity, measure performance, or handle errors consistently. - -```python -def custom_middleware(process_task, context, worker): - # Execute any logic before the task runs - result = await process_task() - # Execute any logic after the task runs - return result +Procrastinate lets you add middleware to workers and tasks. Middleware is a +function that wraps the execution of a task, allowing you to execute custom +logic before and after the task runs. You might use it to log task activity, +measure performance, or handle errors consistently. -app.run_worker(middleware=custom_middleware) -``` - -The middleware is coroutine that takes three arguments: -- `process_task`: a coroutine (without arguments) that runs the task +A middleware is a function or coroutine (see examples below) that takes three arguments: +- `process_task`: a function resp. coroutine (without arguments) that runs the task - `context`: a `JobContext` object that contains information about the job - `worker`: the worker that runs the job -The middleware should await `process_task` to run the task and return the result. +The middleware should call resp. await `process_task` to run the task and then return the +result. :::{note} -The `worker` instance can be used to stop the worker from the middleware by +The `worker` instance can be used to stop the worker from within the middleware by calling `worker.stop()`. This will stop the worker after the jobs currently being -processed are finished. +processed by the worker are finished. ::: :::{warning} -When the middleware is called, the job is already fetched from the database and -in `doing` state. After the `process_task` coroutine is called the job is still -in `doing` state and will be updated after the middleware returns. +When the middleware is called, the job was already fetched from the database and +is in `doing` state. After `process_task` the job is still in `doing` state and will +be updated to its final state after the middleware returns. ::: +## Worker middleware + +To add a middleware to a worker, pass a middleware coroutine to the `run_worker` or +`run_worker_async` method. The middleware will wrap the execution of all tasks +run by this worker. + +```python +async def custom_worker_middleware(process_task, context, worker): + # Execute any logic before the task is processed + result = await process_task() + # Execute any logic after the task is processed + return result + +app.run_worker(middleware=custom_middleware) +``` + ## Task middleware You can also add a middleware to a specific task. This middleware will only wrap -the execution of this specific task then. - +the execution of this task then. :::{note} -For a sync task, the middleware should be a sync function. For an async task, the +For a sync task, the middleware must be a sync function, and for an async task, the middleware should be a coroutine. ::: ```python -# for a sync task +# middleware of a sync task def custom_sync_middleware(process_task, context, worker): - # do something at the beginning of the task + # Execute any logic before the task is processed result = process_task() - # do something at the end of the task + # Execute any logic after the task is processed return result @app.task(middleware=custom_sync_middleware) def my_task(): ... -# or for an async task +# or middleware of an async task async def custom_async_middleware(process_task, context, worker): - # do something at the beginning of the task + # Execute any logic before the task is processed result = await process_task() - # do something at the end of the task + # Execute any logic after the task is processed return result @app.task(middleware=custom_async_middleware) async def my_task(): ... ``` - -::: {warning} -Just like with worker middleware, when the task middleware is executed, the job has already been fetched from the database and is in `doing` state. The final state of the job will only be updated once the middleware has fully completed its execution. -:::