Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
37b6cc8
feat: push notifications support added
botraunak Aug 12, 2025
329a8d0
feat: save push notification config is available in message config
botraunak Aug 12, 2025
c0c4f96
fix: added abort controller for push notification config
botraunak Aug 12, 2025
b3e4d70
fix: Requested changes
botraunak Aug 17, 2025
76f3e66
fix: Test changes
botraunak Aug 21, 2025
58df8a4
Merge branch 'main' into feat/push_notifications
botraunak Aug 21, 2025
fed8024
fix: use deepequal to check task objects
botraunak Aug 22, 2025
febbb55
refactor: create new mocks folder for resuable server mocks
botraunak Aug 22, 2025
fbbf4d3
fix: remove slow push testcase and add error case and deep check body
botraunak Aug 22, 2025
50edcaa
fix: added options for push notification sender and resolve
botraunak Aug 22, 2025
88a18f0
fix: afterEach hook error unit test, close testServer properly
botraunak Aug 22, 2025
1f1ece8
Merge branch 'main' into feat/push_notifications
botraunak Aug 27, 2025
016bc67
fix: remove promise.resolve unwanted
botraunak Aug 27, 2025
eee1d24
fix: remove unauthorized scenario for push testing
botraunak Aug 27, 2025
2995a1e
fix: use refactored fake task exection
botraunak Aug 27, 2025
ee6831b
fix: remove promise.resolves
botraunak Aug 28, 2025
ca33ada
fix: remove dependency on taskStore load
botraunak Aug 28, 2025
4795f85
feat: added readme section
botraunak Aug 28, 2025
11d01c3
Merge branch 'main' into feat/push_notifications
botraunak Aug 28, 2025
2c93805
fix: merge
botraunak Aug 28, 2025
3f00e8d
fix: reset readme to main
botraunak Aug 28, 2025
04e854e
feat: added readme section
botraunak Aug 28, 2025
0d3dd61
fix: update heading styles
botraunak Aug 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/server/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,8 @@ export { InMemoryTaskStore } from "./store.js";

export { JsonRpcTransportHandler } from "./transports/jsonrpc_transport_handler.js";
export { A2AError } from "./error.js";

export type { PushNotificationSender } from "./push_notification/push_notification_sender.js";
export { DefaultPushNotificationSender } from "./push_notification/default_push_notification_sender.js";
export type { PushNotificationStore } from "./push_notification/push_notification_store.js";
export { InMemoryPushNotificationStore } from "./push_notification/push_notification_store.js";
70 changes: 70 additions & 0 deletions src/server/push_notification/default_push_notification_sender.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import { Task, PushNotificationConfig } from "../../types.js";
import { PushNotificationSender } from "./push_notification_sender.js";
import { PushNotificationStore } from "./push_notification_store.js";

export class DefaultPushNotificationSender implements PushNotificationSender {

private readonly pushNotificationStore: PushNotificationStore;

constructor(pushNotificationStore: PushNotificationStore) {
this.pushNotificationStore = pushNotificationStore;
}

async send(task: Task): Promise<void> {
const pushConfigs = await this.pushNotificationStore.load(task.id);
if (!pushConfigs || pushConfigs.length === 0) {
return;
}

pushConfigs.forEach(pushConfig => {
this._dispatchNotification(task, pushConfig)
.then(success => {
if (!success) {
console.warn(`Push notification failed to send for task_id=${task.id} to URL: ${pushConfig.url}`);
}
})
.catch(error => {
console.error(`Error sending push notification for task_id=${task.id} to URL: ${pushConfig.url}. Error:`, error);
});
});
}

private async _dispatchNotification(
task: Task,
pushConfig: PushNotificationConfig
): Promise<boolean> {
const url = pushConfig.url;
const controller = new AbortController();
// Abort the request if it takes longer than 5 seconds.
const timeoutId = setTimeout(() => controller.abort(), 5000);

try {
const headers: Record<string, string> = {
'Content-Type': 'application/json'
};

if (pushConfig.token) {
headers['X-A2A-Notification-Token'] = pushConfig.token;
}

const response = await fetch(url, {
method: 'POST',
headers,
body: JSON.stringify(task),
signal: controller.signal
});

if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
}

console.info(`Push notification sent for task_id=${task.id} to URL: ${url}`);
return true;
} catch (error) {
console.error(`Error sending push notification for task_id=${task.id} to URL: ${url}. Error:`, error);
return false;
} finally {
clearTimeout(timeoutId);
}
}
}
5 changes: 5 additions & 0 deletions src/server/push_notification/push_notification_sender.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import { Task } from "../../types.js";

