Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Group messages not processed in real-time when sent from the same async consumer #2025

Open
rasca opened this issue Jul 6, 2023 · 6 comments

Comments

@rasca
Copy link

rasca commented Jul 6, 2023

Description:
I have encountered an issue where group messages sent from an AsyncWebsocketConsumer in receive are not processed in real-time by the same consumer if it's self-subscribed to the group. Instead, the messages are accumulated and processed together after the completion of the initial message receive method. This behavior is unexpected and not explained in the documentation.

Steps to Reproduce:

  1. Create an async consumer that subscribes to a group.
  2. In the receive method of the consumer, perform a long-running process.
  3. Within the long-running process, call group_send to send messages to the same group the consumer is subscribed to.
  4. Observe that the group messages are not processed until the initial message's long-running process is completed.
  5. Other consumers subscribed to the same group do receive the messages in real time.

Expected Behaviour:
When a group message is sent from an async consumer using group_send, it should be processed in real-time, even if the receive is the same consumer.

Actual Behaviour:
The group messages sent from an async consumer are accumulated and processed together after the completion of the initial message's long-running process. This causes delays in message delivery and affects the real-time nature of the application. Other consumers in the same group receive the messages as they are emitted.

Environment:
django-channels==4.0.0
channels-redis==4.1.0
Tested with Daphne and Uvicorn

Additional Information:
I have noticed that if I pass the consumer instance to the async callback and call send instead of group_send, the messages are received correctly by the client, but other consumers subscribed to the group do not receive the messages.

Expected Resolution:
If the current behaviour is intentional and part of the channels architecture, it should be documented to help developers understand and work around this limitation. However, if this behaviour is unintended, I believe it should be treated as a bug and addressed in a future release.

Use case:
Our use case is a chat application with a LLM (large language model) that streams the tokens of it responses. When a user sends a message, the consumer performs a long-running process of generating tokens using the LLM. As each token is generated, it needs to be sent to all the users in the chat room in real-time (all but who called the LLM receive the tokens one at a time).

Please let me know if any further information or code samples are needed.

@carltongibson
Copy link
Member

carltongibson commented Jul 6, 2023

I'd say this is maybe #1924

@carltongibson
Copy link
Member

A small (failing) test case using just channels would be useful to clarify the desired change.

@rasca
Copy link
Author

rasca commented Jul 6, 2023

Let me try and write something. Would a test with asyncio.sleep(1) be acceptable?

rasca added a commit to rasca/channels that referenced this issue Jul 6, 2023
@rasca
Copy link
Author

rasca commented Jul 6, 2023

@carltongibson I hacked something quickly. I'm pretty sure the test is not elegant and in an incorrect place, but it showcases the issue. Some guidance would we welcome.

The messages should arrive as 1, 2, 3 and not as 3, 1, 2.

@MaZZly
Copy link

MaZZly commented Nov 22, 2023

We also encounter this, which happens both in the Django runserver and with gunicorn+uvicorn workers...

When we call self.channel_layer.group_send() every client in the group, except the one that triggered the group_send(), gets the message instantly.

The client that triggered the event gets the message first after the consumer function finishes (which is/can be a long running function). As mentioned by @rasca, calling self.send() works to send the message instantly.

As a workaround for this we added extra code to "send it instantly", and then a decorator to handle not sending duplicate events to the same client again:

class RoomConsumer(AsyncJsonWebsocketConsumer):
    async def _send_group(self, data, send_instantly_function=None):
        """If send_instantly_function is defined, calls it directly and sets `skip_for_userid` on the data sent to the group."""
        send_data = {}
        if send_instantly_function:
            await send_instantly_function(data)
            send_data["skip_for_channel_name"] = self.channel_name
        send_data.update(data)
        await self.channel_layer.group_send(self.room_group_name, send_data)

    def skip_duplicate_send(func):
        """Decorator to skip sending message to client if it has already been done in `_send_group()`."""
        async def dummy_func():
            pass

        @wraps(func)
        def _decorator(self, *args, **kwargs):
            if args[0].get("skip_for_channel_name") == self.channel_name:
                return dummy_func()
            return func(self, *args, **kwargs)

        return _decorator

    @skip_duplicate_send
    async def send_room_list(self, event):
        """Send updated list of rooms to clients."""
        await self.send_json({"type": "room-list", "rooms": event["rooms"]})

    async def add_room(self, data):
        rooms = handle_add_room_and_get_list_of_rooms()
        await self._send_group({"type": "send_room_list", "rooms": rooms}, self.send_room_list)
        # Other things that take unknown time and might update the state of rooms
        rooms = call_external_apis_etc_and_get_list_of_rooms()
        await self._send_group({"type": "send_room_list", "rooms": rooms}, self.send_room_list)

@MaZZly
Copy link

MaZZly commented Oct 30, 2024

We did some refactoring that made the "Update room list" logic reusable from outside the consumer as well and then we realized that it works fine and in the correct order without having to do any skip_for_channel_name magic...

So e.g. the following works and does it in the correct order:

from channels.layers import get_channel_layer

channel_layer = get_channel_layer()


async def send_updated_room_list(group_id) -> None:
    """Send an updated list rooms to the specific group"""
    await channel_layer.group_send(
        f"group_{group_id}",
        {"type": "send_room_list", "rooms": [room.to_json() for room in Room.objects.all()]},
    )


class RoomConsumer(AsyncJsonWebsocketConsumer):
    async def add_room(self, data):
        rooms = handle_add_room_and_get_list_of_rooms()
        await send_updated_room_list(self.group_id)
        # Other things that take unknown time and might update the state of rooms
        rooms = call_external_apis_etc_and_get_list_of_rooms()
        await send_updated_room_list(self.group_id)

So it seems the problems lies somewhere with the self.channel_layer.group_send() inside the consumer...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants