Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
55586c4
Add server bug tests that expose JSON parsing crashes and validation …
gwbischof Jul 1, 2025
f83d6b8
Add comprehensive edge case tests with timeout protection to discover…
gwbischof Jul 1, 2025
b19a691
Uncomment hanging WebSocket tests and add timeout protection to preve…
gwbischof Jul 1, 2025
a512d08
Add TODO comments to identify tests that expose server crashes or han…
gwbischof Jul 1, 2025
1453d46
Refine test_server_bugs.py to focus on 12 actionable server bugs incl…
gwbischof Jul 7, 2025
ad14419
Remove pytest.raises patterns and add 8 new bug tests to expand test …
gwbischof Jul 7, 2025
48326f4
Remove 2 passing tests that show correct server behavior to focus tes…
gwbischof Jul 7, 2025
bfc56e0
Reorganize bug tests into 5 focused test files grouped by server func…
gwbischof Jul 7, 2025
9c9dc74
Fix server crash by adding JSON parsing error handling to /close endp…
gwbischof Jul 7, 2025
cb53697
Add configurable resource limits to prevent memory exhaustion and DoS…
gwbischof Jul 7, 2025
f212ec8
Remove Redis manipulation tests that bypass API validation to focus o…
gwbischof Jul 7, 2025
5b346e1
Add test references to server code fixes for better traceability betw…
gwbischof Jul 7, 2025
0b402f4
Improve JSON error handling to only catch JSON-specific exceptions fo…
gwbischof Jul 7, 2025
a7a9e17
Sanitize JSON error messages to prevent leaking implementation detail…
gwbischof Jul 7, 2025
7f82c1f
makeing some progress
gwbischof Jul 7, 2025
8cadb32
Remove untested WebSocket frame size limits to follow test-driven dev…
gwbischof Jul 7, 2025
95bbdc7
moving along
gwbischof Jul 7, 2025
7112820
ruff
gwbischof Jul 7, 2025
a319113
cleanup
gwbischof Jul 7, 2025
c66f737
don't use the middleware because it is only needed for the upload end…
gwbischof Jul 7, 2025
a9c7025
Add payload size validation and JSON error handling to prevent server…
gwbischof Jul 7, 2025
90ae9ff
Replace manual JSON parsing with Pydantic model for cleaner validation
gwbischof Jul 8, 2025
0eb0c3b
clean up
gwbischof Jul 8, 2025
28eea12
touch ups
gwbischof Jul 8, 2025
6211736
remove the close_connection body
gwbischof Jul 9, 2025
f189940
make close_connection a delete endpoint
gwbischof Jul 10, 2025
d95f65d
add close connection tests
gwbischof Jul 10, 2025
7c0a96c
test websocket connection to non-existant node
gwbischof Jul 10, 2025
8212b36
return 404 if node not streamable
gwbischof Jul 11, 2025
f662241
clean up the test
gwbischof Jul 11, 2025
ea431da
touch ups
gwbischof Jul 11, 2025
57c94be
touch ups
gwbischof Jul 11, 2025
cd31947
touch up
gwbischof Jul 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 28 additions & 17 deletions server.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import numpy as np
import uvicorn
from pydantic_settings import BaseSettings
from fastapi import FastAPI, WebSocket, Request, WebSocketDisconnect
from fastapi import FastAPI, WebSocket, Request, WebSocketDisconnect, HTTPException
from datetime import datetime
import msgpack
import asyncio
Expand All @@ -15,6 +15,7 @@
class Settings(BaseSettings):
redis_url: str = "redis://localhost:6379/0"
ttl: int = 60 * 60 # 1 hour
max_payload_size: int = 16 * 1024 * 1024 # 16MB max payload


def build_app(settings: Settings):
Expand Down Expand Up @@ -45,21 +46,20 @@ async def create():
await redis_client.setnx(f"seq_num:{node_id}", 0)
return {"node_id": node_id}

@app.delete("/upload/{node_id}", status_code=204)
async def close(node_id):
"Declare that a dataset is done streaming."

await redis_client.delete(f"seq_num:{node_id}")
# TODO: Shorten TTL on all extant data for this node.
return None

@app.post("/upload/{node_id}")
async def append(node_id, request: Request):
"Append data to a dataset."

# Check request body size limit
# Tell good-faith clients that their request is too big.
# Fix for: test_large_data_resource.py::test_large_data_resource_limits
headers = request.headers
content_length = headers.get("content-length")
if content_length and int(content_length) > settings.max_payload_size:
raise HTTPException(status_code=413, detail="Payload too large")

# get data from request body
binary_data = await request.body()
headers = request.headers
metadata = {
"timestamp": datetime.now().isoformat(),
}
Expand All @@ -85,16 +85,23 @@ async def append(node_id, request: Request):
# TODO: Implement two-way communication with subscribe, unsubscribe, flow control.
# @app.websocket("/stream/many")

