Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ REDIS_URL=redis://redis:6379/0
GITHUB_TOKEN=
GITHUB_WEBHOOK_SECRET=

# Helius / Shyft / custom indexer → POST /api/webhooks/internal/chain-events
CHAIN_WEBHOOK_INDEXER_SECRET=

# Solana - defaults to devnet for safe local development
SOLANA_RPC_URL=https://api.devnet.solana.com

Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/anchor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ jobs:
components: clippy, rustfmt

- name: Setup Solana
uses: metadaoproject/setup-solana@v1
uses: metadaoproject/setup-solana@v1.2
with:
solana-cli-version: ${{ env.SOLANA_VERSION }}

Expand Down Expand Up @@ -119,7 +119,7 @@ jobs:
toolchain: ${{ env.RUST_VERSION }}

- name: Setup Solana
uses: metadaoproject/setup-solana@v1
uses: metadaoproject/setup-solana@v1.2
with:
solana-cli-version: ${{ env.SOLANA_VERSION }}

Expand Down
1 change: 1 addition & 0 deletions backend/alembic/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

# Import all models so Alembic can detect them
from app.models.contributor import ContributorTable # noqa: F401
from app.models.webhook_delivery import WebhookDeliveryAttemptDB # noqa: F401

config = context.config

Expand Down
84 changes: 84 additions & 0 deletions backend/alembic/versions/006_webhook_delivery_attempts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
"""Webhook delivery attempt log for dashboard and retry visibility.

Revision ID: 006_webhook_delivery_attempts
Revises: 005_bounty_boosts
Create Date: 2026-03-23
"""

from typing import Sequence, Union

import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects import postgresql

revision: str = "006_webhook_delivery_attempts"
down_revision: Union[str, None] = "005_bounty_boosts"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
op.create_table(
"webhook_delivery_attempts",
sa.Column("id", postgresql.UUID(as_uuid=True), primary_key=True),
sa.Column(
"webhook_id",
postgresql.UUID(as_uuid=True),
sa.ForeignKey("contributor_webhooks.id", ondelete="CASCADE"),
nullable=False,
),
sa.Column("batch_id", postgresql.UUID(as_uuid=True), nullable=True),
sa.Column("delivery_mode", sa.String(16), nullable=False),
sa.Column(
"event_types", postgresql.JSONB(astext_type=sa.Text()), nullable=False
),
sa.Column("attempt_number", sa.Integer(), nullable=False),
sa.Column("success", sa.Boolean(), nullable=False),
sa.Column("http_status", sa.Integer(), nullable=True),
sa.Column("error_message", sa.Text(), nullable=True),
sa.Column(
"created_at",
sa.DateTime(timezone=True),
nullable=False,
server_default=sa.text("now()"),
),
)
op.create_index(
"ix_webhook_delivery_attempts_webhook_id",
"webhook_delivery_attempts",
["webhook_id"],
)
op.create_index(
"ix_webhook_delivery_attempts_batch_id",
"webhook_delivery_attempts",
["batch_id"],
)
op.create_index(
"ix_webhook_delivery_attempts_created_at",
"webhook_delivery_attempts",
["created_at"],
)
op.create_index(
"ix_webhook_delivery_webhook_created",
"webhook_delivery_attempts",
["webhook_id", "created_at"],
)


def downgrade() -> None:
op.drop_index(
"ix_webhook_delivery_webhook_created", table_name="webhook_delivery_attempts"
)
op.drop_index(
"ix_webhook_delivery_attempts_created_at",
table_name="webhook_delivery_attempts",
)
op.drop_index(
"ix_webhook_delivery_attempts_batch_id",
table_name="webhook_delivery_attempts",
)
op.drop_index(
"ix_webhook_delivery_attempts_webhook_id",
table_name="webhook_delivery_attempts",
)
op.drop_table("webhook_delivery_attempts")
112 changes: 112 additions & 0 deletions backend/app/api/chain_webhook_indexer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
"""Ingest endpoint for Helius / Shyft (or any indexer) to push normalized chain events."""

from __future__ import annotations

import logging
import os
from datetime import datetime, timezone
from typing import Annotated, Any, Optional

