Skip to content

SSE and Streaming Support POC #1422

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
May 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 32 additions & 16 deletions packages/mcp-client/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,26 @@ import { stdin, stdout } from "node:process";
import { join } from "node:path";
import { homedir } from "node:os";
import type { StdioServerParameters } from "@modelcontextprotocol/sdk/client/stdio.js";
import type { InferenceProvider } from "@huggingface/inference";
import { ANSI } from "./src/utils";
import type { ServerConfig } from "./src/types";
import type { InferenceProviderOrPolicy } from "@huggingface/inference";
import { ANSI, urlToServerConfig } from "./src/utils";
import { Agent } from "./src";
import { version as packageVersion } from "./package.json";
import { parseArgs } from "node:util";

const MODEL_ID = process.env.MODEL_ID ?? "Qwen/Qwen2.5-72B-Instruct";
const PROVIDER = (process.env.PROVIDER as InferenceProvider) ?? "nebius";
const PROVIDER = (process.env.PROVIDER as InferenceProviderOrPolicy) ?? "nebius";
const ENDPOINT_URL = process.env.ENDPOINT_URL ?? process.env.BASE_URL;
const MCP_EXAMPLER_LOCAL_FOLDER = process.platform === "darwin" ? join(homedir(), "Desktop") : homedir();