@app.post("/close/{node_id}")
@app.delete("/close/{node_id}")
async def close_connection(node_id: str, request: Request):
# Parse the JSON body
body = await request.json()
headers = request.headers

reason = body.get("reason", None)

metadata = {"timestamp": datetime.now().isoformat(), "reason": reason}
# Check the node status.
# ttl returns -2 if the key does not exist.
# ttl returns -1 if the key exists but has no associated expire.
# ttl greater than 0 means that it is marked to expire.
node_ttl = await redis_client.ttl(f"seq_num:{node_id}")
if node_ttl > 0:
raise HTTPException(status_code=404, detail=f"Node expiring in {node_ttl} seconds")
if node_ttl == -2:
raise HTTPException(status_code=404, detail="Node not found")

metadata = {"timestamp": datetime.now().isoformat()}
metadata.setdefault("Content-Type", headers.get("Content-Type"))

# Increment the counter for this node.
seq_num = await redis_client.incr(f"seq_num:{node_id}")

Expand All @@ -109,12 +116,12 @@ async def close_connection(node_id: str, request: Request):
},
)
pipeline.expire(f"data:{node_id}:{seq_num}", settings.ttl)
pipeline.expire(f"seq_num:{node_id}", settings.ttl)
pipeline.publish(f"notify:{node_id}", seq_num)
await pipeline.execute()

return {
"status": f"Connection for node {node_id} is now closed.",
"reason": reason,
}

@app.websocket("/stream/single/{node_id}") # one-way communcation
Expand All @@ -124,6 +131,10 @@ async def websocket_endpoint(
envelope_format: str = "json",
seq_num: Optional[int] = None,
):
# Check if the node is streamable before accepting the websocket connection
if not await redis_client.exists(f"seq_num:{node_id}"):
raise HTTPException(status_code=404, detail="Node not found")

await websocket.accept(
headers=[(b"x-server-host", socket.gethostname().encode())]
)
Expand Down
3 changes: 1 addition & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ def client():
"""Fixture providing TestClient following ws-tests pattern."""
settings = Settings(redis_url="redis://localhost:6379/0", ttl=60 * 60)
app = build_app(settings)

with TestClient(app) as client:
yield client

37 changes: 37 additions & 0 deletions tests/test_close_endpoint.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
"""
Tests for the close endpoint.
"""

def test_close_connection_success(client):
"""Test successful close of an existing connection."""
# First create a node
response = client.post("/upload")
assert response.status_code == 200
node_id = response.json()["node_id"]

# Upload some data
response = client.post(
f"/upload/{node_id}",
content=b"test data",
headers={"Content-Type": "application/octet-stream"},
)
assert response.status_code == 200

# Now close the connection
response = client.delete(f"/close/{node_id}")
assert response.status_code == 200
assert response.json()["status"] == f"Connection for node {node_id} is now closed."

# Now close the connection again.
response = client.delete(f"/close/{node_id}")
assert response.status_code == 404


def test_close_connection_not_found(client):
"""Test close endpoint returns 404 for non-existent node."""
non_existent_node_id = "definitely_non_existent_node_99999999"

response = client.delete(f"/close/{non_existent_node_id}")
assert response.status_code == 404
assert response.json()["detail"] == "Node not found"

22 changes: 22 additions & 0 deletions tests/test_large_data_resource.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
"""
Tests for large data handling and resource limit bugs.
"""


def test_large_data_resource_limits(client):
"""Server should handle large data with proper resource limits."""

# Test: Huge payload (20MB) - should be rejected as too large
response = client.post("/upload")
assert response.status_code == 200
node_id = response.json()["node_id"]

huge_payload = b"\x00" * (20 * 1024 * 1024) # 20MB (exceeds 16MB limit)
response = client.post(
f"/upload/{node_id}",
content=huge_payload,
headers={"Content-Type": "application/octet-stream"},
)
# Should be rejected with 413 Payload Too Large due to size limits
assert response.status_code == 413
assert "Payload too large" in response.json()["detail"]
10 changes: 10 additions & 0 deletions tests/test_websocket_timing.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,16 @@
import numpy as np


def test_websocket_connection_to_non_existent_node(client):
"""Test websocket connection to non-existent node returns 404."""
non_existent_node_id = "definitely_non_existent_websocket_node_99999999"

# Try to connect to websocket for non-existent node
# This should result in an HTTP 404 response during the handshake
response = client.get(f"/stream/single/{non_existent_node_id}")
assert response.status_code == 404


def test_subscribe_immediately_after_creation_websockets(client):
"""Client that subscribes immediately after node creation sees all updates in order."""
# Create node
Expand Down