export interface PushNotificationSender {
send(task: Task): Promise<void>;
}
58 changes: 58 additions & 0 deletions src/server/push_notification/push_notification_store.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import { PushNotificationConfig } from "../../types.js";

export interface PushNotificationStore {
save(taskId: string, pushNotificationConfig: PushNotificationConfig): Promise<void>;
load(taskId: string): Promise<PushNotificationConfig[]>;
delete(taskId: string, configId?: string): Promise<void>;
}

export class InMemoryPushNotificationStore implements PushNotificationStore {
private store: Map<string, PushNotificationConfig[]> = new Map();

async save(taskId: string, pushNotificationConfig: PushNotificationConfig): Promise<void> {
const configs = this.store.get(taskId) || [];

// Set ID if it's not already set
if (!pushNotificationConfig.id) {
pushNotificationConfig.id = taskId;
}

// Remove existing config with the same ID if it exists
const existingIndex = configs.findIndex(config => config.id === pushNotificationConfig.id);
if (existingIndex !== -1) {
configs.splice(existingIndex, 1);
}

// Add the new/updated config
configs.push(pushNotificationConfig);
this.store.set(taskId, configs);
}

async load(taskId: string): Promise<PushNotificationConfig[]> {
const configs = this.store.get(taskId);
return configs || [];
}

async delete(taskId: string, configId?: string): Promise<void> {
// If no configId is provided, use taskId as the configId (backward compatibility)
if (configId === undefined) {
configId = taskId;
}

const configs = this.store.get(taskId);
if (!configs) {
return;
}

const configIndex = configs.findIndex(config => config.id === configId);
if (configIndex !== -1) {
configs.splice(configIndex, 1);
}

if (configs.length === 0) {
this.store.delete(taskId);
} else {
this.store.set(taskId, configs);
}
}
}
85 changes: 60 additions & 25 deletions src/server/request_handler/default_request_handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@ import { AgentExecutor } from "../agent_execution/agent_executor.js";
import { RequestContext } from "../agent_execution/request_context.js";
import { A2AError } from "../error.js";
import { ExecutionEventBusManager, DefaultExecutionEventBusManager } from "../events/execution_event_bus_manager.js";
import { ExecutionEventBus } from "../events/execution_event_bus.js";
import { AgentExecutionEvent, ExecutionEventBus } from "../events/execution_event_bus.js";
import { ExecutionEventQueue } from "../events/execution_event_queue.js";
import { ResultManager } from "../result_manager.js";
import { TaskStore } from "../store.js";
import { A2ARequestHandler } from "./a2a_request_handler.js";
import { InMemoryPushNotificationStore, PushNotificationStore } from '../push_notification/push_notification_store.js';
import { PushNotificationSender } from '../push_notification/push_notification_sender.js';
import { DefaultPushNotificationSender } from '../push_notification/default_push_notification_sender.js';

const terminalStates: TaskState[] = ["completed", "failed", "canceled", "rejected"];