from fastapi import APIRouter, Header, HTTPException, status
from pydantic import BaseModel, Field, field_validator

from app.models.contributor_webhook import ON_CHAIN_WEBHOOK_EVENTS
from app.services.chain_webhook_batcher import chain_webhook_batcher

logger = logging.getLogger(__name__)

router = APIRouter(prefix="/webhooks/internal", tags=["chain-webhook-indexer"])


class ChainEventIngestBody(BaseModel):
"""Payload from an indexer after parsing on-chain logs."""

event: str = Field(
...,
description="One of the on-chain webhook event types (e.g. escrow.locked).",
)
transaction_signature: str = Field(
...,
min_length=32,
max_length=128,
description="Solana transaction signature (base58).",
)
slot: int = Field(..., ge=0, description="Slot containing the transaction.")
block_time: Optional[str] = Field(
None,
description="ISO-8601 UTC timestamp from the block (optional).",
)
accounts: dict[str, Any] = Field(
default_factory=dict,
description="Relevant account pubkeys and parsed fields (indexer-specific).",
)
bounty_id: str = Field(
default="",
description="Optional bounty correlation id when applicable.",
)
extra: dict[str, Any] = Field(
default_factory=dict,
description="Additional fields merged into the webhook ``data`` object.",
)
notify_user_id: Optional[str] = Field(
None,
description=(
"If set, only webhooks owned by this user UUID receive the event. "
"If omitted, all active subscriber URLs receive batched deliveries."
),
)
Comment on lines +35 to +57
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

The ingest schema is not enforced for several structured fields.

Lines 35-57 accept block_time and notify_user_id as plain strings, so malformed timestamps and non-UUID values pass validation. Lines 101-106 then forward block_time verbatim into outbound payloads, and backend/app/services/contributor_webhook_service.py Lines 241-242 only coerce notify_user_id to UUID during batch delivery, after this endpoint has already returned 202. The merge at Lines 101-102 also lets extra.accounts overwrite the reserved data.accounts object, which breaks the documented event shape.

