|
| 1 | +""" |
| 2 | +Image upload hardening tests (QW-01). |
| 3 | +
|
| 4 | +Covers: |
| 5 | +- Extension allowlist: .php / .exe / .svg rejected |
| 6 | +- Magic bytes validation: mismatched content rejected |
| 7 | +- Size cap: file exceeding _MAX_IMAGE_BYTES returns 413 |
| 8 | +- Valid uploads: .jpg, .png, .gif, .webp accepted |
| 9 | +""" |
| 10 | +import io |
| 11 | +import os |
| 12 | + |
| 13 | +import httpx |
| 14 | +import pytest |
| 15 | + |
| 16 | +BASE_URL = os.getenv("AGENTCHATBUS_BASE_URL", "http://127.0.0.1:39766") |
| 17 | + |
| 18 | +# Real magic bytes for each supported format |
| 19 | +_JPEG_MAGIC = b"\xff\xd8\xff" + b"\x00" * 10 |
| 20 | +_PNG_MAGIC = b"\x89PNG\r\n\x1a\n" + b"\x00" * 10 |
| 21 | +_GIF_MAGIC = b"GIF89a" + b"\x00" * 10 |
| 22 | +_WEBP_MAGIC = b"RIFF" + b"\x00\x00\x00\x00" + b"WEBP" + b"\x00" * 10 |
| 23 | + |
| 24 | + |
| 25 | +def _build_client() -> httpx.Client: |
| 26 | + return httpx.Client(base_url=BASE_URL, timeout=10) |
| 27 | + |
| 28 | + |
| 29 | +def _require_server_or_skip(client: httpx.Client) -> None: |
| 30 | + try: |
| 31 | + resp = client.get("/api/threads") |
| 32 | + if resp.status_code < 500: |
| 33 | + return |
| 34 | + except Exception: |
| 35 | + pass |
| 36 | + pytest.skip(f"AgentChatBus server is not reachable at {BASE_URL}") |
| 37 | + |
| 38 | + |
| 39 | +def _upload(client: httpx.Client, filename: str, content: bytes, content_type: str = "image/jpeg"): |
| 40 | + return client.post( |
| 41 | + "/api/upload/image", |
| 42 | + files={"file": (filename, io.BytesIO(content), content_type)}, |
| 43 | + ) |
| 44 | + |
| 45 | + |
| 46 | +# ─── Extension allowlist ───────────────────────────────────────────────────── |
| 47 | + |
| 48 | +def test_upload_php_rejected(): |
| 49 | + """.php extension must be rejected regardless of content.""" |
| 50 | + with _build_client() as client: |
| 51 | + _require_server_or_skip(client) |
| 52 | + r = _upload(client, "shell.php", _JPEG_MAGIC, "image/jpeg") |
| 53 | + assert r.status_code == 400, f"Expected 400, got {r.status_code}: {r.text}" |
| 54 | + assert "Unsupported" in r.text or "type" in r.text.lower() |
| 55 | + |
| 56 | + |
| 57 | +def test_upload_exe_rejected(): |
| 58 | + """.exe extension must be rejected.""" |
| 59 | + with _build_client() as client: |
| 60 | + _require_server_or_skip(client) |
| 61 | + r = _upload(client, "malware.exe", b"MZ\x90\x00", "application/octet-stream") |
| 62 | + assert r.status_code == 400, f"Expected 400, got {r.status_code}: {r.text}" |
| 63 | + |
| 64 | + |
| 65 | +def test_upload_svg_rejected(): |
| 66 | + """.svg is excluded — can embed scripts (XSS vector).""" |
| 67 | + with _build_client() as client: |
| 68 | + _require_server_or_skip(client) |
| 69 | + r = _upload(client, "xss.svg", b"<svg><script>alert(1)</script></svg>", "image/svg+xml") |
| 70 | + assert r.status_code == 400, f"Expected 400, got {r.status_code}: {r.text}" |
| 71 | + |
| 72 | + |
| 73 | +# ─── Magic bytes validation ─────────────────────────────────────────────────── |
| 74 | + |
| 75 | +def test_upload_wrong_magic_bytes_rejected(): |
| 76 | + """A .jpg file whose content starts with PNG magic bytes must be rejected.""" |
| 77 | + with _build_client() as client: |
| 78 | + _require_server_or_skip(client) |
| 79 | + r = _upload(client, "fake.jpg", _PNG_MAGIC, "image/jpeg") |
| 80 | + assert r.status_code == 400, f"Expected 400, got {r.status_code}: {r.text}" |
| 81 | + assert "content" in r.text.lower() or "match" in r.text.lower() |
| 82 | + |
| 83 | + |
| 84 | +def test_upload_renamed_text_as_jpg_rejected(): |
| 85 | + """A plain text file renamed to .jpg must be rejected (no valid magic bytes).""" |
| 86 | + with _build_client() as client: |
| 87 | + _require_server_or_skip(client) |
| 88 | + r = _upload(client, "not-an-image.jpg", b"Hello world, I am definitely not a JPEG", "image/jpeg") |
| 89 | + assert r.status_code == 400, f"Expected 400, got {r.status_code}: {r.text}" |
| 90 | + |
| 91 | + |
| 92 | +# ─── Size cap ──────────────────────────────────────────────────────────────── |
| 93 | + |
| 94 | +def test_upload_oversized_file_rejected(): |
| 95 | + """File larger than MAX_IMAGE_BYTES must return 413.""" |
| 96 | + with _build_client() as client: |
| 97 | + _require_server_or_skip(client) |
| 98 | + max_bytes = int(os.getenv("AGENTCHATBUS_MAX_IMAGE_BYTES", str(5 * 1024 * 1024))) |
| 99 | + # Build a fake JPEG that exceeds the limit |
| 100 | + oversized = _JPEG_MAGIC + b"\x00" * (max_bytes + 1024) |
| 101 | + r = _upload(client, "huge.jpg", oversized, "image/jpeg") |
| 102 | + assert r.status_code == 413, f"Expected 413, got {r.status_code}: {r.text}" |
| 103 | + |
| 104 | + |
| 105 | +# ─── Valid uploads ──────────────────────────────────────────────────────────── |
| 106 | + |
| 107 | +def test_upload_valid_jpeg_accepted(): |
| 108 | + """A valid JPEG (correct magic bytes, allowed ext) must return 200 with a URL.""" |
| 109 | + with _build_client() as client: |
| 110 | + _require_server_or_skip(client) |
| 111 | + r = _upload(client, "photo.jpg", _JPEG_MAGIC, "image/jpeg") |
| 112 | + assert r.status_code == 200, f"Expected 200, got {r.status_code}: {r.text}" |
| 113 | + data = r.json() |
| 114 | + assert "url" in data |
| 115 | + assert data["url"].startswith("/static/uploads/") |
| 116 | + assert data["url"].endswith(".jpg") |
| 117 | + |
| 118 | + |
| 119 | +def test_upload_valid_png_accepted(): |
| 120 | + """A valid PNG must be accepted.""" |
| 121 | + with _build_client() as client: |
| 122 | + _require_server_or_skip(client) |
| 123 | + r = _upload(client, "screenshot.png", _PNG_MAGIC, "image/png") |
| 124 | + assert r.status_code == 200, f"Expected 200, got {r.status_code}: {r.text}" |
| 125 | + assert r.json()["url"].endswith(".png") |
| 126 | + |
| 127 | + |
| 128 | +def test_upload_valid_gif_accepted(): |
| 129 | + """A valid GIF must be accepted.""" |
| 130 | + with _build_client() as client: |
| 131 | + _require_server_or_skip(client) |
| 132 | + r = _upload(client, "anim.gif", _GIF_MAGIC, "image/gif") |
| 133 | + assert r.status_code == 200, f"Expected 200, got {r.status_code}: {r.text}" |
| 134 | + assert r.json()["url"].endswith(".gif") |
| 135 | + |
| 136 | + |
| 137 | +def test_upload_valid_webp_accepted(): |
| 138 | + """A valid WebP must be accepted.""" |
| 139 | + with _build_client() as client: |
| 140 | + _require_server_or_skip(client) |
| 141 | + r = _upload(client, "modern.webp", _WEBP_MAGIC, "image/webp") |
| 142 | + assert r.status_code == 200, f"Expected 200, got {r.status_code}: {r.text}" |
| 143 | + assert r.json()["url"].endswith(".webp") |
| 144 | + |
| 145 | + |
| 146 | +def test_upload_no_file_rejected(): |
| 147 | + """POST without a file must return 400.""" |
| 148 | + with _build_client() as client: |
| 149 | + _require_server_or_skip(client) |
| 150 | + r = client.post("/api/upload/image") |
| 151 | + assert r.status_code in (400, 422), f"Expected 400/422, got {r.status_code}: {r.text}" |
0 commit comments