Expand All @@ -19,22 +22,31 @@ export class DefaultRequestHandler implements A2ARequestHandler {
private readonly taskStore: TaskStore;
private readonly agentExecutor: AgentExecutor;
private readonly eventBusManager: ExecutionEventBusManager;
// Store for push notification configurations (could be part of TaskStore or separate)
private readonly pushNotificationConfigs: Map<string, PushNotificationConfig[]> = new Map();
private readonly pushNotificationStore ?: PushNotificationStore;
private readonly pushNotificationSender ?: PushNotificationSender;


constructor(
agentCard: AgentCard,
taskStore: TaskStore,
agentExecutor: AgentExecutor,
eventBusManager: ExecutionEventBusManager = new DefaultExecutionEventBusManager(),
pushNotificationStore?: PushNotificationStore,
pushNotificationSender?: PushNotificationSender,
extendedAgentCard?: AgentCard,
) {
this.agentCard = agentCard;
this.taskStore = taskStore;
this.agentExecutor = agentExecutor;
this.eventBusManager = eventBusManager;
this.extendedAgentCard = extendedAgentCard;

// If push notifications are supported, use the provided store and sender.
// Otherwise, use the default in-memory store and sender.
if (agentCard.capabilities.pushNotifications) {
this.pushNotificationStore = pushNotificationStore || new InMemoryPushNotificationStore();
this.pushNotificationSender = pushNotificationSender || new DefaultPushNotificationSender(this.pushNotificationStore);
}
}

async getAgentCard(): Promise<AgentCard> {
Expand Down Expand Up @@ -114,6 +126,8 @@ export class DefaultRequestHandler implements A2ARequestHandler {
for await (const event of eventQueue.events()) {
await resultManager.processEvent(event);

await this._sendPushNotificationIfNeeded(event);

if (options?.firstResultResolver && !firstResultSent) {
if (event.kind === 'message' || event.kind === 'task') {
options.firstResultResolver(event as Message | Task);
Expand Down Expand Up @@ -156,6 +170,11 @@ export class DefaultRequestHandler implements A2ARequestHandler {
// Use the (potentially updated) contextId from requestContext
const finalMessageForAgent = requestContext.userMessage;

// If push notification config is provided, save it to the store.
if (params.configuration?.pushNotificationConfig && this.agentCard.capabilities.pushNotifications) {
await this.pushNotificationStore?.save(taskId, params.configuration.pushNotificationConfig);
}


const eventBus = this.eventBusManager.createOrGetByTaskId(taskId);
// EventQueue should be attached to the bus, before the agent execution begins.
Expand Down Expand Up @@ -250,6 +269,11 @@ export class DefaultRequestHandler implements A2ARequestHandler {
const eventBus = this.eventBusManager.createOrGetByTaskId(taskId);
const eventQueue = new ExecutionEventQueue(eventBus);

// If push notification config is provided, save it to the store.
if (params.configuration?.pushNotificationConfig && this.agentCard.capabilities.pushNotifications) {
await this.pushNotificationStore?.save(taskId, params.configuration.pushNotificationConfig);
}


// Start agent execution (non-blocking)
this.agentExecutor.execute(requestContext, eventBus).catch(err => {
Expand Down Expand Up @@ -279,6 +303,7 @@ export class DefaultRequestHandler implements A2ARequestHandler {
try {
for await (const event of eventQueue.events()) {
await resultManager.processEvent(event); // Update store in background
await this._sendPushNotificationIfNeeded(event);
yield event; // Stream the event to the client
}
} finally {
Expand Down Expand Up @@ -362,14 +387,7 @@ export class DefaultRequestHandler implements A2ARequestHandler {
pushNotificationConfig.id = taskId;
}

const configs = this.pushNotificationConfigs.get(taskId) || [];

// Remove existing config with the same ID to replace it
const updatedConfigs = configs.filter(c => c.id !== pushNotificationConfig.id);

updatedConfigs.push(pushNotificationConfig);

this.pushNotificationConfigs.set(taskId, updatedConfigs);
await this.pushNotificationStore?.save(taskId, pushNotificationConfig);

return params;
}
Expand All @@ -385,7 +403,7 @@ export class DefaultRequestHandler implements A2ARequestHandler {
throw A2AError.taskNotFound(params.id);
}

const configs = this.pushNotificationConfigs.get(params.id) || [];
const configs = await this.pushNotificationStore?.load(params.id) || [];
if (configs.length === 0) {
throw A2AError.internalError(`Push notification config not found for task ${params.id}.`);
}
Expand Down Expand Up @@ -417,7 +435,7 @@ export class DefaultRequestHandler implements A2ARequestHandler {
throw A2AError.taskNotFound(params.id);
}

const configs = this.pushNotificationConfigs.get(params.id) || [];
const configs = await this.pushNotificationStore?.load(params.id) || [];

return configs.map(config => ({
taskId: params.id,
Expand All @@ -438,18 +456,7 @@ export class DefaultRequestHandler implements A2ARequestHandler {

const { id: taskId, pushNotificationConfigId } = params;

const configs = this.pushNotificationConfigs.get(taskId);
if (!configs) {
return;
}

const updatedConfigs = configs.filter(c => c.id !== pushNotificationConfigId);

if (updatedConfigs.length === 0) {
this.pushNotificationConfigs.delete(taskId);
} else if (updatedConfigs.length < configs.length) {
this.pushNotificationConfigs.set(taskId, updatedConfigs);
}
await this.pushNotificationStore?.delete(taskId, pushNotificationConfigId);
}

async *resubscribe(
Expand Down Expand Up @@ -512,4 +519,32 @@ export class DefaultRequestHandler implements A2ARequestHandler {
eventQueue.stop();
}
}

private async _sendPushNotificationIfNeeded(event: AgentExecutionEvent): Promise<void> {
if (!this.agentCard.capabilities.pushNotifications) {
return;
}

let taskId: string = "";
if (event.kind == "task") {
const task = event as Task;
taskId = task.id;
} else {
taskId = event.taskId;
}

if (!taskId) {
console.error(`Task ID not found for event ${event.kind}.`);
return;
}

const task = await this.taskStore.load(taskId);
if (!task) {
console.error(`Task ${taskId} not found.`);
return;
}

// Send push notification in the background.
this.pushNotificationSender?.send(task);
}
}
Loading