Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Events are not received when dispatch() occurs in an asyncio.Task #34

Open
smithk86 opened this issue Aug 28, 2022 · 5 comments
Open

Events are not received when dispatch() occurs in an asyncio.Task #34

smithk86 opened this issue Aug 28, 2022 · 5 comments

Comments

@smithk86
Copy link
Contributor

smithk86 commented Aug 28, 2022

I banged my head against the wall with this one for a while before I found the problem.

Steps to reproduce:

  1. Request is made to FastAPI which dispatches a starting X process. The request immediately returns a payload letting the client know the asynchronous process has started.
  2. The actual work is then done in an asyncio.Task which then dispatches a X process is complete afterwords.
  3. Any dispatched events in the Task are never received

What is actually happening:

The issue is on dispatcher.py line 57. Starting in Python 3.7, asyncio.Task copies the current context from contextvars into the Task. When line 57 is reached, the code is told the event will be dispatched in the middleware as part of the request since the request was active at the time the Task was created. In actuality, these events end up in the void as they should have been dispatched via _dispatch_as_task.

For now, anywhere an event needs to be dispatch within a Task, I import fastapi_events.in_req_res_cycle into the code and run in_req_res_cycle.set(None). This forces _dispatch() to process these events via _dispatch_as_task.

Edit: updated the link to include the specific commit revision

@melvinkcx
Copy link
Owner

Hi @smithk86, thank you for reporting the issue and for your patience. I've been thinking about this issue.

When you said the actual work being done in an asyncio.Task, do you mean you're create a task manually using asyncio.create_task()?

@melvinkcx
Copy link
Owner

Do you know if there is any good way to tell when contextvars are being copied? Would you mind sharing it or pointing me to the resource? 🙏 thanks

@smithk86
Copy link
Contributor Author

Below is a brief example of the route handler I was working on when I found this issue. The goal was to have the request start a long-running process but not have the client wait on the work being done. The results of the work would be published as an event.

To answer your question, the context (with contextvars) are being copied when asyncio.create_task() is run. The context is used for the duration of the coroutine being called. Because of this, fastapi-events believes it is executing within the context of a request when the request is actually done.

Cheers
Kyle

def add_data(data: DataModel):
    """
    This function may take a while to complete
    """
    dispatch("add_data_start", payload={"message": "starting"})
    try:
        await database.insert(data)
        await app.reload_data()
    except:
        dispatch("add_data_error", payload={"message": "an error occurred"})
    else:
        dispatch("add_data_done", payload={"message": "complete"})


@app.post("/addData")
async def uuid(
    data: DataModel,
):
    """
    This request will return immediately for the client and then do the work in the background.
    Events will be dispatched within the function/task once the work is complete.
    """
    asyncio.create_task(add_data(data))
    return {"detail": "add_data has been started"}

@melvinkcx
Copy link
Owner

thank you for the code sample. I can't think of a good general solution so far.
I would imagine a function that wraps asyncio.create_task that would also unset is_req_res_cycle at the same time, but that's not the best solution. 🤔

@smithk86
Copy link
Contributor Author

@melvinkcx Setting in_req_res_cycle to False is how I am currently getting around this and, while a bit hacky, it does work quite well.

My wrapper for dispatch() looks something like this:

@contextmanager
def _force_fastapi_events_dispatch_as_task() -> Generator:
    token: ContextVarToken = in_req_res_cycle.set(False)
    yield
    in_req_res_cycle.reset(token)


def dispatch(
    event_name: Union[str, Enum],
    payload: Optional[Any] = None,
    validate_payload: bool = True,
    payload_schema_cls_dict_args: Optional[Dict[str, Any]] = None,
    payload_schema_registry: Optional[BaseEventPayloadSchemaRegistry] = None,
    middleware_id: Optional[int] = None
    as_task: bool = False,
) -> None:
    if as_task:
        with _force_fastapi_events_dispatch_as_task():
            fastapi_events_dispatch(
                event_name=event_name,
                payload=payload,
                validate_payload=validate_payload,
                payload_schema_cls_dict_args=payload_schema_cls_dict_args,
                payload_schema_registry=payload_schema_registry,
            )
    else:
        fastapi_events_dispatch(
            event_name=event_name,
            payload=payload,
            validate_payload=validate_payload,
            payload_schema_cls_dict_args=payload_schema_cls_dict_args,
            payload_schema_registry=payload_schema_registry,
        )

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants