Skip to content

Commit dd56fdf

Browse files
ersinkocclaude
andcommitted
✨ feat: approval recovery, webhook triggers, fleet tests, agent safety
Workflow approval recovery: - Add resumeFromApproval() to WorkflowService — resumes paused workflows when approval is approved, skips already-completed nodes, handles rejection - Approve endpoint now fires workflow resumption automatically Webhook trigger integration: - Add POST /webhooks/workflow/:path endpoint for webhook-triggered workflows - HMAC-SHA256 signature validation when webhookSecret is configured - Webhook body injected as workflow input under __webhook namespace - Add getByWebhookPath() query to WorkflowsRepository Agent safety fixes: - getCommunicationBus() now throws descriptive error instead of null deref - spawnCounts Map cleaned up when conversations have no active sessions Fleet Command tests (68 tests): - Lifecycle, scheduling, task execution, error handling, budget enforcement - Concurrency guards, dependency cascades, orphan recovery, shared context - Boot/shutdown, task communication, event emissions, singleton management Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 4997e71 commit dd56fdf

8 files changed

Lines changed: 1764 additions & 2 deletions

File tree

packages/gateway/src/db/repositories/workflows.ts

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -837,6 +837,28 @@ export class WorkflowsRepository extends BaseRepository {
837837
);
838838
return parseInt(row?.count ?? '0', 10);
839839
}
840+
841+
/**
842+
* Find a workflow by its trigger node's webhookPath (global lookup, cross-user).
843+
* Used by the webhook endpoint to match incoming requests to workflows.
844+
*/
845+
async getByWebhookPath(webhookPath: string): Promise<Workflow | null> {
846+
// Query all active workflows and find one with a matching trigger node webhookPath.
847+
// Uses PostgreSQL JSONB operators for efficient filtering.
848+
const row = await this.queryOne<WorkflowRow>(
849+
`SELECT w.* FROM workflows w,
850+
LATERAL jsonb_array_elements(
851+
CASE WHEN jsonb_typeof(w.nodes) = 'array' THEN w.nodes ELSE '[]'::jsonb END
852+
) AS node
853+
WHERE w.status = 'active'
854+
AND node->>'type' = 'triggerNode'
855+
AND node->'data'->>'triggerType' = 'webhook'
856+
AND node->'data'->>'webhookPath' = $1
857+
LIMIT 1`,
858+
[webhookPath]
859+
);
860+
return row ? mapWorkflow(row) : null;
861+
}
840862
}
841863

842864
// ============================================================================

packages/gateway/src/routes/webhooks.ts

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,10 @@ import { getServiceRegistry, Services } from '@ownpilot/core';
1212
import { getLog } from '../services/log.js';
1313
import { safeKeyCompare, apiError, apiResponse, ERROR_CODES, getErrorMessage } from './helpers.js';
1414
import { TriggersRepository, type WebhookConfig } from '../db/repositories/triggers.js';
15+
import {
16+
WorkflowsRepository,
17+
type TriggerNodeData,
18+
} from '../db/repositories/workflows.js';
1519

1620
const log = getLog('Webhooks');
1721

