In this asgi file I am using the tokens to return the users, and that works very fine. But in testing Django act like this asgi file not exists att all despite I already set it in the settings.py
#asgi.py
from urllib.parse import parse_qs
from channels.db import database_sync_to_async
from channels.routing import ProtocolTypeRouter, URLRouter
from django.contrib.auth.models import AnonymousUser
from django.core.asgi import get_asgi_application
# from jwt import decode
from rest_framework_simplejwt.tokens import UntypedToken
from Alerts.routing import websocket_urlpatterns
from Functions.debuging import Debugging
from users.models import User
@database_sync_to_async
def get_user(querys):
token = parse_qs(querys.decode("utf8"))['token'][0]
token_data = UntypedToken(token)
user_id = token_data["user_id"]
try:
return User.objects.get(id=user_id)
except User.DoesNotExist:
return AnonymousUser()
class QueryAuthMiddleware:
def __init__(self, app):
# Store the ASGI application we were passed
self.app = app
async def __call__(self, scope, receive, send):
print(scope) #<==🔴
scope['user'] = await get_user(scope["query_string"])
return await self.app(scope, receive, send)
application = ProtocolTypeRouter({
"http": get_asgi_application(),
"websocket": QueryAuthMiddleware(
URLRouter(
websocket_urlpatterns
)
),
})
when I run WebSocket in the front end and it connects I got the code line print(scope) #<==🔴 runs correctly. But, when I run testing this function and the whole asgi file don't run at all. However, I need the asgi file to handle the token in order to get the user and add it to the scope data.
#consumers.py
class Alerts(JsonWebsocketConsumer):
def connect(self):
# user = self.scope["user"]
Debugging(self.scope, color='blue')
# notifcation = return_notifcations(self.scope)
self.accept()
self.send(json.dumps({'message': 'data'}))
* in the test I got the message correctly and everything is just like expected except it doesn't return logined-in user
# tests.py
class MyTests(TestCase):
def setUp(self):
tests_setup_function(self)
async def test_my_consumer(self):
url = f"ws://localhost:8000/alerts/?token={self.token}"
communicator = WebsocketCommunicator(Alerts.as_asgi(), url)
connected, subprotocol = await communicator.connect()
response = await communicator.receive_from()
print([connected,subprotocol,response])
In this asgi file I am using the tokens to return the users, and that works very fine. But in testing Django act like this asgi file not exists att all despite I already set it in the settings.py
when I run WebSocket in the front end and it connects I got the code line
print(scope) #<==🔴runs correctly. But, when I run testing this function and the whole asgi file don't run at all. However, I need the asgi file to handle the token in order to get the user and add it to the scope data.