diff --git a/Dockerfile b/Dockerfile index ce46224d..5362a7b3 100644 --- a/Dockerfile +++ b/Dockerfile @@ -26,4 +26,4 @@ ADD .git /opt/tulius/.git RUN python manage.py compilemessages ADD gunicorn.conf.py /opt/tulius/gunicorn.conf.py -CMD [ "gunicorn" ] +CMD python -m django_h2 diff --git a/README.md b/README.md index 32ef4811..d6a1ec62 100644 --- a/README.md +++ b/README.md @@ -63,10 +63,11 @@ Repo for http://tulius.com project. ```sql CREATE DATABASE tulius_prod; CREATE DATABASE tulius_qa; - CREATE DATABASE sentry; - GRANT ALL ON tulius_prod.* TO tulius_prod@'%' IDENTIFIED BY 'tulius prod password'; - GRANT ALL ON tulius_qa.* TO tulius_qa@'%' IDENTIFIED BY 'tulius qa password'; - GRANT ALL ON sentry.* TO sentry@'%' IDENTIFIED BY 'sentry'; + CREATE USER 'tulius_prod'@'%' IDENTIFIED BY 'tulius prod password'; + GRANT ALL ON tulius_prod.* TO tulius_prod@'%'; + CREATE USER 'tulius_qa'@'%' IDENTIFIED BY 'tulius qa password'; + GRANT ALL ON tulius_qa.* TO tulius_qa@'%'; + flush privileges; ``` If you have SQL backup file, placed to mysql data folder: ```bash diff --git a/gunicorn.conf.py b/gunicorn.conf.py index c0df240f..b0b49f99 100644 --- a/gunicorn.conf.py +++ b/gunicorn.conf.py @@ -1,10 +1,9 @@ bind = ['0.0.0.0:7000'] workers = 2 -worker_class = 'uvicorn.workers.UvicornWorker' +django_settings = 'settings_production,settings' max_requests = 500 keepalive = 60 preload_app = True -wsgi_app = "asgi:application" accesslog = '-' disable_redirect_access_to_syslog = True errorlog = '-' diff --git a/requirements.txt b/requirements.txt index acfde1e9..43e08104 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,8 +9,8 @@ pyyaml == 6.0 ua-parser == 0.10.0 # sentry -sentry-sdk == 1.15.0 -redis == 4.5.4 +sentry-sdk == 1.29.2 +redis == 5.0.1 requests == 2.28.2 elasticsearch8 == 8.6.2 @@ -31,8 +31,10 @@ coverage==6.5.0 coveralls==3.3.1 pytest-asyncio==0.20.3 -gunicorn == 20.1.0 +gunicorn == 21.2.0 uvicorn == 0.20.0 websockets == 10.4 +h2 +django_h2 @ git+https://github.com/kozzztik/django_h2.git@324edc8 portalocker == 2.7.0 \ No newline at end of file diff --git a/scripts/tulius/dev/docker-compose.yml b/scripts/tulius/dev/docker-compose.yml index d4232e0a..186b0e53 100644 --- a/scripts/tulius/dev/docker-compose.yml +++ b/scripts/tulius/dev/docker-compose.yml @@ -43,9 +43,5 @@ services: networks: tuliusnet: - driver: bridge - ipam: - config: - - subnet: 10.5.0.0/16 name: tuliusnet external: true diff --git a/scripts/tulius/docker-compose.yml b/scripts/tulius/docker-compose.yml index 48800295..c9a38086 100644 --- a/scripts/tulius/docker-compose.yml +++ b/scripts/tulius/docker-compose.yml @@ -9,7 +9,7 @@ services: container_name: tulius_redis mysql: - image: mysql/mysql-server:5.7.22 + image: mysql:8.1.0 restart: always networks: tuliusnet: diff --git a/scripts/tulius/local/docker-compose.yml b/scripts/tulius/local/docker-compose.yml index dadcfd15..49a82461 100644 --- a/scripts/tulius/local/docker-compose.yml +++ b/scripts/tulius/local/docker-compose.yml @@ -11,7 +11,7 @@ services: - 127.0.0.1:6379:6379 mysql: - image: mysql/mysql-server:5.7.22 + image: mysql:8.1.0 restart: always networks: tuliusnet: @@ -69,13 +69,12 @@ services: ipv4_address: 10.5.0.20 environment: TULIUS_ENV: local_docker - ports: - - 7000:7000 volumes: - ../../../data/media:/opt/tulius/data/media - ../../../data/mail:/opt/tulius/data/mail - - ../../../data/indexing:/opt/tulius/data/indexing - ../../../settings_production.py:/opt/tulius/settings_production.py + - ../../../../django_h2/:/opt/tulius/django_h2 + command: bash -c "pip install -e /opt/tulius/django_h2 && python -m django_h2 --serve_static" mem_limit: 512M logging: driver: json-file @@ -83,6 +82,18 @@ services: max-size: "100m" max-file: "10" + nginx: + image: nginx + networks: + tuliusnet: + ipv4_address: 10.5.0.4 + ports: + - "127.0.0.1:8080:80" + volumes: + - ./nginx.conf:/etc/nginx/conf.d/tulius.conf:ro + - ../../../data/media:/opt/tulius/data/media + command: ['nginx', '-g', 'daemon off;'] + networks: tuliusnet: driver: bridge diff --git a/scripts/tulius/local/nginx.conf b/scripts/tulius/local/nginx.conf new file mode 100644 index 00000000..37543a53 --- /dev/null +++ b/scripts/tulius/local/nginx.conf @@ -0,0 +1,22 @@ +server { + listen 80; + server_name 127.0.0.1; + charset utf-8; + + # max upload size + client_max_body_size 75M; + + location /media { + alias /opt/tulius/data/media; + } + + # Finally, send all non-media requests to the Django server. + location / { + grpc_set_header Host $http_host; + grpc_set_header User-Agent $http_user_agent; + grpc_set_header X-Real-IP $remote_addr; + grpc_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + grpc_set_header X-Forwarded-Proto $scheme; + grpc_pass grpc://10.5.0.20:7000; + } +} \ No newline at end of file diff --git a/scripts/tulius/master/docker-compose.yml b/scripts/tulius/master/docker-compose.yml index 115a33e3..96656b9f 100644 --- a/scripts/tulius/master/docker-compose.yml +++ b/scripts/tulius/master/docker-compose.yml @@ -39,9 +39,5 @@ services: networks: tuliusnet: - driver: bridge - ipam: - config: - - subnet: 10.5.0.0/16 name: tuliusnet external: true diff --git a/scripts/tulius/mysql.cnf b/scripts/tulius/mysql.cnf index 7ba19455..34ef6b97 100644 --- a/scripts/tulius/mysql.cnf +++ b/scripts/tulius/mysql.cnf @@ -23,11 +23,6 @@ thread_cache_size = 16 max_connections = 250 table_open_cache = 1000 #thread_concurrency = 10 -# -# * Query Cache Configuration -# -query_cache_limit = 1M -query_cache_size = 100M # InnoDB innodb_flush_log_at_trx_commit = 0 innodb_buffer_pool_size = 300M diff --git a/scripts/tulius/nginx/tulius.conf b/scripts/tulius/nginx/tulius.conf deleted file mode 100644 index ceb93f43..00000000 --- a/scripts/tulius/nginx/tulius.conf +++ /dev/null @@ -1,106 +0,0 @@ -server { - listen 80; - server_name www.tulius.com; - return 302 http://tulius.com$request_uri; - - listen 443 ssl; # managed by Certbot - ssl_certificate /etc/letsencrypt/live/svn.milana.co-de.org/fullchain.pem; # managed by Certbot - ssl_certificate_key /etc/letsencrypt/live/svn.milana.co-de.org/privkey.pem; # managed by Certbot - ssl_session_cache shared:le_nginx_SSL:1m; # managed by Certbot - ssl_session_timeout 1440m; # managed by Certbot - - ssl_protocols TLSv1 TLSv1.1 TLSv1.2; # managed by Certbot - ssl_prefer_server_ciphers on; # managed by Certbot - - ssl_ciphers "ECDHE-ECDSA-AES128-GCM-SHA256 ECDHE-ECDSA-AES256-GCM-SHA384 ECDHE-ECDSA-AES128-SHA ECDHE-ECDSA-AES256-SHA ECDHE-ECDSA-AES128-SHA256 ECDHE-ECDSA-AES256-SHA384 ECDHE-RSA-AES128-GCM-SHA256 ECDHE-RSA-AES256-GCM-SHA384 ECDHE-RSA-AES128-SHA ECDHE-RSA-AES128-SHA256 ECDHE-RSA-AES256-SHA384 DHE-RSA-AES128-GCM-SHA256 DHE-RSA-AES256-GCM-SHA384 DHE-RSA-AES128-SHA DHE-RSA-AES256-SHA DHE-RSA-AES128-SHA256 DHE-RSA-AES256-SHA256 EDH-RSA-DES-CBC3-SHA"; # managed by Certbot -} - -server { - listen 80; - server_name test.tulius.com; - server_name test.tulius.co-de.org; - server_name dev.tulius.co-de.org; - server_name dev.tulius.kozzz.ru; -# if ($remote_addr != 178.236.141.69) { -# return 503; -# } - location / { - proxy_set_header Host $host; - proxy_pass http://127.0.0.1:8080; - } - location /ws/ { - proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; - proxy_set_header Host $host; - proxy_http_version 1.1; - proxy_set_header Upgrade $http_upgrade; - proxy_set_header Connection "upgrade"; - proxy_pass http://127.0.0.1:8080; - } - location /ws_new/ { - proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; - proxy_set_header Host $host; - proxy_http_version 1.1; - proxy_set_header Upgrade $http_upgrade; - proxy_set_header Connection "upgrade"; - proxy_pass http://127.0.0.1:8080; - } - listen 443 ssl; # managed by Certbot - ssl_certificate /etc/letsencrypt/live/svn.milana.co-de.org/fullchain.pem; # managed by Certbot - ssl_certificate_key /etc/letsencrypt/live/svn.milana.co-de.org/privkey.pem; # managed by Certbot - - ssl_session_cache shared:le_nginx_SSL:1m; # managed by Certbot - ssl_session_timeout 1440m; # managed by Certbot - - ssl_protocols TLSv1 TLSv1.1 TLSv1.2; # managed by Certbot - ssl_prefer_server_ciphers on; # managed by Certbot - - ssl_ciphers "ECDHE-ECDSA-AES128-GCM-SHA256 ECDHE-ECDSA-AES256-GCM-SHA384 ECDHE-ECDSA-AES128-SHA ECDHE-ECDSA-AES256-SHA ECDHE-ECDSA-AES128-SHA256 ECDHE-ECDSA-AES256-SHA384 ECDHE-RSA-AES128-GCM-SHA256 ECDHE-RSA-AES256-GCM-SHA384 ECDHE-RSA-AES128-SHA ECDHE-RSA-AES128-SHA256 ECDHE-RSA-AES256-SHA384 DHE-RSA-AES128-GCM-SHA256 DHE-RSA-AES256-GCM-SHA384DHE-RSA-AES128-SHA DHE-RSA-AES256-SHA DHE-RSA-AES128-SHA256 DHE-RSA-AES256-SHA256 EDH-RSA-DES-CBC3-SHA"; # managed by Certbot -} - -server { - listen 80; - server_name kibana.test.tulius.co-de.org; - server_name kibana.tulius.co-de.org; - - location / { - proxy_set_header Host $host; - proxy_pass http://127.0.0.1:8080; - auth_basic "Restricted"; - auth_basic_user_file /etc/nginx/htpasswd; - } -} - -server { - listen 80; - listen 443 ssl; - server_name tulius.com; - server_name tulius.co-de.org; - server_name master.tulius.co-de.org; - ssl_certificate /etc/letsencrypt/live/svn.milana.co-de.org/fullchain.pem; # managed by Certbot - ssl_certificate_key /etc/letsencrypt/live/svn.milana.co-de.org/privkey.pem; # managed by Certbot - -# if ($remote_addr != 178.236.141.69) { -# return 503; -# } - - location / { - proxy_set_header Host $host; - proxy_pass http://127.0.0.1:8080; - } - location /ws/ { - proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; - proxy_set_header Host $host; - proxy_http_version 1.1; - proxy_set_header Upgrade $http_upgrade; - proxy_set_header Connection "upgrade"; - proxy_pass http://127.0.0.1:8080; - } - location /ws_new/ { - proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; - proxy_set_header Host $host; - proxy_http_version 1.1; - proxy_set_header Upgrade $http_upgrade; - proxy_set_header Connection "upgrade"; - proxy_pass http://127.0.0.1:8080; - } -} diff --git a/scripts/tulius/nginx_dev.conf b/scripts/tulius/nginx_dev.conf index 6d268b23..2f0d1e94 100644 --- a/scripts/tulius/nginx_dev.conf +++ b/scripts/tulius/nginx_dev.conf @@ -12,6 +12,7 @@ server { listen 80; + # listen 443 ssl http2; # the domain name it will serve for server_name test.tulius.com; server_name test.tulius.co-de.org; @@ -43,15 +44,12 @@ server { } # Finally, send all non-media requests to the Django server. location / { - proxy_http_version 1.1; # websockets not work over http2 :( - proxy_set_header Host $host; - proxy_set_header User-Agent $http_user_agent; - proxy_set_header X-Real-IP $remote_addr; - proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; - proxy_set_header X-Forwarded-Proto $scheme; - proxy_set_header Upgrade $http_upgrade; - proxy_set_header Connection "upgrade"; - proxy_pass http://10.5.0.20:7000; + grpc_set_header Host $http_host; + grpc_set_header User-Agent $http_user_agent; + grpc_set_header X-Real-IP $remote_addr; + grpc_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + grpc_set_header X-Forwarded-Proto $scheme; + grpc_pass grpc://10.5.0.20:7000; } } @@ -61,7 +59,7 @@ server { charset utf-8; location / { proxy_pass http://10.5.0.31:5601; - proxy_set_header Host $host; + proxy_set_header Host http_host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-Forwarded-Proto $scheme; diff --git a/scripts/tulius/nginx_production.conf b/scripts/tulius/nginx_production.conf index 4bd660f2..43e84fc2 100644 --- a/scripts/tulius/nginx_production.conf +++ b/scripts/tulius/nginx_production.conf @@ -9,6 +9,7 @@ server { server { listen 80; + # listen 443 ssl http2; server_name tulius.com; server_name tulius.co-de.org; server_name master.tulius.co-de.org; @@ -38,15 +39,12 @@ server { } # Finally, send all non-media requests to the Django server. location / { - proxy_http_version 1.1; # websockets not work over http2 :( - proxy_set_header Host $host; - proxy_set_header User-Agent $http_user_agent; - proxy_set_header X-Real-IP $remote_addr; - proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; - proxy_set_header X-Forwarded-Proto $scheme; - proxy_set_header Upgrade $http_upgrade; - proxy_set_header Connection "upgrade"; - proxy_pass http://10.5.0.10:7000; + grpc_set_header Host $host; + grpc_set_header User-Agent $http_user_agent; + grpc_set_header X-Real-IP $remote_addr; + grpc_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + grpc_set_header X-Forwarded-Proto $scheme; + grpc_pass grpc://10.5.0.10:7000; } } diff --git a/settings.py b/settings.py index 94126f13..2083b9ca 100644 --- a/settings.py +++ b/settings.py @@ -92,7 +92,7 @@ 'tulius.events.EventsConfig', 'tulius.vk', 'tulius.counters', - 'tulius.websockets.WebsocketsConfig', + 'django_h2.DjangoH2Config', ) MIDDLEWARE = ( @@ -196,7 +196,7 @@ LOGGING = { 'version': 1, - 'disable_existing_loggers': True, + 'disable_existing_loggers': False, 'filters': { 'require_debug_false': { '()': 'django.utils.log.RequireDebugFalse' @@ -257,6 +257,8 @@ 'level': 'DEBUG' if env in ['dev', 'local_docker'] else 'WARNING', 'propagate': True, }, + 'hpack': {'level': 'WARNING'}, + 'flake8': {'level': 'ERROR'}, }, 'root': { 'handlers': @@ -336,7 +338,7 @@ ] RAVEN_CONFIG = { - 'integrations': [DjangoIntegration()], + 'integrations': [DjangoIntegration(signals_spans=False)], 'send_default_pii': True, } diff --git a/tests/conftest.py b/tests/conftest.py index 48f2d27b..e4874ebd 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,7 +1,9 @@ import os +from weakref import ref import pytest import django +from django.db.backends.signals import connection_created from django.test import client as django_client from django.test import utils @@ -106,10 +108,25 @@ def pytest_sessionstart(session): ) +connections_history = [] + + +@connection_created.connect +def new_db_connection(sender, connection, **kwargs): + connections_history.append(ref(connection)) + + @pytest.hookimpl(trylast=True) def pytest_sessionfinish(session, exitstatus): db_cfg = getattr(session, 'django_db_cfg') if db_cfg and not session.config.option.keep_db: + # mysql can stuck if there are open connections to DB while dropping it + # So close all of them, including opened in async thread pool. + alive = [x() for x in connections_history] + for conn in alive: + if conn: + conn.inc_thread_sharing() # for async connections + conn.close() utils.teardown_databases( db_cfg, verbosity=session.config.option.verbose) # pylint: disable=import-outside-toplevel diff --git a/tests/core/elastic/test_elastic_indexer.py b/tests/core/elastic/test_elastic_indexer.py index 6a221975..a7026d7b 100644 --- a/tests/core/elastic/test_elastic_indexer.py +++ b/tests/core/elastic/test_elastic_indexer.py @@ -38,8 +38,7 @@ def test_index_to_closed_indexer(indexer_config): indexer_config['PACK_SIZE'] = 1 with indexing.ElasticIndexer(indexer_config, autostart=False) as indexer: pass - with pytest.raises(indexing.ClosedException): - indexer.index({}) + indexer.index({}) def test_indexing_queue_if_transport_is_slow(indexer_config): @@ -82,7 +81,7 @@ def test_closing_module(): assert indexer == indexing.get_indexer() # gives same indexer indexing.close() indexing.close() # can do twice nothing happen - assert indexer != indexing.get_indexer() # gives new after close + assert indexer == indexing.get_indexer() # gives same after close def test_closing_indexer_twice(indexer_config): diff --git a/tests/forum/comments/test_comments_ws.py b/tests/forum/comments/test_comments_ws.py index a4b651e7..144ca5e6 100644 --- a/tests/forum/comments/test_comments_ws.py +++ b/tests/forum/comments/test_comments_ws.py @@ -1,103 +1,78 @@ import json import pytest - +from django_h2.test_client import AsyncClient from tulius.forum.threads import models -from tulius.websockets.asgi.utils import AsyncClient @pytest.mark.asyncio -async def test_comments_ws(thread, superuser, admin, user): - wss = {} - clients = {} +async def test_comments_sse(thread, user, admin): + user_client = AsyncClient() + admin_client = AsyncClient() + user_client.cookies = user.cookies + admin_client.cookies = admin.cookies + sse = await user_client.sse(thread['url'] + 'comments_sse/') + assert sse.status_code == 200 try: - for u in [superuser, admin, user]: - clients[u] = client = AsyncClient() - client.cookies = u.cookies - wss[u] = await client.ws('/api/ws/') - response = await client.get(thread['url']) - assert response.status_code == 200 - await wss[user].send_text(json.dumps({ - 'action': 'subscribe_comments', 'id': thread['id']})) + response = await user_client.get(thread['url']) + assert response.status_code == 200 # post comment to thread - response = await clients[admin].post( + response = await admin_client.post( thread['url'] + 'comments_page/', { 'reply_id': thread['first_comment_id'], 'title': 'ho ho ho', 'body': 'happy new year', 'media': {}, - }) - assert response.status_code == 200 - data = await wss[user].receive_text() - data = json.loads(data) - assert data['.namespaced'] == 'thread_comments' - assert data['.action'] == 'new_comment' - assert data['parent_id'] == thread['id'] - assert wss[admin].send_queue == [] - assert wss[superuser].send_queue == [] - # again, but unsubscribe - await wss[user].send_text(json.dumps({ - 'action': 'unsubscribe_comments', 'id': thread['id']})) - await wss[superuser].send_text(json.dumps({ - 'action': 'subscribe_comments', 'id': thread['id']})) - response = await clients[admin].post( - thread['url'] + 'comments_page/', { - 'reply_id': thread['first_comment_id'], - 'title': 'ho ho ho', 'body': 'happy new year2', - 'media': {}, - }) + }, + content_type='application/json' + ) assert response.status_code == 200 - data = await wss[superuser].receive_text() - data = json.loads(data) - assert data['.namespaced'] == 'thread_comments' + event = await sse.events.__anext__() + assert event['event'] == 'thread_comments' + data = json.loads(event['data']) assert data['.action'] == 'new_comment' assert data['parent_id'] == thread['id'] - assert wss[user].send_queue == [] - assert wss[admin].send_queue == [] finally: - for u in wss.values(): - u.close() + sse.close() @pytest.mark.asyncio -async def test_comments_ws_no_rights(room_group, superuser, admin, user): - client = AsyncClient() - client.cookies = admin.cookies - response = await client.put( - room_group['url'], { +async def test_comments_sse_no_rights(room_group, superuser, admin, user): + admin_client = AsyncClient() + admin_client.cookies = admin.cookies + response = await admin_client.put( + room_group['url'], + { 'title': 'thread', 'body': 'thread description', 'room': False, 'default_rights': models.NO_ACCESS, - 'granted_rights': [], 'important': False, 'media': {}}) + 'granted_rights': [], 'important': False, 'media': {} + }, + content_type='application/json' + ) assert response.status_code == 200 thread = response.json() - wss = {} - clients = {} + user_client = AsyncClient() + user_client.cookies = user.cookies + response = await user_client.sse(thread['url'] + 'comments_sse/') + assert response.status_code == 403 + superuser_client = AsyncClient() + superuser_client.cookies = superuser.cookies + sse = await superuser_client.sse(thread['url'] + 'comments_sse/') + assert sse.status_code == 200 try: - for u in [superuser, admin, user]: - clients[u] = client = AsyncClient() - client.cookies = u.cookies - wss[u] = await client.ws('/api/ws/') - response = await client.get(thread['url']) - if u is user: - assert response.status_code == 403 - else: - assert response.status_code == 200 - await wss[u].send_text(json.dumps({ - 'action': 'subscribe_comments', 'id': thread['id']})) - # post comment to thread - response = await clients[admin].post( - thread['url'] + 'comments_page/', { + response = await admin_client.post( + thread['url'] + 'comments_page/', + { 'reply_id': thread['first_comment_id'], 'title': 'ho ho ho', 'body': 'happy new year', 'media': {}, - }) + }, + content_type='application/json' + ) assert response.status_code == 200 - for u in [superuser, admin]: - data = await wss[u].receive_text() - data = json.loads(data) - assert data['.namespaced'] == 'thread_comments' - assert data['.action'] == 'new_comment' - assert data['parent_id'] == thread['id'] - assert wss[user].send_queue == [] + event = await sse.events.__anext__() + assert event['event'] == 'thread_comments' + data = json.loads(event['data']) + assert data['.action'] == 'new_comment' + assert data['parent_id'] == thread['id'] finally: - for u in wss.values(): - u.close() + sse.close() diff --git a/tests/tulius/test_pm.py b/tests/tulius/test_pm.py index 31c9271c..4ac3901d 100644 --- a/tests/tulius/test_pm.py +++ b/tests/tulius/test_pm.py @@ -1,66 +1,29 @@ -import pytest +import json +import pytest from django import urls -from tulius.websockets.asgi.utils import AsyncClient -from django.test import client as django_client - - -@pytest.mark.asyncio -async def test_comments_ws_old(superuser, admin, user): - wss = {} - clients = {} - try: - for u in [superuser, admin, user]: - clients[u] = client = AsyncClient() - client.cookies = u.cookies - wss[u] = await client.ws('/api/ws/old/') - # check is ready - await wss[user].send_text("ping") - data = await wss[user].receive_text(timeout=10) - assert data == 'ping/answer' - # post comment to thread - response = await clients[admin].post( - urls.reverse('pm:to_user', args=[user.user.pk]), - {'body': 'hello'}, - content_type=django_client.MULTIPART_CONTENT - ) - assert response.status_code == 302 - assert response.url == urls.reverse('pm:to_user', args=[user.user.pk]) - data = await wss[user].receive_text(timeout=30) - assert data.startswith('new_pm ') - assert wss[superuser].send_queue == [] - assert wss[admin].send_queue == [] - finally: - for u in wss.values(): - u.close() +from django_h2.test_client import AsyncClient @pytest.mark.asyncio -async def test_comments_ws_json(superuser, admin, user): - wss = {} - clients = {} +async def test_pm_sse(admin, user): + admin_client = AsyncClient() + admin_client.cookies = admin.cookies + user_client = AsyncClient() + user_client.cookies = user.cookies + sse = await user_client.sse('/api/sse/') + assert sse.status_code == 200 try: - for u in [superuser, admin, user]: - clients[u] = client = AsyncClient() - client.cookies = u.cookies - wss[u] = await client.ws('/api/ws/') - # check is ready - await wss[user].send_json({'action': 'ping', 'data': 'ping'}) - data = await wss[user].receive_json(timeout=10) - assert data['data'] == 'ping/answer' - # post comment to thread - response = await clients[admin].post( + response = await admin_client.post( urls.reverse('pm:to_user', args=[user.user.pk]), {'body': 'hello'}, - content_type=django_client.MULTIPART_CONTENT ) assert response.status_code == 302 assert response.url == urls.reverse('pm:to_user', args=[user.user.pk]) - data = await wss[user].receive_json(timeout=30) + event = await sse.events.__anext__() + assert event['event'] == 'pm' + data = json.loads(event['data']) assert data['.action'] == 'new_pm' - assert wss[superuser].send_queue == [] - assert wss[admin].send_queue == [] finally: - for u in wss.values(): - u.close() + sse.close() diff --git a/tests/websockets/test_asgi_handler.py b/tests/websockets/test_asgi_handler.py deleted file mode 100644 index e8255070..00000000 --- a/tests/websockets/test_asgi_handler.py +++ /dev/null @@ -1,205 +0,0 @@ -import asyncio -from unittest import mock - -import pytest -from django import urls -from django import http -from django.test import override_settings -from django.core import signals -from django.core.handlers import asgi as dj_asgi - -from tulius.websockets.asgi import asgi_handler -from tulius.websockets.asgi import utils -from tulius.websockets.asgi import connections -from tulius.websockets.asgi import websocket - - -def setup_module(_): - connections.ConnectionHandler.monkey_patch() - - -@pytest.mark.asyncio -async def test_unknown_scope_type(): - handler = asgi_handler.ASGIHandler() - with pytest.raises(ValueError): - await handler({'type': 'foobar'}, None, None) - - -class ASGIContext(utils.BaseASGIContext): - async def send(self, data): - await self._internal_send(data) - - async def receive(self): - return await self._internal_read() - - -@pytest.mark.asyncio -async def test_lifespan(): - handler = asgi_handler.ASGIHandler() - context = ASGIContext(handler, {'type': 'lifespan'}) - task = asyncio.create_task(context.run()) - await context.send({'type': 'lifespan.startup'}) - response = await context.receive() - assert response['type'] == 'lifespan.startup.complete' - # test shutdown - await context.send({'type': 'lifespan.shutdown'}) - response = await context.receive() - assert response['type'] == 'lifespan.shutdown.complete' - # check task closed - await task - await context.closed - - -@pytest.mark.asyncio -async def test_body_abort(): - handler = asgi_handler.ASGIHandler() - context = ASGIContext(handler, {'type': 'http'}) - await context.send({'type': 'http.disconnect'}) - await context.run() # nothing fails - await context.closed - - -@pytest.mark.asyncio -async def test_bad_scope(): - handler = asgi_handler.ASGIHandler() - context = ASGIContext(handler, {'type': 'http'}) - await context.send({ - 'type': 'http.request', 'body': b'foo', 'more_body': False, - }) - await context.run() - with pytest.raises(KeyError): - await context.closed - - -class BadRequest(dj_asgi.ASGIRequest): - def __init__(self, scope, body_file): - raise dj_asgi.RequestDataTooBig() - - -@pytest.mark.asyncio -async def test_too_large_request(): - handler = utils.TestASGIHandler() - handler.request_class = BadRequest - context = utils.ASGIRequest(handler, {'type': 'http'}) - await context.run() # nothing fails - assert context.response.status_code == 413 - - -def test_application(): - handler = asgi_handler.get_asgi_application() - assert isinstance(handler, asgi_handler.ASGIHandler) - - -def _test_file_view(_): - return http.FileResponse(streaming_content=[b'123']) - - -@websocket.websocket_view -def _ws_exc_view(_, __): - raise ValueError() - - -@websocket.websocket_view -def _ws_cancel_view(_, __): - raise asyncio.CancelledError() - - -@websocket.websocket_view -async def _ws_ping(_, ws: websocket.WebSocket): - while True: - data = await ws.receive_text() - await ws.send_text(data + '_pong') - - -class UrlConf: - urlpatterns = [ - urls.re_path(r'^file_response/$', _test_file_view), - urls.re_path(r'^ws_exc/$', _ws_exc_view), - urls.re_path(r'^ws_cancel/$', _ws_cancel_view), - urls.re_path(r'^ws_ping/$', _ws_ping), - ] - - -@override_settings(ROOT_URLCONF=UrlConf) -@pytest.mark.asyncio -async def test_ping_pong(): - client = utils.AsyncClient() - ws: utils.ASGIWebsocket = await client.ws('/ws_ping/') - assert ws.connected.result() is True - await ws.send_text('ping') - result = await ws.receive_text() - assert result == 'ping_pong' - ws.close() - assert ws.closed.done() - - -@override_settings(ROOT_URLCONF=UrlConf) -@pytest.mark.asyncio -async def test_file_chunks(): - class ChunkedHandler(utils.TestASGIHandler): - chunk_size = 1 - client = utils.AsyncClient() - client.handler.handler_class = ChunkedHandler - response = await client.get('/file_response/') - assert response.block_size == 1 - assert response.streaming is True - - -@pytest.mark.asyncio -async def test_websocket_on_http(): - client = utils.AsyncClient() - ws = await client.ws('/file_response/') - assert ws.connected.result() is False - assert ws.closed.done() - - -@pytest.mark.asyncio -async def test_websocket_bad_request(): - class BadRequestHandler(utils.TestASGIHandler): - request_class = BadRequest - client = utils.AsyncClient() - client.handler.handler_class = BadRequestHandler - ws = await client.ws('/file_response/') - assert ws.connected.result() is False - assert ws.closed.done() - - -@override_settings(ROOT_URLCONF=UrlConf) -@pytest.mark.asyncio -async def test_websocket_exc(): - client = utils.AsyncClient(raise_request_exception=False) - future = asyncio.Future() - - def receiver(**kwargs): - future.set_result(None) - - signals.got_request_exception.connect(receiver) - ws = None - try: - ws = await client.ws('/ws_exc/') - assert ws.connected.result() is True - await asyncio.wait([future, ws.closed], timeout=10) - assert ws.closed.result() is None - assert future.result() is None - finally: - signals.got_request_exception.disconnect(receiver) - if ws: - ws.close() - - -@override_settings(ROOT_URLCONF=UrlConf) -@pytest.mark.asyncio -async def test_websocket_timeout(): - client = utils.AsyncClient(raise_request_exception=False) - receiver = mock.MagicMock() - signals.got_request_exception.connect(receiver) - ws = None - try: - ws = await client.ws('/ws_cancel/') - assert ws.connected.result() is True - assert ws.closed.result() is None - assert not receiver.called - finally: - signals.got_request_exception.disconnect(receiver) - if ws: - ws.close() diff --git a/tests/websockets/test_connections.py b/tests/websockets/test_connections.py deleted file mode 100644 index b51cd0ad..00000000 --- a/tests/websockets/test_connections.py +++ /dev/null @@ -1,171 +0,0 @@ -import asyncio -import json -import logging -from unittest import mock - -from django.test import override_settings -from django import urls -from django import http -from django import db -from django.db import transaction -import pytest -from asgiref.sync import sync_to_async - -from tulius.websockets.asgi import websocket -from tulius.websockets.asgi.utils import AsyncClient -from tulius.websockets.asgi import connections - - -def _test_view(_, pk): - with db.connection.cursor() as cursor: - cursor.execute("SELECT %s", [pk]) - row = cursor.fetchone() - return http.JsonResponse({ - 'value': row[0], - 'connection': id(cursor.connection) - }) - - -@websocket.websocket_view -async def _test_web_socket_view(_, ws: websocket.WebSocket): - def set_variable(pk): - transaction.set_autocommit(False) - with db.connection.cursor() as cursor: - cursor.execute("SET @tmp = %s", [pk]) - - def get_variable(): - with db.connection.cursor() as cursor: - cursor.execute("SELECT @tmp") - row = cursor.fetchone() - return row[0] - - while True: - try: - text = await ws.receive_text() - if text == "@GET": - result = await sync_to_async( - get_variable, thread_sensitive=False)() - await ws.send_text(result) - else: - await sync_to_async(set_variable, thread_sensitive=False)(text) - except Exception as e: - logging.exception(e) - raise e - - -class UrlConf: - urlpatterns = [ - urls.re_path(r'^test1/(?P\d+)/$', _test_view), - urls.re_path(r'^ws/$', _test_web_socket_view), - ] - - -@override_settings(ROOT_URLCONF=UrlConf) -def test_wsgi_connection_reusage(client): - response = client.get('/test1/1/') - data = json.loads(response.content) - assert data['value'] == '1' - conn1 = data['connection'] - response = client.get('/test1/2/') - data = json.loads(response.content) - assert data['value'] == '2' - conn2 = data['connection'] - assert conn1 == conn2 - - -@override_settings(ROOT_URLCONF=UrlConf) -@pytest.mark.asyncio -async def test_connection_reusage(): - client = AsyncClient() - await sync_to_async(db.connections.close_all)() - response = await client.get('/test1/1/') - data = json.loads(response.content) - assert data['value'] == '1' - conn1 = data['connection'] - response = await client.get('/test1/2/') - data = json.loads(response.content) - assert data['value'] == '2' - conn2 = data['connection'] - assert conn1 == conn2 - - -@override_settings(ROOT_URLCONF=UrlConf) -@pytest.mark.asyncio -async def test_single_websocket(): - client = AsyncClient() - async with await client.ws('/ws/') as ws: - await ws.send_text('1') - await ws.send_text('@GET') - result = await ws.receive_text() - assert result == '1' - - -@override_settings(ROOT_URLCONF=UrlConf) -@pytest.mark.asyncio -async def test_parallel_websockets(): - client = AsyncClient() - wss = [] - for i in range(10): - ws = await client.ws('/ws/') - await ws.__aenter__() - wss.append(ws) - try: - for i, ws in enumerate(wss): - await ws.send_text(str(i)) - for ws in reversed(wss): - await ws.send_text('@GET') - results = [] - for ws in wss: - results.append(await ws.receive_text(timeout=10)) - finally: - for ws in wss: - ws.close() - assert results == [str(i) for i in range(10)] - - -@override_settings(ROOT_URLCONF=UrlConf) -@pytest.mark.asyncio -async def test_parallel_views(): - client = AsyncClient() - wss = [] - for i in range(5): - ws = await client.ws('/ws/') - await ws.__aenter__() - wss.append(ws) - try: - views = [client.get(f'/test1/{i + len(wss)}/') for i in range(5)] - for i, ws in enumerate(wss): - await ws.send_text(str(i)) - for ws in reversed(wss): - await ws.send_text('@GET') - tasks = [ws.receive_text() for ws in wss] + views - results = await asyncio.gather(*tasks) - wss_results = results[:len(wss)] - views = results[len(wss):] - views_results = [json.loads(r.content)['value'] for r in views] - results = wss_results + views_results - finally: - for ws in wss: - ws.close() - assert results == [str(i) for i in range(10)] - - -def test_missing_connection(): - handler = connections.ConnectionHandler() - with pytest.raises(handler.exception_class): - handler['foobar'] - - -def test_closing_connections_in_pool(): - handler = connections.ConnectionHandler() - handler['default'] = conn = mock.MagicMock() - handler.close_context() - handler.close_all() - assert conn.close.called - - -def test_closing_connections_in_context(): - handler = connections.ConnectionHandler() - handler['default'] = conn = mock.MagicMock() - handler.close_all() - assert conn.close.called diff --git a/tests/websockets/test_websocket.py b/tests/websockets/test_websocket.py deleted file mode 100644 index 17934f0e..00000000 --- a/tests/websockets/test_websocket.py +++ /dev/null @@ -1,265 +0,0 @@ -import pytest -from django.core import exceptions - -from tulius.websockets.asgi import websocket - - -@pytest.mark.asyncio -async def test_accept_ws_twice(): - async def _receive(): - return {'type': 'websocket.connect'} - - async def _send(_): - pass - - ws = websocket.WebSocket(_receive, _send) - await ws.accept(websocket.HttpResponseUpgrade(handler=None)) - with pytest.raises(websocket.WSProtoException) as exc: - await ws.accept(websocket.HttpResponseUpgrade(handler=None)) - assert exc.value.args[0] == 'Websocket can be accepted only once' - - -@pytest.mark.asyncio -async def test_accept_headers(): - data = [] - - async def _receive(): - return {'type': 'websocket.connect'} - - async def _send(message): - data.append(message) - - response = websocket.HttpResponseUpgrade(handler=None) - response.headers['Foo'] = 'Bar' - ws = websocket.WebSocket(_receive, _send) - await ws.accept(response) - assert len(data) == 1 - headers = data[0]['headers'] - assert len(headers) == 2 - assert headers[1] == (b'foo', b'Bar') # lower cased binary - - -@pytest.mark.asyncio -async def test_operation_without_accept(): - data = [] - - async def _send(message): - data.append(message) - - ws = websocket.WebSocket(None, _send) - with pytest.raises(websocket.WSProtoException) as exc: - await ws.send({}) - assert exc.value.args[0] == 'Websocket needs to be accepted before send' - with pytest.raises(websocket.WSProtoException) as exc: - await ws.receive() - assert exc.value.args[0] == \ - 'Websocket needs to be accepted before receive data' - assert not data - await ws.close() - assert ws.closed - assert len(data) == 1 - assert data[0]['type'] == 'websocket.close' - - -@pytest.mark.asyncio -async def test_operation_on_closed_ws(): - data = [] - - async def _receive(): - return {'type': 'websocket.connect'} - - async def _send(message): - data.append(message) - - ws = websocket.WebSocket(_receive, _send) - await ws.accept(websocket.HttpResponseUpgrade(handler=None)) - await ws.close() - - with pytest.raises(websocket.WSProtoException) as exc: - await ws.send({}) - assert exc.value.args[0] == 'WebSocket is closed' - with pytest.raises(websocket.WSProtoException) as exc: - await ws.receive() - assert exc.value.args[0] == 'WebSocket is closed' - assert ws.closed - assert len(data) == 2 - assert data[0]['type'] == 'websocket.accept' - assert data[1]['type'] == 'websocket.close' - - -@pytest.mark.asyncio -async def test_read_unknown_message(): - in_data = [{'type': 'websocket.connect'}, {'type': 'websocket.foo'}] - - async def _receive(): - return in_data.pop(0) - - async def _send(_): - pass - - ws = websocket.WebSocket(_receive, _send) - await ws.accept(websocket.HttpResponseUpgrade(handler=None)) - with pytest.raises(websocket.WSProtoException) as exc: - await ws.receive() - assert exc.value.args[0].startswith('Wrong websocket receive message type') - - -@pytest.mark.asyncio -async def test_read_on_close(): - in_data = [ - {'type': 'websocket.receive', 'text': 'foo'}, - {'type': 'websocket.disconnect'}, - ] - - async def _receive(): - return in_data.pop(0) - - async def _send(_): - pass - - ws = websocket.WebSocket(_receive, _send) - await ws.accept(websocket.HttpResponseUpgrade(handler=None)) - message = await ws.receive_text() - assert message == 'foo' - with pytest.raises(exceptions.RequestAborted): - await ws.receive_text() - - -@pytest.mark.asyncio -async def test_read_json_binary_and_text(): - in_data = [ - {'type': 'websocket.receive', 'text': '{"foo": "bar"}'}, - {'type': 'websocket.receive', 'bytes': b'{"bar": "foo"}'}, - ] - - async def _receive(): - return in_data.pop(0) - - async def _send(_): - pass - - ws = websocket.WebSocket(_receive, _send) - await ws.accept(websocket.HttpResponseUpgrade(handler=None)) - message = await ws.receive_json() - assert message == {'foo': 'bar'} - message = await ws.receive_json() - assert message == {'bar': 'foo'} - - -@pytest.mark.asyncio -async def test_read_text_binary_and_text(): - in_data = [ - {'type': 'websocket.receive', 'text': 'foo'}, - {'type': 'websocket.receive', 'bytes': b'bar'}, - ] - - async def _receive(): - return in_data.pop(0) - - async def _send(_): - pass - - ws = websocket.WebSocket(_receive, _send) - await ws.accept(websocket.HttpResponseUpgrade(handler=None)) - message = await ws.receive_text() - assert message == 'foo' - message = await ws.receive_text() - assert message == 'bar' - - -@pytest.mark.asyncio -async def test_read_bytes_binary_and_text(): - in_data = [ - {'type': 'websocket.receive', 'text': 'foo'}, - {'type': 'websocket.receive', 'bytes': b'bar'}, - ] - - async def _receive(): - return in_data.pop(0) - - async def _send(_): - pass - - ws = websocket.WebSocket(_receive, _send) - await ws.accept(websocket.HttpResponseUpgrade(handler=None)) - message = await ws.receive_bytes() - assert message == b'foo' - message = await ws.receive_bytes() - assert message == b'bar' - - -@pytest.mark.asyncio -async def test_send_json(): - data = [] - - async def _receive(): - return {'type': 'websocket.connect'} - - async def _send(message): - data.append(message) - ws = websocket.WebSocket(_receive, _send) - await ws.accept(websocket.HttpResponseUpgrade(handler=None)) - await ws.send_json({'foo': 'bar'}) - assert len(data) == 2 - assert data[0]['type'] == 'websocket.accept' - assert data[1]['type'] == 'websocket.send' - assert data[1]['text'] == '{"foo": "bar"}' - - -@pytest.mark.asyncio -async def test_send_text(): - data = [] - - async def _receive(): - return {'type': 'websocket.connect'} - - async def _send(message): - data.append(message) - ws = websocket.WebSocket(_receive, _send) - await ws.accept(websocket.HttpResponseUpgrade(handler=None)) - with pytest.raises(websocket.WSProtoException): - await ws.send_text(b'bar') - await ws.send_text('foo') - assert len(data) == 2 - assert data[0]['type'] == 'websocket.accept' - assert data[1]['type'] == 'websocket.send' - assert data[1]['text'] == 'foo' - - -@pytest.mark.asyncio -async def test_send_bytes(): - data = [] - - async def _receive(): - return {'type': 'websocket.connect'} - - async def _send(message): - data.append(message) - ws = websocket.WebSocket(_receive, _send) - await ws.accept(websocket.HttpResponseUpgrade(handler=None)) - with pytest.raises(websocket.WSProtoException): - await ws.send_bytes('bar') - await ws.send_bytes(b'foo') - assert len(data) == 2 - assert data[0]['type'] == 'websocket.accept' - assert data[1]['type'] == 'websocket.send' - assert data[1]['bytes'] == b'foo' - - -@pytest.mark.asyncio -async def test_accept_wrong_order(): - """ Some servers send connect on handshake finish. """ - data = [] - - async def _receive(): - if not data: - raise ValueError() - return {'type': 'websocket.connect'} - - async def _send(message): - data.append(message) - - ws = websocket.WebSocket(_receive, _send) - await ws.accept(websocket.HttpResponseUpgrade(handler=None)) - assert len(data) == 1 - assert data[0]['type'] == 'websocket.accept' diff --git a/tulius/core/elastic/indexing.py b/tulius/core/elastic/indexing.py index 873a282c..b4013f4e 100644 --- a/tulius/core/elastic/indexing.py +++ b/tulius/core/elastic/indexing.py @@ -17,10 +17,6 @@ logger = logging.getLogger('deferred_indexing') -class ClosedException(Exception): - pass - - class ElasticIndexer: def __init__(self, config, autostart=True): self.config = config.copy() @@ -78,7 +74,7 @@ def _write_to_dead_queue(self, exc, data): def index(self, data): if self._closing: - raise ClosedException() + return with self._lock: if not self._work_files: self._work_files.append(files.WorkFile( @@ -274,10 +270,8 @@ def get_indexer() -> ElasticIndexer: def close(*_): - global _indexer # pylint: disable=global-statement if _indexer: _indexer.close() - _indexer = None signal.signal(signal.SIGINT, close) diff --git a/tulius/core/sse.py b/tulius/core/sse.py new file mode 100644 index 00000000..490f86c7 --- /dev/null +++ b/tulius/core/sse.py @@ -0,0 +1,64 @@ +import json +import logging + +from django_h2.sse import SSEResponse +from django.conf import settings +import redis +from redis import asyncio as aioredis + +logger = logging.getLogger('sse') + + +CHANNEL_PUBLIC = 'public' +CHANNEL_USER = 'user-{}' + + +class RedisChannel: + def __init__(self, user, channel_names, request): + self.channel_names = channel_names + self.response = SSEResponse(request, handler=self.handler()) + self.user = user + + async def handler(self): + redis_client = aioredis.from_url(settings.REDIS_LOCATION) + pubsub = redis_client.pubsub() + try: + await pubsub.subscribe( + **{ + f'{settings.ENV}_{name}': self._get_message + for name in self.channel_names + } + ) + await pubsub.run( + exception_handler=self._pubsub_exc_handler, poll_timeout=30) + finally: + await pubsub.aclose() + + @staticmethod + def _pubsub_exc_handler(e, *args): + logging.exception(e) + + async def _get_message(self, message): + logger.debug('User %s message %s', self.user, message) + message = json.loads(message['data']) + direct = message.pop('.direct') + name = message.pop('.namespaced') + pk = message.pop('id', None) + if direct: + await self.response.send_event( + name, json.dumps(message), event_id=pk) + + +def publish_message(channel, message): + redis_client = redis.Redis( + settings.REDIS_CONNECTION['host'], + settings.REDIS_CONNECTION['port'], + db=settings.REDIS_CONNECTION['db'] + ) + return redis_client.publish( + f'{settings.ENV}_{channel}', json.dumps(message) + ) + + +def publish_message_to_user(user_id, message): + return publish_message(CHANNEL_USER.format(user_id), message) diff --git a/tulius/forum/comments/views.py b/tulius/forum/comments/views.py index 722bc4ce..5c82f6f9 100644 --- a/tulius/forum/comments/views.py +++ b/tulius/forum/comments/views.py @@ -7,6 +7,7 @@ from django.utils import timezone from tulius.core.ckeditor import html_converter +from tulius.core import sse from tulius.forum.comments import models from tulius.forum.threads import views from tulius.forum.threads import models as thread_models @@ -14,7 +15,8 @@ from tulius.forum.comments import pagination from tulius.forum.comments import signals as comment_signals from tulius.forum.comments import mutations -from tulius.websockets import publisher + +THREAD_COMMENTS_CHANNEL = 'forum_thread_comments_{thread_id}' @dispatch.receiver(thread_signals.to_json_as_item, sender=thread_models.Thread) @@ -51,6 +53,8 @@ def comments_query(self): class CommentsPageAPI(CommentsBase): + comments_channel = THREAD_COMMENTS_CHANNEL + def get_context_data(self, **kwargs): self.get_parent_thread(**kwargs) page_num = kwargs.get('page') or int(self.request.GET.get('page', 1)) @@ -124,8 +128,18 @@ def post(self, request, **kwargs): self.obj.save() transaction.commit() page = comment.page - publisher.notify_thread_about_new_comment( - self, self.obj, comment, page) + sse.publish_message( + self.comments_channel.format(thread_id=self.obj.id), + { + '.direct': True, + '.action': 'new_comment', + '.namespaced': 'thread_comments', + 'id': comment.pk, + 'parent_id': self.obj.id, + 'url': comment.get_absolute_url(), + 'page': page, + } + ) else: page = self.pages_count(self.obj) return self.get_context_data(page=page, **kwargs) @@ -221,5 +235,18 @@ def post(self, request, **kwargs): return self.comment.to_json(self.user, detailed=True) +class CommentsSubscription(views.BaseThreadView): + channel_template = THREAD_COMMENTS_CHANNEL + + def get(self, request, *args, **kwargs): + self.get_parent_thread(**kwargs) + channel = sse.RedisChannel( + request.user, + [self.channel_template.format(thread_id=self.obj.pk)], + request + ) + return channel.response + + thread_signals.on_update.connect( CommentAPI.on_thread_update, sender=thread_models.Thread) diff --git a/tulius/forum/elastic_search/tasks.py b/tulius/forum/elastic_search/tasks.py index 9c114745..7c75cd05 100644 --- a/tulius/forum/elastic_search/tasks.py +++ b/tulius/forum/elastic_search/tasks.py @@ -2,10 +2,9 @@ from django import apps from django.conf import settings +from tulius.core import sse from tulius.forum.elastic_search import models from tulius.forum.elastic_search import mapping -from tulius.websockets import publisher -from tulius.websockets import consts class ReindexAll(models.ReindexQuery): @@ -21,14 +20,16 @@ def progress(self, counter, all_count): 'count': counter, 'all_count': all_count } - publisher.publish_message( - consts.CHANNEL_USER.format(self.user_id), { + sse.publish_message_to_user( + self.user_id, + { '.direct': True, '.action': 'index_all_elastic_search', '.namespaced': 'task_update', 'finished': False, 'counters': self.counters - }) + } + ) @shared_task(track_started=True) @@ -39,14 +40,16 @@ def reindex_all_entities(user_id): mapping.rebuild_index(model) ReindexAll(f'{app_name}.{model_name}', user_id, counters)( model.objects.all()) - publisher.publish_message( - consts.CHANNEL_USER.format(user_id), { + sse.publish_message_to_user( + user_id, + { '.direct': True, '.action': 'index_all_elastic_search', '.namespaced': 'task_update', 'finished': True, 'counters': counters - }) + } + ) class ReindexComments(models.ReindexQuery): @@ -60,15 +63,17 @@ def progress(self, counter, all_count): if not self.reported: self.reported = True self.counters['comments'] += all_count - publisher.publish_message( - consts.CHANNEL_USER.format(self.user_id), { + sse.publish_message_to_user( + self.user_id, + { '.direct': True, '.action': 'index_forum_elastic_search', '.namespaced': 'task_update', 'finished': False, 'threads': self.counters['threads'], 'comments': self.counters['comments'], - }) + } + ) @shared_task(track_started=True) @@ -89,15 +94,17 @@ def reindex_forum(app_label, model_name, parent_id, user_id): counters['threads'] += 1 if not thread.room: ReindexComments(user_id, counters)(thread.comments) - publisher.publish_message( - consts.CHANNEL_USER.format(user_id), { + sse.publish_message_to_user( + user_id, + { '.direct': True, '.action': 'index_forum_elastic_search', '.namespaced': 'task_update', 'finished': True, 'threads': counters['threads'], 'comments': counters['comments'], - }) + } + ) @shared_task(track_started=True) diff --git a/tulius/forum/static/forum/components/comments.js b/tulius/forum/static/forum/components/comments.js index 1f4cce2f..a922c51c 100644 --- a/tulius/forum/static/forum/components/comments.js +++ b/tulius/forum/static/forum/components/comments.js @@ -15,6 +15,7 @@ export default LazyComponent('forum_thread_comments', { delete_comment_message: '', old_page: null, old_thread: null, + sse: null, } }, computed: { @@ -31,18 +32,18 @@ export default LazyComponent('forum_thread_comments', { }, methods: { subscribe_comments() { - if (this.thread.id) - this.$root.$socket.sendObj({action: 'subscribe_comments', id: this.thread.id}); - }, - unsubscribe_comments() { if (this.thread.id) { - this.$root.$socket.sendObj({action: 'unsubscribe_comments', id: this.thread.id}) + this.sse = new EventSource(this.thread.url + 'comments_sse/'); + this.sse.addEventListener("thread_comments", (event) => { + const content = JSON.parse(event.data); + if (content['.action'] == 'new_comment') + this.new_comment_event(content); + }); } - delete this.$options.sockets.onmessage }, - websock_message(msg) { - var data = JSON.parse(msg.data); - if ((data['.namespaced'] != 'thread_comments') || (data.parent_id != this.thread.id)) return; + new_comment_event(data) { + if (data.parent_id != this.thread.id) + return; if (data.page > this.pagination.pages_count) { this.pagination.pages_count = this.pagination.pages_count + 1; this.pagination.pages.push(this.pagination.pages_count); @@ -100,7 +101,6 @@ export default LazyComponent('forum_thread_comments', { this.update_likes(); }, load_api(pk, page) { - this.unsubscribe_comments(); this.cleanup_reply_form(); if (this.$parent.$refs.reply_form) this.$parent.$refs.reply_form.hide(); @@ -204,11 +204,6 @@ export default LazyComponent('forum_thread_comments', { }, }, mounted() { - this.$parent.$options.sockets.onopen = this.subscribe_comments; - this.$parent.$options.sockets.onmessage = this.websock_message; this.load_api(this.thread.id, this.$route.query['page'] || 1) }, - beforeDestroy() { - this.unsubscribe_comments(); - }, }) diff --git a/tulius/forum/threads/mutations.py b/tulius/forum/threads/mutations.py index 0cec4c45..ee62f7f0 100644 --- a/tulius/forum/threads/mutations.py +++ b/tulius/forum/threads/mutations.py @@ -1,8 +1,8 @@ from django.utils import timezone +from tulius.core import sse from tulius.forum.threads import signals from tulius.forum.threads import models -from tulius.websockets import publisher class Mutation: @@ -247,7 +247,15 @@ def __init__(self, thread, user=None, result=None): def update_result(self, instance): self.result['threads'] = self.result.get('threads', 0) + 1 if self.user: - publisher.notify_user_about_fixes(self.user, self.result) + sse.publish_message_to_user( + self.user.id, + { + '.direct': True, + '.action': 'fixes_update', + '.namespaced': 'fixes_update', + 'data': self.result, + } + ) def process_thread(self, instance): instance.threads_count.cleanup() diff --git a/tulius/forum/urls.py b/tulius/forum/urls.py index 62da509d..38164cac 100644 --- a/tulius/forum/urls.py +++ b/tulius/forum/urls.py @@ -52,6 +52,9 @@ urls.re_path( r'^thread/(?P\d+)/comments_page/$', comments_api.CommentsPageAPI.as_view(), name='comments_page'), + urls.re_path( + r'^thread/(?P\d+)/comments_sse/$', + comments_api.CommentsSubscription.as_view(), name='comments_sse'), urls.re_path( r'^thread/(?P\d+)/read_mark/$', read_marks.ReadmarkAPI.as_view(), name='thread_readmark'), diff --git a/tulius/gameforum/comments/views.py b/tulius/gameforum/comments/views.py index 8231464e..2046c838 100644 --- a/tulius/gameforum/comments/views.py +++ b/tulius/gameforum/comments/views.py @@ -14,6 +14,9 @@ from tulius.gameforum.rights import mutations as rights_mutations +THREAD_COMMENTS_CHANNEL = 'gameforum_thread_comments_{thread_id}' + + base_mutations.on_mutation(rights_mutations.UpdateRights)( mutations.FixCountersOnRights) @@ -127,6 +130,8 @@ def update_role_comments_count(role_id, value): class CommentsPageAPI(comments.CommentsPageAPI, CommentsBase): + comments_channel = THREAD_COMMENTS_CHANNEL + @classmethod def create_comment(cls, thread, user, data): comment = super().create_comment(thread, user, data) @@ -171,3 +176,8 @@ def update_comment(cls, comment, user, data, preview): thread_signals.on_update.connect( CommentAPI.on_thread_update, sender=thread_models.Thread) + + +class CommentsSubscription( + comments.CommentsSubscription, threads.BaseThreadAPI): + channel_template = THREAD_COMMENTS_CHANNEL diff --git a/tulius/gameforum/urls.py b/tulius/gameforum/urls.py index cdd64940..f019eb10 100644 --- a/tulius/gameforum/urls.py +++ b/tulius/gameforum/urls.py @@ -40,6 +40,9 @@ urls.re_path( r'^variation/(?P\d+)/thread/(?P\d+)/comments_page/$', comments.CommentsPageAPI.as_view(), name='comments_page'), + urls.re_path( + r'^variation/(?P\d+)/thread/(?P\d+)/comments_sse/$', + comments.CommentsSubscription.as_view(), name='comments_sse'), urls.re_path( r'^variation/(?P\d+)/thread/(?P\d+)/read_mark/$', read_marks.ReadmarkAPI.as_view(), name='thread_readmark'), diff --git a/tulius/pm/views.py b/tulius/pm/views.py index 6aa648e0..a58fbdad 100644 --- a/tulius/pm/views.py +++ b/tulius/pm/views.py @@ -5,7 +5,7 @@ from django.utils import decorators from django.contrib.auth import decorators as auth_decorators -from tulius.websockets import publisher +from tulius.core import sse from .forms import PrivateMessageForm from .models import PrivateTalking, PrivateMessage @@ -63,7 +63,14 @@ def post(self, request, *args, **kwargs): request.user, self.object, data=request.POST or None) if form.is_valid(): m = form.save() - publisher.publish_message_to_user( - self.object, publisher.consts.USER_NEW_PM, m.pk) + sse.publish_message_to_user( + self.object.pk, + { + '.direct': True, + '.action': 'new_pm', + '.namespaced': 'pm', + 'id': m.pk, + } + ) return http.HttpResponseRedirect( urls.reverse('pm:to_user', args=(self.object.pk,))) diff --git a/tulius/static/app/app.js b/tulius/static/app/app.js index 57e4cca7..82919dac 100644 --- a/tulius/static/app/app.js +++ b/tulius/static/app/app.js @@ -12,19 +12,6 @@ const router = new VueRouter({ routes: routes, }) -function production_url() { - var schema = window.location.protocol == 'https:' ? 'wss://' : 'ws://'; - return schema + window.location.host + '/api/ws/'; -} - -var websockets_url = production_url(); - -Vue.use(VueNativeSock.default, websockets_url, { - reconnection: true, - reconnectionDelay: 3000, - format: 'json' -}); - var app = new Vue({ el: '#vue_app', router: router, diff --git a/tulius/static/common/components/main_menu.js b/tulius/static/common/components/main_menu.js index 5082d900..7ea25a79 100644 --- a/tulius/static/common/components/main_menu.js +++ b/tulius/static/common/components/main_menu.js @@ -5,6 +5,7 @@ export default LazyComponent('main_menu', { return { articles: [], show: '', + sse: null, } }, methods: { @@ -29,11 +30,11 @@ export default LazyComponent('main_menu', { this.articles = response.data.pages; }).catch(error => this.$parent.add_message(error, "error")) .then(() => {}); - this.$options.sockets.onmessage = (msg) => { - var data = JSON.parse(msg.data) - if (data['.namespaced'] == 'pm') { + this.sse = new EventSource('/api/sse/'); + this.sse.addEventListener("pm", (event) => { + const content = JSON.parse(event.data); + if (content['.action'] == 'new_pm') this.$root.user.not_readed_messages = true; - } - } + }); } }) diff --git a/tulius/static/common/components/sse.js b/tulius/static/common/components/sse.js new file mode 100644 index 00000000..a6b69356 --- /dev/null +++ b/tulius/static/common/components/sse.js @@ -0,0 +1,10 @@ +jQuery(document).ready(function($) { + const evtSource = new EventSource("/api/sse/"); + evtSource.addEventListener("pm", (event) => { + const content = JSON.parse(event.data) + if (content['.action'] == 'new_pm') { + $('.new_messages').addClass('active'); + $('.new_messages').attr('title', "У вас новое сообщение!") + } + }); +}); diff --git a/tulius/templates/base.haml b/tulius/templates/base.haml index 9fab8a43..bba0148e 100755 --- a/tulius/templates/base.haml +++ b/tulius/templates/base.haml @@ -16,17 +16,7 @@ %script{type: 'text/javascript', src: '{{ STATIC_URL }}common/js/jquery.history.js'} %script{type: 'text/javascript', src: '{{ STATIC_URL }}common/jquery-lightbox/js/lightbox.js'} %script{type: 'text/javascript', src: '{{ STATIC_URL }}common/js/autocomplete-select.js'} - %script{type: 'text/javascript', src: '{{ STATIC_URL }}websockets/ws4redis.js'} - - %script{type: 'text/javascript', src: '{{ STATIC_URL }}websockets/websockets.js'} + %script{type: 'text/javascript', src: '{{ STATIC_URL }}common/components/sse.js'} %script{type: 'text/javascript', src: '{{ STATIC_URL }}tulius/js/menu.js'} %script{type: 'text/javascript', src: '{{ STATIC_URL }}common/js/jquery.formset.js'} diff --git a/tulius/urls.py b/tulius/urls.py index 22d20526..d751c25c 100755 --- a/tulius/urls.py +++ b/tulius/urls.py @@ -105,9 +105,7 @@ def trigger_error(_): urls.re_path( r'^counters/', urls.include( 'tulius.counters.urls', namespace='counters')), - urls.re_path( - r'^api/ws/', urls.include( - 'tulius.websockets.urls', namespace='websockets')), + urls.re_path(r'^api/sse/$', views.sse_channel_view, name='sse_channel'), ] handler404 = 'tulius.views.error404' diff --git a/tulius/views.py b/tulius/views.py index 96a5cf24..17b9a752 100755 --- a/tulius/views.py +++ b/tulius/views.py @@ -10,6 +10,7 @@ from djfw.news import models as news from djfw.flatpages import models as flatpage_models +from tulius.core import sse from tulius.profile import views as profile_views from tulius.games import models as games from tulius import celery @@ -85,3 +86,11 @@ def get(request, **_kwargs): 'stats': celery.app.control.inspect().stats() or {}, 'active': active, }) + + +def sse_channel_view(request): + names = [sse.CHANNEL_PUBLIC] + if request.user.is_authenticated: + names.append(sse.CHANNEL_USER.format(request.user.id)) + channel = sse.RedisChannel(request.user, names, request) + return channel.response diff --git a/tulius/websockets/__init__.py b/tulius/websockets/__init__.py deleted file mode 100644 index 134084d1..00000000 --- a/tulius/websockets/__init__.py +++ /dev/null @@ -1,12 +0,0 @@ -from django.apps import AppConfig - -from tulius.websockets import runserver - - -class WebsocketsConfig(AppConfig): - name = 'tulius.websockets' - label = 'websockets' - verbose_name = 'websockets' - - def ready(self): - runserver.patch() diff --git a/tulius/websockets/asgi/__init__.py b/tulius/websockets/asgi/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/tulius/websockets/asgi/asgi_handler.py b/tulius/websockets/asgi/asgi_handler.py deleted file mode 100644 index 12ff5531..00000000 --- a/tulius/websockets/asgi/asgi_handler.py +++ /dev/null @@ -1,137 +0,0 @@ -import asyncio -import sys -import tempfile - -import django -from django import db -from django.core import signals -from django.core.handlers import asgi as dj_asgi -from django.conf import settings -from django import urls -from django.utils import log -from django.core.handlers import exception -from django.core import exceptions - -from tulius.websockets.asgi import connections -from tulius.websockets.asgi import websocket - - -def handle_exception(request, exc): - signals.got_request_exception.send(sender=None, request=request) - response = exception.handle_uncaught_exception( - request, urls.get_resolver(urls.get_urlconf()), sys.exc_info() - ) - log.log_response( - "%s: %s", response.reason_phrase, request.path, - response=response, request=request, exception=exc) - - -class ASGIHandler(dj_asgi.ASGIHandler): - context_pool = None - - async def __call__(self, scope, receive, send): - """ - Async entrypoint - parses the request and hands off to get_response. - """ - if scope['type'] not in ['http', 'websocket', 'lifespan']: - raise ValueError( - "Django can only handle ASGI/HTTP connections, not %s." - % scope["type"] - ) - - async with dj_asgi.ThreadSensitiveContext(): - try: - await self.handle(scope, receive, send) - finally: - db.connections.close_context() - - async def handle(self, scope, receive, send): - """ - Handles the ASGI request. Called via the __call__ method. - """ - if scope['type'] == 'lifespan': - return await self.handle_lifespan(scope, receive, send) - # for backward capability. In websocket there is no "method" in scope - if scope['type'] == 'websocket': - scope['method'] = 'GET' - message = await receive() - if message["type"] != "websocket.connect": - return - # pylint: disable=consider-using-with - body_file = tempfile.SpooledTemporaryFile( - max_size=settings.FILE_UPLOAD_MAX_MEMORY_SIZE, mode="w+b") - else: - # Receive the HTTP request body as a stream object. - try: - body_file = await self.read_body(receive) - except dj_asgi.RequestAborted: - return - # Request is complete and can be served. - try: - dj_asgi.set_script_prefix(self.get_script_prefix(scope)) - await dj_asgi.sync_to_async( - dj_asgi.signals.request_started.send, thread_sensitive=True - )(sender=self.__class__, scope=scope) - # Get the request and check for basic issues. - request, error_response = self.create_request(scope, body_file) - if request is None: - await self.handle_response( - request, error_response, scope, receive, send) - return - # Get the response, using the async mode of BaseHandler. - response = await self.get_response_async(request) - response._handler_class = self.__class__ - finally: - body_file.close() - # Increase chunk size on file responses (ASGI servers handles low-level - # chunking). - if isinstance(response, dj_asgi.FileResponse): - response.block_size = self.chunk_size - # Send the response. - await self.handle_response(request, response, scope, receive, send) - - # pylint: disable=too-many-arguments - async def handle_response(self, request, response, scope, receive, send): - if scope['type'] == 'websocket': - if isinstance(response, websocket.HttpResponseUpgrade): - ws = websocket.WebSocket(receive, send) - await ws.accept(response) - try: - try: - await response.handler(ws) - except (asyncio.CancelledError, exceptions.RequestAborted): - pass - except Exception as exc: - await dj_asgi.sync_to_async( - handle_exception, thread_sensitive=False - )(request, exc) - finally: - if not ws.closed: - await ws.close() - else: - await send({'type': 'websocket.close'}) - else: - await self.send_response(response, send) - - @staticmethod - async def handle_lifespan(scope, receive, send): - while True: - message = await receive() - if message['type'] == 'lifespan.startup': - # Do some startup here! - await send({'type': 'lifespan.startup.complete'}) - elif message['type'] == 'lifespan.shutdown': - # Do some shutdown here! - await send({'type': 'lifespan.shutdown.complete'}) - return - - -def get_asgi_application(): - """ - The public interface to Django's ASGI support. Return an ASGI 3 callable. - Avoids making django.core.handlers.ASGIHandler a public API, in case the - internal implementation changes or moves in the future. - """ - django.setup(set_prefix=False) - connections.ConnectionHandler.monkey_patch() - return ASGIHandler() diff --git a/tulius/websockets/asgi/connections.py b/tulius/websockets/asgi/connections.py deleted file mode 100644 index 5c5b448a..00000000 --- a/tulius/websockets/asgi/connections.py +++ /dev/null @@ -1,89 +0,0 @@ -import threading -import functools - -from asgiref.local import Local - -from django import db -from django.db import utils - - -class ConnectionHandler(utils.ConnectionHandler): - def __init__(self, settings=None): - super().__init__(settings) - self._connection_pools = {} - self._lock = threading.Lock() - - @classmethod - def monkey_patch(cls): - """ - Django db connections does not really support asgi, so it doesn't - correctly close connections and have no support of connection pools - needed for async. - - We patch connections object in place, as it can be already imported - somewhere. - """ - if getattr(db.connections, '_lock', None): - return - db.connections._connection_pools = {} - db.connections._lock = threading.Lock() - db.connections._connections = Local(False) - old_cls = db.connections.__class__ - old_cls.__getitem__ = cls.__getitem__ - old_cls.close_context = cls.close_context - # old_cls.close_old_connections = cls.close_old_connections - db.connections.close_all = functools.partial( - cls.close_all, db.connections) - - def __getitem__(self, alias): - try: - return getattr(self._connections, alias) - except AttributeError as exc: - if alias not in self.settings: - raise self.exception_class( - f"The connection '{alias}' doesn't exist." - ) from exc - with self._lock: - pool = self._connection_pools.setdefault(alias, []) - conn = None - while pool: - conn = pool.pop() - conn.close_if_unusable_or_obsolete() - if conn.connection: - break - conn = None - if conn is None: - conn = self.create_connection(alias) - conn.inc_thread_sharing() # be thread free - setattr(self._connections, alias, conn) - return conn - - # def close_old_connections(self, **kwargs): - # with self._lock: - # for alias in self: - # new_pool = [] - # pool = self._connection_pools.get(alias, []) - # for conn in pool: - # conn.close_if_unusable_or_obsolete() - # if conn.health_check_done: - # new_pool.append(conn) - # self._connection_pools[alias] = new_pool - - def close_context(self): - with self._lock: - for alias in self: - if hasattr(self._connections, alias): - conn = getattr(self._connections, alias) - delattr(self._connections, alias) - pool = self._connection_pools.setdefault(alias, []) - pool.append(conn) - - def close_all(self): - # not super because of monkey patching - utils.ConnectionHandler.close_all(self) - with self._lock: - for alias in self: - pool = self._connection_pools.get(alias, []) - while pool: - conn = pool.pop() - conn.close() diff --git a/tulius/websockets/asgi/utils.py b/tulius/websockets/asgi/utils.py deleted file mode 100644 index ffcef396..00000000 --- a/tulius/websockets/asgi/utils.py +++ /dev/null @@ -1,240 +0,0 @@ -import json - -import asyncio - -from django.test import client as django_client - -from tulius.websockets.asgi import asgi_handler -from tulius.websockets.asgi import connections - - -class ASGIRequest: - def __init__(self, app, scope): - self.app = app - self.scope = scope - if "_body_file" in scope: - self.body_file = scope.pop("_body_file") - else: - self.body_file = django_client.FakePayload("") - self.response = None - self._request_body_send = False - self.request = None - scope['_test_asgi_context'] = self - self.asgi_request = None - - async def run(self): - await self.app(self.scope, self._receive, self._send) - - async def _send(self, data): - data.asgi_request = self.request - self.response = data - - async def _receive(self): - if self._request_body_send: - return {'type': 'http.disconnect'} - self._request_body_send = True - return { - 'type': 'http.request', - 'body': self.body_file.read(), - 'more_body': False, - } - - -class BaseASGIContext(ASGIRequest): - def __init__(self, app, scope): - super().__init__(app, scope) - self.send_queue = [] - self.receive_queue = [] - self._send_futures = [] - self._receive_futures = [] - self.closed = asyncio.Future() - - def close(self, exc=None): - if self.closed.done(): - return - if exc: - self.closed.set_exception(exc) - else: - self.closed.set_result(None) - for future in self._send_futures: - future.cancel() - for future in self._receive_futures: - future.cancel() - - async def __aenter__(self): - return self - - async def __aexit__(self, exc_type, exc_val, exc_tb): - self.close() - - async def run(self): - try: - await super().run() - except Exception as e: - self.close(e) - else: - self.close() - - async def _send(self, data): - if self._receive_futures: - self._receive_futures.pop(0).set_result(data) - else: - self.receive_queue.append(data) - - async def _receive(self): - if self.closed.done(): - raise asyncio.CancelledError() - if self.send_queue: - return self.send_queue.pop(0) - future = asyncio.Future() - self._send_futures.append(future) - await future - return future.result() - - async def _internal_send(self, message): - if self.closed.done(): - raise asyncio.CancelledError() - if self._send_futures: - self._send_futures.pop(0).set_result(message) - else: - self.send_queue.append(message) - - async def _internal_read(self, timeout=60): - if self.closed.done(): - raise asyncio.CancelledError() - if self.receive_queue: - return self.receive_queue.pop(0) - future = asyncio.Future() - self._receive_futures.append(future) - await asyncio.wait([future], timeout=timeout) - if future.done(): - return future.result() - self._receive_futures.remove(future) - raise asyncio.TimeoutError() - - -class ASGIWebsocket(BaseASGIContext): - def __init__(self, app, scope): - super().__init__(app, scope) - self.connected = asyncio.Future() - self.cookies = None - - async def _send(self, data): - if data["type"] == "websocket.accept": - self.connected.set_result(True) - self.asgi_request = self.request - elif data["type"] == "websocket.close": - self.close() - elif data["type"] == "websocket.send": - await super()._send(data) - - async def _receive(self): - if not self._request_body_send: - self._request_body_send = True - return {'type': 'websocket.connect'} - return await super()._receive() - - async def send_text(self, text): - await self._internal_send({"type": "websocket.receive", "text": text}) - - async def receive_text(self, timeout=60): - message = await self._internal_read(timeout=timeout) - return message.get('bytes') or message.get('text') - - async def send_json(self, data): - await self._internal_send({ - "type": "websocket.receive", "text": json.dumps(data)}) - - async def receive_json(self, timeout=60): - message = await self._internal_read(timeout=timeout) - data = message.get('bytes') or message.get('text') - return json.loads(data) - - def close(self, exc=None): - if not self.connected.done(): - if exc: - self.connected.set_exception(exc) - else: - self.connected.set_result(False) - while self.receive_queue: - f = self.receive_queue.pop() - f.set_result({'type': 'websocket.disconnect'}) - super().close(exc) - - -class TestASGIHandler(asgi_handler.ASGIHandler): - def __init__(self, enforce_csrf_checks=True): - self.enforce_csrf_checks = enforce_csrf_checks - super().__init__() - - async def send_response(self, response, send): - await send(response) - - def create_request(self, scope, body_file): - request, error_response = super().create_request(scope, body_file) - if request: - request._dont_enforce_csrf_checks = not scope.get( - '_enforce_csrf_checks') - scope['_test_asgi_context'].request = request - return request, error_response - - -class AsyncClientHandler: - handler_class = TestASGIHandler - - def __init__(self, enforce_csrf_checks=True): - self.enforce_csrf_checks = enforce_csrf_checks - connections.ConnectionHandler.monkey_patch() - - async def __call__(self, scope): - app = self.handler_class(self.enforce_csrf_checks) - if scope['type'] == 'websocket': - context = ASGIWebsocket(app, scope) - asyncio.create_task(context.run()) - await context.connected - return context - context = ASGIRequest(app, scope) - await asyncio.create_task(context.run()) - return context.response - - -class AsyncClient(django_client.AsyncClient): - def __init__( - self, enforce_csrf_checks=False, raise_request_exception=True, - **defaults): - super().__init__( - enforce_csrf_checks=enforce_csrf_checks, - raise_request_exception=raise_request_exception, **defaults) - self.handler = AsyncClientHandler(enforce_csrf_checks) - - def ws(self, path, secure=False, **extra): - return self.generic("WS", path, secure=secure, **extra) - - def _base_scope(self, **request): - scope = super()._base_scope(**request) - if request['method'] == 'WS': - scope['type'] = 'websocket' - scope['scheme'] = 'ws' - scope.pop('method') - return scope - - # pylint: disable=too-many-arguments - def post( - self, path, data=None, - content_type=None, - secure=False, **extra): - if isinstance(data, dict) and content_type is None: - content_type = 'application/json' - else: - content_type = content_type or django_client.MULTIPART_CONTENT - return super().post( - path, data, content_type=content_type, secure=secure, **extra) - - # pylint: disable=too-many-arguments - def put( - self, path, data='', content_type='application/octet-stream', - secure=False, **extra): - if isinstance(data, dict): - content_type = 'application/json' - return super().put( - path, data=data, content_type=content_type, secure=secure, **extra) diff --git a/tulius/websockets/asgi/websocket.py b/tulius/websockets/asgi/websocket.py deleted file mode 100644 index 45300d41..00000000 --- a/tulius/websockets/asgi/websocket.py +++ /dev/null @@ -1,120 +0,0 @@ -import json -import typing as t -import functools - -from django import http -from django.core import exceptions - - -class HttpResponseUpgrade(http.HttpResponse): - status_code = 101 - handler = None - sub_protocol = None - - def __init__(self, handler, *args, sub_protocol: str = None, **kwargs): - super().__init__(*args, **kwargs) - self.handler = handler - self.sub_protocol = sub_protocol - - -class WSProtoException(Exception): - pass - - -class WebSocket: - def __init__(self, receive, send): - self._receive = receive - self._send = send - self._accepted = False - self.closed = False - - async def accept(self, response: HttpResponseUpgrade): - if self._accepted: - raise WSProtoException('Websocket can be accepted only once') - # By ASGI spec we should receive 'websocket.connect' first. But some - # server send it after 'accept'. Communication is async, so just send - # our part of handshake first - it covers both cases. - response_headers = [] - for header, value in response.items(): - if isinstance(header, str): - header = header.encode("ascii") - if isinstance(value, str): - value = value.encode("latin1") - response_headers.append((bytes(header).lower(), bytes(value))) - await self._send({ - 'type': 'websocket.accept', - 'subprotocol': response.sub_protocol, - 'headers': response_headers - }) - self._accepted = True - - async def close(self, code: int = 1000): - await self._send({'type': 'websocket.close', 'code': code}) - self.closed = True - - async def send(self, message: t.Mapping): - if not self._accepted: - raise WSProtoException( - 'Websocket needs to be accepted before send') - if self.closed: - raise WSProtoException('WebSocket is closed') - await self._send(message) - - async def receive(self) -> t.Mapping: - if not self._accepted: - raise WSProtoException( - 'Websocket needs to be accepted before receive data') - if self.closed: - raise WSProtoException('WebSocket is closed') - - message = await self._receive() - if message['type'] == 'websocket.disconnect': - self.closed = True - raise exceptions.RequestAborted() - if message['type'] != 'websocket.receive': - raise WSProtoException( - 'Wrong websocket receive message type: %s' % message['type']) - return message - - async def receive_json(self): - message = await self.receive() - if 'text' in message: - return json.loads(message['text']) - return json.loads(message['bytes'].decode()) - - async def receive_text(self) -> str: - message = await self.receive() - if 'text' in message: - return message['text'] - return message['bytes'].decode() - - async def receive_bytes(self) -> bytes: - message = await self.receive() - if 'text' in message: - return message['text'].encode() - return message['bytes'] - - async def send_json(self, data, **dumps_kwargs): - await self.send({ - 'type': 'websocket.send', - 'text': json.dumps(data, **dumps_kwargs) - }) - - async def send_text(self, text: str): - if not isinstance(text, str): - raise WSProtoException('Only text can be send, not %s' % text) - await self.send({'type': 'websocket.send', 'text': text}) - - async def send_bytes(self, text: bytes): - if not isinstance(text, bytes): - raise WSProtoException('Only bytes can be send, not %s' % text) - await self.send({'type': 'websocket.send', 'bytes': text}) - - -def websocket_view(func): - @functools.wraps(func) - def wrapper(request, *args, **kwargs): - async def websocket_handler(ws: WebSocket): - return await func(request, ws, *args, **kwargs) - return HttpResponseUpgrade(handler=websocket_handler) - return wrapper diff --git a/tulius/websockets/consts.py b/tulius/websockets/consts.py deleted file mode 100644 index ae4ce1e4..00000000 --- a/tulius/websockets/consts.py +++ /dev/null @@ -1,14 +0,0 @@ -from django.conf import settings - -CHANNEL_PUBLIC = 'public' -CHANNEL_USER = 'user-{}' - -USER_NEW_PM = 'new_pm' -USER_NEW_GAME_INVITATION = 'new_game_inv' - -THREAD_COMMENTS_NEW_COMMENT = 'new_comment' -THREAD_COMMENTS_CHANNEL = 'forum_thread_comments_{thread_id}' - - -def make_channel_name(channel): - return '{}_{}'.format(settings.ENV, channel) diff --git a/tulius/websockets/publisher.py b/tulius/websockets/publisher.py deleted file mode 100644 index 02e79611..00000000 --- a/tulius/websockets/publisher.py +++ /dev/null @@ -1,50 +0,0 @@ -import json - -import redis -from django.conf import settings - -from tulius.websockets import consts - - -def publish_message(channel, message): - redis_client = redis.Redis( - settings.REDIS_CONNECTION['host'], - settings.REDIS_CONNECTION['port'], - db=settings.REDIS_CONNECTION['db'] - ) - return redis_client.publish( - consts.make_channel_name(channel), json.dumps(message) - ) - - -def publish_message_to_user(user, action, pk): - return publish_message( - consts.CHANNEL_USER.format(user.id), { - '.direct': True, - '.action': 'new_pm', - '.namespaced': 'pm', - 'id': pk, - }) - - -def notify_user_about_fixes(user, data): - return publish_message( - consts.CHANNEL_USER.format(user.id), { - '.direct': True, - '.action': 'fixes_update', - '.namespaced': 'fixes_update', - 'data': data, - }) - - -def notify_thread_about_new_comment(sender, thread, comment, page): - return publish_message( - consts.THREAD_COMMENTS_CHANNEL.format(thread_id=thread.id), - { - '.direct': True, - '.action': 'new_comment', - 'id': comment.id, - 'parent_id': thread.id, - 'url': comment.get_absolute_url(), - 'page': page, - }) diff --git a/tulius/websockets/runserver.py b/tulius/websockets/runserver.py deleted file mode 100644 index 653759b2..00000000 --- a/tulius/websockets/runserver.py +++ /dev/null @@ -1,150 +0,0 @@ -import asyncio -import logging - -from aiohttp import web -from aiohttp.http import WSMsgType -from django.contrib.staticfiles import handlers as static_handlers -from django.core.management.commands import runserver as dj_runserver -import uvicorn - -from tulius.websockets.asgi import asgi_handler - -logger = logging.getLogger('django.server') - - -class ASGIRequestHandler: - _response_start = None - _response_body = None - _ws = None - - def __init__(self, request): - self.body_read = False - self.request = request - if request.headers.get('Upgrade') == 'websocket': - connection_type = 'websocket' - else: - connection_type = 'http' - self.scope = { - 'type': connection_type, - 'root_path': '', - 'path': request.path, - 'raw_path': request.raw_path, - 'method': request.method, - 'query_string': request.query_string, - 'client': request._transport_peername, - 'server': ('host', 0), - 'headers': [(n.lower(), v) for n, v in request.raw_headers], - } - - async def receive(self): - if not self.body_read: - self.body_read = True - data = await self.request.read() - if self.scope['type'] == 'websocket': - msg_type = 'websocket.connect' - else: - msg_type = 'body' - return { - 'type': msg_type, - 'body': data, - } - if self._ws is not None: - msg = await self._ws.receive() - if msg.type in ( - WSMsgType.CLOSE, WSMsgType.CLOSING, WSMsgType.CLOSED): - return {'type': 'websocket.disconnect'} - return { - 'type': 'websocket.receive', - 'text': msg.data, - } - return {'type': 'body end'} - - async def send(self, context): - if context['type'] == 'http.response.start': - self._response_start = context - elif context['type'] == 'http.response.body': - if not self._response_body: - self._response_body = [] - if 'body' in context: - self._response_body.append(context['body']) - elif context['type'] == 'websocket.accept': - self._ws = web.WebSocketResponse() - await self._ws.prepare(self.request) - elif context['type'] == 'websocket.send': - await self._ws.send_str(context['text']) - elif context['type'] == 'websocket.close': - await self._ws.close() - else: - raise NotImplementedError() - - async def handle(self, application): - try: - await application(self.scope, self.receive, self.send) - if self._ws is not None: - return self._ws - return web.Response( - status=self._response_start['status'], - headers=[ - (n.decode('ascii'), v.decode('latin1')) - for n, v in self._response_start['headers']], - body=b''.join(self._response_body) - ) - except Exception as e: - logger.exception(e) - finally: - if self._response_start: - status_code = self._response_start.get('status', 500) - else: - status_code = 500 - if status_code >= 500: - level = logger.error - elif status_code >= 400: - level = logger.warning - else: - level = logger.info - - level('%s %s', self.request.path, status_code) - - -class ASGIServer: - def __init__(self, server_address, handler, ipv6): - self.app = web.Application() - self.asgi_application = asgi_handler.get_asgi_application() - self.server_address = server_address - self.app.add_routes([ - web.get('/{tail:.*}', self.aiohttp_handler), - web.post('/{tail:.*}', self.aiohttp_handler), - web.put('/{tail:.*}', self.aiohttp_handler), - web.options('/{tail:.*}', self.aiohttp_handler), - ]) - - def set_app(self, wsgi_handler): - if isinstance(wsgi_handler, static_handlers.StaticFilesHandler): - self.asgi_application = static_handlers.ASGIStaticFilesHandler( - self.asgi_application - ) - - def serve_forever(self): - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - uvicorn.run( - self.asgi_application, - host=self.server_address[0], - port=self.server_address[1], - workers=1, - log_level="info") - - # web.run_app( - # self.app, - # host=self.server_address[0], - # port=self.server_address[1], - # print=None, - # ) - - async def aiohttp_handler(self, request): - handler = ASGIRequestHandler(request) - return await handler.handle(self.asgi_application) - - -def patch(): - dj_runserver.Command.server_cls = ASGIServer diff --git a/tulius/websockets/static/websockets/websockets.js b/tulius/websockets/static/websockets/websockets.js deleted file mode 100644 index a4e6fdd8..00000000 --- a/tulius/websockets/static/websockets/websockets.js +++ /dev/null @@ -1,37 +0,0 @@ -jQuery(document).ready(function($) { - var ws4redis = WS4Redis({ - uri: websocket_uri, - connecting: on_connecting, - connected: on_connected, - receive_message: receiveMessage, - disconnected: on_disconnected, - heartbeat_msg: '' - }); - - // attach this function to an event handler on your site - function sendMessage() { - ws4redis.send_message('A message'); - } - - function on_connecting() { - console.log('Websocket is connecting...'); - } - - function on_connected() { - ws4redis.send_message('Hello'); - } - - function on_disconnected(evt) { - console.log('Websocket was disconnected: ' + JSON.stringify(evt)); - } - - // receive a message though the websocket from the server - function receiveMessage(msg) { - console.log('Message from Websocket: ' + msg); - var kind = msg.split(' ', 1)[0]; - if (kind == 'new_pm') { - $('.new_messages').addClass('active'); - $('.new_messages').attr('title', "У вас новое сообщение!") - } - } -}); diff --git a/tulius/websockets/static/websockets/ws4redis.js b/tulius/websockets/static/websockets/ws4redis.js deleted file mode 100644 index 6d973920..00000000 --- a/tulius/websockets/static/websockets/ws4redis.js +++ /dev/null @@ -1,167 +0,0 @@ -/** - * options.uri - > The Websocket URI - * options.connected -> Callback called after the websocket is connected. - * options.connecting -> Callback called when the websocket is connecting. - * options.disconnected -> Callback called after the websocket is disconnected. - * options.receive_message -> Callback called when a message is received from the websocket. - * options.heartbeat_msg -> String to identify the heartbeat message. - * $ -> JQuery instance. - */ -function WS4Redis(options, $) { - 'use strict'; - var opts, ws, deferred, timer, attempts = 1, must_reconnect = true; - var heartbeat_interval = null, missed_heartbeats = 0; - - if (this === undefined) - return new WS4Redis(options, $); - if (options.uri === undefined) - throw new Error('No Websocket URI in options'); - if ($ === undefined) - $ = jQuery; - opts = $.extend({ heartbeat_msg: null }, options); - connect(opts.uri); - - function connect(uri) { - try { - if (ws && (is_connecting() || is_connected())) { - console.log("Websocket is connecting or already connected."); - return; - } - - if ($.type(opts.connecting) === 'function') { - opts.connecting(); - } - - console.log("Connecting to " + uri + " ..."); - deferred = $.Deferred(); - ws = new WebSocket(uri); - ws.onopen = on_open; - ws.onmessage = on_message; - ws.onerror = on_error; - ws.onclose = on_close; - timer = null; - } catch (err) { - try_to_reconnect(); - deferred.reject(new Error(err)); - } - } - - function try_to_reconnect() { - if (must_reconnect && !timer) { - // try to reconnect - console.log('Reconnecting...'); - var interval = generate_inteval(attempts); - timer = setTimeout(function() { - attempts++; - connect(ws.url); - }, interval); - } - } - - function send_heartbeat() { - try { - missed_heartbeats++; - if (missed_heartbeats > 3) - throw new Error("Too many missed heartbeats."); - ws.send(opts.heartbeat_msg); - } catch(e) { - clearInterval(heartbeat_interval); - heartbeat_interval = null; - console.warn("Closing connection. Reason: " + e.message); - if ( !is_closing() && !is_closed() ) { - ws.close(); - } - } - } - - function on_open() { - console.log('Connected!'); - // new connection, reset attemps counter - attempts = 1; - deferred.resolve(); - if (opts.heartbeat_msg && heartbeat_interval === null) { - missed_heartbeats = 0; - heartbeat_interval = setInterval(send_heartbeat, 5000); - } - if ($.type(opts.connected) === 'function') { - opts.connected(); - } - } - - function on_close(evt) { - console.log("Connection closed!"); - if ($.type(opts.disconnected) === 'function') { - opts.disconnected(evt); - } - try_to_reconnect(); - } - - function on_error(evt) { - console.error("Websocket connection is broken!"); - deferred.reject(new Error(evt)); - } - - function on_message(evt) { - if (opts.heartbeat_msg && evt.data === opts.heartbeat_msg) { - // reset the counter for missed heartbeats - missed_heartbeats = 0; - } else if ($.type(opts.receive_message) === 'function') { - return opts.receive_message(evt.data); - } - } - - // this code is borrowed from http://blog.johnryding.com/post/78544969349/ - // - // Generate an interval that is randomly between 0 and 2^k - 1, where k is - // the number of connection attmpts, with a maximum interval of 30 seconds, - // so it starts at 0 - 1 seconds and maxes out at 0 - 30 seconds - function generate_inteval(k) { - var maxInterval = (Math.pow(2, k) - 1) * 1000; - - // If the generated interval is more than 30 seconds, truncate it down to 30 seconds. - if (maxInterval > 30*1000) { - maxInterval = 30*1000; - } - - // generate the interval to a random number between 0 and the maxInterval determined from above - return Math.random() * maxInterval; - } - - this.send_message = function(message) { - ws.send(message); - }; - - this.get_state = function() { - return ws.readyState; - }; - - function is_connecting() { - return ws && ws.readyState === 0; - } - - function is_connected() { - return ws && ws.readyState === 1; - } - - function is_closing() { - return ws && ws.readyState === 2; - } - - function is_closed() { - return ws && ws.readyState === 3; - } - - - this.close = function () { - clearInterval(heartbeat_interval); - must_reconnect = false; - if (!is_closing() || !is_closed()) { - ws.close(); - } - } - - this.is_connecting = is_connecting; - this.is_connected = is_connected; - this.is_closing = is_closing; - this.is_closed = is_closed; -} diff --git a/tulius/websockets/urls.py b/tulius/websockets/urls.py deleted file mode 100644 index 54dbfd53..00000000 --- a/tulius/websockets/urls.py +++ /dev/null @@ -1,13 +0,0 @@ -from django import urls - -from tulius.websockets import views - - -app_name = 'tulius.websockets' - - -urlpatterns = [ - urls.re_path(r'^$', views.web_socket_view, name='ws'), - urls.re_path( - r'^old/$', views.web_socket_view, {'json_format': False}, name='old'), -] diff --git a/tulius/websockets/user_session.py b/tulius/websockets/user_session.py deleted file mode 100644 index 4e218b57..00000000 --- a/tulius/websockets/user_session.py +++ /dev/null @@ -1,167 +0,0 @@ -import asyncio -import json -import logging -import inspect - -from django.conf import settings -from django.contrib import auth -from redis import asyncio as aioredis -from asgiref.sync import sync_to_async - -from tulius.forum import const as forum_const -from tulius.websockets import consts - -logger = logging.getLogger('async_app') - - -class PubSub(aioredis.client.PubSub): - async def run( - self, - *, - exception_handler=None, - poll_timeout: float = 1.0, - ) -> None: - for channel, handler in self.channels.items(): - if handler is None: - raise aioredis.PubSubError( - f"Channel: '{channel}' has no handler registered") - for pattern, handler in self.patterns.items(): - if handler is None: - raise aioredis.PubSubError( - f"Pattern: '{pattern}' has no handler registered") - - await self.connect() - while True: - try: - await self.get_message( - ignore_subscribe_messages=True, timeout=poll_timeout - ) - except (asyncio.CancelledError, GeneratorExit): - raise - except BaseException as e: - if exception_handler is None: - raise - res = exception_handler(e, self) - if inspect.isawaitable(res): - await res - # Ensure that other tasks on the event loop get a chance to run - # if we didn't have to block for I/O anywhere. - await asyncio.sleep(0) - - -class UserSession: - def __init__(self, request, ws, redis_cache, json_format): - self.request = request - self.ws = ws - self.redis = None - self.user_id = None - self._redis_cache = redis_cache - self.json = json_format - self.user = None - self.pubsub = None - self.pubsub_task = None - - def cache_key(self, value): - return self._redis_cache.make_key(value) - - async def auth(self): - user = await sync_to_async( - auth.get_user, thread_sensitive=False)(self.request) - self.user = user - self.user_id = self.user.pk - - @staticmethod - def _pubsub_exc_handler(e, *args): - logging.exception(e) - - async def subscribe_channel(self, name, func): - logger.debug( - 'subscribe channel %s %s', self.user_id, name) - await self.pubsub.subscribe(**{consts.make_channel_name(name): func}) - - async def public_channel(self, message): - pass - - async def user_channel(self, message): - logger.debug('User %s message %s', self.user_id, message) - message = json.loads(message['data']) - direct = message.pop('.direct') - if direct: - if self.json: - await self.ws.send_json(message) - else: - await self.ws.send_text("new_pm {}".format(message['id'])) - - async def thread_comments_channel(self, message): - logger.debug('User %s message %s', self.user_id, message) - message = json.loads(message['data']) - direct = message.pop('.direct') - if direct: - message['.namespaced'] = 'thread_comments' - await self.ws.send_json(message) - - async def action_subscribe_comments(self, data): - thread_id = data['id'] - async with self.redis.client() as client: - rights = await client.get( - self.cache_key( - forum_const.USER_THREAD_RIGHTS.format( - user_id=self.user_id, thread_id=thread_id))) - if not rights: - return - await self.subscribe_channel( - consts.THREAD_COMMENTS_CHANNEL.format(thread_id=thread_id), - self.thread_comments_channel - ) - - async def action_unsubscribe_comments(self, data): - logging.debug('unsubscribe %s %s', self.user_id, data['id']) - await self.pubsub.unsubscribe(consts.make_channel_name( - consts.THREAD_COMMENTS_CHANNEL.format(thread_id=data['id']))) - - async def action_ping(self, data): - logger.debug('User %s message %s', self.user_id, data) - await self.ws.send_json({ - '.direct': True, - '.namespaced': 'session', - '.action': 'ping', - 'data': data['data'] + '/answer', - }) - - async def process(self): - await self.auth() - logger.info('User %s logged in', self.user_id) - self.redis = aioredis.from_url(settings.REDIS_LOCATION) - self.pubsub = PubSub(self.redis.connection_pool) - await self.subscribe_channel( - consts.CHANNEL_PUBLIC, self.public_channel) - self.pubsub_task = asyncio.create_task( - self.pubsub.run( - exception_handler=self._pubsub_exc_handler, poll_timeout=30)) - try: - if self.user_id: - await self.subscribe_channel( - consts.CHANNEL_USER.format(self.user_id), - self.user_channel) - while True: - if self.json: - data = await self.ws.receive_json() - else: - data = await self.ws.receive_text() - if self.json: - method = getattr( - self, 'action_' + data.get('action', 'empty'), None) - if method: - await method(data) - else: - await self.ws.send_text(data + '/answer') - finally: - logger.info('User %s closed', self.user_id) - await self.close() - - async def close(self): - if self.pubsub_task: - self.pubsub_task.cancel() - self.pubsub = None - if self.redis: - await self.redis.close() diff --git a/tulius/websockets/views.py b/tulius/websockets/views.py deleted file mode 100644 index aa80d3fe..00000000 --- a/tulius/websockets/views.py +++ /dev/null @@ -1,23 +0,0 @@ -from django.core.cache.backends.redis import RedisCache -from django.conf import settings -from django.db import transaction - -from tulius.websockets import user_session -from tulius.websockets.asgi import websocket - -params = settings.CACHES['default'].copy() -if params['BACKEND'] != 'django.core.cache.backends.redis.RedisCache': - raise NotImplementedError() -redis_location = params.pop('LOCATION') -redis_cache = RedisCache(redis_location, params) - - -@transaction.non_atomic_requests -@websocket.websocket_view -async def web_socket_view(request, ws: websocket.WebSocket, json_format=True): - session = user_session.UserSession( - request, ws, redis_cache, json_format=json_format) - try: - await session.process() - finally: - await session.close()