diff --git a/README.md b/README.md index 6f8a26a..0348adc 100644 --- a/README.md +++ b/README.md @@ -535,6 +535,107 @@ class CancellableExecutor implements AgentExecutor { } ``` +## A2A Push Notifications + +For very long-running tasks (e.g., lasting minutes, hours, or even days) or when clients cannot or prefer not to maintain persistent connections (like mobile clients or serverless functions), A2A supports asynchronous updates via push notifications. This mechanism allows the A2A Server to actively notify a client-provided webhook when a significant task update occurs. + +### Server-Side Configuration + +To enable push notifications, your agent card must declare support: + +```typescript +const movieAgentCard: AgentCard = { + // ... other properties + capabilities: { + streaming: true, + pushNotifications: true, // Enable push notifications + stateTransitionHistory: true, + }, + // ... rest of agent card +}; +``` + +When creating the `DefaultRequestHandler`, you can optionally provide custom push notification components: + +```typescript +import { + DefaultRequestHandler, + InMemoryPushNotificationStore, + DefaultPushNotificationSender +} from "@a2a-js/sdk/server"; + +// Optional: Custom push notification store and sender +const pushNotificationStore = new InMemoryPushNotificationStore(); +const pushNotificationSender = new DefaultPushNotificationSender( + pushNotificationStore, + { + timeout: 5000, // 5 second timeout + tokenHeaderName: 'X-A2A-Notification-Token' // Custom header name + } +); + +const requestHandler = new DefaultRequestHandler( + movieAgentCard, + taskStore, + agentExecutor, + undefined, // eventBusManager (optional) + pushNotificationStore, // custom store + pushNotificationSender, // custom sender + undefined // extendedAgentCard (optional) +); +``` + +### Client-Side Usage + +Configure push notifications when sending messages: + +```typescript +// Configure push notification for a message +const pushConfig: PushNotificationConfig = { + id: "my-notification-config", // Optional, defaults to task ID + url: "https://my-app.com/webhook/task-updates", + token: "your-auth-token" // Optional authentication token +}; + +const sendParams: MessageSendParams = { + message: { + messageId: uuidv4(), + role: "user", + parts: [{ kind: "text", text: "Hello, agent!" }], + kind: "message", + }, + configuration: { + blocking: true, + acceptedOutputModes: ["text/plain"], + pushNotificationConfig: pushConfig // Add push notification config + }, +}; +``` + +### Webhook Endpoint Implementation + +Your webhook endpoint should expect POST requests with the task data: + +```typescript +// Example Express.js webhook endpoint +app.post('/webhook/task-updates', (req, res) => { + const task = req.body; // The complete task object + + // Verify the token if provided + const token = req.headers['x-a2a-notification-token']; + if (token !== 'your-auth-token') { + return res.status(401).json({ error: 'Unauthorized' }); + } + + console.log(`Task ${task.id} status: ${task.status.state}`); + + // Process the task update + // ... + + res.status(200).json({ received: true }); +}); +``` + ## License This project is licensed under the terms of the [Apache 2.0 License](https://raw.githubusercontent.com/google-a2a/a2a-python/refs/heads/main/LICENSE). diff --git a/src/server/index.ts b/src/server/index.ts index 682d196..4d97dd2 100644 --- a/src/server/index.ts +++ b/src/server/index.ts @@ -20,3 +20,9 @@ export { InMemoryTaskStore } from "./store.js"; export { JsonRpcTransportHandler } from "./transports/jsonrpc_transport_handler.js"; export { A2AError } from "./error.js"; + +export type { PushNotificationSender } from "./push_notification/push_notification_sender.js"; +export { DefaultPushNotificationSender } from "./push_notification/default_push_notification_sender.js"; +export type { DefaultPushNotificationSenderOptions } from "./push_notification/default_push_notification_sender.js"; +export type { PushNotificationStore } from "./push_notification/push_notification_store.js"; +export { InMemoryPushNotificationStore } from "./push_notification/push_notification_store.js"; diff --git a/src/server/push_notification/default_push_notification_sender.ts b/src/server/push_notification/default_push_notification_sender.ts new file mode 100644 index 0000000..f1da902 --- /dev/null +++ b/src/server/push_notification/default_push_notification_sender.ts @@ -0,0 +1,81 @@ +import { Task, PushNotificationConfig } from "../../types.js"; +import { PushNotificationSender } from "./push_notification_sender.js"; +import { PushNotificationStore } from "./push_notification_store.js"; + +export interface DefaultPushNotificationSenderOptions { + /** + * Timeout in milliseconds for the abort controller. Defaults to 5000ms. + */ + timeout?: number; + /** + * Custom header name for the token. Defaults to 'X-A2A-Notification-Token'. + */ + tokenHeaderName?: string; +} + +export class DefaultPushNotificationSender implements PushNotificationSender { + + private readonly pushNotificationStore: PushNotificationStore; + private readonly options: Required; + + constructor(pushNotificationStore: PushNotificationStore, options: DefaultPushNotificationSenderOptions = {}) { + this.pushNotificationStore = pushNotificationStore; + this.options = { + timeout: 5000, + tokenHeaderName: 'X-A2A-Notification-Token', + ...options + }; + } + + async send(task: Task): Promise { + const pushConfigs = await this.pushNotificationStore.load(task.id); + if (!pushConfigs || pushConfigs.length === 0) { + return; + } + + pushConfigs.forEach(pushConfig => { + this._dispatchNotification(task, pushConfig) + .catch(error => { + console.error(`Error sending push notification for task_id=${task.id} to URL: ${pushConfig.url}. Error:`, error); + }); + }); + } + + private async _dispatchNotification( + task: Task, + pushConfig: PushNotificationConfig + ): Promise { + const url = pushConfig.url; + const controller = new AbortController(); + // Abort the request if it takes longer than the configured timeout. + const timeoutId = setTimeout(() => controller.abort(), this.options.timeout); + + try { + const headers: Record = { + 'Content-Type': 'application/json' + }; + + if (pushConfig.token) { + headers[this.options.tokenHeaderName] = pushConfig.token; + } + + const response = await fetch(url, { + method: 'POST', + headers, + body: JSON.stringify(task), + signal: controller.signal + }); + + if (!response.ok) { + throw new Error(`HTTP ${response.status}: ${response.statusText}`); + } + + console.info(`Push notification sent for task_id=${task.id} to URL: ${url}`); + } catch (error) { + // Ignore errors + console.error(`Error sending push notification for task_id=${task.id} to URL: ${url}. Error:`, error); + } finally { + clearTimeout(timeoutId); + } + } +} \ No newline at end of file diff --git a/src/server/push_notification/push_notification_sender.ts b/src/server/push_notification/push_notification_sender.ts new file mode 100644 index 0000000..291a94a --- /dev/null +++ b/src/server/push_notification/push_notification_sender.ts @@ -0,0 +1,5 @@ +import { Task } from "../../types.js"; + +export interface PushNotificationSender { + send(task: Task): Promise; +} diff --git a/src/server/push_notification/push_notification_store.ts b/src/server/push_notification/push_notification_store.ts new file mode 100644 index 0000000..eb7fc43 --- /dev/null +++ b/src/server/push_notification/push_notification_store.ts @@ -0,0 +1,58 @@ +import { PushNotificationConfig } from "../../types.js"; + +export interface PushNotificationStore { + save(taskId: string, pushNotificationConfig: PushNotificationConfig): Promise; + load(taskId: string): Promise; + delete(taskId: string, configId?: string): Promise; +} + +export class InMemoryPushNotificationStore implements PushNotificationStore { + private store: Map = new Map(); + + async save(taskId: string, pushNotificationConfig: PushNotificationConfig): Promise { + const configs = this.store.get(taskId) || []; + + // Set ID if it's not already set + if (!pushNotificationConfig.id) { + pushNotificationConfig.id = taskId; + } + + // Remove existing config with the same ID if it exists + const existingIndex = configs.findIndex(config => config.id === pushNotificationConfig.id); + if (existingIndex !== -1) { + configs.splice(existingIndex, 1); + } + + // Add the new/updated config + configs.push(pushNotificationConfig); + this.store.set(taskId, configs); + } + + async load(taskId: string): Promise { + const configs = this.store.get(taskId); + return configs || []; + } + + async delete(taskId: string, configId?: string): Promise { + // If no configId is provided, use taskId as the configId (backward compatibility) + if (configId === undefined) { + configId = taskId; + } + + const configs = this.store.get(taskId); + if (!configs) { + return; + } + + const configIndex = configs.findIndex(config => config.id === configId); + if (configIndex !== -1) { + configs.splice(configIndex, 1); + } + + if (configs.length === 0) { + this.store.delete(taskId); + } else { + this.store.set(taskId, configs); + } + } +} \ No newline at end of file diff --git a/src/server/request_handler/default_request_handler.ts b/src/server/request_handler/default_request_handler.ts index 71cf187..f90a823 100644 --- a/src/server/request_handler/default_request_handler.ts +++ b/src/server/request_handler/default_request_handler.ts @@ -5,11 +5,14 @@ import { AgentExecutor } from "../agent_execution/agent_executor.js"; import { RequestContext } from "../agent_execution/request_context.js"; import { A2AError } from "../error.js"; import { ExecutionEventBusManager, DefaultExecutionEventBusManager } from "../events/execution_event_bus_manager.js"; -import { ExecutionEventBus } from "../events/execution_event_bus.js"; +import { AgentExecutionEvent, ExecutionEventBus } from "../events/execution_event_bus.js"; import { ExecutionEventQueue } from "../events/execution_event_queue.js"; import { ResultManager } from "../result_manager.js"; import { TaskStore } from "../store.js"; import { A2ARequestHandler } from "./a2a_request_handler.js"; +import { InMemoryPushNotificationStore, PushNotificationStore } from '../push_notification/push_notification_store.js'; +import { PushNotificationSender } from '../push_notification/push_notification_sender.js'; +import { DefaultPushNotificationSender } from '../push_notification/default_push_notification_sender.js'; const terminalStates: TaskState[] = ["completed", "failed", "canceled", "rejected"]; @@ -19,8 +22,8 @@ export class DefaultRequestHandler implements A2ARequestHandler { private readonly taskStore: TaskStore; private readonly agentExecutor: AgentExecutor; private readonly eventBusManager: ExecutionEventBusManager; - // Store for push notification configurations (could be part of TaskStore or separate) - private readonly pushNotificationConfigs: Map = new Map(); + private readonly pushNotificationStore ?: PushNotificationStore; + private readonly pushNotificationSender ?: PushNotificationSender; constructor( @@ -28,6 +31,8 @@ export class DefaultRequestHandler implements A2ARequestHandler { taskStore: TaskStore, agentExecutor: AgentExecutor, eventBusManager: ExecutionEventBusManager = new DefaultExecutionEventBusManager(), + pushNotificationStore?: PushNotificationStore, + pushNotificationSender?: PushNotificationSender, extendedAgentCard?: AgentCard, ) { this.agentCard = agentCard; @@ -35,6 +40,13 @@ export class DefaultRequestHandler implements A2ARequestHandler { this.agentExecutor = agentExecutor; this.eventBusManager = eventBusManager; this.extendedAgentCard = extendedAgentCard; + + // If push notifications are supported, use the provided store and sender. + // Otherwise, use the default in-memory store and sender. + if (agentCard.capabilities.pushNotifications) { + this.pushNotificationStore = pushNotificationStore || new InMemoryPushNotificationStore(); + this.pushNotificationSender = pushNotificationSender || new DefaultPushNotificationSender(this.pushNotificationStore); + } } async getAgentCard(): Promise { @@ -114,6 +126,8 @@ export class DefaultRequestHandler implements A2ARequestHandler { for await (const event of eventQueue.events()) { await resultManager.processEvent(event); + await this._sendPushNotificationIfNeeded(event); + if (options?.firstResultResolver && !firstResultSent) { if (event.kind === 'message' || event.kind === 'task') { options.firstResultResolver(event as Message | Task); @@ -156,6 +170,11 @@ export class DefaultRequestHandler implements A2ARequestHandler { // Use the (potentially updated) contextId from requestContext const finalMessageForAgent = requestContext.userMessage; + // If push notification config is provided, save it to the store. + if (params.configuration?.pushNotificationConfig && this.agentCard.capabilities.pushNotifications) { + await this.pushNotificationStore?.save(taskId, params.configuration.pushNotificationConfig); + } + const eventBus = this.eventBusManager.createOrGetByTaskId(taskId); // EventQueue should be attached to the bus, before the agent execution begins. @@ -250,6 +269,11 @@ export class DefaultRequestHandler implements A2ARequestHandler { const eventBus = this.eventBusManager.createOrGetByTaskId(taskId); const eventQueue = new ExecutionEventQueue(eventBus); + // If push notification config is provided, save it to the store. + if (params.configuration?.pushNotificationConfig && this.agentCard.capabilities.pushNotifications) { + await this.pushNotificationStore?.save(taskId, params.configuration.pushNotificationConfig); + } + // Start agent execution (non-blocking) this.agentExecutor.execute(requestContext, eventBus).catch(err => { @@ -279,6 +303,7 @@ export class DefaultRequestHandler implements A2ARequestHandler { try { for await (const event of eventQueue.events()) { await resultManager.processEvent(event); // Update store in background + await this._sendPushNotificationIfNeeded(event); yield event; // Stream the event to the client } } finally { @@ -362,14 +387,7 @@ export class DefaultRequestHandler implements A2ARequestHandler { pushNotificationConfig.id = taskId; } - const configs = this.pushNotificationConfigs.get(taskId) || []; - - // Remove existing config with the same ID to replace it - const updatedConfigs = configs.filter(c => c.id !== pushNotificationConfig.id); - - updatedConfigs.push(pushNotificationConfig); - - this.pushNotificationConfigs.set(taskId, updatedConfigs); + await this.pushNotificationStore?.save(taskId, pushNotificationConfig); return params; } @@ -385,7 +403,7 @@ export class DefaultRequestHandler implements A2ARequestHandler { throw A2AError.taskNotFound(params.id); } - const configs = this.pushNotificationConfigs.get(params.id) || []; + const configs = await this.pushNotificationStore?.load(params.id) || []; if (configs.length === 0) { throw A2AError.internalError(`Push notification config not found for task ${params.id}.`); } @@ -417,7 +435,7 @@ export class DefaultRequestHandler implements A2ARequestHandler { throw A2AError.taskNotFound(params.id); } - const configs = this.pushNotificationConfigs.get(params.id) || []; + const configs = await this.pushNotificationStore?.load(params.id) || []; return configs.map(config => ({ taskId: params.id, @@ -438,18 +456,7 @@ export class DefaultRequestHandler implements A2ARequestHandler { const { id: taskId, pushNotificationConfigId } = params; - const configs = this.pushNotificationConfigs.get(taskId); - if (!configs) { - return; - } - - const updatedConfigs = configs.filter(c => c.id !== pushNotificationConfigId); - - if (updatedConfigs.length === 0) { - this.pushNotificationConfigs.delete(taskId); - } else if (updatedConfigs.length < configs.length) { - this.pushNotificationConfigs.set(taskId, updatedConfigs); - } + await this.pushNotificationStore?.delete(taskId, pushNotificationConfigId); } async *resubscribe( @@ -512,4 +519,32 @@ export class DefaultRequestHandler implements A2ARequestHandler { eventQueue.stop(); } } + + private async _sendPushNotificationIfNeeded(event: AgentExecutionEvent): Promise { + if (!this.agentCard.capabilities.pushNotifications) { + return; + } + + let taskId: string = ""; + if (event.kind == "task") { + const task = event as Task; + taskId = task.id; + } else { + taskId = event.taskId; + } + + if (!taskId) { + console.error(`Task ID not found for event ${event.kind}.`); + return; + } + + const task = await this.taskStore.load(taskId); + if (!task) { + console.error(`Task ${taskId} not found.`); + return; + } + + // Send push notification in the background. + this.pushNotificationSender?.send(task); + } } diff --git a/test/server/default_request_handler.spec.ts b/test/server/default_request_handler.spec.ts index 03a8f4b..a2f4450 100644 --- a/test/server/default_request_handler.spec.ts +++ b/test/server/default_request_handler.spec.ts @@ -3,64 +3,21 @@ import { assert, expect } from 'chai'; import sinon, { SinonStub, SinonFakeTimers } from 'sinon'; import { AgentExecutor } from '../../src/server/agent_execution/agent_executor.js'; -import { RequestContext, ExecutionEventBus, TaskStore, InMemoryTaskStore, DefaultRequestHandler, ExecutionEventQueue, A2AError } from '../../src/server/index.js'; +import { RequestContext, ExecutionEventBus, TaskStore, InMemoryTaskStore, DefaultRequestHandler, ExecutionEventQueue, A2AError, InMemoryPushNotificationStore, PushNotificationStore, PushNotificationSender } from '../../src/server/index.js'; import { AgentCard, Artifact, DeleteTaskPushNotificationConfigParams, GetTaskPushNotificationConfigParams, ListTaskPushNotificationConfigParams, Message, MessageSendParams, PushNotificationConfig, Task, TaskIdParams, TaskPushNotificationConfig, TaskState, TaskStatusUpdateEvent } from '../../src/index.js'; import { DefaultExecutionEventBusManager, ExecutionEventBusManager } from '../../src/server/events/execution_event_bus_manager.js'; import { A2ARequestHandler } from '../../src/server/request_handler/a2a_request_handler.js'; +import { MockAgentExecutor, CancellableMockAgentExecutor, fakeTaskExecute } from './mocks/agent-executor.mock.js'; +import { MockPushNotificationSender } from './mocks/push_notification_sender.mock.js'; -/** - * A realistic mock of AgentExecutor for cancellation tests. - */ -class CancellableMockAgentExecutor implements AgentExecutor { - private cancelledTasks = new Set(); - private clock: SinonFakeTimers; - - constructor(clock: SinonFakeTimers) { - this.clock = clock; - } - - public execute = async ( - requestContext: RequestContext, - eventBus: ExecutionEventBus, - ): Promise => { - const taskId = requestContext.taskId; - const contextId = requestContext.contextId; - - eventBus.publish({ id: taskId, contextId, status: { state: "submitted" }, kind: 'task' }); - eventBus.publish({ taskId, contextId, kind: 'status-update', status: { state: "working" }, final: false }); - - // Simulate a long-running process - for (let i = 0; i < 5; i++) { - if (this.cancelledTasks.has(taskId)) { - eventBus.publish({ taskId, contextId, kind: 'status-update', status: { state: "canceled" }, final: true }); - eventBus.finished(); - return; - } - // Use fake timers to simulate work - await this.clock.tickAsync(100); - } - - eventBus.publish({ taskId, contextId, kind: 'status-update', status: { state: "completed" }, final: true }); - eventBus.finished(); - }; - - public cancelTask = async ( - taskId: string, - eventBus: ExecutionEventBus, - ): Promise => { - this.cancelledTasks.add(taskId); - // The execute loop is responsible for publishing the final state - }; - - // Stub for spying on cancelTask calls - public cancelTaskSpy = sinon.spy(this, 'cancelTask'); -} describe('DefaultRequestHandler as A2ARequestHandler', () => { let handler: A2ARequestHandler; let mockTaskStore: TaskStore; let mockAgentExecutor: AgentExecutor; let executionEventBusManager: ExecutionEventBusManager; + let mockPushNotificationStore: PushNotificationStore; + let mockPushNotificationSender: PushNotificationSender; let clock: SinonFakeTimers; const testAgentCard: AgentCard = { @@ -68,6 +25,7 @@ describe('DefaultRequestHandler as A2ARequestHandler', () => { description: 'An agent for testing purposes', url: 'http://localhost:8080', version: '1.0.0', + protocolVersion: '0.3.0', capabilities: { streaming: true, pushNotifications: true, @@ -113,19 +71,6 @@ describe('DefaultRequestHandler as A2ARequestHandler', () => { parts: [{ kind: 'text', text }], kind: 'message', }); - - /** - * A mock implementation of AgentExecutor to control agent behavior during tests. - */ - class MockAgentExecutor implements AgentExecutor { - // Stubs to control and inspect calls to execute and cancelTask - public execute: SinonStub< - [RequestContext, ExecutionEventBus], - Promise - > = sinon.stub(); - public cancelTask: SinonStub<[string, ExecutionEventBus], Promise> = - sinon.stub(); - } it('sendMessage: should return a simple message response', async () => { const params: MessageSendParams = { @@ -571,6 +516,159 @@ describe('DefaultRequestHandler as A2ARequestHandler', () => { expect(configs).to.be.an('array').with.lengthOf(0); }); + it('should send push notification when task update is received', async () => { + const mockPushNotificationStore = new InMemoryPushNotificationStore(); + const mockPushNotificationSender = new MockPushNotificationSender(); + + const handler = new DefaultRequestHandler( + testAgentCard, + mockTaskStore, + mockAgentExecutor, + executionEventBusManager, + mockPushNotificationStore, + mockPushNotificationSender, + ); + const pushNotificationConfig: PushNotificationConfig = { + url: 'https://push-1.com' + }; + const contextId = 'ctx-push-1'; + + const params: MessageSendParams = { + message: { + ...createTestMessage('msg-push-1', 'Work on task with push notification'), + contextId: contextId, + }, + configuration: { + pushNotificationConfig: pushNotificationConfig + } + }; + + let taskId: string; + (mockAgentExecutor as MockAgentExecutor).execute.callsFake(async (ctx, bus) => { + taskId = ctx.taskId; + fakeTaskExecute(ctx, bus); + }); + + await handler.sendMessage(params); + + const expectedTask: Task = { + id: taskId, + contextId, + status: { state: 'completed' }, + kind: 'task', + history: [params.message as Message] + }; + + // Verify push notifications were sent with complete task objects + assert.isTrue((mockPushNotificationSender as MockPushNotificationSender).send.calledThrice); + + // Verify first call (submitted state) + const firstCallTask = (mockPushNotificationSender as MockPushNotificationSender).send.firstCall.args[0] as Task; + const expectedFirstTask: Task = { + ...expectedTask, + status: { state: 'submitted' } + }; + assert.deepEqual(firstCallTask, expectedFirstTask); + + // // Verify second call (working state) + const secondCallTask = (mockPushNotificationSender as MockPushNotificationSender).send.secondCall.args[0] as Task; + const expectedSecondTask: Task = { + ...expectedTask, + status: { state: 'working' } + }; + assert.deepEqual(secondCallTask, expectedSecondTask); + + // // Verify third call (completed state) + const thirdCallTask = (mockPushNotificationSender as MockPushNotificationSender).send.thirdCall.args[0] as Task; + const expectedThirdTask: Task = { + ...expectedTask, + status: { state: 'completed' } + }; + assert.deepEqual(thirdCallTask, expectedThirdTask); + }); + + it('sendMessageStream: should send push notification when task update is received', async () => { + const mockPushNotificationStore = new InMemoryPushNotificationStore(); + const mockPushNotificationSender = new MockPushNotificationSender(); + + const handler = new DefaultRequestHandler( + testAgentCard, + mockTaskStore, + mockAgentExecutor, + executionEventBusManager, + mockPushNotificationStore, + mockPushNotificationSender, + ); + const pushNotificationConfig: PushNotificationConfig = { + url: 'https://push-stream-1.com' + }; + + const contextId = 'ctx-push-stream-1'; + + const params: MessageSendParams = { + message: { + ...createTestMessage('msg-push-stream-1', 'Work on task with push notification via stream'), + contextId: contextId, + }, + configuration: { + pushNotificationConfig: pushNotificationConfig + } + }; + + let taskId: string; + (mockAgentExecutor as MockAgentExecutor).execute.callsFake(async (ctx, bus) => { + taskId = ctx.taskId; + fakeTaskExecute(ctx, bus); + }); + + const eventGenerator = handler.sendMessageStream(params); + const events = []; + for await (const event of eventGenerator) { + events.push(event); + } + + // Verify stream events + assert.lengthOf(events, 3, "Stream should yield 3 events"); + assert.equal((events[0] as Task).status.state, "submitted"); + assert.equal((events[1] as TaskStatusUpdateEvent).status.state, "working"); + assert.equal((events[2] as TaskStatusUpdateEvent).status.state, "completed"); + assert.isTrue((events[2] as TaskStatusUpdateEvent).final); + + // Verify push notifications were sent with complete task objects + assert.isTrue((mockPushNotificationSender as MockPushNotificationSender).send.calledThrice); + + const expectedTask: Task = { + id: taskId, + contextId, + status: { state: 'completed' }, + kind: 'task', + history: [params.message as Message] + }; + // Verify first call (submitted state) + const firstCallTask = (mockPushNotificationSender as MockPushNotificationSender).send.firstCall.args[0] as Task; + const expectedFirstTask: Task = { + ...expectedTask, + status: { state: 'submitted' } + }; + assert.deepEqual(firstCallTask, expectedFirstTask); + + // Verify second call (working state) + const secondCallTask = (mockPushNotificationSender as MockPushNotificationSender).send.secondCall.args[0] as Task; + const expectedSecondTask: Task = { + ...expectedTask, + status: { state: 'working' } + }; + assert.deepEqual(secondCallTask, expectedSecondTask); + + // Verify third call (completed state) + const thirdCallTask = (mockPushNotificationSender as MockPushNotificationSender).send.thirdCall.args[0] as Task; + const expectedThirdTask: Task = { + ...expectedTask, + status: { state: 'completed' } + }; + assert.deepEqual(thirdCallTask, expectedThirdTask); + }); + it('Push Notification methods should throw error if task does not exist', async () => { const nonExistentTaskId = 'task-non-existent'; const config: PushNotificationConfig = { id: 'cfg-x', url: 'https://x.com' }; diff --git a/test/server/mocks/agent-executor.mock.ts b/test/server/mocks/agent-executor.mock.ts new file mode 100644 index 0000000..b5fe98a --- /dev/null +++ b/test/server/mocks/agent-executor.mock.ts @@ -0,0 +1,101 @@ +import sinon, { SinonStub, SinonFakeTimers } from 'sinon'; +import { AgentExecutor } from '../../../src/server/agent_execution/agent_executor.js'; +import { RequestContext, ExecutionEventBus } from '../../../src/server/index.js'; + +/** + * A mock implementation of AgentExecutor to control agent behavior during tests. + */ +export class MockAgentExecutor implements AgentExecutor { + // Stubs to control and inspect calls to execute and cancelTask + public execute: SinonStub< + [RequestContext, ExecutionEventBus], + Promise + > = sinon.stub(); + + public cancelTask: SinonStub<[string, ExecutionEventBus], Promise> = + sinon.stub(); +} + +/** + * Fake implementation of the task execution events. + */ +export const fakeTaskExecute = async (ctx: RequestContext, bus: ExecutionEventBus) => { + const taskId = ctx.taskId; + const contextId = ctx.contextId; + + // Publish task creation + bus.publish({ + id: taskId, + contextId, + status: { state: "submitted" }, + kind: 'task' + }); + + // Publish working status + bus.publish({ + taskId, + contextId, + kind: 'status-update', + status: { state: "working" }, + final: false + }); + + // Publish completion + bus.publish({ + taskId, + contextId, + kind: 'status-update', + status: { state: "completed" }, + final: true + }); + + bus.finished(); +} + +/** + * A realistic mock of AgentExecutor for cancellation tests. + */ +export class CancellableMockAgentExecutor implements AgentExecutor { + private cancelledTasks = new Set(); + private clock: SinonFakeTimers; + + constructor(clock: SinonFakeTimers) { + this.clock = clock; + } + + public execute = async ( + requestContext: RequestContext, + eventBus: ExecutionEventBus, + ): Promise => { + const taskId = requestContext.taskId; + const contextId = requestContext.contextId; + + eventBus.publish({ id: taskId, contextId, status: { state: "submitted" }, kind: 'task' }); + eventBus.publish({ taskId, contextId, kind: 'status-update', status: { state: "working" }, final: false }); + + // Simulate a long-running process + for (let i = 0; i < 5; i++) { + if (this.cancelledTasks.has(taskId)) { + eventBus.publish({ taskId, contextId, kind: 'status-update', status: { state: "canceled" }, final: true }); + eventBus.finished(); + return; + } + // Use fake timers to simulate work + await this.clock.tickAsync(100); + } + + eventBus.publish({ taskId, contextId, kind: 'status-update', status: { state: "completed" }, final: true }); + eventBus.finished(); + }; + + public cancelTask = async ( + taskId: string, + eventBus: ExecutionEventBus, + ): Promise => { + this.cancelledTasks.add(taskId); + // The execute loop is responsible for publishing the final state + }; + + // Stub for spying on cancelTask calls + public cancelTaskSpy = sinon.spy(this, 'cancelTask'); +} diff --git a/test/server/mocks/push_notification_sender.mock.ts b/test/server/mocks/push_notification_sender.mock.ts new file mode 100644 index 0000000..cede58e --- /dev/null +++ b/test/server/mocks/push_notification_sender.mock.ts @@ -0,0 +1,7 @@ +import sinon, { SinonStub } from 'sinon'; +import { Task } from '../../../src/index.js'; +import { PushNotificationSender } from '../../../src/server/push_notification/push_notification_sender.js'; + +export class MockPushNotificationSender implements PushNotificationSender { + public send: SinonStub<[Task], Promise> = sinon.stub(); +} \ No newline at end of file diff --git a/test/server/push_notification_integration.spec.ts b/test/server/push_notification_integration.spec.ts new file mode 100644 index 0000000..8f08283 --- /dev/null +++ b/test/server/push_notification_integration.spec.ts @@ -0,0 +1,623 @@ +import 'mocha'; +import { assert } from 'chai'; +import sinon from 'sinon'; +import express, { Request, Response } from 'express'; +import { Server } from 'http'; +import { AddressInfo } from 'net'; + +import { DefaultRequestHandler } from '../../src/server/request_handler/default_request_handler.js'; +import { InMemoryTaskStore } from '../../src/server/store.js'; +import { InMemoryPushNotificationStore } from '../../src/server/push_notification/push_notification_store.js'; +import { DefaultPushNotificationSender } from '../../src/server/push_notification/default_push_notification_sender.js'; +import { DefaultExecutionEventBusManager } from '../../src/server/events/execution_event_bus_manager.js'; +import { AgentCard, Message, MessageSendParams, PushNotificationConfig, Task } from '../../src/index.js'; +import { fakeTaskExecute, MockAgentExecutor } from './mocks/agent-executor.mock.js'; + +describe('Push Notification Integration Tests', () => { + let testServer: Server; + let testServerPort: number; + let testServerUrl: string; + let receivedNotifications: Array<{ body: any; headers: any; url: string; method: string }> = []; + + let taskStore: InMemoryTaskStore; + let handler: DefaultRequestHandler; + let mockAgentExecutor: MockAgentExecutor; + let pushNotificationStore: InMemoryPushNotificationStore; + let pushNotificationSender: DefaultPushNotificationSender; + + const testAgentCard: AgentCard = { + name: 'Test Agent', + description: 'An agent for testing push notifications', + url: 'http://localhost:8080', + version: '1.0.0', + protocolVersion: '0.3.0', + capabilities: { + streaming: true, + pushNotifications: true, + }, + defaultInputModes: ['text/plain'], + defaultOutputModes: ['text/plain'], + skills: [], + }; + + // Create test Express server to receive push notifications + const createTestServer = (): Promise<{ server: Server; port: number; url: string }> => { + return new Promise((resolve) => { + const app = express(); + app.use(express.json()); + + // Endpoint to receive push notifications + app.post('/notify', (req: Request, res: Response) => { + receivedNotifications.push({ + body: req.body, + headers: req.headers, + url: req.url, + method: req.method + }); + res.status(200).json({ received: true, timestamp: new Date().toISOString() }); + }); + + // Endpoint to simulate different response scenarios + app.post('/notify/:scenario', (req: Request, res: Response) => { + const scenario = req.params.scenario; + + receivedNotifications.push({ + body: req.body, + headers: req.headers, + url: req.url, + method: req.method + }); + + switch (scenario) { + case 'error': + res.status(500).json({ error: 'Internal Server Error' }); + break; + default: + res.status(200).json({ received: true }); + } + }); + + const server = app.listen(0, () => { + const port = (server.address() as AddressInfo).port; + const url = `http://localhost:${port}`; + resolve({ server, port, url }); + }); + }); + }; + + beforeEach(async () => { + // Reset state + receivedNotifications = []; + + // Create and start test server + const serverInfo = await createTestServer(); + testServer = serverInfo.server; + testServerPort = serverInfo.port; + testServerUrl = serverInfo.url; + + // Create fresh instances for each test + taskStore = new InMemoryTaskStore(); + mockAgentExecutor = new MockAgentExecutor(); + const executionEventBusManager = new DefaultExecutionEventBusManager(); + pushNotificationStore = new InMemoryPushNotificationStore(); + pushNotificationSender = new DefaultPushNotificationSender(pushNotificationStore); + + handler = new DefaultRequestHandler( + testAgentCard, + taskStore, + mockAgentExecutor, + executionEventBusManager, + pushNotificationStore, + pushNotificationSender, + ); + }); + + afterEach(async () => { + // Clean up test server + if (testServer) { + await testServer.close(); + } + sinon.restore(); + }); + + const createTestMessage = (text: string, taskId?: string): Message => ({ + messageId: `msg-${Date.now()}`, + role: 'user', + parts: [{ kind: 'text', text }], + kind: 'message', + ...(taskId && { taskId }) + }); + + describe('End-to-End Push Notification Flow', () => { + it('should send push notifications for task status updates', async () => { + const pushConfig: PushNotificationConfig = { + id: 'test-push-config', + url: `${testServerUrl}/notify`, + token: 'test-auth-token' + }; + + let contextId = 'test-push-context'; + const params: MessageSendParams = { + message: { + ...createTestMessage('Test task with push notifications'), + contextId: contextId, + }, + configuration: { + pushNotificationConfig: pushConfig + } + }; + + let taskId: string; + // Mock the agent executor to publish all three states for this test only + mockAgentExecutor.execute.callsFake(async (ctx, bus) => { + taskId = ctx.taskId; + fakeTaskExecute(ctx, bus); + }); + + // Send message and wait for completion + await handler.sendMessage(params); + + // Wait for async push notifications to be sent + await new Promise(resolve => setTimeout(resolve, 200)); + + // Load the task from the store + const expectedTaskResult: Task = { + id: taskId, + contextId, + history: [params.message as Message], + status: { state: 'completed' }, + kind: 'task' + }; + + // Verify push notifications were sent + assert.lengthOf(receivedNotifications, 3, 'Should send notifications for submitted, working, and completed states'); + + // Verify all three states are present (order may vary) + const states = receivedNotifications.map(n => n.body.status.state); + assert.include(states, 'submitted', 'Should include submitted state'); + assert.include(states, 'working', 'Should include working state'); + assert.include(states, 'completed', 'Should include completed state'); + + // Verify first notification has correct format + const firstNotification = receivedNotifications[0]; + assert.equal(firstNotification.method, 'POST'); + assert.equal(firstNotification.url, '/notify'); + assert.equal(firstNotification.headers['content-type'], 'application/json'); + assert.equal(firstNotification.headers['x-a2a-notification-token'], 'test-auth-token'); + assert.deepEqual(firstNotification.body, { + ...expectedTaskResult, + status: { state: 'submitted' } + }); + + const secondNotification = receivedNotifications[1]; + assert.deepEqual(secondNotification.body, { + ...expectedTaskResult, + status: { state: 'working' } + }); + + const thirdNotification = receivedNotifications[2]; + assert.deepEqual(thirdNotification.body, { + ...expectedTaskResult, + status: { state: 'completed' } + }); + }); + + it('should handle multiple push notification endpoints for the same task', async () => { + const pushConfig1: PushNotificationConfig = { + id: 'config-1', + url: `${testServerUrl}/notify`, + token: 'token-1' + }; + + const pushConfig2: PushNotificationConfig = { + id: 'config-2', + url: `${testServerUrl}/notify/second`, + token: 'token-2' + }; + + const params: MessageSendParams = { + message: { + ...createTestMessage('Test task with multiple push endpoints'), + taskId: 'test-multi-endpoints', + contextId: 'test-context' + } + }; + + // Assume the task is created by a previous message + const task: Task = { + id: 'test-multi-endpoints', + contextId: 'test-context', + status: { state: 'submitted' }, + kind: 'task' + }; + await taskStore.save(task); + + // Set multiple push notification configs for this message + await handler.setTaskPushNotificationConfig({ + taskId: task.id, + pushNotificationConfig: pushConfig1 + }); + + await handler.setTaskPushNotificationConfig({ + taskId: task.id, + pushNotificationConfig: pushConfig2 + }); + + // Mock the agent executor to publish only completed state + mockAgentExecutor.execute.callsFake(async (ctx, bus) => { + const taskId = ctx.taskId; + const contextId = ctx.contextId; + + // Publish working status + bus.publish({ + id: taskId, + contextId, + status: { state: "working" }, + kind: 'task' + }); + + // Publish completion directly + bus.publish({ + taskId, + contextId, + kind: 'status-update', + status: { state: "completed" }, + final: true + }); + + bus.finished(); + }); + + // Send a message to trigger notifications + await handler.sendMessage(params); + + // Wait for async push notifications to be sent + await new Promise(resolve => setTimeout(resolve, 300)); + + // Should now have notifications from both endpoints + const notificationsByEndpoint = receivedNotifications.reduce((acc, n) => { + acc[n.url] = acc[n.url] || 0; + acc[n.url]++; + return acc; + }, {} as Record); + + // Verify push notification was attempted (even though it failed) + assert.lengthOf(receivedNotifications, 4, 'Should have 4 notifications 2 for each endpoint'); + assert.equal(notificationsByEndpoint['/notify'], 2, 'Should have 2 notifications for primary endpoint'); + assert.equal(notificationsByEndpoint['/notify/second'], 2, 'Should have 2 notifications for second endpoint'); + }); + + it('should complete task successfully even when push notification endpoint returns an error', async () => { + const pushConfig: PushNotificationConfig = { + id: 'error-endpoint-config', + url: `${testServerUrl}/notify/error`, + token: 'test-auth-token' + }; + + let contextId = 'test-error-context'; + const params: MessageSendParams = { + message: { + ...createTestMessage('Test task with error endpoint'), + contextId: contextId, + }, + configuration: { + pushNotificationConfig: pushConfig + } + }; + + let taskId: string; + // Mock the agent executor to publish task states + mockAgentExecutor.execute.callsFake(async (ctx, bus) => { + taskId = ctx.taskId; + fakeTaskExecute(ctx, bus); + }); + + // Send message and wait for completion - this should not throw an error + const result = await handler.sendMessage(params); + const task = result as Task; + + // Wait for async push notifications to be sent + await new Promise(resolve => setTimeout(resolve, 200)); + + // Load the task from the store + const expectedTaskResult: Task = { + id: taskId, + contextId, + history: [params.message as Message], + status: { state: 'completed' }, + kind: 'task' + }; + + // Verify the task payload + assert.deepEqual(task, expectedTaskResult); + + // Verify the error endpoint was hit + const errorNotifications = receivedNotifications.filter(n => n.url === '/notify/error'); + assert.lengthOf(errorNotifications, 3, 'Should have attempted to send notifications to error endpoint'); + }); + }); + + describe('Push Notification Header Configuration Tests', () => { + it('should use default header name when tokenHeaderName is not specified', async () => { + const pushConfig: PushNotificationConfig = { + id: 'default-header-test', + url: `${testServerUrl}/notify`, + token: 'default-token' + }; + + const params: MessageSendParams = { + message: createTestMessage('Test with default header name'), + configuration: { + pushNotificationConfig: pushConfig + } + }; + + // Mock the agent executor to publish completion + mockAgentExecutor.execute.callsFake(async (ctx, bus) => { + const taskId = ctx.taskId; + const contextId = ctx.contextId; + + bus.publish({ + id: taskId, + contextId, + status: { state: "submitted" }, + kind: 'task' + }); + + bus.publish({ + taskId, + contextId, + kind: 'status-update', + status: { state: "completed" }, + final: true + }); + + bus.finished(); + }); + + await handler.sendMessage(params); + + // Wait for async push notifications to be sent + await new Promise(resolve => setTimeout(resolve, 200)); + + // Verify default header name is used + assert.lengthOf(receivedNotifications, 2, 'Should send notifications for submitted and completed states'); + + receivedNotifications.forEach(notification => { + assert.equal(notification.headers['x-a2a-notification-token'], 'default-token', + 'Should use default header name X-A2A-Notification-Token'); + assert.equal(notification.headers['content-type'], 'application/json', + 'Should include content-type header'); + }); + }); + + it('should use custom header name when tokenHeaderName is specified', async () => { + // Create a new handler with custom header name + const customPushNotificationSender = new DefaultPushNotificationSender( + pushNotificationStore, + { tokenHeaderName: 'X-Custom-Auth-Token' } + ); + + const customHandler = new DefaultRequestHandler( + testAgentCard, + taskStore, + mockAgentExecutor, + new DefaultExecutionEventBusManager(), + pushNotificationStore, + customPushNotificationSender, + ); + + const pushConfig: PushNotificationConfig = { + id: 'custom-header-test', + url: `${testServerUrl}/notify`, + token: 'custom-token' + }; + + const params: MessageSendParams = { + message: createTestMessage('Test with custom header name'), + configuration: { + pushNotificationConfig: pushConfig + } + }; + + // Mock the agent executor to publish completion + mockAgentExecutor.execute.callsFake(async (ctx, bus) => { + const taskId = ctx.taskId; + const contextId = ctx.contextId; + + bus.publish({ + id: taskId, + contextId, + status: { state: "submitted" }, + kind: 'task' + }); + + bus.publish({ + taskId, + contextId, + kind: 'status-update', + status: { state: "completed" }, + final: true + }); + + bus.finished(); + }); + + await customHandler.sendMessage(params); + + // Wait for async push notifications to be sent + await new Promise(resolve => setTimeout(resolve, 200)); + + // Verify custom header name is used + assert.lengthOf(receivedNotifications, 2, 'Should send notifications for submitted and completed states'); + + receivedNotifications.forEach(notification => { + assert.equal(notification.headers['x-custom-auth-token'], 'custom-token', + 'Should use custom header name X-Custom-Auth-Token'); + assert.isUndefined(notification.headers['x-a2a-notification-token'], + 'Should not use default header name'); + assert.equal(notification.headers['content-type'], 'application/json', + 'Should include content-type header'); + }); + }); + + it('should not send token header when token is not provided', async () => { + const pushConfig: PushNotificationConfig = { + id: 'no-token-test', + url: `${testServerUrl}/notify` + // No token provided + }; + + const params: MessageSendParams = { + message: createTestMessage('Test without token'), + configuration: { + pushNotificationConfig: pushConfig + } + }; + + // Mock the agent executor to publish completion + mockAgentExecutor.execute.callsFake(async (ctx, bus) => { + const taskId = ctx.taskId; + const contextId = ctx.contextId; + + bus.publish({ + id: taskId, + contextId, + status: { state: "submitted" }, + kind: 'task' + }); + + bus.publish({ + taskId, + contextId, + kind: 'status-update', + status: { state: "completed" }, + final: true + }); + + bus.finished(); + }); + + await handler.sendMessage(params); + + // Wait for async push notifications to be sent + await new Promise(resolve => setTimeout(resolve, 200)); + + // Verify no token header is sent + assert.lengthOf(receivedNotifications, 2, 'Should send notifications for submitted and completed states'); + + receivedNotifications.forEach(notification => { + assert.isUndefined(notification.headers['x-a2a-notification-token'], + 'Should not include token header when token is not provided'); + assert.equal(notification.headers['content-type'], 'application/json', + 'Should include content-type header'); + }); + }); + + it('should handle multiple push configs with different header configurations', async () => { + // Create a handler with custom header name + const customPushNotificationSender = new DefaultPushNotificationSender( + pushNotificationStore, + { tokenHeaderName: 'X-Custom-Token' } + ); + + const customHandler = new DefaultRequestHandler( + testAgentCard, + taskStore, + mockAgentExecutor, + new DefaultExecutionEventBusManager(), + pushNotificationStore, + customPushNotificationSender, + ); + + const pushConfig1: PushNotificationConfig = { + id: 'config-with-token', + url: `${testServerUrl}/notify`, + token: 'token-1' + }; + + const pushConfig2: PushNotificationConfig = { + id: 'config-without-token', + url: `${testServerUrl}/notify/second` + // No token + }; + + const params: MessageSendParams = { + message: { + ...createTestMessage('Test with multiple configs'), + taskId: 'multi-config-test', + contextId: 'test-context' + } + }; + + // Create task and set multiple push configs + const task: Task = { + id: 'multi-config-test', + contextId: 'test-context', + status: { state: 'submitted' }, + kind: 'task' + }; + await taskStore.save(task); + + await customHandler.setTaskPushNotificationConfig({ + taskId: task.id, + pushNotificationConfig: pushConfig1 + }); + + await customHandler.setTaskPushNotificationConfig({ + taskId: task.id, + pushNotificationConfig: pushConfig2 + }); + + // Mock the agent executor to publish completion + mockAgentExecutor.execute.callsFake(async (ctx, bus) => { + const taskId = ctx.taskId; + const contextId = ctx.contextId; + + bus.publish({ + taskId, + contextId, + kind: 'status-update', + status: { state: "completed" }, + final: true + }); + + bus.finished(); + }); + + await customHandler.sendMessage(params); + + // Wait for async push notifications to be sent + await new Promise(resolve => setTimeout(resolve, 300)); + + // Verify both endpoints received notifications with correct headers + const config1Notifications = receivedNotifications.filter(n => n.url === '/notify'); + const config2Notifications = receivedNotifications.filter(n => n.url === '/notify/second'); + + assert.lengthOf(config1Notifications, 1, 'Should send notification to first endpoint'); + assert.lengthOf(config2Notifications, 1, 'Should send notification to second endpoint'); + + // Check headers for config with token + config1Notifications.forEach(notification => { + assert.equal(notification.headers['x-custom-token'], 'token-1', + 'Should use custom header name for config with token'); + assert.isUndefined(notification.headers['x-a2a-notification-token'], + 'Should not use default header name'); + }); + + // Check headers for config without token + config2Notifications.forEach(notification => { + assert.isUndefined(notification.headers['x-custom-token'], + 'Should not include token header for config without token'); + assert.isUndefined(notification.headers['x-a2a-notification-token'], + 'Should not include default token header'); + }); + + // Both should have content-type + receivedNotifications.forEach(notification => { + assert.equal(notification.headers['content-type'], 'application/json', + 'Should include content-type header'); + }); + }); + }); +});