const SERVERS: StdioServerParameters[] = [
const SERVERS: (ServerConfig | StdioServerParameters)[] = [
{
// Filesystem "official" mcp-server with access to your Desktop
command: "npx",
args: ["-y", "@modelcontextprotocol/server-filesystem", MCP_EXAMPLER_LOCAL_FOLDER],
args: [
"-y",
"@modelcontextprotocol/server-filesystem",
process.platform === "darwin" ? join(homedir(), "Desktop") : homedir(),
],
},
{
// Playwright MCP
Expand All @@ -27,17 +32,28 @@ const SERVERS: StdioServerParameters[] = [
},
];

if (process.env.EXPERIMENTAL_HF_MCP_SERVER) {
SERVERS.push({
// Early version of a HF-MCP server
// you can download it from gist.github.com/julien-c/0500ba922e1b38f2dc30447fb81f7dc6
// and replace the local path below
command: "node",
args: ["--disable-warning=ExperimentalWarning", join(homedir(), "Desktop/hf-mcp/index.ts")],
env: {
HF_TOKEN: process.env.HF_TOKEN ?? "",
// Handle --url parameters from command line: each URL will be parsed into a ServerConfig object
const {
values: { url: urls },
} = parseArgs({
options: {
url: {
type: "string",
multiple: true,
},
});
},
});
if (urls?.length) {
while (SERVERS.length) {
SERVERS.pop();
}
for (const url of urls) {
try {
SERVERS.push(urlToServerConfig(url));
} catch (error) {
console.error(`Error adding server from URL "${url}": ${error.message}`);
}
}
}

async function main() {
Expand Down
2 changes: 1 addition & 1 deletion packages/mcp-client/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,6 @@
"dependencies": {
"@huggingface/inference": "workspace:^",
"@huggingface/tasks": "workspace:^",
"@modelcontextprotocol/sdk": "^1.9.0"
"@modelcontextprotocol/sdk": "^1.11.2"
}
}
10 changes: 5 additions & 5 deletions packages/mcp-client/pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 5 additions & 4 deletions packages/mcp-client/src/Agent.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import type { InferenceProvider } from "@huggingface/inference";
import type { InferenceProviderOrPolicy } from "@huggingface/inference";
import type { ChatCompletionInputMessageTool } from "./McpClient";
import { McpClient } from "./McpClient";
import type { ChatCompletionInputMessage, ChatCompletionStreamOutput } from "@huggingface/tasks";
import type { ChatCompletionInputTool } from "@huggingface/tasks/src/tasks/chat-completion/inference";
import type { StdioServerParameters } from "@modelcontextprotocol/sdk/client/stdio";
import { debug } from "./utils";
import type { ServerConfig } from "./types";

const DEFAULT_SYSTEM_PROMPT = `
You are an agent - please keep going until the user’s query is completely resolved, before ending your turn and yielding back to the user. Only terminate your turn when you are sure that the problem is solved, or if you need more info from the user to solve the problem.
Expand Down Expand Up @@ -44,7 +45,7 @@ const askQuestionTool: ChatCompletionInputTool = {
const exitLoopTools = [taskCompletionTool, askQuestionTool];

export class Agent extends McpClient {
private readonly servers: StdioServerParameters[];
private readonly servers: (ServerConfig | StdioServerParameters)[];
protected messages: ChatCompletionInputMessage[];

constructor({
Expand All @@ -56,7 +57,7 @@ export class Agent extends McpClient {
prompt,
}: (
| {
provider: InferenceProvider;
provider: InferenceProviderOrPolicy;
endpointUrl?: undefined;
}
| {
Expand All @@ -66,7 +67,7 @@ export class Agent extends McpClient {
) & {
model: string;
apiKey: string;
servers: StdioServerParameters[];
servers: (ServerConfig | StdioServerParameters)[];
prompt?: string;
}) {
super(provider ? { provider, endpointUrl, model, apiKey } : { provider, endpointUrl, model, apiKey });
Expand Down
41 changes: 34 additions & 7 deletions packages/mcp-client/src/McpClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ import type {
} from "@huggingface/tasks/src/tasks/chat-completion/inference";
import { version as packageVersion } from "../package.json";
import { debug } from "./utils";
import type { ServerConfig } from "./types";
import type { Transport } from "@modelcontextprotocol/sdk/shared/transport";
import { SSEClientTransport } from "@modelcontextprotocol/sdk/client/sse.js";
import { StreamableHTTPClientTransport } from "@modelcontextprotocol/sdk/client/streamableHttp.js";
import { ResultFormatter } from "./ResultFormatter.js";

type ToolName = string;

Expand Down Expand Up @@ -52,15 +57,37 @@ export class McpClient {
this.model = model;
}

async addMcpServers(servers: StdioServerParameters[]): Promise<void> {
async addMcpServers(servers: (ServerConfig | StdioServerParameters)[]): Promise<void> {
await Promise.all(servers.map((s) => this.addMcpServer(s)));
}

async addMcpServer(server: StdioServerParameters): Promise<void> {
const transport = new StdioClientTransport({
...server,
env: { ...server.env, PATH: process.env.PATH ?? "" },
});
async addMcpServer(server: ServerConfig | StdioServerParameters): Promise<void> {
let transport: Transport;
const asUrl = (url: string | URL): URL => {
return typeof url === "string" ? new URL(url) : url;
};

if (!("type" in server)) {
transport = new StdioClientTransport({
...server,
env: { ...server.env, PATH: process.env.PATH ?? "" },
});
} else {
switch (server.type) {
case "stdio":
transport = new StdioClientTransport({
...server.config,
env: { ...server.config.env, PATH: process.env.PATH ?? "" },
});
break;
case "sse":
transport = new SSEClientTransport(asUrl(server.config.url), server.config.options);
break;
case "http":
transport = new StreamableHTTPClientTransport(asUrl(server.config.url), server.config.options);
break;
}
}
const mcp = new Client({ name: "@huggingface/mcp-client", version: packageVersion });
await mcp.connect(transport);

Expand Down Expand Up @@ -170,7 +197,7 @@ export class McpClient {
const client = this.clients.get(toolName);
if (client) {
const result = await client.callTool({ name: toolName, arguments: toolArgs, signal: opts.abortSignal });
toolMessage.content = (result.content as Array<{ text: string }>)[0].text;
toolMessage.content = ResultFormatter.format(result);
} else {
toolMessage.content = `Error: No session found for tool: ${toolName}`;
}
Expand Down
86 changes: 86 additions & 0 deletions packages/mcp-client/src/ResultFormatter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import type {
TextResourceContents,
BlobResourceContents,
CompatibilityCallToolResult,
} from "@modelcontextprotocol/sdk/types";

/**
* A utility class for formatting CallToolResult contents into human-readable text.
* Processes different content types, extracts text, and summarizes binary data.
*/
export class ResultFormatter {
/**
* Formats a CallToolResult's contents into a single string.
* - Text content is included directly
* - Binary content (images, audio, blobs) is summarized
*
* @param result The CallToolResult to format
* @returns A human-readable string representation of the result contents
*/
static format(result: CompatibilityCallToolResult): string {
if (!result.content || !Array.isArray(result.content) || result.content.length === 0) {
return "[No content]";
}

const formattedParts: string[] = [];

for (const item of result.content) {
switch (item.type) {
case "text":
// Extract text content directly
formattedParts.push(item.text);
break;

case "image": {
// Summarize image content
const imageSize = this.getBase64Size(item.data);
formattedParts.push(
`[Binary Content: Image ${item.mimeType}, ${imageSize} bytes]\nThe task is complete and the content accessible to the User`
);
break;
}

case "audio": {
// Summarize audio content
const audioSize = this.getBase64Size(item.data);
formattedParts.push(
`[Binary Content: Audio ${item.mimeType}, ${audioSize} bytes]\nThe task is complete and the content accessible to the User`
);
break;
}

case "resource":
// Handle embedded resources - explicitly type the resource
if ("text" in item.resource) {
// It's a text resource with a text property
const textResource = item.resource as TextResourceContents;
formattedParts.push(textResource.text);
} else if ("blob" in item.resource) {
// It's a binary resource with a blob property
const blobResource = item.resource as BlobResourceContents;
const blobSize = this.getBase64Size(blobResource.blob);
const uri = blobResource.uri ? ` (${blobResource.uri})` : "";
const mimeType = blobResource.mimeType ? blobResource.mimeType : "unknown type";
formattedParts.push(
`[Binary Content${uri}: ${mimeType}, ${blobSize} bytes]\nThe task is complete and the content accessible to the User`
);
}
break;
}
}

return formattedParts.join("\n");
}

/**
* Calculates the approximate size in bytes of base64-encoded data
*/
private static getBase64Size(base64: string): number {
// Remove base64 header if present (e.g., data:image/png;base64,)
const cleanBase64 = base64.includes(",") ? base64.split(",")[1] : base64;

// Calculate size: Base64 encodes 3 bytes into 4 characters
const padding = cleanBase64.endsWith("==") ? 2 : cleanBase64.endsWith("=") ? 1 : 0;
return Math.floor((cleanBase64.length * 3) / 4 - padding);
}
}
30 changes: 30 additions & 0 deletions packages/mcp-client/src/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// src/types.ts
import type { StdioServerParameters } from "@modelcontextprotocol/sdk/client/stdio.js";
import type { SSEClientTransportOptions } from "@modelcontextprotocol/sdk/client/sse.js";
import type { StreamableHTTPClientTransportOptions } from "@modelcontextprotocol/sdk/client/streamableHttp.js";

/** StdioServerParameters is usable as-is */

/**
* Configuration for an SSE MCP server
*/
export interface SSEServerConfig {
url: string | URL;
options?: SSEClientTransportOptions;
}

/**
* Configuration for a StreamableHTTP MCP server
*/
export interface StreamableHTTPServerConfig {
url: string | URL;
options?: StreamableHTTPClientTransportOptions;
}

/**
* Discriminated union type for different MCP server types
*/
export type ServerConfig =
| { type: "stdio"; config: StdioServerParameters }
| { type: "sse"; config: SSEServerConfig }
| { type: "http"; config: StreamableHTTPServerConfig };
Loading