@@ -245,3 +249,94 @@ webhookRoutes.post('/trigger/:triggerId', async (c) => {
245249

246250
return apiResponse(c, { message: 'Webhook received', triggerId });
247251
});
252+
253+
/**
254+
* POST /webhooks/workflow/:path
255+
*
256+
* Receives external webhook calls that trigger workflows directly.
257+
* Matches the incoming path against workflow trigger nodes with
258+
* triggerType='webhook' and a matching webhookPath.
259+
*
260+
* Validates HMAC-SHA256 signature via X-Webhook-Signature header
261+
* if webhookSecret is configured on the trigger node.
262+
*
263+
* The webhook payload is injected as workflow input variables
264+
* under the __webhook namespace.
265+
*/
266+
webhookRoutes.post('/workflow/:path', async (c) => {
267+
const webhookPath = c.req.param('path');
268+
269+
// Look up active workflow with matching webhookPath in its trigger node
270+
const repo = new WorkflowsRepository();
271+
const workflow = await repo.getByWebhookPath(`/hooks/${webhookPath}`);
272+
273+
if (!workflow) {
274+
return apiError(
275+
c,
276+
{ code: ERROR_CODES.NOT_FOUND, message: 'No workflow matches this webhook path' },
277+
404
278+
);
279+
}
280+
281+
// Find the trigger node to check for webhook secret
282+
const triggerNode = workflow.nodes.find((n) => n.type === 'triggerNode');
283+
const triggerData = triggerNode?.data as TriggerNodeData | undefined;
284+
285+
// HMAC-SHA256 signature validation if webhookSecret is configured
286+
if (triggerData?.webhookSecret) {
287+
const signature = c.req.header('x-webhook-signature');
288+
if (!signature) {
289+
return apiError(
290+
c,
291+
{ code: ERROR_CODES.ACCESS_DENIED, message: 'Missing X-Webhook-Signature header' },
292+
403
293+
);
294+
}
295+
296+
const rawBody = await c.req.text();
297+
const expected = createHmac('sha256', triggerData.webhookSecret).update(rawBody).digest('hex');
298+
299+
if (!safeKeyCompare(signature, expected)) {
300+
return apiError(
301+
c,
302+
{ code: ERROR_CODES.ACCESS_DENIED, message: 'Invalid webhook signature' },
303+
403
304+
);
305+
}
306+
}
307+
308+
// Parse the webhook body
309+
let body: Record<string, unknown> = {};
310+
try {
311+
body = (await c.req.json()) as Record<string, unknown>;
312+
} catch {
313+
/* empty body is fine */
314+
}
315+
316+
// Execute the workflow with webhook data as input
317+
try {
318+
const service = getServiceRegistry().get(Services.Workflow);
319+
const userId = workflow.userId ?? 'default';
320+
321+
// Fire-and-forget: execute in background, don't block the webhook response
322+
service
323+
.executeWorkflow(workflow.id, userId, undefined, {
324+
inputs: { __webhook: { path: webhookPath, body, receivedAt: new Date().toISOString() } },
325+
})
326+
.catch((err: Error) =>
327+
log.error('Webhook workflow execution failed', { workflowId: workflow.id, error: err.message })
328+
);
329+
330+
return apiResponse(c, {
331+
message: 'Webhook received, workflow triggered',
332+
workflowId: workflow.id,
333+
});
334+
} catch (error) {
335+
log.error('Webhook workflow trigger failed', { error: getErrorMessage(error) });
336+
return apiError(
337+
c,
338+
{ code: ERROR_CODES.INTERNAL_ERROR, message: 'Failed to trigger workflow' },
339+
500
340+
);
341+
}
342+
});

packages/gateway/src/routes/workflows.test.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,13 @@ vi.mock('../db/repositories/workflow-approvals.js', () => ({
5757
createWorkflowApprovalsRepository: () => mockApprovalsRepo,
5858
}));
5959

60+
const mockWorkflowService = {
61+
resumeFromApproval: vi.fn().mockResolvedValue(undefined),
62+
};
63+
6064
vi.mock('../services/workflow-service.js', () => ({
6165
topologicalSort: vi.fn(), // default: no throw = valid DAG
66+
getWorkflowService: () => mockWorkflowService,
6267
}));
6368

6469
vi.mock('../services/workflow/dag-utils.js', () => ({

packages/gateway/src/routes/workflows.ts

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import {
2121
import { ERROR_CODES } from './error-codes.js';
2222
import { createWorkflowsRepository } from '../db/repositories/workflows.js';
2323
import { createWorkflowApprovalsRepository } from '../db/repositories/workflow-approvals.js';
24-
import { topologicalSort } from '../services/workflow-service.js';
24+
import { topologicalSort, getWorkflowService } from '../services/workflow-service.js';
2525
import {
2626
detectCycle,
2727
type ValidationNode,
@@ -768,6 +768,29 @@ workflowRoutes.post('/approvals/:id/approve', async (c) => {
768768
}
769769

770770
wsGateway.broadcast('approval:decided', { approvalId: id, status: 'approved' });
771+
772+
// Resume the workflow execution from the approval point
773+
if (approval.workflowId && approval.workflowLogId) {
774+
const service = getWorkflowService();
775+
service
776+
.resumeFromApproval(
777+
approval.workflowId,
778+
userId,
779+
approval.nodeId,
780+
'approved',
781+
approval.workflowLogId
782+
)
783+
.catch((err: unknown) => {
784+
// Log but don't fail the approval response — workflow resume is best-effort
785+
const wfRepo = createWorkflowsRepository(userId);
786+
wfRepo.updateLog(approval.workflowLogId, {
787+
status: 'failed',
788+
error: `Resume after approval failed: ${getErrorMessage(err)}`,
789+
completedAt: new Date().toISOString(),
790+
}).catch(() => { /* ignore log update failure */ });
791+
});
792+
}
793+
771794
return apiResponse(c, approval);
772795
});
773796

0 commit comments

Comments
 (0)