As per coding guidelines, backend/**: Python FastAPI backend. Analyze thoroughly: Input validation and SQL injection vectors; Error handling and edge case coverage; API contract consistency with spec.

Also applies to: 101-111

Comment on lines +51 to +57
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

One shared indexer key can broadcast arbitrary events to every subscriber.

Lines 69-81 treat the indexer credential as a single bearer secret, and Lines 51-57 make recipient scoping optional. In backend/app/services/contributor_webhook_service.py Lines 238-245, a missing notify_user_id becomes an unscoped query over every active webhook. That means any caller holding the indexer secret can synthesize arbitrary on-chain events and fan them out platform-wide, rather than being constrained to a specific source or subscriber set.

As per coding guidelines, backend/**: Python FastAPI backend. Analyze thoroughly: Authentication/authorization gaps.

Also applies to: 69-81, 89-112

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/api/chain_webhook_indexer.py` around lines 51 - 57, The indexer
credential is currently treated as a single bearer secret and notify_user_id
(Field notify_user_id) is optional, allowing any holder of the indexer secret to
broadcast to all active webhooks; fix this by requiring scoping and enforcing
authorization: make notify_user_id required in the indexer payload or, if
optional, validate the indexer credential against an allowlist of user UUIDs and
reject requests with no notify_user_id; update the chain_webhook_indexer.py
logic that parses the credential (the bearer/indexer auth handling around the
code that reads notify_user_id) to enforce the requirement and add explicit
authorization checks, and modify contributor_webhook_service.py (the code path
in the function that queries active webhooks around Lines 238-245) so that it
never performs an unscoped query when notify_user_id is missing—return 400/403
instead or scope the query to allowed user_ids tied to the indexer credential.


@field_validator("event")
@classmethod
def event_must_be_on_chain(cls, v: str) -> str:
if v not in ON_CHAIN_WEBHOOK_EVENTS:
raise ValueError(
f"event must be one of: {', '.join(sorted(ON_CHAIN_WEBHOOK_EVENTS))}"
)
return v


def _verify_indexer_key(x_chain_indexer_key: str | None) -> None:
expected = os.getenv("CHAIN_WEBHOOK_INDEXER_SECRET", "").strip()
if not expected:
logger.warning("CHAIN_WEBHOOK_INDEXER_SECRET unset — rejecting indexer ingest")
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="Chain indexer ingest is not configured",
)
if not x_chain_indexer_key or x_chain_indexer_key != expected:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid indexer credentials",
)


@router.post(
"/chain-events",
status_code=status.HTTP_202_ACCEPTED,
summary="Enqueue an on-chain event for batched webhook delivery",
)
async def ingest_chain_event(
body: ChainEventIngestBody,
x_chain_indexer_key: Annotated[str | None, Header()] = None,
) -> dict[str, str]:
"""Accept a normalized chain event from Helius, Shyft, or an in-house indexer.

Events are queued and delivered in batches every five seconds to reduce HTTP
traffic. Authenticate with header ``X-Chain-Indexer-Key`` matching env
``CHAIN_WEBHOOK_INDEXER_SECRET``.
"""
_verify_indexer_key(x_chain_indexer_key)

ts = body.block_time or datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
data: dict[str, Any] = {"accounts": body.accounts, **body.extra}
event_dict = {
"event": body.event,
"bounty_id": body.bounty_id,
"timestamp": ts,
"data": data,
"transaction_signature": body.transaction_signature,
"slot": body.slot,
}
await chain_webhook_batcher.enqueue(event_dict, body.notify_user_id)
return {"status": "accepted", "delivery": "batched"}
45 changes: 45 additions & 0 deletions backend/app/api/contributor_webhooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
POST /api/webhooks/register Register a new webhook URL
DELETE /api/webhooks/{id} Unregister a webhook
GET /api/webhooks List registered webhooks for the caller
GET /api/webhooks/delivery-stats Delivery health + retry history (dashboard)
POST /api/webhooks/test Send a signed test event to your endpoints
"""

import logging
Expand All @@ -18,6 +20,7 @@
from app.auth import get_current_user_id
from app.database import get_db
from app.models.contributor_webhook import (
WebhookDeliveryDashboard,
WebhookListResponse,
WebhookRegisterRequest,
WebhookResponse,
Expand Down Expand Up @@ -116,3 +119,45 @@ async def list_webhooks(
service = ContributorWebhookService(db)
items = await service.list_for_user(user_id)
return WebhookListResponse(items=items, total=len(items))


# ── dashboard + test ────────────────────────────────────────────────────────


@router.get(
"/delivery-stats",
response_model=WebhookDeliveryDashboard,
summary="Webhook delivery health",
description=(
"Returns attempt counts, failure rate over the last 7 days, last endpoint "
"status, and recent per-attempt rows (including retries) for the caller's "
"registered webhooks."
),
)
async def webhook_delivery_stats(
user_id: str = Depends(get_current_user_id),
db: AsyncSession = Depends(get_db),
) -> WebhookDeliveryDashboard:
service = ContributorWebhookService(db)
raw = await service.delivery_dashboard(user_id)
return WebhookDeliveryDashboard(**raw)


@router.post(
"/test",
summary="Send a test webhook",
description=(
"Dispatches a signed ``webhook.test`` payload immediately to every active "
"webhook registered by the caller (not batched)."
),
responses={
401: {"model": ErrorResponse, "description": "Authentication required"},
},
)
async def webhook_test_delivery(
user_id: str = Depends(get_current_user_id),
db: AsyncSession = Depends(get_db),
) -> dict[str, str | int]:
service = ContributorWebhookService(db)
n = await service.dispatch_test_event(user_id)
return {"status": "completed", "endpoints_notified": n}
Comment on lines +146 to +163
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Consider adding explicit error handling for dispatch_test_event.

The service's dispatch_test_event method raises ValueError if "webhook.test" is not configured in WEBHOOK_EVENTS. While the global value_error_handler in main.py catches this and returns 400, consider documenting this in the responses dict or handling it explicitly for clarity.

Additionally, consider adding a response_model for the return type to ensure consistent API documentation in OpenAPI.

💡 Optional improvement
 `@router.post`(
     "/test",
     summary="Send a test webhook",
     description=(
         "Dispatches a signed ``webhook.test`` payload immediately to every active "
         "webhook registered by the caller (not batched)."
     ),
     responses={
+        400: {"model": ErrorResponse, "description": "webhook.test event not configured"},
     },
 )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/api/contributor_webhooks.py` around lines 146 - 163, Add explicit
handling and documentation for errors and the response model in the
webhook_test_delivery endpoint: update the endpoint to declare a response_model
(e.g., a pydantic model with status and endpoints_notified) and add the
ValueError to the responses mapping (e.g., 400: {"model": ErrorResponse,
"description": "..."}), or wrap the call to
ContributorWebhookService.dispatch_test_event in a try/except that converts
ValueError into an HTTPException(status_code=400) so OpenAPI and runtime
behavior are explicit; reference the webhook_test_delivery function and the
dispatch_test_event method when making these edits.

1 change: 1 addition & 0 deletions backend/app/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ async def init_db() -> None:
from app.models.lifecycle import BountyLifecycleLogDB # noqa: F401
from app.models.escrow import EscrowTable, EscrowLedgerTable # noqa: F401
from app.models.boost import BountyBoostTable # noqa: F401
from app.models.webhook_delivery import WebhookDeliveryAttemptDB # noqa: F401

# NOTE: create_all is idempotent (skips existing tables). For
# production schema changes use ``alembic upgrade head`` instead.
Expand Down
6 changes: 6 additions & 0 deletions backend/app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@
from app.database import init_db, close_db
from app.api.og import router as og_router
from app.api.contributor_webhooks import router as contributor_webhooks_router
from app.api.chain_webhook_indexer import router as chain_webhook_indexer_router
from app.services.chain_webhook_batcher import chain_webhook_batcher
from app.api.siws import router as siws_router
from app.middleware.security import SecurityHeadersMiddleware
from app.middleware.sanitization import InputSanitizationMiddleware
Expand Down Expand Up @@ -111,6 +113,7 @@ async def lifespan(app: FastAPI):
)

await init_db()
await chain_webhook_batcher.start()
await ws_manager.init()

# Hydrate in-memory caches from PostgreSQL (source of truth)
Expand Down Expand Up @@ -170,6 +173,8 @@ async def lifespan(app: FastAPI):

yield

await chain_webhook_batcher.stop()

Comment on lines +176 to +177
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Consider wrapping stop() in try/except for graceful shutdown.

If chain_webhook_batcher.stop() raises an exception, subsequent cleanup steps (task cancellations, close_redis, close_db) will not execute, potentially leaving resources unreleased.

💡 Suggested improvement
-    await chain_webhook_batcher.stop()
+    try:
+        await chain_webhook_batcher.stop()
+    except Exception as exc:
+        logger.error("chain_webhook_batcher shutdown error: %s", exc)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
await chain_webhook_batcher.stop()
try:
await chain_webhook_batcher.stop()
except Exception as exc:
logger.error("chain_webhook_batcher shutdown error: %s", exc)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/main.py` around lines 176 - 177, Wrap the await
chain_webhook_batcher.stop() call in a try/except so any exception from
chain_webhook_batcher.stop() is caught and logged (use logger.exception or
process_logger.error with the exception) and does not abort the rest of
shutdown; ensure task cancellation and calls to close_redis and close_db still
run (move them into a finally block or simply run them after the except) so
resources are always released even if chain_webhook_batcher.stop() fails.

# Shutdown: Cancel background tasks, close connections, then database
sync_task.cancel()
auto_approve_task.cancel()
Expand Down Expand Up @@ -410,6 +415,7 @@ async def value_error_handler(request: Request, exc: ValueError):
# Open Graph previews: /og/*
app.include_router(og_router)
app.include_router(contributor_webhooks_router, prefix="/api")
app.include_router(chain_webhook_indexer_router, prefix="/api")
app.include_router(siws_router, prefix="/api")

# System Health: /health, Prometheus: /metrics
Expand Down
Loading
Loading