A TypeScript SDK for implementing Model Context Protocol (MCP) over MQTT, supporting both browser and Node.js environments with full type safety and automatic environment detection.
See this SDK in action! Check out our MCP AI Companion Demo - a real-world implementation using both Python and TypeScript with this SDK to create an AI agent that can:
- π€ Control browser audio/video functionality
- π Switch facial expressions and emotions
- π¬ Power interactive conversational experiences
- π Demonstrate cross-platform MCP communication
This demo showcases how to build sophisticated AI agents using MCP over MQTT for seamless browser control and interaction.
- π Universal: Works seamlessly in browser (WebSocket) and Node.js (TCP) environments
- π Type Safe: Full TypeScript support with Zod schema validation for MCP protocol
- π MQTT Transport: Uses MQTT as the transport layer for reliable MCP communication
- ποΈ Constructor-Based: Clean object-oriented API with proper TypeScript classes
- π Standards Compliant: Follows MCP specification v2024-11-05
- π§ Tool & Resource Support: Complete support for MCP tools and resources
- π Auto Discovery: Automatic server discovery over MQTT topics
- π Environment Detection: Automatic browser/Node.js detection with appropriate defaults
npm install @emqx-ai/mcp-mqtt-sdk
import { McpMqttServer } from '@emqx-ai/mcp-mqtt-sdk'
// Create server instance
const server = new McpMqttServer({
// MQTT connection
host: 'mqtt://localhost:1883',
// Server identification
serverId: 'unique-server-id-123',
serverName: 'myapp/greeting-server', // Hierarchical naming
// Server information
name: 'My MCP Server',
version: '1.0.0',
// Optional configuration
description: 'A sample MCP server providing greeting tools',
capabilities: {
tools: { listChanged: true },
resources: { listChanged: true, subscribe: false },
},
})
// Add a tool
server.tool(
'greet',
'Greet someone with a personalized message',
{
type: 'object',
properties: {
name: {
type: 'string',
description: 'Name of the person to greet'
},
language: {
type: 'string',
enum: ['en', 'es', 'fr'],
description: 'Language for greeting'
}
},
required: ['name'],
},
async ({ name, language = 'en' }) => {
const greetings = {
en: `Hello, ${name}!`,
es: `Β‘Hola, ${name}!`,
fr: `Bonjour, ${name}!`
}
return {
content: [{
type: 'text',
text: greetings[language] || greetings.en,
}],
}
}
)
// Add a resource
server.resource(
'config://app-settings',
'Application Settings',
async () => ({
contents: [{
uri: 'config://app-settings',
mimeType: 'application/json',
text: JSON.stringify({
theme: 'dark',
language: 'en',
notifications: true
}, null, 2),
}],
}),
{
description: 'Current application configuration',
mimeType: 'application/json',
}
)
// Event handlers
server.on('ready', () => {
console.log('Server is ready!')
console.log('Topics:', server.getTopics())
})
server.on('error', (error) => {
console.error('Server error:', error)
})
// Start the server
await server.start()
import { McpMqttClient } from '@emqx-ai/mcp-mqtt-sdk'
// Create client instance
const client = new McpMqttClient({
// MQTT connection
host: 'mqtt://localhost:1883',
// Client information
name: 'My MCP Client',
version: '1.0.0',
})
// Set up server discovery handler
client.on('serverDiscovered', async (server) => {
console.log('π‘ Discovered server:', server.name, '(ID:', server.serverId, ')')
try {
// Connect to the discovered server using serverId
await client.initializeServer(server.serverId)
console.log('β
Connected to:', server.name)
// List available tools using serverId
const tools = await client.listTools(server.serverId)
console.log('π§ Available tools:', tools.map(t => t.name))
// Call a tool using serverId
if (tools.some(t => t.name === 'greet')) {
const result = await client.callTool(server.serverId, 'greet', {
name: 'World',
language: 'es'
})
console.log('π Tool result:', result.content[0]?.text)
}
// List and read resources using serverId
const resources = await client.listResources(server.serverId)
console.log('π Available resources:', resources.map(r => r.uri))
if (resources.some(r => r.uri === 'config://app-settings')) {
const config = await client.readResource(server.serverId, 'config://app-settings')
console.log('π Config:', config.contents[0]?.text)
}
} catch (error) {
console.error('β Error with server:', server.name, error)
}
})
// Connect and start discovery
await client.connect()
// Graceful shutdown
process.on('SIGINT', async () => {
await client.disconnect()
process.exit(0)
})
new McpMqttServer(config: McpMqttServerConfig)
Configuration:
interface McpMqttServerConfig {
// MQTT connection settings
host: string
username?: string
password?: string
// Server identification (required)
serverId: string // Unique server ID (MQTT client ID)
serverName: string // Hierarchical server name (e.g., "app/feature/server")
// Server information (required)
name: string
version: string
// Optional configuration
description?: string // Server description
capabilities?: {
prompts?: { listChanged?: boolean }
resources?: { subscribe?: boolean; listChanged?: boolean }
tools?: { listChanged?: boolean }
}
rbac?: { // Optional role-based access control
roles: Array<{
name: string
description: string
allowed_methods: string[]
allowed_tools: string[] | "all"
allowed_resources: string[] | "all"
}>
}
}
Register a tool that clients can call.
server.tool(
'calculate',
'Perform mathematical calculations',
{
type: 'object',
properties: {
expression: {
type: 'string',
description: 'Mathematical expression to evaluate'
},
},
required: ['expression'],
},
async ({ expression }) => {
try {
// Safe evaluation - implement your own parser
const result = evaluateExpression(expression)
return {
content: [{
type: 'text',
text: `${expression} = ${result}`,
}],
}
} catch (error) {
return {
content: [{
type: 'text',
text: `Error: ${error.message}`,
}],
isError: true,
}
}
}
)
Register a resource that clients can read.
server.resource(
'file://logs/app.log',
'Application Logs',
async () => {
const logs = await readLogFile()
return {
contents: [{
uri: 'file://logs/app.log',
mimeType: 'text/plain',
text: logs,
}],
}
},
{
description: 'Current application log entries',
mimeType: 'text/plain',
}
)
Control server lifecycle.
await server.start() // Start listening for requests
await server.stop() // Gracefully shutdown
Get MQTT topics used by this server.
const { request, response } = server.getTopics()
server.on('ready', () => console.log('Server ready'))
server.on('error', (error) => console.error('Server error:', error))
server.on('closed', () => console.log('Server closed'))
new McpMqttClient(config: McpMqttClientConfig)
Configuration:
interface McpMqttClientConfig {
// MQTT connection settings
host: string
clientId?: string
username?: string
password?: string
clean?: boolean
keepalive?: number
connectTimeout?: number
reconnectPeriod?: number
// Client information (required)
name: string
version: string
// Optional configuration
capabilities?: {
roots?: { listChanged?: boolean }
sampling?: Record<string, any>
}
// Advanced MQTT settings (optional)
will?: {
topic: string
payload: string | Buffer
qos?: 0 | 1 | 2
retain?: boolean
}
properties?: Record<string, any>
}
await client.connect() // Connect to MQTT broker and start discovery
await client.disconnect() // Disconnect from broker
await client.initializeServer(serverId) // Initialize connection to a specific server
const tools = await client.listTools(serverId)
const result = await client.callTool(serverId, toolName, args)
const resources = await client.listResources(serverId)
const data = await client.readResource(serverId, uri)
const discovered = client.getDiscoveredServers()
const connected = client.getConnectedServers()
client.on('serverDiscovered', (server) => {
console.log('Found server:', server.name, 'ID:', server.serverId)
})
client.on('serverInitialized', (server) => {
console.log('Connected to:', server.name)
})
client.on('serverDisconnected', (serverId) => {
console.log('Server disconnected:', serverId)
})
client.on('connected', () => console.log('Client connected'))
client.on('disconnected', () => console.log('Client disconnected'))
client.on('error', (error) => console.error('Client error:', error))
The SDK supports comprehensive MQTT connection options:
interface MqttConnectionOptions {
host: string // Complete connection URL (e.g., ws://localhost:8083, wss://broker.emqx.io:8084)
clientId?: string // Auto-generated if not provided
username?: string
password?: string
clean?: boolean // Clean session (default: true)
keepalive?: number // Keep-alive interval in seconds
connectTimeout?: number // Connection timeout in milliseconds
reconnectPeriod?: number // Reconnection period in milliseconds
will?: { // Last Will Testament
topic: string
payload: string | Buffer
qos?: 0 | 1 | 2
retain?: boolean
};
}
The host
parameter should be a complete connection URL:
// WebSocket connections (browser)
host: 'ws://localhost:8083'
host: 'wss://broker.emqx.io:8084'
// TCP connections (Node.js)
host: 'mqtt://localhost:1883'
host: 'mqtts://broker.emqx.io:8883'
Both server and client provide getMqttClient()
method to access the underlying MQTT client for custom pub/sub operations:
const mqttClient = server.getMqttClient() // or client.getMqttClient()
if (mqttClient) {
mqttClient.subscribe('custom/topic', { qos: 1 })
mqttClient.publish('custom/topic', 'Hello World')
}
import { z } from 'zod'
// Define schema for tool parameters
const CalculateSchema = z.object({
operation: z.enum(['add', 'subtract', 'multiply', 'divide']),
a: z.number(),
b: z.number(),
})
server.tool(
'calculate',
'Perform arithmetic operations',
{
type: 'object',
properties: {
operation: { type: 'string', enum: ['add', 'subtract', 'multiply', 'divide'] },
a: { type: 'number' },
b: { type: 'number' },
},
required: ['operation', 'a', 'b'],
},
async (params) => {
// Validate with Zod
const { operation, a, b } = CalculateSchema.parse(params)
let result: number
switch (operation) {
case 'add': result = a + b; break
case 'subtract': result = a - b; break
case 'multiply': result = a * b; break
case 'divide':
if (b === 0) throw new Error('Division by zero')
result = a / b
break
}
return {
content: [{
type: 'text',
text: `${a} ${operation} ${b} = ${result}`,
}],
}
}
)
// Server-side error handling
server.tool('risky-operation', 'An operation that might fail', schema, async (params) => {
try {
const result = await performRiskyOperation(params)
return {
content: [{ type: 'text', text: `Success: ${result}` }],
}
} catch (error) {
return {
content: [{ type: 'text', text: `Operation failed: ${error.message}` }],
isError: true, // Mark as error response
}
}
})
// Client-side error handling
try {
const result = await client.callTool(serverName, 'risky-operation', params)
if (result.isError) {
console.error('Tool returned error:', result.content[0]?.text)
} else {
console.log('Success:', result.content[0]?.text)
}
} catch (error) {
console.error('Tool call failed:', error.message)
}
// Server: Streaming resource
server.resource(
'stream://live-data',
'Live Data Stream',
async () => {
const chunks = await getLiveDataChunks()
return {
contents: chunks.map((chunk, index) => ({
uri: `stream://live-data#${index}`,
mimeType: 'application/json',
text: JSON.stringify(chunk),
})),
}
}
)
// Client: Read streaming resource
const stream = await client.readResource(serverName, 'stream://live-data')
for (const content of stream.contents) {
const data = JSON.parse(content.text!)
processStreamChunk(data)
}
# Install dependencies
npm install
# Build
npm run build
# Build in watch mode
npm run build:watch
# Run tests
npm run test
# Type checking
npm run typecheck
# Linting
npm run lint
npm run lint:fix
The SDK follows the official MCP over MQTT specification topic hierarchy:
MCP over MQTT Topic Structure:
ποΈ Server Topics:
βββ $mcp-server/{server-id}/{server-name} # Control topic (initialization)
βββ $mcp-server/capability/{server-id}/{server-name} # Capability change notifications
βββ $mcp-server/presence/{server-id}/{server-name} # Server presence (online/offline)
ποΈ Client Topics:
βββ $mcp-client/capability/{mcp-client-id} # Client capability changes
βββ $mcp-client/presence/{mcp-client-id} # Client presence
ποΈ RPC Communication:
βββ $mcp-rpc/{mcp-client-id}/{server-id}/{server-name} # Bidirectional RPC communication
Example Topics:
- Control: $mcp-server/server-123/myapp/greeting-server
- Capability: $mcp-server/capability/server-123/myapp/greeting-server
- Presence: $mcp-server/presence/server-123/myapp/greeting-server
- RPC: $mcp-rpc/client-456/server-123/myapp/greeting-server
- Service Discovery: Client subscribes to
$mcp-server/presence/+/#
- Server Registration: Server publishes presence to
$mcp-server/presence/{server-id}/{server-name}
- Initialization: Client sends
initialize
request to$mcp-server/{server-id}/{server-name}
- RPC Communication: Bidirectional communication via
$mcp-rpc/{client-id}/{server-id}/{server-name}
The SDK follows JSON-RPC 2.0 error codes:
-32700
: Parse error-32600
: Invalid request-32601
: Method not found-32602
: Invalid params-32603
: Internal error-32000
to-32099
: Implementation-defined server errors
Looking for MCP over MQTT support in other languages?
- Python SDK - MCP over MQTT implementation for Python
- Erlang SDK - MCP over MQTT implementation for Erlang
- Paho MCP over MQTT - MCP over MQTT implementation in C
- ESP MCP over MQTT - MCP over MQTT for ESP32/embedded devices in C
- Fork the repository
- Create a feature branch (
git checkout -b feature/amazing-feature
) - Make your changes with tests
- Run the test suite (
npm test
) - Submit a pull request
Apache License 2.0. See LICENSE file for details.