Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
37b6cc8
feat: push notifications support added
botraunak Aug 12, 2025
329a8d0
feat: save push notification config is available in message config
botraunak Aug 12, 2025
c0c4f96
fix: added abort controller for push notification config
botraunak Aug 12, 2025
b3e4d70
fix: Requested changes
botraunak Aug 17, 2025
76f3e66
fix: Test changes
botraunak Aug 21, 2025
58df8a4
Merge branch 'main' into feat/push_notifications
botraunak Aug 21, 2025
fed8024
fix: use deepequal to check task objects
botraunak Aug 22, 2025
febbb55
refactor: create new mocks folder for resuable server mocks
botraunak Aug 22, 2025
fbbf4d3
fix: remove slow push testcase and add error case and deep check body
botraunak Aug 22, 2025
50edcaa
fix: added options for push notification sender and resolve
botraunak Aug 22, 2025
88a18f0
fix: afterEach hook error unit test, close testServer properly
botraunak Aug 22, 2025
1f1ece8
Merge branch 'main' into feat/push_notifications
botraunak Aug 27, 2025
016bc67
fix: remove promise.resolve unwanted
botraunak Aug 27, 2025
eee1d24
fix: remove unauthorized scenario for push testing
botraunak Aug 27, 2025
2995a1e
fix: use refactored fake task exection
botraunak Aug 27, 2025
ee6831b
fix: remove promise.resolves
botraunak Aug 28, 2025
ca33ada
fix: remove dependency on taskStore load
botraunak Aug 28, 2025
4795f85
feat: added readme section
botraunak Aug 28, 2025
11d01c3
Merge branch 'main' into feat/push_notifications
botraunak Aug 28, 2025
2c93805
fix: merge
botraunak Aug 28, 2025
3f00e8d
fix: reset readme to main
botraunak Aug 28, 2025
04e854e
feat: added readme section
botraunak Aug 28, 2025
0d3dd61
fix: update heading styles
botraunak Aug 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 101 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,107 @@ class CancellableExecutor implements AgentExecutor {
}
```

## A2A Push Notifications

For very long-running tasks (e.g., lasting minutes, hours, or even days) or when clients cannot or prefer not to maintain persistent connections (like mobile clients or serverless functions), A2A supports asynchronous updates via push notifications. This mechanism allows the A2A Server to actively notify a client-provided webhook when a significant task update occurs.

### Server-Side Configuration

To enable push notifications, your agent card must declare support:

```typescript
const movieAgentCard: AgentCard = {
// ... other properties
capabilities: {
streaming: true,
pushNotifications: true, // Enable push notifications
stateTransitionHistory: true,
},
// ... rest of agent card
};
```

When creating the `DefaultRequestHandler`, you can optionally provide custom push notification components:

```typescript
import {
DefaultRequestHandler,
InMemoryPushNotificationStore,
DefaultPushNotificationSender
} from "@a2a-js/sdk/server";

// Optional: Custom push notification store and sender
const pushNotificationStore = new InMemoryPushNotificationStore();
const pushNotificationSender = new DefaultPushNotificationSender(
pushNotificationStore,
{
timeout: 5000, // 5 second timeout
tokenHeaderName: 'X-A2A-Notification-Token' // Custom header name
}
);

const requestHandler = new DefaultRequestHandler(
movieAgentCard,
taskStore,
agentExecutor,
undefined, // eventBusManager (optional)
pushNotificationStore, // custom store
pushNotificationSender, // custom sender
undefined // extendedAgentCard (optional)
);
```

### Client-Side Usage

Configure push notifications when sending messages:

```typescript
// Configure push notification for a message
const pushConfig: PushNotificationConfig = {
id: "my-notification-config", // Optional, defaults to task ID
url: "https://my-app.com/webhook/task-updates",
token: "your-auth-token" // Optional authentication token
};

