-
Notifications
You must be signed in to change notification settings - Fork 55
Implement full stack admin dashboard #644
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. Weβll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,376 @@ | ||
| """Tests for the admin dashboard API (/api/admin/*). | ||
|
|
||
| All tests run against an in-memory SQLite database with AUTH_ENABLED=false | ||
| so the normal auth middleware is a no-op. Admin auth is tested separately | ||
| by supplying / omitting the ADMIN_API_KEY Bearer token. | ||
| """ | ||
|
|
||
| import os | ||
|
|
||
| os.environ.setdefault("DATABASE_URL", "sqlite+aiosqlite:///:memory:") | ||
| os.environ.setdefault("SECRET_KEY", "test-secret-key-for-ci") | ||
|
|
||
| import pytest | ||
| from fastapi import FastAPI | ||
| from fastapi.testclient import TestClient | ||
|
|
||
| import app.api.admin as admin_module | ||
| from app.api.admin import router as admin_router | ||
| from app.models.bounty import BountyDB, BountyStatus, BountyTier, SubmissionRecord | ||
| from app.services import bounty_service, contributor_service | ||
|
|
||
| # --------------------------------------------------------------------------- | ||
| # Constants | ||
| # --------------------------------------------------------------------------- | ||
|
|
||
| TEST_API_KEY = "test-admin-key-abc" | ||
| AUTH_HEADER = {"Authorization": f"Bearer {TEST_API_KEY}"} | ||
| BAD_AUTH = {"Authorization": "Bearer wrong-key"} | ||
|
|
||
|
|
||
| # --------------------------------------------------------------------------- | ||
| # Fixtures | ||
| # --------------------------------------------------------------------------- | ||
|
|
||
| @pytest.fixture(autouse=True) | ||
| def patch_admin_key(monkeypatch): | ||
| """Inject test API key into the admin module before each test.""" | ||
| monkeypatch.setattr(admin_module, "ADMIN_API_KEY", TEST_API_KEY) | ||
|
|
||
|
|
||
| @pytest.fixture() | ||
| def client(): | ||
| """Create a fresh TestClient with only the admin router mounted.""" | ||
| app = FastAPI() | ||
| app.include_router(admin_router) | ||
| return TestClient(app, raise_server_exceptions=True) | ||
|
|
||
|
|
||
| @pytest.fixture(autouse=True) | ||
| def clear_stores(): | ||
| """Reset in-memory stores and audit log between tests.""" | ||
| bounty_service._bounty_store.clear() | ||
| contributor_service._store.clear() | ||
| admin_module._audit_log.clear() | ||
| yield | ||
| bounty_service._bounty_store.clear() | ||
| contributor_service._store.clear() | ||
| admin_module._audit_log.clear() | ||
|
|
||
|
|
||
| def _make_bounty(bid="b1", title="Fix bug", status=BountyStatus.OPEN, reward=500.0): | ||
| """Insert a minimal BountyDB into the in-memory store.""" | ||
| from datetime import datetime, timezone, timedelta | ||
| bounty = BountyDB( | ||
| id=bid, | ||
| title=title, | ||
| description="A test bounty", | ||
| tier=BountyTier.T1, | ||
| required_skills=[], | ||
| reward_amount=reward, | ||
| created_by="creator-1", | ||
| deadline=(datetime.now(timezone.utc) + timedelta(days=7)).isoformat(), | ||
| status=status, | ||
| created_at=datetime.now(timezone.utc), | ||
| updated_at=datetime.now(timezone.utc), | ||
| ) | ||
| bounty_service._bounty_store[bid] = bounty | ||
| return bounty | ||
|
|
||
|
|
||
| def _make_contributor(cid="c1", username="alice", banned=False): | ||
| """Insert a minimal contributor into the in-memory store.""" | ||
| from app.models.contributor import ContributorDB | ||
| c = ContributorDB( | ||
| id=cid, | ||
| username=username, | ||
| display_name=username.capitalize(), | ||
| skills=[], | ||
| badges=[], | ||
| reputation_score=10.0, | ||
| total_bounties_completed=2, | ||
| total_earnings=1000.0, | ||
| ) | ||
| c.is_banned = banned | ||
| contributor_service._store[cid] = c | ||
| return c | ||
|
|
||
|
|
||
| # --------------------------------------------------------------------------- | ||
| # Auth tests | ||
| # --------------------------------------------------------------------------- | ||
|
|
||
| class TestAdminAuth: | ||
| def test_unauthenticated_request_returns_401(self, client): | ||
| resp = client.get("/api/admin/overview") | ||
| assert resp.status_code == 401 | ||
|
|
||
| def test_wrong_key_returns_403(self, client): | ||
| resp = client.get("/api/admin/overview", headers=BAD_AUTH) | ||
| assert resp.status_code == 403 | ||
|
|
||
| def test_no_api_key_configured_returns_503(self, client, monkeypatch): | ||
| monkeypatch.setattr(admin_module, "ADMIN_API_KEY", "") | ||
| resp = client.get("/api/admin/overview", headers=AUTH_HEADER) | ||
| assert resp.status_code == 503 | ||
|
|
||
| def test_correct_key_allows_access(self, client): | ||
| resp = client.get("/api/admin/overview", headers=AUTH_HEADER) | ||
| assert resp.status_code == 200 | ||
|
|
||
|
|
||
| # --------------------------------------------------------------------------- | ||
| # Overview | ||
| # --------------------------------------------------------------------------- | ||
|
|
||
| class TestOverview: | ||
| def test_returns_zero_counts_on_empty_stores(self, client): | ||
| data = client.get("/api/admin/overview", headers=AUTH_HEADER).json() | ||
| assert data["total_bounties"] == 0 | ||
| assert data["total_contributors"] == 0 | ||
| assert data["total_fndry_paid"] == 0 | ||
|
|
||
| def test_counts_open_and_completed_bounties(self, client): | ||
| _make_bounty("b1", status=BountyStatus.OPEN) | ||
| _make_bounty("b2", status=BountyStatus.COMPLETED, reward=1000.0) | ||
| _make_bounty("b3", status=BountyStatus.PAID, reward=500.0) | ||
|
|
||
| data = client.get("/api/admin/overview", headers=AUTH_HEADER).json() | ||
| assert data["total_bounties"] == 3 | ||
| assert data["open_bounties"] == 1 | ||
| assert data["completed_bounties"] == 2 | ||
| assert data["total_fndry_paid"] == 1500.0 | ||
|
|
||
| def test_counts_banned_contributors(self, client): | ||
| _make_contributor("c1", "alice", banned=False) | ||
| _make_contributor("c2", "bob", banned=True) | ||
|
|
||
| data = client.get("/api/admin/overview", headers=AUTH_HEADER).json() | ||
| assert data["total_contributors"] == 2 | ||
| assert data["active_contributors"] == 1 | ||
| assert data["banned_contributors"] == 1 | ||
|
|
||
|
|
||
| # --------------------------------------------------------------------------- | ||
| # Bounty management | ||
| # --------------------------------------------------------------------------- | ||
|
|
||
| class TestBountyManagement: | ||
| def test_list_bounties_empty(self, client): | ||
| data = client.get("/api/admin/bounties", headers=AUTH_HEADER).json() | ||
| assert data["total"] == 0 | ||
| assert data["items"] == [] | ||
|
|
||
| def test_list_bounties_pagination(self, client): | ||
| for i in range(5): | ||
| _make_bounty(f"b{i}", title=f"Bounty {i}") | ||
|
|
||
| data = client.get("/api/admin/bounties?page=1&per_page=3", headers=AUTH_HEADER).json() | ||
| assert len(data["items"]) == 3 | ||
| assert data["total"] == 5 | ||
| assert data["page"] == 1 | ||
|
|
||
| def test_list_bounties_search_filter(self, client): | ||
| _make_bounty("b1", title="Fix the login bug") | ||
| _make_bounty("b2", title="Add dark mode") | ||
|
|
||
| data = client.get("/api/admin/bounties?search=login", headers=AUTH_HEADER).json() | ||
| assert data["total"] == 1 | ||
| assert data["items"][0]["id"] == "b1" | ||
|
|
||
| def test_list_bounties_status_filter(self, client): | ||
| _make_bounty("b1", status=BountyStatus.OPEN) | ||
| _make_bounty("b2", status=BountyStatus.COMPLETED) | ||
|
|
||
| data = client.get("/api/admin/bounties?status=open", headers=AUTH_HEADER).json() | ||
| assert data["total"] == 1 | ||
| assert data["items"][0]["id"] == "b1" | ||
|
|
||
| def test_update_bounty_status(self, client): | ||
| _make_bounty("b1", status=BountyStatus.OPEN) | ||
| resp = client.patch( | ||
| "/api/admin/bounties/b1", | ||
| headers=AUTH_HEADER, | ||
| json={"status": "completed"}, | ||
| ) | ||
| assert resp.status_code == 200 | ||
| assert bounty_service._bounty_store["b1"].status == "completed" | ||
|
|
||
| def test_update_bounty_reward(self, client): | ||
| _make_bounty("b1", reward=500.0) | ||
| resp = client.patch( | ||
| "/api/admin/bounties/b1", | ||
| headers=AUTH_HEADER, | ||
| json={"reward_amount": 1500.0}, | ||
| ) | ||
| assert resp.status_code == 200 | ||
| assert bounty_service._bounty_store["b1"].reward_amount == 1500.0 | ||
|
|
||
| def test_update_nonexistent_bounty_404(self, client): | ||
| resp = client.patch( | ||
| "/api/admin/bounties/missing", | ||
| headers=AUTH_HEADER, | ||
| json={"status": "cancelled"}, | ||
| ) | ||
| assert resp.status_code == 404 | ||
|
|
||
| def test_update_with_no_changes_400(self, client): | ||
| _make_bounty("b1") | ||
| resp = client.patch("/api/admin/bounties/b1", headers=AUTH_HEADER, json={}) | ||
| assert resp.status_code == 400 | ||
|
|
||
| def test_close_bounty(self, client): | ||
| _make_bounty("b1", status=BountyStatus.IN_PROGRESS) | ||
| resp = client.post("/api/admin/bounties/b1/close", headers=AUTH_HEADER) | ||
| assert resp.status_code == 200 | ||
| assert bounty_service._bounty_store["b1"].status == BountyStatus.CANCELLED | ||
|
|
||
| def test_close_nonexistent_bounty_404(self, client): | ||
| resp = client.post("/api/admin/bounties/missing/close", headers=AUTH_HEADER) | ||
| assert resp.status_code == 404 | ||
|
|
||
| def test_close_bounty_writes_audit_log(self, client): | ||
| _make_bounty("b1") | ||
| client.post("/api/admin/bounties/b1/close", headers=AUTH_HEADER) | ||
| assert any(e["event"] == "admin_bounty_closed" for e in admin_module._audit_log) | ||
|
|
||
|
|
||
| # --------------------------------------------------------------------------- | ||
| # Contributor management | ||
| # --------------------------------------------------------------------------- | ||
|
|
||
| class TestContributorManagement: | ||
| def test_list_contributors_empty(self, client): | ||
| data = client.get("/api/admin/contributors", headers=AUTH_HEADER).json() | ||
| assert data["total"] == 0 | ||
|
|
||
| def test_list_contributors_banned_filter(self, client): | ||
| _make_contributor("c1", "alice", banned=False) | ||
| _make_contributor("c2", "bob", banned=True) | ||
|
|
||
| data = client.get("/api/admin/contributors?is_banned=true", headers=AUTH_HEADER).json() | ||
| assert data["total"] == 1 | ||
| assert data["items"][0]["username"] == "bob" | ||
|
|
||
| def test_ban_contributor(self, client): | ||
| _make_contributor("c1", "alice", banned=False) | ||
| resp = client.post( | ||
| "/api/admin/contributors/c1/ban", | ||
| headers=AUTH_HEADER, | ||
| json={"reason": "Spam submissions violating policy"}, | ||
| ) | ||
| assert resp.status_code == 200 | ||
| assert contributor_service._store["c1"].is_banned is True | ||
|
|
||
| def test_ban_requires_reason(self, client): | ||
| _make_contributor("c1") | ||
| resp = client.post( | ||
| "/api/admin/contributors/c1/ban", | ||
| headers=AUTH_HEADER, | ||
| json={"reason": "ok"}, # too short (<5 chars) | ||
| ) | ||
| assert resp.status_code == 422 | ||
|
|
||
| def test_unban_contributor(self, client): | ||
| _make_contributor("c1", "alice", banned=True) | ||
| resp = client.post("/api/admin/contributors/c1/unban", headers=AUTH_HEADER) | ||
| assert resp.status_code == 200 | ||
| assert contributor_service._store["c1"].is_banned is False | ||
|
|
||
| def test_ban_nonexistent_contributor_404(self, client): | ||
| resp = client.post( | ||
| "/api/admin/contributors/missing/ban", | ||
| headers=AUTH_HEADER, | ||
| json={"reason": "Test reason here"}, | ||
| ) | ||
| assert resp.status_code == 404 | ||
|
|
||
| def test_ban_writes_audit_entry(self, client): | ||
| _make_contributor("c1", "alice") | ||
| client.post( | ||
| "/api/admin/contributors/c1/ban", | ||
| headers=AUTH_HEADER, | ||
| json={"reason": "Policy violation reason"}, | ||
| ) | ||
| assert any(e["event"] == "admin_contributor_banned" for e in admin_module._audit_log) | ||
|
|
||
|
|
||
| # --------------------------------------------------------------------------- | ||
| # Review pipeline | ||
| # --------------------------------------------------------------------------- | ||
|
|
||
| class TestReviewPipeline: | ||
| def test_empty_pipeline(self, client): | ||
| data = client.get("/api/admin/reviews/pipeline", headers=AUTH_HEADER).json() | ||
| assert data["total_active"] == 0 | ||
| assert data["pass_rate"] == 0.0 | ||
| assert data["avg_score"] == 0.0 | ||
|
|
||
|
Comment on lines
+302
to
+308
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. π§Ή Nitpick | π΅ Trivial Missing test coverage for system health endpoint. The Consider adding: class TestSystemHealth:
def test_returns_healthy_status(self, client):
data = client.get("/api/admin/system/health", headers=AUTH_HEADER).json()
assert data["status"] in ("healthy", "degraded")
assert "uptime_seconds" in data
assert "services" in dataπ€ Prompt for AI Agents |
||
|
|
||
| # --------------------------------------------------------------------------- | ||
| # Financial | ||
| # --------------------------------------------------------------------------- | ||
|
|
||
| class TestFinancial: | ||
| def test_overview_zero_on_empty(self, client): | ||
| data = client.get("/api/admin/financial/overview", headers=AUTH_HEADER).json() | ||
| assert data["total_fndry_distributed"] == 0.0 | ||
| assert data["total_paid_bounties"] == 0 | ||
|
|
||
| def test_overview_sums_paid_bounties(self, client): | ||
| _make_bounty("b1", status=BountyStatus.PAID, reward=1000.0) | ||
| _make_bounty("b2", status=BountyStatus.COMPLETED, reward=500.0) | ||
| _make_bounty("b3", status=BountyStatus.OPEN, reward=200.0) | ||
|
|
||
| data = client.get("/api/admin/financial/overview", headers=AUTH_HEADER).json() | ||
| assert data["total_fndry_distributed"] == 1500.0 | ||
| assert data["total_paid_bounties"] == 2 | ||
|
|
||
| def test_payout_history_pagination(self, client): | ||
| for i in range(5): | ||
| _make_bounty(f"b{i}", status=BountyStatus.PAID, reward=100.0) | ||
|
|
||
| data = client.get("/api/admin/financial/payouts?page=1&per_page=3", headers=AUTH_HEADER).json() | ||
| assert len(data["items"]) == 3 | ||
| assert data["total"] == 5 | ||
|
|
||
|
|
||
| # --------------------------------------------------------------------------- | ||
| # Audit log | ||
| # --------------------------------------------------------------------------- | ||
|
|
||
| class TestAuditLog: | ||
| def test_empty_audit_log(self, client): | ||
| data = client.get("/api/admin/audit-log", headers=AUTH_HEADER).json() | ||
| assert data["entries"] == [] | ||
| assert data["total"] == 0 | ||
|
|
||
| def test_audit_log_populated_by_actions(self, client): | ||
| _make_bounty("b1") | ||
| client.post("/api/admin/bounties/b1/close", headers=AUTH_HEADER) | ||
|
|
||
| data = client.get("/api/admin/audit-log", headers=AUTH_HEADER).json() | ||
| assert data["total"] >= 1 | ||
| events = [e["event"] for e in data["entries"]] | ||
| assert "admin_bounty_closed" in events | ||
|
|
||
| def test_audit_log_event_filter(self, client): | ||
| _make_bounty("b1") | ||
| _make_contributor("c1", "alice") | ||
| client.post("/api/admin/bounties/b1/close", headers=AUTH_HEADER) | ||
| client.post( | ||
| "/api/admin/contributors/c1/ban", | ||
| headers=AUTH_HEADER, | ||
| json={"reason": "Spamming the platform"}, | ||
| ) | ||
|
|
||
| data = client.get("/api/admin/audit-log?event=banned", headers=AUTH_HEADER).json() | ||
| assert all("banned" in e["event"] for e in data["entries"]) | ||
|
|
||
| def test_audit_log_limit(self, client): | ||
| for i in range(10): | ||
| _make_bounty(f"b{i}") | ||
| client.post(f"/api/admin/bounties/b{i}/close", headers=AUTH_HEADER) | ||
|
|
||
| data = client.get("/api/admin/audit-log?limit=5", headers=AUTH_HEADER).json() | ||
| assert len(data["entries"]) <= 5 | ||
Uh oh!
There was an error while loading. Please reload this page.