diff --git a/app/src/App.tsx b/app/src/App.tsx index f0dc5942..7286797d 100644 --- a/app/src/App.tsx +++ b/app/src/App.tsx @@ -16,6 +16,7 @@ import NotFound from "./pages/NotFound"; import { Landing } from "./pages/Landing"; import ProtectedRoute from "./components/auth/ProtectedRoute"; import Account from "./pages/Account"; +import Jobs from "./pages/Jobs"; const queryClient = new QueryClient({ defaultOptions: { @@ -83,6 +84,14 @@ const App = () => ( } /> + + + + } + /> ({ + useToast: () => ({ toast: toastMock }), +})); + +jest.mock('@/components/ui/button', () => ({ + Button: ({ children, ...props }: React.PropsWithChildren & React.ButtonHTMLAttributes) => ( + + ), +})); +jest.mock('@/components/ui/input', () => ({ + Input: ({ ...props }: React.InputHTMLAttributes) => , +})); +jest.mock('@/components/ui/label', () => ({ + Label: ({ children, ...props }: React.PropsWithChildren & React.LabelHTMLAttributes) => ( + + ), +})); +jest.mock('@/components/ui/badge', () => ({ + Badge: ({ children, className, ...props }: React.PropsWithChildren & React.HTMLAttributes) => ( + {children} + ), +})); +jest.mock('@/components/ui/dialog', () => ({ + Dialog: ({ children }: React.PropsWithChildren) =>
{children}
, + DialogContent: ({ children }: React.PropsWithChildren) =>
{children}
, + DialogHeader: ({ children }: React.PropsWithChildren) =>
{children}
, + DialogTitle: ({ children }: React.PropsWithChildren) =>
{children}
, + DialogDescription: ({ children }: React.PropsWithChildren) =>
{children}
, + DialogTrigger: ({ children }: React.PropsWithChildren) =>
{children}
, + DialogFooter: ({ children }: React.PropsWithChildren) =>
{children}
, +})); + +const listJobsMock = jest.fn(); +const createJobMock = jest.fn(); +const retryJobMock = jest.fn(); +const getJobStatsMock = jest.fn(); +const getDeadLetterQueueMock = jest.fn(); +jest.mock('@/api/jobs', () => ({ + listJobs: (...args: unknown[]) => listJobsMock(...args), + createJob: (...args: unknown[]) => createJobMock(...args), + retryJob: (...args: unknown[]) => retryJobMock(...args), + getJobStats: (...args: unknown[]) => getJobStatsMock(...args), + getDeadLetterQueue: (...args: unknown[]) => getDeadLetterQueueMock(...args), +})); + +const SAMPLE_JOB = { + id: 1, + user_id: 1, + name: 'Sync bank data', + job_type: 'DATA_SYNC' as const, + status: 'COMPLETED' as const, + attempts: 1, + max_retries: 5, + last_error: null, + payload: '{"source":"plaid"}', + result: '{"records_synced":42}', + scheduled_at: '2025-01-01T00:00:00', + started_at: '2025-01-01T00:00:01', + completed_at: '2025-01-01T00:00:02', + next_retry_at: null, + created_at: '2025-01-01T00:00:00', + updated_at: '2025-01-01T00:00:02', +}; + +const FAILED_JOB = { + ...SAMPLE_JOB, + id: 2, + name: 'Failed report', + job_type: 'REPORT_GENERATION' as const, + status: 'FAILED' as const, + attempts: 3, + last_error: 'Connection timeout', + completed_at: null, + result: null, + next_retry_at: '2025-01-01T00:01:00', +}; + +const SAMPLE_STATS = { + pending: 2, + running: 1, + completed: 10, + failed: 3, + dead: 1, + total: 17, +}; + +describe('Jobs monitor integration', () => { + beforeEach(() => { + jest.clearAllMocks(); + listJobsMock.mockResolvedValue({ + jobs: [SAMPLE_JOB, FAILED_JOB], + total: 2, + page: 1, + per_page: 20, + }); + getJobStatsMock.mockResolvedValue(SAMPLE_STATS); + createJobMock.mockResolvedValue(SAMPLE_JOB); + retryJobMock.mockResolvedValue({ ...FAILED_JOB, status: 'COMPLETED', attempts: 4 }); + getDeadLetterQueueMock.mockResolvedValue([]); + }); + + it('renders page title and stats cards', async () => { + render(); + await waitFor(() => expect(getJobStatsMock).toHaveBeenCalled()); + expect(screen.getByText(/background jobs/i)).toBeInTheDocument(); + expect(screen.getByText('17')).toBeInTheDocument(); // total + expect(screen.getByText('10')).toBeInTheDocument(); // completed + }); + + it('renders job list with status badges', async () => { + render(); + await waitFor(() => expect(listJobsMock).toHaveBeenCalled()); + expect(screen.getByText('Sync bank data')).toBeInTheDocument(); + expect(screen.getByText('Failed report')).toBeInTheDocument(); + expect(screen.getByText('Completed')).toBeInTheDocument(); + expect(screen.getByText('Failed')).toBeInTheDocument(); + }); + + it('shows retry button for failed jobs', async () => { + render(); + await waitFor(() => expect(listJobsMock).toHaveBeenCalled()); + const retryButtons = screen.getAllByRole('button', { name: /retry/i }); + expect(retryButtons.length).toBeGreaterThanOrEqual(1); + }); + + it('retries a failed job', async () => { + render(); + await waitFor(() => expect(listJobsMock).toHaveBeenCalled()); + const retryButtons = screen.getAllByRole('button', { name: /retry/i }); + await userEvent.click(retryButtons[0]); + await waitFor(() => expect(retryJobMock).toHaveBeenCalledWith(2)); + expect(toastMock).toHaveBeenCalledWith( + expect.objectContaining({ title: 'Job retried' }), + ); + }); + + it('shows error message for failed jobs', async () => { + render(); + await waitFor(() => expect(listJobsMock).toHaveBeenCalled()); + expect(screen.getByText('Connection timeout')).toBeInTheDocument(); + }); + + it('displays attempt counter', async () => { + render(); + await waitFor(() => expect(listJobsMock).toHaveBeenCalled()); + expect(screen.getByText('Attempt 1/5')).toBeInTheDocument(); + expect(screen.getByText('Attempt 3/5')).toBeInTheDocument(); + }); + + it('shows empty state when no jobs', async () => { + listJobsMock.mockResolvedValue({ jobs: [], total: 0, page: 1, per_page: 20 }); + getJobStatsMock.mockResolvedValue({ ...SAMPLE_STATS, total: 0 }); + render(); + await waitFor(() => expect(listJobsMock).toHaveBeenCalled()); + expect(screen.getByText(/no jobs found/i)).toBeInTheDocument(); + }); +}); diff --git a/app/src/api/jobs.ts b/app/src/api/jobs.ts new file mode 100644 index 00000000..45fde3ea --- /dev/null +++ b/app/src/api/jobs.ts @@ -0,0 +1,82 @@ +import { api } from './client'; + +export type JobStatus = 'PENDING' | 'RUNNING' | 'COMPLETED' | 'FAILED' | 'DEAD'; +export type JobType = 'DATA_SYNC' | 'REPORT_GENERATION' | 'EMAIL_NOTIFICATION'; + +export type Job = { + id: number; + user_id: number; + name: string; + job_type: JobType; + status: JobStatus; + attempts: number; + max_retries: number; + last_error: string | null; + payload: string | null; + result: string | null; + scheduled_at: string | null; + started_at: string | null; + completed_at: string | null; + next_retry_at: string | null; + created_at: string | null; + updated_at: string | null; +}; + +export type JobListResponse = { + jobs: Job[]; + total: number; + page: number; + per_page: number; +}; + +export type JobStats = { + pending: number; + running: number; + completed: number; + failed: number; + dead: number; + total: number; +}; + +export type JobCreate = { + name: string; + job_type: JobType; + payload?: Record; + max_retries?: number; + scheduled_at?: string; +}; + +export async function listJobs(params?: { + status?: JobStatus; + job_type?: JobType; + page?: number; + per_page?: number; +}): Promise { + const searchParams = new URLSearchParams(); + if (params?.status) searchParams.set('status', params.status); + if (params?.job_type) searchParams.set('job_type', params.job_type); + if (params?.page) searchParams.set('page', String(params.page)); + if (params?.per_page) searchParams.set('per_page', String(params.per_page)); + const qs = searchParams.toString(); + return api(`/jobs${qs ? `?${qs}` : ''}`); +} + +export async function createJob(payload: JobCreate): Promise { + return api('/jobs', { method: 'POST', body: payload }); +} + +export async function getJob(id: number): Promise { + return api(`/jobs/${id}`); +} + +export async function retryJob(id: number): Promise { + return api(`/jobs/${id}/retry`, { method: 'POST' }); +} + +export async function getJobStats(): Promise { + return api('/jobs/stats'); +} + +export async function getDeadLetterQueue(): Promise { + return api('/jobs/dead-letter'); +} diff --git a/app/src/components/layout/Navbar.tsx b/app/src/components/layout/Navbar.tsx index c7593b70..0a816057 100644 --- a/app/src/components/layout/Navbar.tsx +++ b/app/src/components/layout/Navbar.tsx @@ -13,6 +13,7 @@ const navigation = [ { name: 'Reminders', href: '/reminders' }, { name: 'Expenses', href: '/expenses' }, { name: 'Analytics', href: '/analytics' }, + { name: 'Jobs', href: '/jobs' }, ]; export function Navbar() { diff --git a/app/src/pages/Jobs.tsx b/app/src/pages/Jobs.tsx new file mode 100644 index 00000000..48da9564 --- /dev/null +++ b/app/src/pages/Jobs.tsx @@ -0,0 +1,431 @@ +import { useCallback, useEffect, useState } from 'react'; +import { Button } from '@/components/ui/button'; +import { Badge } from '@/components/ui/badge'; +import { Input } from '@/components/ui/input'; +import { Label } from '@/components/ui/label'; +import { + Dialog, + DialogContent, + DialogDescription, + DialogFooter, + DialogHeader, + DialogTitle, + DialogTrigger, +} from '@/components/ui/dialog'; +import { useToast } from '@/hooks/use-toast'; +import { + listJobs, + createJob, + retryJob, + getJobStats, + getDeadLetterQueue, + type Job, + type JobStatus, + type JobType, + type JobStats, + type JobCreate, +} from '@/api/jobs'; + +const STATUS_STYLES: Record = { + PENDING: { + label: 'Pending', + className: 'bg-yellow-100 text-yellow-800 border-yellow-200', + }, + RUNNING: { + label: 'Running', + className: 'bg-blue-100 text-blue-800 border-blue-200', + }, + COMPLETED: { + label: 'Completed', + className: 'bg-green-100 text-green-800 border-green-200', + }, + FAILED: { + label: 'Failed', + className: 'bg-red-100 text-red-800 border-red-200', + }, + DEAD: { + label: 'Dead Letter', + className: 'bg-gray-800 text-gray-100 border-gray-700', + }, +}; + +const JOB_TYPE_LABELS: Record = { + DATA_SYNC: 'Data Sync', + REPORT_GENERATION: 'Report Generation', + EMAIL_NOTIFICATION: 'Email Notification', +}; + +function StatusBadge({ status }: { status: JobStatus }) { + const style = STATUS_STYLES[status] ?? STATUS_STYLES.PENDING; + return ( + + {style.label} + + ); +} + +function StatsCard({ label, count, active }: { label: string; count: number; active?: boolean }) { + return ( + + ); +} + +function TimelineEntry({ job }: { job: Job }) { + const parsePayload = (raw: string | null) => { + if (!raw) return null; + try { + return JSON.parse(raw); + } catch { + return null; + } + }; + + const payload = parsePayload(job.payload); + const result = parsePayload(job.result); + + return ( +
+
+
+
+ {job.name} + + + {JOB_TYPE_LABELS[job.job_type] ?? job.job_type} + +
+
+ Attempt {job.attempts}/{job.max_retries} + {job.created_at && Created {new Date(job.created_at).toLocaleString()}} + {job.completed_at && Completed {new Date(job.completed_at).toLocaleString()}} + {job.next_retry_at && job.status === 'FAILED' && ( + Retry at {new Date(job.next_retry_at).toLocaleString()} + )} +
+ {job.last_error && ( +
+ {job.last_error} +
+ )} + {payload && ( +
+ + Payload + +
+              {JSON.stringify(payload, null, 2)}
+            
+
+ )} + {result && ( +
+ + Result + +
+              {JSON.stringify(result, null, 2)}
+            
+
+ )} +
+
+ ); +} + +export function Jobs() { + const { toast } = useToast(); + const getErrorMessage = (error: unknown, fallback: string) => + error instanceof Error ? error.message : fallback; + + const [jobs, setJobs] = useState([]); + const [stats, setStats] = useState(null); + const [loading, setLoading] = useState(true); + const [statusFilter, setStatusFilter] = useState(''); + const [typeFilter, setTypeFilter] = useState(''); + const [page, setPage] = useState(1); + const [total, setTotal] = useState(0); + const perPage = 20; + + // New job dialog state + const [dialogOpen, setDialogOpen] = useState(false); + const [newName, setNewName] = useState(''); + const [newType, setNewType] = useState('DATA_SYNC'); + const [newMaxRetries, setNewMaxRetries] = useState('5'); + const [saving, setSaving] = useState(false); + + const refresh = useCallback(async () => { + setLoading(true); + try { + const [jobsRes, statsRes] = await Promise.all([ + listJobs({ + status: statusFilter || undefined, + job_type: typeFilter || undefined, + page, + per_page: perPage, + }), + getJobStats(), + ]); + setJobs(jobsRes.jobs); + setTotal(jobsRes.total); + setStats(statsRes); + } catch (error: unknown) { + toast({ + title: 'Failed to load jobs', + description: getErrorMessage(error, 'Please try again.'), + }); + } finally { + setLoading(false); + } + }, [statusFilter, typeFilter, page, toast]); + + useEffect(() => { + void refresh(); + }, [refresh]); + + async function handleCreate() { + if (!newName.trim()) return; + setSaving(true); + try { + const payload: JobCreate = { + name: newName.trim(), + job_type: newType, + max_retries: Math.max(0, parseInt(newMaxRetries, 10) || 5), + }; + await createJob(payload); + toast({ title: 'Job created' }); + setDialogOpen(false); + setNewName(''); + setNewMaxRetries('5'); + void refresh(); + } catch (error: unknown) { + toast({ + title: 'Failed to create job', + description: getErrorMessage(error, 'Please try again.'), + }); + } finally { + setSaving(false); + } + } + + async function handleRetry(jobId: number) { + try { + const updated = await retryJob(jobId); + toast({ + title: 'Job retried', + description: `Status: ${updated.status}`, + }); + void refresh(); + } catch (error: unknown) { + toast({ + title: 'Failed to retry job', + description: getErrorMessage(error, 'Please try again.'), + }); + } + } + + const totalPages = Math.ceil(total / perPage); + + return ( +
+
+
+
+

Background Jobs

+

+ Monitor, retry, and manage background tasks with resilient retry logic. +

+
+
+ + + + + + + + Create Background Job + + Enqueue a new job for background processing with automatic retry. + + +
+
+ + setNewName(e.target.value)} + placeholder="Sync bank transactions" + /> +
+
+
+ + +
+
+ + setNewMaxRetries(e.target.value)} + /> +
+
+
+ + + + +
+
+
+
+
+ + {/* Stats cards */} + {stats && ( +
+
{ setStatusFilter(''); setPage(1); }}> + +
+
{ setStatusFilter('PENDING'); setPage(1); }}> + +
+
{ setStatusFilter('RUNNING'); setPage(1); }}> + +
+
{ setStatusFilter('COMPLETED'); setPage(1); }}> + +
+
{ setStatusFilter('FAILED'); setPage(1); }}> + +
+
{ setStatusFilter('DEAD'); setPage(1); }}> + +
+
+ )} + + {/* Filters */} +
+
+
+ + +
+ +
+
+ + {/* Job list as timeline */} + {loading ? ( +
Loading...
+ ) : jobs.length === 0 ? ( +
+ No jobs found. Create one to get started. +
+ ) : ( +
+

Job History

+
+ {jobs.map((job) => ( +
+ + {(job.status === 'FAILED' || job.status === 'DEAD') && ( +
+ +
+ )} +
+ ))} +
+ + {/* Pagination */} + {totalPages > 1 && ( +
+ + + Page {page} of {totalPages} ({total} total) + + +
+ )} +
+ )} +
+ ); +} + +export default Jobs; diff --git a/packages/backend/app/db/schema.sql b/packages/backend/app/db/schema.sql index 410189de..f4ba028d 100644 --- a/packages/backend/app/db/schema.sql +++ b/packages/backend/app/db/schema.sql @@ -123,3 +123,25 @@ CREATE TABLE IF NOT EXISTS audit_logs ( action VARCHAR(100) NOT NULL, created_at TIMESTAMP NOT NULL DEFAULT NOW() ); + +CREATE TABLE IF NOT EXISTS background_jobs ( + id SERIAL PRIMARY KEY, + user_id INT NOT NULL REFERENCES users(id) ON DELETE CASCADE, + name VARCHAR(200) NOT NULL, + job_type VARCHAR(30) NOT NULL, + status VARCHAR(20) NOT NULL DEFAULT 'PENDING', + attempts INT NOT NULL DEFAULT 0, + max_retries INT NOT NULL DEFAULT 5, + last_error TEXT, + payload TEXT, + result TEXT, + scheduled_at TIMESTAMP, + started_at TIMESTAMP, + completed_at TIMESTAMP, + next_retry_at TIMESTAMP, + created_at TIMESTAMP NOT NULL DEFAULT NOW(), + updated_at TIMESTAMP NOT NULL DEFAULT NOW() +); +CREATE INDEX IF NOT EXISTS idx_background_jobs_user_status ON background_jobs(user_id, status); +CREATE INDEX IF NOT EXISTS idx_background_jobs_pending ON background_jobs(status, scheduled_at) + WHERE status IN ('PENDING', 'FAILED'); diff --git a/packages/backend/app/models.py b/packages/backend/app/models.py index 64d44810..19a29064 100644 --- a/packages/backend/app/models.py +++ b/packages/backend/app/models.py @@ -1,6 +1,6 @@ from datetime import datetime, date from enum import Enum -from sqlalchemy import Enum as SAEnum +from sqlalchemy import Enum as SAEnum, Text from .extensions import db @@ -133,3 +133,61 @@ class AuditLog(db.Model): user_id = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=True) action = db.Column(db.String(100), nullable=False) created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False) + + +class JobStatus(str, Enum): + PENDING = "PENDING" + RUNNING = "RUNNING" + COMPLETED = "COMPLETED" + FAILED = "FAILED" + DEAD = "DEAD" + + +class JobType(str, Enum): + DATA_SYNC = "DATA_SYNC" + REPORT_GENERATION = "REPORT_GENERATION" + EMAIL_NOTIFICATION = "EMAIL_NOTIFICATION" + + +class BackgroundJob(db.Model): + __tablename__ = "background_jobs" + id = db.Column(db.Integer, primary_key=True) + user_id = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=False) + name = db.Column(db.String(200), nullable=False) + job_type = db.Column(db.String(30), nullable=False) + status = db.Column( + db.String(20), default=JobStatus.PENDING.value, nullable=False + ) + attempts = db.Column(db.Integer, default=0, nullable=False) + max_retries = db.Column(db.Integer, default=5, nullable=False) + last_error = db.Column(Text, nullable=True) + payload = db.Column(Text, nullable=True) # JSON-encoded job payload + result = db.Column(Text, nullable=True) # JSON-encoded result + scheduled_at = db.Column(db.DateTime, nullable=True) + started_at = db.Column(db.DateTime, nullable=True) + completed_at = db.Column(db.DateTime, nullable=True) + next_retry_at = db.Column(db.DateTime, nullable=True) + created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False) + updated_at = db.Column( + db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False + ) + + def to_dict(self) -> dict: + return { + "id": self.id, + "user_id": self.user_id, + "name": self.name, + "job_type": self.job_type, + "status": self.status, + "attempts": self.attempts, + "max_retries": self.max_retries, + "last_error": self.last_error, + "payload": self.payload, + "result": self.result, + "scheduled_at": self.scheduled_at.isoformat() if self.scheduled_at else None, + "started_at": self.started_at.isoformat() if self.started_at else None, + "completed_at": self.completed_at.isoformat() if self.completed_at else None, + "next_retry_at": self.next_retry_at.isoformat() if self.next_retry_at else None, + "created_at": self.created_at.isoformat() if self.created_at else None, + "updated_at": self.updated_at.isoformat() if self.updated_at else None, + } diff --git a/packages/backend/app/observability.py b/packages/backend/app/observability.py index 0c2a6bf7..d65c272c 100644 --- a/packages/backend/app/observability.py +++ b/packages/backend/app/observability.py @@ -60,6 +60,12 @@ def __init__(self) -> None: ["event", "channel", "status"], registry=self.registry, ) + self.job_events_total = Counter( + "finmind_job_events_total", + "Background job lifecycle events.", + ["event", "job_type", "status"], + registry=self.registry, + ) def observe_http_request( self, method: str, endpoint: str, status_code: int, duration_seconds: float @@ -79,6 +85,13 @@ def record_reminder_event( event=event, channel=channel, status=status ).inc() + def record_job_event( + self, event: str, job_type: str, status: str = "ok" + ) -> None: + self.job_events_total.labels( + event=event, job_type=job_type, status=status + ).inc() + def metrics_response(self) -> Response: if self.multiprocess_enabled: registry = CollectorRegistry() @@ -137,3 +150,9 @@ def track_reminder_event(event: str, channel: str, status: str = "ok") -> None: obs = current_app.extensions.get("observability") if obs: obs.record_reminder_event(event=event, channel=channel, status=status) + + +def track_job_event(event: str, job_type: str, status: str = "ok") -> None: + obs = current_app.extensions.get("observability") + if obs: + obs.record_job_event(event=event, job_type=job_type, status=status) diff --git a/packages/backend/app/routes/__init__.py b/packages/backend/app/routes/__init__.py index f13b0f89..8d585f25 100644 --- a/packages/backend/app/routes/__init__.py +++ b/packages/backend/app/routes/__init__.py @@ -7,6 +7,7 @@ from .categories import bp as categories_bp from .docs import bp as docs_bp from .dashboard import bp as dashboard_bp +from .jobs import bp as jobs_bp def register_routes(app: Flask): @@ -18,3 +19,4 @@ def register_routes(app: Flask): app.register_blueprint(categories_bp, url_prefix="/categories") app.register_blueprint(docs_bp, url_prefix="/docs") app.register_blueprint(dashboard_bp, url_prefix="/dashboard") + app.register_blueprint(jobs_bp, url_prefix="/jobs") diff --git a/packages/backend/app/routes/jobs.py b/packages/backend/app/routes/jobs.py new file mode 100644 index 00000000..250dea5b --- /dev/null +++ b/packages/backend/app/routes/jobs.py @@ -0,0 +1,169 @@ +"""REST API routes for background job management. + +Endpoints: + GET /jobs - List jobs for the authenticated user + POST /jobs - Create (enqueue) a new job + GET /jobs/ - Get a single job by ID + POST /jobs//retry - Manually retry a failed/dead job + GET /jobs/stats - Aggregate job status counts + GET /jobs/dead-letter - List dead-letter (permanently failed) jobs +""" + +from datetime import datetime +from flask import Blueprint, jsonify, request +from flask_jwt_extended import jwt_required, get_jwt_identity +from ..extensions import db +from ..models import BackgroundJob, JobStatus, JobType +from ..services.job_queue import enqueue_job, retry_job, execute_job +import logging + +bp = Blueprint("jobs", __name__) +logger = logging.getLogger("finmind.jobs.routes") + + +@bp.get("") +@jwt_required() +def list_jobs(): + """List all background jobs for the authenticated user.""" + uid = int(get_jwt_identity()) + status_filter = request.args.get("status") + job_type_filter = request.args.get("job_type") + page = max(int(request.args.get("page", 1)), 1) + per_page = min(int(request.args.get("per_page", 50)), 100) + + query = db.session.query(BackgroundJob).filter_by(user_id=uid) + if status_filter: + query = query.filter_by(status=status_filter.upper()) + if job_type_filter: + query = query.filter_by(job_type=job_type_filter.upper()) + + total = query.count() + items = ( + query.order_by(BackgroundJob.created_at.desc()) + .offset((page - 1) * per_page) + .limit(per_page) + .all() + ) + + return jsonify( + { + "jobs": [j.to_dict() for j in items], + "total": total, + "page": page, + "per_page": per_page, + } + ) + + +@bp.post("") +@jwt_required() +def create_job(): + """Enqueue a new background job.""" + uid = int(get_jwt_identity()) + data = request.get_json() or {} + + name = data.get("name") + job_type = data.get("job_type") + if not name or not job_type: + return jsonify(error="name and job_type are required"), 400 + + valid_types = {e.value for e in JobType} + if job_type.upper() not in valid_types: + return ( + jsonify(error=f"Invalid job_type. Must be one of: {', '.join(sorted(valid_types))}"), + 400, + ) + + payload = data.get("payload", {}) + max_retries = data.get("max_retries", 5) + if not isinstance(max_retries, int) or max_retries < 0: + return jsonify(error="max_retries must be a non-negative integer"), 400 + + scheduled_at = None + if data.get("scheduled_at"): + try: + scheduled_at = datetime.fromisoformat(data["scheduled_at"]) + except (ValueError, TypeError): + return jsonify(error="scheduled_at must be a valid ISO datetime"), 400 + + job = enqueue_job( + user_id=uid, + name=name, + job_type=job_type.upper(), + payload=payload if isinstance(payload, dict) else {}, + max_retries=max_retries, + scheduled_at=scheduled_at, + ) + logger.info("Created job id=%s type=%s user=%s", job.id, job.job_type, uid) + return jsonify(job.to_dict()), 201 + + +@bp.get("/") +@jwt_required() +def get_job(job_id: int): + """Get a single job by ID.""" + uid = int(get_jwt_identity()) + job = db.session.get(BackgroundJob, job_id) + if not job or job.user_id != uid: + return jsonify(error="not found"), 404 + return jsonify(job.to_dict()) + + +@bp.post("//retry") +@jwt_required() +def retry_job_route(job_id: int): + """Manually retry a failed or dead job.""" + uid = int(get_jwt_identity()) + job = db.session.get(BackgroundJob, job_id) + if not job or job.user_id != uid: + return jsonify(error="not found"), 404 + + if job.status not in (JobStatus.FAILED.value, JobStatus.DEAD.value): + return ( + jsonify(error=f"Cannot retry job in status {job.status}"), + 400, + ) + + job = retry_job(job) + # Execute immediately in-band for manual retries + execute_job(job) + logger.info("Manual retry job id=%s, new status=%s", job.id, job.status) + return jsonify(job.to_dict()) + + +@bp.get("/stats") +@jwt_required() +def job_stats(): + """Get aggregate job status counts for the authenticated user.""" + uid = int(get_jwt_identity()) + rows = ( + db.session.query(BackgroundJob.status, db.func.count(BackgroundJob.id)) + .filter_by(user_id=uid) + .group_by(BackgroundJob.status) + .all() + ) + counts = {status: count for status, count in rows} + return jsonify( + { + "pending": counts.get(JobStatus.PENDING.value, 0), + "running": counts.get(JobStatus.RUNNING.value, 0), + "completed": counts.get(JobStatus.COMPLETED.value, 0), + "failed": counts.get(JobStatus.FAILED.value, 0), + "dead": counts.get(JobStatus.DEAD.value, 0), + "total": sum(counts.values()), + } + ) + + +@bp.get("/dead-letter") +@jwt_required() +def dead_letter_queue(): + """List all permanently failed (dead) jobs for the authenticated user.""" + uid = int(get_jwt_identity()) + items = ( + db.session.query(BackgroundJob) + .filter_by(user_id=uid, status=JobStatus.DEAD.value) + .order_by(BackgroundJob.updated_at.desc()) + .all() + ) + return jsonify([j.to_dict() for j in items]) diff --git a/packages/backend/app/services/job_queue.py b/packages/backend/app/services/job_queue.py new file mode 100644 index 00000000..40247b84 --- /dev/null +++ b/packages/backend/app/services/job_queue.py @@ -0,0 +1,235 @@ +"""Background job queue with exponential backoff retry and dead letter queue. + +Provides an in-process job executor that: + - Picks up PENDING / retryable jobs and runs registered handlers. + - Implements exponential back-off (base 2) with jitter. + - Moves permanently failed jobs to DEAD status (dead letter queue). + - Exposes helpers for the REST layer to enqueue, retry, and query jobs. +""" + +from __future__ import annotations + +import json +import logging +import random +import threading +import time +from datetime import datetime, timedelta +from typing import Any, Callable + +from ..extensions import db +from ..models import BackgroundJob, JobStatus, JobType + +logger = logging.getLogger("finmind.jobs") + +# --------------------------------------------------------------------------- +# Handler registry +# --------------------------------------------------------------------------- +JobHandler = Callable[[dict[str, Any]], dict[str, Any] | None] + +_handlers: dict[str, JobHandler] = {} + + +def register_handler(job_type: str, handler: JobHandler) -> None: + """Register an execution handler for a job type.""" + _handlers[job_type] = handler + + +# --------------------------------------------------------------------------- +# Job lifecycle helpers +# --------------------------------------------------------------------------- + +def enqueue_job( + *, + user_id: int, + name: str, + job_type: str, + payload: dict[str, Any] | None = None, + max_retries: int = 5, + scheduled_at: datetime | None = None, +) -> BackgroundJob: + """Create a new background job in PENDING state.""" + if job_type not in {e.value for e in JobType}: + raise ValueError(f"Unknown job type: {job_type}") + + job = BackgroundJob( + user_id=user_id, + name=name, + job_type=job_type, + status=JobStatus.PENDING.value, + payload=json.dumps(payload) if payload else None, + max_retries=max_retries, + scheduled_at=scheduled_at or datetime.utcnow(), + ) + db.session.add(job) + db.session.commit() + logger.info("Enqueued job id=%s type=%s user=%s", job.id, job_type, user_id) + return job + + +def _compute_next_retry(attempts: int) -> datetime: + """Exponential back-off: 2^attempts seconds + uniform jitter [0, 2s].""" + delay = (2 ** attempts) + random.uniform(0, 2) # noqa: S311 + return datetime.utcnow() + timedelta(seconds=delay) + + +def execute_job(job: BackgroundJob) -> None: + """Run a single job through its registered handler.""" + handler = _handlers.get(job.job_type) + if handler is None: + job.status = JobStatus.DEAD.value + job.last_error = f"No handler registered for job type: {job.job_type}" + db.session.commit() + logger.error("No handler for job id=%s type=%s", job.id, job.job_type) + return + + job.status = JobStatus.RUNNING.value + job.started_at = datetime.utcnow() + job.attempts += 1 + db.session.commit() + + try: + payload = json.loads(job.payload) if job.payload else {} + result = handler(payload) + job.status = JobStatus.COMPLETED.value + job.completed_at = datetime.utcnow() + job.result = json.dumps(result) if result else None + job.last_error = None + db.session.commit() + logger.info( + "Job id=%s completed on attempt %s", job.id, job.attempts + ) + except Exception as exc: + job.last_error = str(exc) + if job.attempts >= job.max_retries: + # Move to dead letter queue + job.status = JobStatus.DEAD.value + logger.warning( + "Job id=%s moved to dead letter queue after %s attempts: %s", + job.id, + job.attempts, + exc, + ) + else: + job.status = JobStatus.FAILED.value + job.next_retry_at = _compute_next_retry(job.attempts) + logger.info( + "Job id=%s failed (attempt %s/%s), retry at %s: %s", + job.id, + job.attempts, + job.max_retries, + job.next_retry_at, + exc, + ) + db.session.commit() + + +def retry_job(job: BackgroundJob) -> BackgroundJob: + """Manually re-queue a FAILED or DEAD job for immediate retry.""" + if job.status not in (JobStatus.FAILED.value, JobStatus.DEAD.value): + raise ValueError( + f"Cannot retry job in status {job.status}; only FAILED or DEAD jobs can be retried" + ) + job.status = JobStatus.PENDING.value + job.next_retry_at = None + # Keep current attempts count so history is preserved + db.session.commit() + logger.info("Manually retried job id=%s", job.id) + return job + + +# --------------------------------------------------------------------------- +# Built-in job handlers (pluggable stubs with real structure) +# --------------------------------------------------------------------------- + +def _handle_data_sync(payload: dict[str, Any]) -> dict[str, Any]: + """Synchronise financial data from external sources.""" + logger.info("Running data sync with payload: %s", payload) + source = payload.get("source", "bank_api") + # In production this would call external APIs. Simulate work: + time.sleep(0.05) + return {"source": source, "records_synced": 0, "status": "ok"} + + +def _handle_report_generation(payload: dict[str, Any]) -> dict[str, Any]: + """Generate a financial report (expense summary, analytics, etc.).""" + logger.info("Generating report with payload: %s", payload) + report_type = payload.get("report_type", "monthly_summary") + time.sleep(0.05) + return {"report_type": report_type, "status": "generated"} + + +def _handle_email_notification(payload: dict[str, Any]) -> dict[str, Any]: + """Send an email notification (bill due, budget alert, etc.).""" + logger.info("Sending email notification with payload: %s", payload) + recipient = payload.get("recipient", "user") + subject = payload.get("subject", "FinMind Notification") + time.sleep(0.05) + return {"recipient": recipient, "subject": subject, "status": "sent"} + + +# Register built-in handlers +register_handler(JobType.DATA_SYNC.value, _handle_data_sync) +register_handler(JobType.REPORT_GENERATION.value, _handle_report_generation) +register_handler(JobType.EMAIL_NOTIFICATION.value, _handle_email_notification) + + +# --------------------------------------------------------------------------- +# Background worker thread (process pending & retryable jobs) +# --------------------------------------------------------------------------- + +class JobWorker: + """Lightweight background worker that polls for runnable jobs.""" + + def __init__(self, app, interval: float = 5.0) -> None: + self._app = app + self._interval = interval + self._stop_event = threading.Event() + self._thread: threading.Thread | None = None + + def start(self) -> None: + if self._thread and self._thread.is_alive(): + return + self._stop_event.clear() + self._thread = threading.Thread(target=self._loop, daemon=True) + self._thread.start() + logger.info("Job worker started (interval=%ss)", self._interval) + + def stop(self) -> None: + self._stop_event.set() + if self._thread: + self._thread.join(timeout=10) + logger.info("Job worker stopped") + + def _loop(self) -> None: + while not self._stop_event.is_set(): + try: + with self._app.app_context(): + self._process_pending() + except Exception: + logger.exception("Job worker iteration failed") + self._stop_event.wait(self._interval) + + def _process_pending(self) -> None: + now = datetime.utcnow() + jobs = ( + db.session.query(BackgroundJob) + .filter( + BackgroundJob.status.in_( + [JobStatus.PENDING.value, JobStatus.FAILED.value] + ), + db.or_( + BackgroundJob.scheduled_at <= now, + BackgroundJob.scheduled_at.is_(None), + ), + db.or_( + BackgroundJob.next_retry_at <= now, + BackgroundJob.next_retry_at.is_(None), + ), + ) + .order_by(BackgroundJob.created_at) + .limit(10) + .all() + ) + for job in jobs: + execute_job(job) diff --git a/packages/backend/tests/test_jobs.py b/packages/backend/tests/test_jobs.py new file mode 100644 index 00000000..45351025 --- /dev/null +++ b/packages/backend/tests/test_jobs.py @@ -0,0 +1,345 @@ +"""Tests for the background job retry and monitoring system.""" + +import json + + +def test_create_job(client, auth_header): + r = client.post( + "/jobs", + json={ + "name": "Sync bank data", + "job_type": "DATA_SYNC", + "payload": {"source": "plaid"}, + }, + headers=auth_header, + ) + assert r.status_code == 201 + data = r.get_json() + assert data["name"] == "Sync bank data" + assert data["job_type"] == "DATA_SYNC" + assert data["status"] == "PENDING" + assert data["attempts"] == 0 + assert data["max_retries"] == 5 + assert data["id"] is not None + + +def test_create_job_missing_fields(client, auth_header): + r = client.post("/jobs", json={"name": "test"}, headers=auth_header) + assert r.status_code == 400 + assert "required" in r.get_json()["error"].lower() + + +def test_create_job_invalid_type(client, auth_header): + r = client.post( + "/jobs", + json={"name": "test", "job_type": "INVALID_TYPE"}, + headers=auth_header, + ) + assert r.status_code == 400 + assert "Invalid job_type" in r.get_json()["error"] + + +def test_list_jobs(client, auth_header): + # Create a few jobs + for name in ["Job A", "Job B"]: + client.post( + "/jobs", + json={"name": name, "job_type": "DATA_SYNC"}, + headers=auth_header, + ) + + r = client.get("/jobs", headers=auth_header) + assert r.status_code == 200 + data = r.get_json() + assert data["total"] >= 2 + assert len(data["jobs"]) >= 2 + + +def test_list_jobs_filter_by_status(client, auth_header): + client.post( + "/jobs", + json={"name": "Filter test", "job_type": "DATA_SYNC"}, + headers=auth_header, + ) + + r = client.get("/jobs?status=PENDING", headers=auth_header) + assert r.status_code == 200 + data = r.get_json() + for job in data["jobs"]: + assert job["status"] == "PENDING" + + +def test_list_jobs_filter_by_job_type(client, auth_header): + client.post( + "/jobs", + json={"name": "Report job", "job_type": "REPORT_GENERATION"}, + headers=auth_header, + ) + + r = client.get("/jobs?job_type=REPORT_GENERATION", headers=auth_header) + assert r.status_code == 200 + data = r.get_json() + for job in data["jobs"]: + assert job["job_type"] == "REPORT_GENERATION" + + +def test_get_job(client, auth_header): + r = client.post( + "/jobs", + json={"name": "Get test", "job_type": "EMAIL_NOTIFICATION"}, + headers=auth_header, + ) + job_id = r.get_json()["id"] + + r = client.get(f"/jobs/{job_id}", headers=auth_header) + assert r.status_code == 200 + assert r.get_json()["id"] == job_id + + +def test_get_job_not_found(client, auth_header): + r = client.get("/jobs/99999", headers=auth_header) + assert r.status_code == 404 + + +def test_retry_failed_job(client, auth_header): + # Create and then manually fail a job via direct model manipulation + from app.extensions import db + from app.models import BackgroundJob, JobStatus + + r = client.post( + "/jobs", + json={ + "name": "Will fail", + "job_type": "DATA_SYNC", + "payload": {"source": "test"}, + }, + headers=auth_header, + ) + job_id = r.get_json()["id"] + + # Manually mark as failed + with client.application.app_context(): + job = db.session.get(BackgroundJob, job_id) + job.status = JobStatus.FAILED.value + job.attempts = 2 + job.last_error = "Simulated failure" + db.session.commit() + + # Retry it + r = client.post(f"/jobs/{job_id}/retry", headers=auth_header) + assert r.status_code == 200 + data = r.get_json() + # After retry+execute, it should be COMPLETED (handler succeeds) + assert data["status"] == "COMPLETED" + assert data["attempts"] == 3 # incremented by execute + + +def test_retry_completed_job_fails(client, auth_header): + from app.extensions import db + from app.models import BackgroundJob, JobStatus + + r = client.post( + "/jobs", + json={"name": "Completed job", "job_type": "DATA_SYNC"}, + headers=auth_header, + ) + job_id = r.get_json()["id"] + + with client.application.app_context(): + job = db.session.get(BackgroundJob, job_id) + job.status = JobStatus.COMPLETED.value + db.session.commit() + + r = client.post(f"/jobs/{job_id}/retry", headers=auth_header) + assert r.status_code == 400 + assert "Cannot retry" in r.get_json()["error"] + + +def test_retry_dead_job(client, auth_header): + from app.extensions import db + from app.models import BackgroundJob, JobStatus + + r = client.post( + "/jobs", + json={"name": "Dead job", "job_type": "REPORT_GENERATION"}, + headers=auth_header, + ) + job_id = r.get_json()["id"] + + with client.application.app_context(): + job = db.session.get(BackgroundJob, job_id) + job.status = JobStatus.DEAD.value + job.attempts = 5 + job.last_error = "Max retries exceeded" + db.session.commit() + + r = client.post(f"/jobs/{job_id}/retry", headers=auth_header) + assert r.status_code == 200 + data = r.get_json() + assert data["status"] == "COMPLETED" + + +def test_job_stats(client, auth_header): + # Create a couple of jobs + client.post( + "/jobs", + json={"name": "Stats A", "job_type": "DATA_SYNC"}, + headers=auth_header, + ) + client.post( + "/jobs", + json={"name": "Stats B", "job_type": "EMAIL_NOTIFICATION"}, + headers=auth_header, + ) + + r = client.get("/jobs/stats", headers=auth_header) + assert r.status_code == 200 + data = r.get_json() + assert "pending" in data + assert "running" in data + assert "completed" in data + assert "failed" in data + assert "dead" in data + assert "total" in data + assert data["total"] >= 2 + + +def test_dead_letter_queue(client, auth_header): + from app.extensions import db + from app.models import BackgroundJob, JobStatus + + r = client.post( + "/jobs", + json={"name": "DLQ test", "job_type": "DATA_SYNC"}, + headers=auth_header, + ) + job_id = r.get_json()["id"] + + with client.application.app_context(): + job = db.session.get(BackgroundJob, job_id) + job.status = JobStatus.DEAD.value + job.last_error = "Permanent failure" + db.session.commit() + + r = client.get("/jobs/dead-letter", headers=auth_header) + assert r.status_code == 200 + data = r.get_json() + dead_ids = [j["id"] for j in data] + assert job_id in dead_ids + + +def test_exponential_backoff(): + """Test that the backoff calculation produces increasing delays.""" + from app.services.job_queue import _compute_next_retry + from datetime import datetime + + delays = [] + for attempt in range(1, 6): + before = datetime.utcnow() + next_retry = _compute_next_retry(attempt) + delay = (next_retry - before).total_seconds() + delays.append(delay) + + # Each delay should be roughly >= 2^attempt (minus jitter noise) + for i, delay in enumerate(delays): + expected_min = 2 ** (i + 1) + # Allow some tolerance for jitter + assert delay >= expected_min - 0.5, ( + f"Attempt {i + 1}: delay {delay}s < expected min {expected_min}s" + ) + + # Delays should be strictly increasing on average + assert delays[-1] > delays[0] + + +def test_execute_job_moves_to_dead_after_max_retries(client, auth_header): + """Job should move to DEAD status when attempts >= max_retries.""" + from app.extensions import db + from app.models import BackgroundJob, JobStatus + from app.services.job_queue import execute_job, register_handler + + # Register a handler that always fails + def _always_fail(payload): + raise RuntimeError("Always fails") + + register_handler("DATA_SYNC", _always_fail) + + try: + r = client.post( + "/jobs", + json={ + "name": "Max retry test", + "job_type": "DATA_SYNC", + "max_retries": 1, + }, + headers=auth_header, + ) + job_id = r.get_json()["id"] + + with client.application.app_context(): + job = db.session.get(BackgroundJob, job_id) + execute_job(job) + # After 1 attempt with max_retries=1, should be DEAD + assert job.status == JobStatus.DEAD.value + assert job.attempts == 1 + assert "Always fails" in job.last_error + finally: + # Restore the real handler + from app.services.job_queue import _handle_data_sync + + register_handler("DATA_SYNC", _handle_data_sync) + + +def test_create_job_with_scheduled_at(client, auth_header): + r = client.post( + "/jobs", + json={ + "name": "Future job", + "job_type": "REPORT_GENERATION", + "scheduled_at": "2099-01-01T00:00:00", + }, + headers=auth_header, + ) + assert r.status_code == 201 + data = r.get_json() + assert data["scheduled_at"] is not None + assert "2099" in data["scheduled_at"] + + +def test_create_job_custom_max_retries(client, auth_header): + r = client.post( + "/jobs", + json={ + "name": "Custom retries", + "job_type": "EMAIL_NOTIFICATION", + "max_retries": 10, + }, + headers=auth_header, + ) + assert r.status_code == 201 + assert r.get_json()["max_retries"] == 10 + + +def test_pagination(client, auth_header): + for i in range(5): + client.post( + "/jobs", + json={"name": f"Page test {i}", "job_type": "DATA_SYNC"}, + headers=auth_header, + ) + + r = client.get("/jobs?page=1&per_page=2", headers=auth_header) + assert r.status_code == 200 + data = r.get_json() + assert len(data["jobs"]) == 2 + assert data["per_page"] == 2 + assert data["page"] == 1 + assert data["total"] >= 5 + + +def test_unauthenticated_access(client): + r = client.get("/jobs") + assert r.status_code == 401 + + r = client.post("/jobs", json={"name": "test", "job_type": "DATA_SYNC"}) + assert r.status_code == 401