const sendParams: MessageSendParams = {
message: {
messageId: uuidv4(),
role: "user",
parts: [{ kind: "text", text: "Hello, agent!" }],
kind: "message",
},
configuration: {
blocking: true,
acceptedOutputModes: ["text/plain"],
pushNotificationConfig: pushConfig // Add push notification config
},
};
```

### Webhook Endpoint Implementation

Your webhook endpoint should expect POST requests with the task data:

```typescript
// Example Express.js webhook endpoint
app.post('/webhook/task-updates', (req, res) => {
const task = req.body; // The complete task object

// Verify the token if provided
const token = req.headers['x-a2a-notification-token'];
if (token !== 'your-auth-token') {
return res.status(401).json({ error: 'Unauthorized' });
}

console.log(`Task ${task.id} status: ${task.status.state}`);

// Process the task update
// ...

res.status(200).json({ received: true });
});
```

## License

This project is licensed under the terms of the [Apache 2.0 License](https://raw.githubusercontent.com/google-a2a/a2a-python/refs/heads/main/LICENSE).
Expand Down
6 changes: 6 additions & 0 deletions src/server/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,9 @@ export { InMemoryTaskStore } from "./store.js";

export { JsonRpcTransportHandler } from "./transports/jsonrpc_transport_handler.js";
export { A2AError } from "./error.js";

export type { PushNotificationSender } from "./push_notification/push_notification_sender.js";
export { DefaultPushNotificationSender } from "./push_notification/default_push_notification_sender.js";
export type { DefaultPushNotificationSenderOptions } from "./push_notification/default_push_notification_sender.js";
export type { PushNotificationStore } from "./push_notification/push_notification_store.js";
export { InMemoryPushNotificationStore } from "./push_notification/push_notification_store.js";
81 changes: 81 additions & 0 deletions src/server/push_notification/default_push_notification_sender.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import { Task, PushNotificationConfig } from "../../types.js";
import { PushNotificationSender } from "./push_notification_sender.js";
import { PushNotificationStore } from "./push_notification_store.js";

export interface DefaultPushNotificationSenderOptions {
/**
* Timeout in milliseconds for the abort controller. Defaults to 5000ms.
*/
timeout?: number;
/**
* Custom header name for the token. Defaults to 'X-A2A-Notification-Token'.
*/
tokenHeaderName?: string;
}

export class DefaultPushNotificationSender implements PushNotificationSender {

private readonly pushNotificationStore: PushNotificationStore;
private readonly options: Required<DefaultPushNotificationSenderOptions>;

constructor(pushNotificationStore: PushNotificationStore, options: DefaultPushNotificationSenderOptions = {}) {
this.pushNotificationStore = pushNotificationStore;
this.options = {
timeout: 5000,
tokenHeaderName: 'X-A2A-Notification-Token',
...options
};
}

async send(task: Task): Promise<void> {
const pushConfigs = await this.pushNotificationStore.load(task.id);
if (!pushConfigs || pushConfigs.length === 0) {
return;
}

pushConfigs.forEach(pushConfig => {
this._dispatchNotification(task, pushConfig)
.catch(error => {
console.error(`Error sending push notification for task_id=${task.id} to URL: ${pushConfig.url}. Error:`, error);
});
});
}

private async _dispatchNotification(
task: Task,
pushConfig: PushNotificationConfig
): Promise<void> {
const url = pushConfig.url;
const controller = new AbortController();
// Abort the request if it takes longer than the configured timeout.
const timeoutId = setTimeout(() => controller.abort(), this.options.timeout);

try {
const headers: Record<string, string> = {
'Content-Type': 'application/json'
};

if (pushConfig.token) {
headers[this.options.tokenHeaderName] = pushConfig.token;
}

const response = await fetch(url, {
method: 'POST',
headers,
body: JSON.stringify(task),
signal: controller.signal
});

if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
}

console.info(`Push notification sent for task_id=${task.id} to URL: ${url}`);
} catch (error) {
// Ignore errors
console.error(`Error sending push notification for task_id=${task.id} to URL: ${url}. Error:`, error);
} finally {
clearTimeout(timeoutId);
}
}
}
5 changes: 5 additions & 0 deletions src/server/push_notification/push_notification_sender.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import { Task } from "../../types.js";

export interface PushNotificationSender {
send(task: Task): Promise<void>;
}
58 changes: 58 additions & 0 deletions src/server/push_notification/push_notification_store.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import { PushNotificationConfig } from "../../types.js";

export interface PushNotificationStore {
save(taskId: string, pushNotificationConfig: PushNotificationConfig): Promise<void>;
load(taskId: string): Promise<PushNotificationConfig[]>;
delete(taskId: string, configId?: string): Promise<void>;
}

export class InMemoryPushNotificationStore implements PushNotificationStore {
private store: Map<string, PushNotificationConfig[]> = new Map();

async save(taskId: string, pushNotificationConfig: PushNotificationConfig): Promise<void> {
const configs = this.store.get(taskId) || [];

// Set ID if it's not already set
if (!pushNotificationConfig.id) {
pushNotificationConfig.id = taskId;
}

// Remove existing config with the same ID if it exists
const existingIndex = configs.findIndex(config => config.id === pushNotificationConfig.id);
if (existingIndex !== -1) {
configs.splice(existingIndex, 1);
}

// Add the new/updated config
configs.push(pushNotificationConfig);
this.store.set(taskId, configs);
}

async load(taskId: string): Promise<PushNotificationConfig[]> {
const configs = this.store.get(taskId);
return configs || [];
}

async delete(taskId: string, configId?: string): Promise<void> {
// If no configId is provided, use taskId as the configId (backward compatibility)
if (configId === undefined) {
configId = taskId;
}

const configs = this.store.get(taskId);
if (!configs) {
return;
}

const configIndex = configs.findIndex(config => config.id === configId);
if (configIndex !== -1) {
configs.splice(configIndex, 1);
}

if (configs.length === 0) {
this.store.delete(taskId);
} else {
this.store.set(taskId, configs);
}
}
}
Loading