|
| 1 | +# Websockets for interactive communication |
| 2 | + |
| 3 | +[Websockets](https://developer.mozilla.org/en-US/docs/Web/API/WebSockets_API) permit synchronous interactive communication between a user's browser and a server. A series of messages are sent between each end, triggering response events. |
| 4 | + |
| 5 | +While Websockets support multi-user sessions, this documentation is mainly focused on a single-user session. |
| 6 | + |
| 7 | +--- |
| 8 | + |
| 9 | +1. [Getting started](getting-started.md) |
| 10 | +2. [Development and installation](development-guide.md) |
| 11 | +3. [Deployment for production](deployment-guide.md) |
| 12 | +4. [Authentication and magic tokens](authentication-guide.md) |
| 13 | +5. [Websockets for interactive communication](websocket-guide.md) |
| 14 | + |
| 15 | +--- |
| 16 | +## Contents |
| 17 | + |
| 18 | +- [Why Websockets?](#why-websockets) |
| 19 | +- [High-level architecture and workflow](#high-level-architecture-and-workflow) |
| 20 | +- [Requirements](#requirements) |
| 21 | +- [Setting up the Nuxt `frontend`](#setting-up-the-nuxt-frontend) |
| 22 | +- [Setting up the FastAPI `backend`](#setting-up-the-fastapi-backend) |
| 23 | + |
| 24 | +## Why Websockets? |
| 25 | + |
| 26 | +Web applications sessions are not persistent. You can maintain state at the back- _or_ frontend, but not both simultaneously. It's great that `Nuxt Pinia` allows your browser to store what you've been doing, but that will need to be communicated via your API to the backend on every page change. |
| 27 | + |
| 28 | +There are ways around this, such as automatically polling the API (known as [long polling](https://ably.com/topic/long-polling)), but Websockets create a bidirectional session between the front- and backends, allowing you to run a synchronous interactive process. |
| 29 | + |
| 30 | +This is great for interactive chat services, or where your app offers complex software which is impractical (or outright impossible) to run in a browser. |
| 31 | + |
| 32 | +Depending on the popularity of your app, concurrency may become a problem. Stateless polling means you can have millions of users on relatively limited infrastructure, because they load information, and then read it. People are using your app simultaneously, not polling your infrastructure simultaneously. |
| 33 | + |
| 34 | +Websockets are active sessions, so be sparing with the amount of information you send back and forth. |
| 35 | + |
| 36 | +## High-level architecture and workflow |
| 37 | + |
| 38 | +The following general conditions apply: |
| 39 | + |
| 40 | +- If a session requires user-authentication, then you need to manually perform this. You may be used to FastAPI routes handling this for you normally, but Websockets don't. |
| 41 | +- Messages sent between front- and backend are `JSON-encoded`. If you use variables that aren't easily stringified (Pydantic, for example, doesn't automatically stringify UUIDs), you'll need to deliberately check for this. |
| 42 | + |
| 43 | +Here's how the workflow tends to play out: |
| 44 | + |
| 45 | +- `frontend` - initialise a socket by opening a session and sending an initial payload `request`, which may include a user token (if authentication is required). |
| 46 | +- `backend` - receive a socket `request`, validate the session (user or other), send a `response` to the `frontend`, and then enter a `while True` loop to keep the session open and keep listening for `requests`. |
| 47 | +- `frontend` - each `response` triggers an event, updating the view. |
| 48 | +- `frontend` - send an instruction to close the socket when the user ends the sessions or, as fallback, when the user changes the page. |
| 49 | +- `backend` - can also end the session based on user activity. |
| 50 | + |
| 51 | +If the user is joining a multi-user session, then each update to the session is communicated to all users. |
| 52 | + |
| 53 | +## Requirements |
| 54 | + |
| 55 | +- [FastAPI](https://fastapi.tiangolo.com/advanced/websockets/) requires the installation of the WebSockets library: |
| 56 | + |
| 57 | +``` |
| 58 | +pip install websockets |
| 59 | +``` |
| 60 | + |
| 61 | +- There are multiple JavaScript libraries for Websockets, but I like [WebSocketAs Promised](https://github.com/vitalets/websocket-as-promised#readme): |
| 62 | + |
| 63 | +``` |
| 64 | +yarn install websocket-as-promised |
| 65 | +``` |
| 66 | + |
| 67 | +## Setting up the Nuxt `frontend` |
| 68 | + |
| 69 | +The API `backend` is reached at `ws://localhost/api/v1` (or `wss://<your-domain>/api/v1` in production). |
| 70 | + |
| 71 | +Create an appropriate `websocketAPI.ts` file: |
| 72 | + |
| 73 | +``` |
| 74 | +import WebSocketAsPromised from "websocket-as-promised" |
| 75 | +import { apiCore } from "./core" |
| 76 | +
|
| 77 | +export const apiSockets = { |
| 78 | + socketRequest() { |
| 79 | + return new WebSocketAsPromised(`${apiCore.wsurl()}/socket`, { |
| 80 | + packMessage: (data) => JSON.stringify(data), |
| 81 | + unpackMessage: (data) => JSON.parse(data as string), |
| 82 | + }) |
| 83 | + }, |
| 84 | +} |
| 85 | +``` |
| 86 | + |
| 87 | +Then, in the relevant `page.ts` (or component), you create a `websocket` variable (`wsp`) and attach a `watcher` to it so that you can respond to events as it is updated: |
| 88 | + |
| 89 | +``` |
| 90 | +<script setup lang="ts"> |
| 91 | + import WebSocketAsPromised from "websocket-as-promised" |
| 92 | + import { IKeyable, ISocketRequest, ISocketResponse } from "@/interfaces" |
| 93 | + import { apiSockets } from "@/api" |
| 94 | + import { useAuthStore } from "@/stores" |
| 95 | +
|
| 96 | + // SETUP |
| 97 | + let wsp = {} as WebSocketAsPromised |
| 98 | + const authStore = useAuthStore() |
| 99 | + const streaming = ref(false) |
| 100 | +
|
| 101 | + onMounted(async () => { |
| 102 | + await authStore.refreshTokens() |
| 103 | + await initialiseSocket() |
| 104 | + }) |
| 105 | +
|
| 106 | + async function initialiseSocket() { |
| 107 | + wsp = apiSockets.socketRequest() |
| 108 | + await wsp.open() |
| 109 | + wsp.onUnpackedMessage.addListener((data) => watchResponseSocket(data)) |
| 110 | + // Deliberately sending a user token to authenticate the session |
| 111 | + const jsonData: IKeyable = { |
| 112 | + token: authStore.authTokens.token |
| 113 | + } |
| 114 | + wsp.sendPacked(jsonData) |
| 115 | + streaming.value = true |
| 116 | + } |
| 117 | + |
| 118 | + function closeSocket() { |
| 119 | + wsp.onUnpackedMessage.removeListener((data) => watchResponseSocket(data)) |
| 120 | + if (streaming.value) { |
| 121 | + wsp.close() |
| 122 | + streaming.value = false |
| 123 | + } |
| 124 | + } |
| 125 | + |
| 126 | + onBeforeRouteLeave((to, from, next) => { |
| 127 | + closeSocket() |
| 128 | + next() |
| 129 | + }) |
| 130 | +
|
| 131 | + // SESSION |
| 132 | + async function watchResponseSocket(response: ISocketResponse) { |
| 133 | + // response: { state, data, error } |
| 134 | + console.log("response: ", response.state, response.data) |
| 135 | + switch (response.state) { |
| 136 | + case "initialised": |
| 137 | + // User session is authenticated and you can now launch the process |
| 138 | + await watchRequestSocket({ |
| 139 | + state: "startThings", |
| 140 | + data: {} |
| 141 | + }) |
| 142 | + break |
| 143 | + case "somethingHappened": |
| 144 | + // Do stuff |
| 145 | + break |
| 146 | + case "somethingElse": |
| 147 | + // Do some other stuff |
| 148 | + break |
| 149 | + case "error": |
| 150 | + toast.addNotice({ |
| 151 | + title: "Some error", |
| 152 | + content: `Error: ${response.error}`, |
| 153 | + icon: "error" |
| 154 | + }) |
| 155 | + break |
| 156 | + } |
| 157 | + } |
| 158 | + |
| 159 | + async function watchRequestSocket(request: ISocketRequest) { |
| 160 | + // request: { state, data } |
| 161 | + switch (request.state) { |
| 162 | + case "something": |
| 163 | + // Request-specific options |
| 164 | + break |
| 165 | + default: |
| 166 | + sendSocketRequest(request.state, request.data) |
| 167 | + } |
| 168 | + } |
| 169 | + |
| 170 | + function sendSocketRequest(state: string, data: IKeyable) { |
| 171 | + try { |
| 172 | + const payload: ISocketRequest = { |
| 173 | + state, |
| 174 | + data |
| 175 | + } |
| 176 | + wsp.sendPacked(payload) |
| 177 | + } catch (e) { |
| 178 | + console.log(e) |
| 179 | + // closeSocket() |
| 180 | + } |
| 181 | + } |
| 182 | +</script> |
| 183 | +``` |
| 184 | + |
| 185 | +The `response` is a `switch` statement which identifies and responds to the appropriate event. |
| 186 | + |
| 187 | +## Setting up the FastAPI `backend` |
| 188 | + |
| 189 | +At the `backend` you already have a [sockets.py](https://github.com/whythawk/full-stack-fastapi-postgresql/blob/0.8.2/%7B%7Bcookiecutter.project_slug%7D%7D/backend/app/app/api/sockets.py) which handles serialising and deserialising the Websocket requests and responses. Now you create a route in `/endpoints`: |
| 190 | + |
| 191 | +``` |
| 192 | +from fastapi import APIRouter, Depends, WebSocket, HTTPException, WebSocketException |
| 193 | +from starlette.websockets import WebSocketDisconnect |
| 194 | +from websockets.exceptions import ConnectionClosedError |
| 195 | +from app import crud, models, schema_types, schemas |
| 196 | +from app.api import deps, sockets |
| 197 | +
|
| 198 | +router = APIRouter() |
| 199 | +
|
| 200 | +@router.websocket("/socket") |
| 201 | +async def some_websocket_session(*, db: Session = Depends(deps.get_db), websocket: WebSocket): |
| 202 | + current_user = None |
| 203 | + initialised = False |
| 204 | + success = False |
| 205 | + # 1. Open the socket and validate current user |
| 206 | + await websocket.accept() |
| 207 | + request = await sockets.receive_request(websocket=websocket) |
| 208 | + response = {"state": "error", "error": "Could not validate credentials."} |
| 209 | + if request.get("token"): |
| 210 | + try: |
| 211 | + current_user = deps.get_active_websocket_user(db=db, token=request["token"]) |
| 212 | + response = {"state": "initialised", "data": {}} |
| 213 | + except ValidationError: |
| 214 | + pass |
| 215 | + success = await sockets.send_response(websocket=websocket, response=response) |
| 216 | + if response["state"] == "initialised" and success: |
| 217 | + try: |
| 218 | + while True and success: |
| 219 | + # LOOP ################################################################# |
| 220 | + request = await sockets.receive_request(websocket=websocket) |
| 221 | + if not request: |
| 222 | + break |
| 223 | + state = request.get("state") |
| 224 | + data = request.get("data", {}) |
| 225 | + data = sockets.sanitize_data_request(data) |
| 226 | + response = {"state": state} |
| 227 | + try: |
| 228 | + # ALL THE STATES ################################################### |
| 229 | + if state == "startThings": |
| 230 | + # Do some stuff |
| 231 | + data = {"something": "yes, something"} |
| 232 | + response["data"] = data |
| 233 | + initialised = True |
| 234 | + # SAVE AND CLOSE THE SESSION ####################################### |
| 235 | + if state == "save" and initialised: |
| 236 | + # This will close the socket, if it succeeds |
| 237 | + response["data"] = {} |
| 238 | + break |
| 239 | + except ValidationError as e: |
| 240 | + response = {"state": "error", "error": e} |
| 241 | + success = await sockets.send_response(websocket=websocket, response=response) |
| 242 | + # LOOP ################################################################# |
| 243 | + except (WebSocketDisconnect, WebSocketException, ConnectionClosedError) as e: |
| 244 | + response = {"state": "error", "error": e} |
| 245 | + try: |
| 246 | + await sockets.send_response(websocket=websocket, response=response) |
| 247 | + await websocket.close(code=1000) |
| 248 | + except (WebSocketDisconnect, ConnectionClosedError, RuntimeError, WebSocketException): |
| 249 | + pass |
| 250 | +``` |
| 251 | + |
| 252 | +And that's - very simplistically - basically it. |
0 commit comments