diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..0ebd33d --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "slim"] + path = slim + url = https://github.com/agntcy/slim.git diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..c7c7495 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,22 @@ +version: '3.8' + +services: + slim-control-plane: + build: + context: https://github.com/agntcy/slim.git#main + dockerfile: control-plane/Dockerfile + ports: + - "8080:8080" + restart: unless-stopped + + slim-data-plane: + build: + context: https://github.com/agntcy/slim.git#main + dockerfile: data-plane/Dockerfile + ports: + - "50051:50051" + environment: + - CONTROL_PLANE_URL=http://slim-control-plane:8080 + restart: unless-stopped + depends_on: + - slim-control-plane \ No newline at end of file diff --git a/examples/nanda_agent.py b/examples/nanda_agent.py index 847799f..dfa30c0 100644 --- a/examples/nanda_agent.py +++ b/examples/nanda_agent.py @@ -1,72 +1,57 @@ +# examples/nanda_agent.py #!/usr/bin/env python3 """ -LLM-Powered Modular NANDA Agent +LLM-Powered NANDA Agent -This agent uses Anthropic Claude for intelligent responses based on configurable personality and expertise. -Simply update the AGENT_CONFIG section to create different agent personalities. +Configurable agent using Anthropic Claude for intelligent responses. +Customize via environment variables or the config dict. """ import os import sys -import time +import asyncio import uuid from datetime import datetime -from typing import Dict, List, Any -# Add the parent directory to the path to allow importing streamlined_adapter sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) from nanda_core.core.adapter import NANDA -# Try to import Anthropic - will fail gracefully if not available +from dotenv import load_dotenv +load_dotenv() + try: from anthropic import Anthropic ANTHROPIC_AVAILABLE = True except ImportError: ANTHROPIC_AVAILABLE = False - print("⚠️ Warning: anthropic library not available. Install with: pip install anthropic") + print("⚠️ anthropic library not available. Install with: pip install anthropic") # ============================================================================= -# AGENT CONFIGURATION - Customize this section for different agents +# CONFIGURATION # ============================================================================= -# Get configuration from environment variables or use defaults -def get_agent_config(): - """Load agent configuration from environment variables or use defaults""" +def get_config(): + """Load configuration from environment variables""" + base_id = os.getenv("AGENT_ID", "helpful-agent") + agent_id = f"{base_id}-{uuid.uuid4().hex[:6]}" if '-' not in base_id else base_id - # Generate agent_id with hex suffix for uniqueness - base_agent_id = os.getenv("AGENT_ID", "helpful-ubuntu-agent") - if not base_agent_id.endswith('-') and '-' not in base_agent_id.split('-')[-1]: - # Add 6-character hex suffix if not already present - hex_suffix = uuid.uuid4().hex[:6] - agent_id = f"{base_agent_id}-{hex_suffix}" - else: - agent_id = base_agent_id + capabilities = os.getenv("AGENT_CAPABILITIES", "general assistance,conversation") + capabilities_list = [cap.strip() for cap in capabilities.split(",")] - print(f"Generated agent_id: {agent_id}") - agent_name = os.getenv("AGENT_NAME", "Ubuntu Helper") + agent_name = os.getenv("AGENT_NAME", "Helper Agent") domain = os.getenv("AGENT_DOMAIN", "general assistance") - specialization = os.getenv("AGENT_SPECIALIZATION", "helpful and friendly AI assistant") - description = os.getenv("AGENT_DESCRIPTION", "I am a helpful AI assistant specializing in general tasks and Ubuntu system administration.") - capabilities = os.getenv("AGENT_CAPABILITIES", "general assistance,Ubuntu system administration,Python development,cloud deployment,agent-to-agent communication") - registry_url = os.getenv("REGISTRY_URL", None) - public_url = os.getenv("PUBLIC_URL", None) - - # Parse capabilities into a list - expertise_list = [cap.strip() for cap in capabilities.split(",")] + specialization = os.getenv("AGENT_SPECIALIZATION", "helpful AI assistant") + description = os.getenv("AGENT_DESCRIPTION", "I'm a helpful AI assistant.") - # Create dynamic system prompt based on configuration - system_prompt = f"""You are {agent_name}, a {specialization} working in the domain of {domain}. + system_prompt = f"""You are {agent_name}, a {specialization} in {domain}. {description} -You are part of the NANDA (Network of Autonomous Distributed Agents) system. You can communicate with other agents and help users with various tasks. +You are part of the NANDA agent network and can communicate with other agents using @agent-id syntax. -Your capabilities include: -{chr(10).join([f"- {cap}" for cap in expertise_list])} +Your capabilities: {', '.join(capabilities_list)} -Always be helpful, accurate, and concise in your responses. If you're unsure about something, say so honestly. You can also help with basic calculations, provide time information, and engage in casual conversation. - -When someone asks about yourself, mention that you're part of the NANDA agent network and can communicate with other agents using the @agent_name syntax.""" +Be helpful, accurate, and concise.""" return { "agent_id": agent_id, @@ -74,200 +59,105 @@ def get_agent_config(): "domain": domain, "specialization": specialization, "description": description, - "expertise": expertise_list, - "registry_url": registry_url, - "public_url": public_url, - "system_prompt": system_prompt, + "capabilities": capabilities_list, + "port": int(os.getenv("PORT", "6000")), + "registry_url": os.getenv("REGISTRY_URL"), + "public_url": os.getenv("PUBLIC_URL") or f"http://localhost:{os.getenv('PORT', '6000')}", "anthropic_api_key": os.getenv("ANTHROPIC_API_KEY"), - "model": "claude-3-haiku-20240307" # Fast and cost-effective model + "model": os.getenv("ANTHROPIC_MODEL", "claude-3-haiku-20240307"), + "system_prompt": system_prompt, + "protocols": { + "a2a": {"enabled": True}, + "slim": { + "enabled": os.getenv("SLIM_ENABLED", "false").lower() == "true", + "node_url": os.getenv("SLIM_NODE_URL", "grpc://localhost:50051") + } + } } -# Load configuration -AGENT_CONFIG = get_agent_config() - -# Port configuration - use environment variable or default to 6000 -PORT = int(os.getenv("PORT", "6000")) - # ============================================================================= -# LLM-POWERED AGENT LOGIC - Uses Anthropic Claude for intelligent responses +# AGENT LOGIC # ============================================================================= -def create_llm_agent_logic(config: Dict[str, Any]): - """ - Creates an LLM-powered agent logic function based on the provided configuration. - Uses Anthropic Claude for intelligent, context-aware responses. - """ +def create_agent_logic(config): + """Create agent logic function with LLM or fallback""" # Initialize Anthropic client - anthropic_client = None + client = None if ANTHROPIC_AVAILABLE and config.get("anthropic_api_key"): try: - anthropic_client = Anthropic(api_key=config["anthropic_api_key"]) - print(f"✅ Anthropic Claude initialized for {config['agent_name']}") + client = Anthropic(api_key=config["anthropic_api_key"]) + print(f"✅ Claude initialized ({config['model']})") except Exception as e: - print(f"❌ Failed to initialize Anthropic: {e}") - anthropic_client = None - - # Prepare system prompt (already formatted in get_agent_config) - system_prompt = config["system_prompt"] + print(f"❌ Claude initialization failed: {e}") - def llm_agent_logic(message: str, conversation_id: str) -> str: - """LLM-powered agent logic with fallback to basic responses""" + def agent_logic(message: str, conversation_id: str) -> str: + """Process message with Claude or fallback""" - # If LLM is available, use it for intelligent responses - if anthropic_client: + if client: try: - # Add current time context if time-related query - context_info = "" - if any(time_word in message.lower() for time_word in ['time', 'date', 'when']): - context_info = f"\n\nCurrent time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" + # Add time context if relevant + context = "" + if any(word in message.lower() for word in ['time', 'date', 'when']): + context = f"\n\nCurrent time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" - response = anthropic_client.messages.create( + response = client.messages.create( model=config["model"], max_tokens=500, - system=system_prompt + context_info, - messages=[ - { - "role": "user", - "content": message - } - ] + system=config["system_prompt"] + context, + messages=[{"role": "user", "content": message}] ) return response.content[0].text.strip() except Exception as e: - print(f"❌ LLM Error: {e}") - # Fall back to basic response - return f"Sorry, I'm having trouble processing that right now. Error: {str(e)}" + return f"Error: {str(e)}" - # Fallback to basic responses if LLM not available + # Fallback responses + msg = message.lower().strip() + if any(greeting in msg for greeting in ['hello', 'hi', 'hey']): + return f"Hello! I'm {config['agent_name']}. Set ANTHROPIC_API_KEY for full capabilities." + elif 'time' in msg: + return f"Current time: {datetime.now().strftime('%H:%M:%S')}" else: - return _basic_fallback_response(message, config) + return f"I'm {config['agent_name']}. Set ANTHROPIC_API_KEY to enable LLM responses." - return llm_agent_logic - -def _basic_fallback_response(message: str, config: Dict[str, Any]) -> str: - """Basic fallback responses when LLM is not available""" - msg = message.lower().strip() - - # Handle greetings - if any(greeting in msg for greeting in ['hello', 'hi', 'hey']): - return f"Hello! I'm {config['agent_name']}, but I need an Anthropic API key to provide intelligent responses. Please set ANTHROPIC_API_KEY environment variable." - - # Handle time requests - elif 'time' in msg: - current_time = datetime.now().strftime("%H:%M:%S") - return f"The current time is {current_time}." - - # Handle basic calculations - elif any(op in message for op in ['+', '-', '*', '/', '=']): - try: - calculation = message.replace('x', '*').replace('X', '*').replace('=', '').strip() - result = eval(calculation) - return f"Calculation result: {calculation} = {result}" - except: - return "Sorry, I couldn't calculate that. Please check your expression." - - # Default fallback - else: - return f"I'm {config['agent_name']}, but I need an Anthropic API key to provide intelligent responses. Please set ANTHROPIC_API_KEY environment variable and restart me." + return agent_logic # ============================================================================= -# MAIN EXECUTION +# MAIN # ============================================================================= -def main(): - """Main function to start the LLM-powered modular agent""" - print(f"🤖 Starting {AGENT_CONFIG['agent_name']}") - print(f"📝 Specialization: {AGENT_CONFIG['specialization']}") - print(f"🎯 Domain: {AGENT_CONFIG['domain']}") - print(f"🛠️ Capabilities: {', '.join(AGENT_CONFIG['expertise'])}") - if AGENT_CONFIG['registry_url']: - print(f"🌐 Registry: {AGENT_CONFIG['registry_url']}") +async def main(): + config = get_config() - # Check for Anthropic API key - if not AGENT_CONFIG.get("anthropic_api_key"): - print("⚠️ Warning: ANTHROPIC_API_KEY not found in environment variables") - print(" The agent will use basic fallback responses only") - print(" Set ANTHROPIC_API_KEY to enable LLM capabilities") - else: - print(f"🧠 LLM Model: {AGENT_CONFIG['model']}") + print(f"🤖 {config['agent_name']} ({config['agent_id']})") + print(f"🎯 {config['domain']} - {config['specialization']}") + print(f"🔗 {config['public_url']}/a2a") + if config['registry_url']: + print(f"🌐 Registry: {config['registry_url']}") - # Create the LLM-powered agent logic based on configuration - agent_logic = create_llm_agent_logic(AGENT_CONFIG) + agent_logic = create_agent_logic(config) - # Create and start the NANDA agent nanda = NANDA( - agent_id=AGENT_CONFIG["agent_id"], + agent_id=config["agent_id"], agent_logic=agent_logic, - port=PORT, - registry_url=AGENT_CONFIG["registry_url"], - public_url=AGENT_CONFIG["public_url"], - enable_telemetry=True + agent_name=config["agent_name"], + domain=config["domain"], + specialization=config["specialization"], + description=config["description"], + capabilities=config["capabilities"], + port=config["port"], + registry_url=config["registry_url"], + public_url=config["public_url"], + enable_telemetry=True, + protocols=config["protocols"] ) - print(f"🚀 Agent URL: http://localhost:{PORT}/a2a") - print("💡 Try these messages:") - print(" - 'Hello there'") - print(" - 'Tell me about yourself'") - print(" - 'What time is it?'") - print(" - 'How can you help with Ubuntu?'") - print(" - 'Explain Python virtual environments'") - print(" - '5 + 3'") - print("\n🛑 Press Ctrl+C to stop") - - # Start the agent - nanda.start() - -def create_custom_agent(agent_name, specialization, domain, expertise_list, port=6000, anthropic_api_key=None, registry_url=None): - """ - Helper function to quickly create a custom LLM-powered agent with different config - - Example usage: - create_custom_agent( - agent_name="Data Scientist", - specialization="analytical and precise AI assistant", - domain="data science", - expertise_list=["data analysis", "statistics", "machine learning", "Python"], - port=6001, - anthropic_api_key="sk-ant-xxxxx" - ) - """ - custom_config = AGENT_CONFIG.copy() - custom_config.update({ - "agent_id": agent_name.lower().replace(" ", "-"), - "agent_name": agent_name, - "specialization": specialization, - "domain": domain, - "expertise": expertise_list, - "registry_url": registry_url, - "anthropic_api_key": anthropic_api_key or os.getenv("ANTHROPIC_API_KEY"), - "system_prompt": f"""You are {agent_name}, a {specialization} working in the domain of {domain}. - -You are part of the NANDA (Network of Autonomous Distributed Agents) system. You can communicate with other agents and help users with various tasks. - -Your capabilities include: -{chr(10).join([f"- {expertise}" for expertise in expertise_list])} - -Always be helpful, accurate, and concise in your responses. If you're unsure about something, say so honestly. - -When someone asks about yourself, mention that you're part of the NANDA agent network and can communicate with other agents using the @agent_name syntax.""" - }) - - agent_logic = create_llm_agent_logic(custom_config) - - nanda = NANDA( - agent_id=custom_config["agent_id"], - agent_logic=agent_logic, - port=port, - registry_url=custom_config["registry_url"], - enable_telemetry=True - ) + print("\n💬 Try: 'Hello', 'What time is it?', '@other-agent Hello!'") + print("🛑 Press Ctrl+C to stop\n") - print(f"🤖 Starting custom LLM agent: {agent_name}") - print(f"🚀 Agent URL: http://localhost:{port}/a2a") - nanda.start() + await nanda.start() if __name__ == "__main__": - main() + asyncio.run(main()) \ No newline at end of file diff --git a/nanda_core/__init__.py b/nanda_core/__init__.py index b6561ca..b287102 100644 --- a/nanda_core/__init__.py +++ b/nanda_core/__init__.py @@ -4,9 +4,9 @@ """ from .core.adapter import NANDA -from .core.agent_bridge import SimpleAgentBridge +from .core.agent_bridge import AgentBridge __all__ = [ "NANDA", - "SimpleAgentBridge" + "AgentBridge" ] \ No newline at end of file diff --git a/nanda_core/core/__init__.py b/nanda_core/core/__init__.py index f0d8570..cf59933 100644 --- a/nanda_core/core/__init__.py +++ b/nanda_core/core/__init__.py @@ -4,9 +4,9 @@ """ from .adapter import NANDA -from .agent_bridge import SimpleAgentBridge +from .agent_bridge import AgentBridge __all__ = [ "NANDA", - "SimpleAgentBridge" + "AgentBridge" ] \ No newline at end of file diff --git a/nanda_core/core/adapter.py b/nanda_core/core/adapter.py index b98892c..151f5f2 100644 --- a/nanda_core/core/adapter.py +++ b/nanda_core/core/adapter.py @@ -1,4 +1,3 @@ -#!/usr/bin/env python3 """ Simple NANDA Adapter - Clean Agent-to-Agent Communication @@ -7,10 +6,13 @@ """ import os +import asyncio import requests -from typing import Optional, Callable -from python_a2a import run_server -from .agent_bridge import SimpleAgentBridge +from typing import Optional, Callable, Dict +from .agent_bridge import AgentBridge +from .registry_client import RegistryClient +from ..protocols.router import ProtocolRouter +from ..protocols.a2a.protocol import A2AProtocol class NANDA: @@ -19,30 +21,48 @@ class NANDA: def __init__(self, agent_id: str, agent_logic: Callable[[str, str], str], + agent_name: Optional[str] = None, + domain: Optional[str] = None, + specialization: Optional[str] = None, + description: Optional[str] = None, + capabilities: Optional[list] = None, port: int = 6000, registry_url: Optional[str] = None, public_url: Optional[str] = None, host: str = "0.0.0.0", - enable_telemetry: bool = True): + enable_telemetry: bool = True, + protocols: Optional[dict] = None): """ Create a simple NANDA agent Args: agent_id: Unique agent identifier agent_logic: Function that takes (message: str, conversation_id: str) -> response: str + agent_name: Display name (defaults to agent_id) + domain: Agent's domain of expertise + specialization: Agent's specialization + description: Agent description + capabilities: List of capabilities (default: ["text"]) port: Port to run on registry_url: Optional registry URL for agent discovery public_url: Public URL for agent registration (e.g., https://yourdomain.com:6000) host: Host to bind to - enable_telemetry: Enable telemetry logging (optional) + enable_telemetry: Enable telemetry logging + protocols: Dict of protocol configs (e.g., {"a2a": {"enabled": True}}) """ self.agent_id = agent_id self.agent_logic = agent_logic + self.agent_name = agent_name or agent_id + self.domain = domain + self.specialization = specialization + self.description = description or f"AI agent {agent_id}" + self.capabilities = capabilities or ["text"] self.port = port self.registry_url = registry_url - self.public_url = public_url + self.public_url = public_url or f"http://localhost:{port}" self.host = host self.enable_telemetry = enable_telemetry + self.protocols_config = protocols or {"a2a": {"enabled": True}} # Initialize telemetry if enabled self.telemetry = None @@ -54,46 +74,105 @@ def __init__(self, except ImportError: print(f"⚠️ Telemetry requested but module not available") - # Create the bridge with optional features - self.bridge = SimpleAgentBridge( + # Initialize protocol router + self.router = ProtocolRouter() + + # Initialize and register protocols + self._initialize_protocols() + + # Initialize registry client + self.registry = RegistryClient(registry_url) if registry_url else RegistryClient(None) + + # Create the bridge + self.bridge = AgentBridge( + protocol_router=self.router, + registry_client=self.registry, agent_id=agent_id, agent_logic=agent_logic, - registry_url=registry_url, telemetry=self.telemetry ) print(f"🤖 NANDA Agent '{agent_id}' created") if registry_url: print(f"🌐 Registry: {registry_url}") - if public_url: - print(f"🔗 Public URL: {public_url}") + print(f"🔗 Public URL: {self.public_url}") + print(f"🔌 Protocols: {self.router.get_all_protocols()}") + + def _initialize_protocols(self): + """Initialize and register protocol adapters based on config""" + + # Initialize A2A protocol if enabled + if self.protocols_config.get("a2a", {}).get("enabled", True): + a2a_protocol = A2AProtocol( + agent_id=self.agent_id, + agent_name=self.agent_name, + public_url=self.public_url, + domain=self.domain, + specialization=self.specialization, + description=self.description, + capabilities=self.capabilities + ) + self.router.register(a2a_protocol) + + slim_config = self.protocols_config.get("slim", {}) + if slim_config.get("enabled", False): + from ..protocols.slim.adapter import SLIMProtocol + + slim_node_url = slim_config.get("node_url", "grpc://localhost:50051") + + slim_protocol = SLIMProtocol( + agent_id=self.agent_id, + slim_node_url=slim_node_url, + agent_name=self.agent_name + ) + self.router.register(slim_protocol) + print(f"🔌 SLIM protocol enabled (node: {slim_node_url})") - def start(self, register: bool = True): + async def start(self, register: bool = True): """Start the agent server""" # Register with registry if provided - if register and self.registry_url and self.public_url: - self._register() + if register and self.registry_url: + await self._register() print(f"🚀 Starting agent '{self.agent_id}' on {self.host}:{self.port}") - # Start the A2A server - run_server(self.bridge, host=self.host, port=self.port) + # Start the bridge (which starts all protocol servers) + await self.bridge.run_server(self.host, self.port) - def _register(self): - """Register agent with registry""" + async def _register(self): + """Register agent with NANDA Index""" try: - data = { + agent_facts = { "agent_id": self.agent_id, - "agent_url": self.public_url + "name": self.agent_name, + "domain": self.domain, + "specialization": self.specialization, + "description": self.description, + "capabilities": self.capabilities, + "url": self.public_url, + "agent_url": self.public_url, # For backward compatibility + "supported_protocols": self.router.get_all_protocols(), + "endpoints": self._get_endpoints() } - response = requests.post(f"{self.registry_url}/register", json=data, timeout=10) - if response.status_code == 200: - print(f"✅ Agent '{self.agent_id}' registered successfully") - else: - print(f"⚠️ Failed to register agent: HTTP {response.status_code}") + + await self.registry.register(agent_facts) + print(f"✅ Agent '{self.agent_id}' registered successfully") + except Exception as e: print(f"⚠️ Registration error: {e}") + def _get_endpoints(self) -> Dict[str, str]: + """Get endpoints for all registered protocols""" + endpoints = {} + for protocol_name in self.router.get_all_protocols(): + if protocol_name == "a2a": + endpoints["a2a"] = f"{self.public_url}/a2a" + elif protocol_name == "slim": + # SLIM uses agent inbox channel, not HTTP endpoint + endpoints["slim"] = f"slim://{self.agent_id}" + + return endpoints + def stop(self): """Stop the agent and cleanup telemetry""" if self.telemetry: diff --git a/nanda_core/core/adapter_copy.py b/nanda_core/core/adapter_copy.py new file mode 100644 index 0000000..b98892c --- /dev/null +++ b/nanda_core/core/adapter_copy.py @@ -0,0 +1,101 @@ +#!/usr/bin/env python3 +""" +Simple NANDA Adapter - Clean Agent-to-Agent Communication + +Simple, clean adapter focused on A2A communication without complexity. +8-10 lines to deploy an agent. +""" + +import os +import requests +from typing import Optional, Callable +from python_a2a import run_server +from .agent_bridge import SimpleAgentBridge + + +class NANDA: + """Simple NANDA class for clean agent deployment""" + + def __init__(self, + agent_id: str, + agent_logic: Callable[[str, str], str], + port: int = 6000, + registry_url: Optional[str] = None, + public_url: Optional[str] = None, + host: str = "0.0.0.0", + enable_telemetry: bool = True): + """ + Create a simple NANDA agent + + Args: + agent_id: Unique agent identifier + agent_logic: Function that takes (message: str, conversation_id: str) -> response: str + port: Port to run on + registry_url: Optional registry URL for agent discovery + public_url: Public URL for agent registration (e.g., https://yourdomain.com:6000) + host: Host to bind to + enable_telemetry: Enable telemetry logging (optional) + """ + self.agent_id = agent_id + self.agent_logic = agent_logic + self.port = port + self.registry_url = registry_url + self.public_url = public_url + self.host = host + self.enable_telemetry = enable_telemetry + + # Initialize telemetry if enabled + self.telemetry = None + if enable_telemetry: + try: + from ..telemetry.telemetry_system import TelemetrySystem + self.telemetry = TelemetrySystem(agent_id) + print(f"📊 Telemetry enabled for {agent_id}") + except ImportError: + print(f"⚠️ Telemetry requested but module not available") + + # Create the bridge with optional features + self.bridge = SimpleAgentBridge( + agent_id=agent_id, + agent_logic=agent_logic, + registry_url=registry_url, + telemetry=self.telemetry + ) + + print(f"🤖 NANDA Agent '{agent_id}' created") + if registry_url: + print(f"🌐 Registry: {registry_url}") + if public_url: + print(f"🔗 Public URL: {public_url}") + + def start(self, register: bool = True): + """Start the agent server""" + # Register with registry if provided + if register and self.registry_url and self.public_url: + self._register() + + print(f"🚀 Starting agent '{self.agent_id}' on {self.host}:{self.port}") + + # Start the A2A server + run_server(self.bridge, host=self.host, port=self.port) + + def _register(self): + """Register agent with registry""" + try: + data = { + "agent_id": self.agent_id, + "agent_url": self.public_url + } + response = requests.post(f"{self.registry_url}/register", json=data, timeout=10) + if response.status_code == 200: + print(f"✅ Agent '{self.agent_id}' registered successfully") + else: + print(f"⚠️ Failed to register agent: HTTP {response.status_code}") + except Exception as e: + print(f"⚠️ Registration error: {e}") + + def stop(self): + """Stop the agent and cleanup telemetry""" + if self.telemetry: + self.telemetry.stop() + print(f"🛑 Stopping agent '{self.agent_id}'") \ No newline at end of file diff --git a/nanda_core/core/agent_bridge.py b/nanda_core/core/agent_bridge.py index fc1a668..a234408 100644 --- a/nanda_core/core/agent_bridge.py +++ b/nanda_core/core/agent_bridge.py @@ -1,145 +1,154 @@ -#!/usr/bin/env python3 """ -Simple Agent Bridge for A2A Communication +Agent Bridge for Protocol-Agnostic Communication -Clean, simple bridge focused on agent-to-agent communication. +Handles message routing between agents using any registered protocol. """ -import os +import re import uuid import logging -import requests from typing import Callable, Optional, Dict, Any -from python_a2a import A2AServer, A2AClient, Message, TextContent, MessageRole, Metadata +from ..protocols.router import ProtocolRouter +from .registry_client import RegistryClient -# Configure logger to capture conversation logs logger = logging.getLogger(__name__) -class SimpleAgentBridge(A2AServer): - """Simple Agent Bridge for A2A communication only""" +class AgentBridge: + """Protocol-agnostic agent message router and coordinator""" def __init__(self, - agent_id: str, + protocol_router: ProtocolRouter, + registry_client: RegistryClient, + agent_id: str, agent_logic: Callable[[str, str], str], - registry_url: Optional[str] = None, - telemetry = None): - super().__init__() + telemetry=None): + """Initialize agent bridge + + Args: + protocol_router: Router managing protocol adapters + registry_client: Client for NANDA Index + agent_id: This agent's unique identifier + agent_logic: Agent's business logic function(message: str, conversation_id: str) -> str + telemetry: Optional telemetry system + """ + self.router = protocol_router + self.registry = registry_client self.agent_id = agent_id self.agent_logic = agent_logic - self.registry_url = registry_url self.telemetry = telemetry - def handle_message(self, msg: Message) -> Message: - """Handle incoming messages""" - conversation_id = msg.conversation_id or str(uuid.uuid4()) + # Register this bridge as incoming handler for all protocols + for protocol_name in self.router.get_all_protocols(): + protocol = self.router.get_protocol(protocol_name) + protocol.set_incoming_handler(self.handle_message) - # Only handle text content - if not isinstance(msg.content, TextContent): - return self._create_response( - msg, conversation_id, - "Only text messages supported" - ) + logger.info(f"🌉 Bridge initialized for {agent_id} with protocols: {self.router.get_all_protocols()}") + + def extract_agent_id(self, content: str) -> Optional[str]: + """Extract @agent-id from message content""" + match = re.search(r'@([\w-]+)', content) + return match.group(1) if match else None + + def parse_incoming_agent_message(self, text: str) -> Optional[Dict[str, str]]: + """Parse incoming agent-to-agent message in format: + FROM: sender\nTO: receiver\nMESSAGE: content + """ + if not (text.startswith("FROM:") and "TO:" in text and "MESSAGE:" in text): + return None + + try: + lines = text.strip().split('\n') + result = {} + + for line in lines: + if line.startswith("FROM:"): + result['from_agent'] = line[5:].strip() + elif line.startswith("TO:"): + result['to_agent'] = line[3:].strip() + elif line.startswith("MESSAGE:"): + result['message'] = line[8:].strip() + + return result if all(k in result for k in ['from_agent', 'to_agent', 'message']) else None + except Exception as e: + logger.error(f"Error parsing agent message: {e}") + return None + + async def handle_message(self, message: Dict[str, Any]) -> Dict[str, Any]: + """Main message handling entry point - user_text = msg.content.text + Called by protocol adapters when messages arrive. + """ + content = message.get("content", {}).get("text", "") + conversation_id = message.get("conversation_id", "") or str(uuid.uuid4()) - # Check if this is an agent-to-agent message in our simple format - if user_text.startswith("FROM:") and "TO:" in user_text and "MESSAGE:" in user_text: - return self._handle_incoming_agent_message(user_text, msg, conversation_id) + # Check if this is an incoming agent-to-agent message + parsed = self.parse_incoming_agent_message(content) + if parsed: + return await self._handle_incoming_agent_message(parsed, conversation_id) - logger.info(f"📨 [{self.agent_id}] Received: {user_text}") + logger.info(f"📨 [{self.agent_id}] Received: {content}") - # Handle different message types try: - if user_text.startswith("@"): - # Agent-to-agent message (outgoing) - return self._handle_agent_message(user_text, msg, conversation_id) - elif user_text.startswith("/"): + # Check message type + if content.startswith("@"): + # Outgoing agent-to-agent message + return await self._handle_outgoing_agent_message(content, conversation_id) + elif content.startswith("/"): # System command - return self._handle_command(user_text, msg, conversation_id) + return await self._handle_command(content, conversation_id) else: # Regular message - use agent logic if self.telemetry: self.telemetry.log_message_received(self.agent_id, conversation_id) - response = self.agent_logic(user_text, conversation_id) - return self._create_response(msg, conversation_id, response) + response = self.agent_logic(content, conversation_id) + return self._create_response(response) except Exception as e: - return self._create_response( - msg, conversation_id, - f"Error: {str(e)}" - ) + logger.error(f"❌ [{self.agent_id}] Error handling message: {e}") + return self._create_response(f"Error: {str(e)}") - def _handle_incoming_agent_message(self, user_text: str, msg: Message, conversation_id: str) -> Message: + async def _handle_incoming_agent_message(self, parsed: Dict[str, str], + conversation_id: str) -> Dict[str, Any]: """Handle incoming messages from other agents""" - try: - lines = user_text.strip().split('\n') - from_agent = "" - to_agent = "" - message_content = "" - - for line in lines: - if line.startswith("FROM:"): - from_agent = line[5:].strip() - elif line.startswith("TO:"): - to_agent = line[3:].strip() - elif line.startswith("MESSAGE:"): - message_content = line[8:].strip() - - logger.info(f"📨 [{self.agent_id}] ← [{from_agent}]: {message_content}") - - # Check if this is a reply (don't respond to replies to avoid infinite loops) - if message_content.startswith("Response to "): - logger.info(f"🔄 [{self.agent_id}] Received reply from {from_agent}, displaying to user") - # Display the reply to user but don't respond back to avoid loops - return self._create_response( - msg, conversation_id, - f"[{from_agent}] {message_content[len('Response to ' + self.agent_id + ': '):]}" - ) - - # Process the message through our agent logic - if self.telemetry: - self.telemetry.log_message_received(self.agent_id, conversation_id) - - response = self.agent_logic(message_content, conversation_id) - - # Send response back - return self._create_response( - msg, conversation_id, - f"Response to {from_agent}: {response}" - ) - - except Exception as e: - logger.error(f"❌ [{self.agent_id}] Error processing incoming agent message: {e}") - return self._create_response( - msg, conversation_id, - f"Error processing message from agent: {str(e)}" - ) - - def _handle_agent_message(self, user_text: str, msg: Message, conversation_id: str) -> Message: + from_agent = parsed['from_agent'] + message_content = parsed['message'] + + logger.info(f"📨 [{self.agent_id}] ← [{from_agent}]: {message_content}") + + # Check if this is a reply (avoid infinite loops) + if message_content.startswith("Response to "): + logger.info(f"🔄 [{self.agent_id}] Received reply from {from_agent}") + return self._create_response(f"[{from_agent}] {message_content[len(f'Response to {self.agent_id}: '):]}") + + # Process through agent logic + if self.telemetry: + self.telemetry.log_message_received(self.agent_id, conversation_id) + + response = self.agent_logic(message_content, conversation_id) + return self._create_response(f"Response to {from_agent}: {response}") + + async def _handle_outgoing_agent_message(self, content: str, + conversation_id: str) -> Dict[str, Any]: """Handle messages to other agents (@agent_id message)""" - parts = user_text.split(" ", 1) + parts = content.split(" ", 1) if len(parts) <= 1: - return self._create_response( - msg, conversation_id, - "Invalid format. Use '@agent_id message'" - ) + return self._create_response("Invalid format. Use '@agent_id message'") - target_agent = parts[0][1:] # Remove @ + target_agent_id = parts[0][1:] # Remove @ message_text = parts[1] - logger.info(f"🔄 [{self.agent_id}] Sending to {target_agent}: {message_text}") + logger.info(f"🔄 [{self.agent_id}] Sending to {target_agent_id}: {message_text}") - # Look up target agent and send message - result = self._send_to_agent(target_agent, message_text, conversation_id) - return self._create_response(msg, conversation_id, result) + # Route to target agent + result = await self.route_to_agent(target_agent_id, message_text, conversation_id) + return result - def _handle_command(self, user_text: str, msg: Message, conversation_id: str) -> Message: + async def _handle_command(self, content: str, conversation_id: str) -> Dict[str, Any]: """Handle system commands""" - parts = user_text.split(" ", 1) + parts = content.split(" ", 1) command = parts[0][1:] if len(parts) > 0 else "" - args = parts[1] if len(parts) > 1 else "" if command == "help": help_text = """Available commands: @@ -147,106 +156,120 @@ def _handle_command(self, user_text: str, msg: Message, conversation_id: str) -> /ping - Test agent responsiveness /status - Show agent status @agent_id message - Send message to another agent""" - return self._create_response(msg, conversation_id, help_text) + return self._create_response(help_text) elif command == "ping": - return self._create_response(msg, conversation_id, "Pong!") + return self._create_response("Pong!") elif command == "status": - status = f"Agent: {self.agent_id}, Status: Running" - if self.registry_url: - status += f", Registry: {self.registry_url}" - return self._create_response(msg, conversation_id, status) + protocols = self.router.get_all_protocols() + status = f"Agent: {self.agent_id}, Status: Running, Protocols: {', '.join(protocols)}" + if hasattr(self.registry, 'registry_url') and self.registry.registry_url: + status += f", Registry: {self.registry.registry_url}" + return self._create_response(status) else: return self._create_response( - msg, conversation_id, f"Unknown command: {command}. Use /help for available commands" ) - def _send_to_agent(self, target_agent_id: str, message_text: str, conversation_id: str) -> str: - """Send message to another agent""" + async def route_to_agent(self, target_agent_id: str, message_text: str, + conversation_id: str) -> Dict[str, Any]: + """Route message to target agent via appropriate protocol""" try: - # Look up agent URL - agent_url = self._lookup_agent(target_agent_id) - if not agent_url: - return f"Agent {target_agent_id} not found" + # Resolve agent via NANDA Index + agent_info = await self.registry.resolve(target_agent_id) - # Ensure URL has /a2a endpoint - if not agent_url.endswith('/a2a'): - agent_url = f"{agent_url}/a2a" + if not agent_info: + logger.warning(f"🔍 Agent {target_agent_id} not found in registry") + return self._create_response(f"Agent {target_agent_id} not found") + protocol_name = "slim" # <-- Force SLIM + target_url = f"slim://{target_agent_id}" # <-- Force SLIM URL - logger.info(f"📤 [{self.agent_id}] → [{target_agent_id}]: {message_text}") + logger.info(f"🧪 TEST MODE: Forcing SLIM protocol") + logger.info(f"📤 [{self.agent_id}] → [{target_agent_id}] via {protocol_name}") + logger.info(f"🔗 Target URL: {target_url}") + + # Debug: print what we got from registry + logger.info(f"📋 Agent info from registry: {agent_info}") - # Create simple message with metadata + # Select protocol + supported_protocols = agent_info.get("supported_protocols") or self.router.get_all_protocols() + protocol_name = self.router.select_protocol(supported_protocols) + + # Get target URL - try multiple fields + endpoints = agent_info.get("endpoints", {}) + target_url = endpoints.get(protocol_name) + + if not target_url: + # Fallback logic + if protocol_name == "slim": + # For SLIM, construct the identifier + target_url = f"slim://{target_agent_id}" + else: + # For A2A, use URL fields + target_url = ( + agent_info.get("url") or + agent_info.get("agent_url") or + agent_info.get("public_url") + ) + + # Ensure target_url is valid + if not target_url: + logger.error(f"❌ No URL found for {target_agent_id}. Agent info: {agent_info}") + return self._create_response(f"No endpoint found for {target_agent_id}") + + # Add /a2a suffix if needed and not already present + if protocol_name == "a2a" and not target_url.endswith('/a2a'): + target_url = f"{target_url}/a2a" + + logger.info(f"📤 [{self.agent_id}] → [{target_agent_id}] via {protocol_name}: {message_text}") + logger.info(f"🔗 Target URL: {target_url}") + + # Create message in simple format simple_message = f"FROM: {self.agent_id}\nTO: {target_agent_id}\nMESSAGE: {message_text}" - # Send message using A2A client - client = A2AClient(agent_url, timeout=30) - response = client.send_message( - Message( - role=MessageRole.USER, - content=TextContent(text=simple_message), - conversation_id=conversation_id, - metadata=Metadata(custom_fields={ - 'from_agent_id': self.agent_id, - 'to_agent_id': target_agent_id, - 'message_type': 'agent_to_agent' - }) - ) - ) + # Send via protocol + message = { + "content": { + "text": simple_message, + "type": "text" + }, + "conversation_id": conversation_id, + "metadata": { + "from_agent_id": self.agent_id, + "to_agent_id": target_agent_id, + "message_type": "agent_to_agent" + } + } + + response = await self.router.send(protocol_name, target_url, message) if self.telemetry: self.telemetry.log_message_sent(target_agent_id, conversation_id) - # Extract the actual response content from the target agent - logger.info(f"🔍 [{self.agent_id}] Response type: {type(response)}, has parts: {hasattr(response, 'parts') if response else 'None'}") - if response: - if hasattr(response, 'parts') and response.parts: - response_text = response.parts[0].text - logger.info(f"✅ [{self.agent_id}] Received response from {target_agent_id}: {response_text[:100]}...") - return f"[{target_agent_id}] {response_text}" - else: - logger.info(f"✅ [{self.agent_id}] Response has no parts, full response: {str(response)[:200]}...") - return f"[{target_agent_id}] {str(response)}" - else: - logger.info(f"✅ [{self.agent_id}] Message delivered to {target_agent_id}, no response") - return f"Message sent to {target_agent_id}: {message_text}" + # Extract response + response_text = response.get("content", {}).get("text", str(response)) + logger.info(f"✅ [{self.agent_id}] Response from {target_agent_id}: {response_text[:100]}...") + + return self._create_response(f"[{target_agent_id}] {response_text}") except Exception as e: - return f"❌ Error sending to {target_agent_id}: {str(e)}" + logger.error(f"❌ Error routing to {target_agent_id}: {e}") + import traceback + traceback.print_exc() + return self._create_response(f"❌ Error sending to {target_agent_id}: {str(e)}") - def _lookup_agent(self, agent_id: str) -> Optional[str]: - """Look up agent URL in registry or use local discovery""" - - # Try registry lookup if available - if self.registry_url: - try: - response = requests.get(f"{self.registry_url}/lookup/{agent_id}", timeout=10) - if response.status_code == 200: - data = response.json() - agent_url = data.get("agent_url") - logger.info(f"🌐 Found {agent_id} in registry: {agent_url}") - return agent_url - except Exception as e: - logger.warning(f"🌐 Registry lookup failed: {e}") - - # Fallback to local discovery (for testing) - local_agents = { - "test_agent": "http://localhost:6000", + def _create_response(self, text: str) -> Dict[str, Any]: + """Create a standardized response dict""" + return { + "content": { + "text": f"[{self.agent_id}] {text}", + "type": "text" + } } - - if agent_id in local_agents: - logger.info(f"🏠 Found {agent_id} locally: {local_agents[agent_id]}") - return local_agents[agent_id] - - return None - def _create_response(self, original_msg: Message, conversation_id: str, text: str) -> Message: - """Create a response message""" - return Message( - role=MessageRole.AGENT, - content=TextContent(text=f"[{self.agent_id}] {text}"), - parent_message_id=original_msg.message_id, - conversation_id=conversation_id - ) \ No newline at end of file + async def run_server(self, host: str = "0.0.0.0", port: int = 8000): + """Start all protocol servers""" + logger.info(f"🚀 Starting agent bridge for {self.agent_id} on {host}:{port}") + await self.router.start_all_servers(host, port) \ No newline at end of file diff --git a/nanda_core/core/agent_bridge_copy.py b/nanda_core/core/agent_bridge_copy.py new file mode 100644 index 0000000..fc1a668 --- /dev/null +++ b/nanda_core/core/agent_bridge_copy.py @@ -0,0 +1,252 @@ +#!/usr/bin/env python3 +""" +Simple Agent Bridge for A2A Communication + +Clean, simple bridge focused on agent-to-agent communication. +""" + +import os +import uuid +import logging +import requests +from typing import Callable, Optional, Dict, Any +from python_a2a import A2AServer, A2AClient, Message, TextContent, MessageRole, Metadata + +# Configure logger to capture conversation logs +logger = logging.getLogger(__name__) + + +class SimpleAgentBridge(A2AServer): + """Simple Agent Bridge for A2A communication only""" + + def __init__(self, + agent_id: str, + agent_logic: Callable[[str, str], str], + registry_url: Optional[str] = None, + telemetry = None): + super().__init__() + self.agent_id = agent_id + self.agent_logic = agent_logic + self.registry_url = registry_url + self.telemetry = telemetry + + def handle_message(self, msg: Message) -> Message: + """Handle incoming messages""" + conversation_id = msg.conversation_id or str(uuid.uuid4()) + + # Only handle text content + if not isinstance(msg.content, TextContent): + return self._create_response( + msg, conversation_id, + "Only text messages supported" + ) + + user_text = msg.content.text + + # Check if this is an agent-to-agent message in our simple format + if user_text.startswith("FROM:") and "TO:" in user_text and "MESSAGE:" in user_text: + return self._handle_incoming_agent_message(user_text, msg, conversation_id) + + logger.info(f"📨 [{self.agent_id}] Received: {user_text}") + + # Handle different message types + try: + if user_text.startswith("@"): + # Agent-to-agent message (outgoing) + return self._handle_agent_message(user_text, msg, conversation_id) + elif user_text.startswith("/"): + # System command + return self._handle_command(user_text, msg, conversation_id) + else: + # Regular message - use agent logic + if self.telemetry: + self.telemetry.log_message_received(self.agent_id, conversation_id) + + response = self.agent_logic(user_text, conversation_id) + return self._create_response(msg, conversation_id, response) + + except Exception as e: + return self._create_response( + msg, conversation_id, + f"Error: {str(e)}" + ) + + def _handle_incoming_agent_message(self, user_text: str, msg: Message, conversation_id: str) -> Message: + """Handle incoming messages from other agents""" + try: + lines = user_text.strip().split('\n') + from_agent = "" + to_agent = "" + message_content = "" + + for line in lines: + if line.startswith("FROM:"): + from_agent = line[5:].strip() + elif line.startswith("TO:"): + to_agent = line[3:].strip() + elif line.startswith("MESSAGE:"): + message_content = line[8:].strip() + + logger.info(f"📨 [{self.agent_id}] ← [{from_agent}]: {message_content}") + + # Check if this is a reply (don't respond to replies to avoid infinite loops) + if message_content.startswith("Response to "): + logger.info(f"🔄 [{self.agent_id}] Received reply from {from_agent}, displaying to user") + # Display the reply to user but don't respond back to avoid loops + return self._create_response( + msg, conversation_id, + f"[{from_agent}] {message_content[len('Response to ' + self.agent_id + ': '):]}" + ) + + # Process the message through our agent logic + if self.telemetry: + self.telemetry.log_message_received(self.agent_id, conversation_id) + + response = self.agent_logic(message_content, conversation_id) + + # Send response back + return self._create_response( + msg, conversation_id, + f"Response to {from_agent}: {response}" + ) + + except Exception as e: + logger.error(f"❌ [{self.agent_id}] Error processing incoming agent message: {e}") + return self._create_response( + msg, conversation_id, + f"Error processing message from agent: {str(e)}" + ) + + def _handle_agent_message(self, user_text: str, msg: Message, conversation_id: str) -> Message: + """Handle messages to other agents (@agent_id message)""" + parts = user_text.split(" ", 1) + if len(parts) <= 1: + return self._create_response( + msg, conversation_id, + "Invalid format. Use '@agent_id message'" + ) + + target_agent = parts[0][1:] # Remove @ + message_text = parts[1] + + logger.info(f"🔄 [{self.agent_id}] Sending to {target_agent}: {message_text}") + + # Look up target agent and send message + result = self._send_to_agent(target_agent, message_text, conversation_id) + return self._create_response(msg, conversation_id, result) + + def _handle_command(self, user_text: str, msg: Message, conversation_id: str) -> Message: + """Handle system commands""" + parts = user_text.split(" ", 1) + command = parts[0][1:] if len(parts) > 0 else "" + args = parts[1] if len(parts) > 1 else "" + + if command == "help": + help_text = """Available commands: +/help - Show this help +/ping - Test agent responsiveness +/status - Show agent status +@agent_id message - Send message to another agent""" + return self._create_response(msg, conversation_id, help_text) + + elif command == "ping": + return self._create_response(msg, conversation_id, "Pong!") + + elif command == "status": + status = f"Agent: {self.agent_id}, Status: Running" + if self.registry_url: + status += f", Registry: {self.registry_url}" + return self._create_response(msg, conversation_id, status) + + else: + return self._create_response( + msg, conversation_id, + f"Unknown command: {command}. Use /help for available commands" + ) + + def _send_to_agent(self, target_agent_id: str, message_text: str, conversation_id: str) -> str: + """Send message to another agent""" + try: + # Look up agent URL + agent_url = self._lookup_agent(target_agent_id) + if not agent_url: + return f"Agent {target_agent_id} not found" + + # Ensure URL has /a2a endpoint + if not agent_url.endswith('/a2a'): + agent_url = f"{agent_url}/a2a" + + logger.info(f"📤 [{self.agent_id}] → [{target_agent_id}]: {message_text}") + + # Create simple message with metadata + simple_message = f"FROM: {self.agent_id}\nTO: {target_agent_id}\nMESSAGE: {message_text}" + + # Send message using A2A client + client = A2AClient(agent_url, timeout=30) + response = client.send_message( + Message( + role=MessageRole.USER, + content=TextContent(text=simple_message), + conversation_id=conversation_id, + metadata=Metadata(custom_fields={ + 'from_agent_id': self.agent_id, + 'to_agent_id': target_agent_id, + 'message_type': 'agent_to_agent' + }) + ) + ) + + if self.telemetry: + self.telemetry.log_message_sent(target_agent_id, conversation_id) + + # Extract the actual response content from the target agent + logger.info(f"🔍 [{self.agent_id}] Response type: {type(response)}, has parts: {hasattr(response, 'parts') if response else 'None'}") + if response: + if hasattr(response, 'parts') and response.parts: + response_text = response.parts[0].text + logger.info(f"✅ [{self.agent_id}] Received response from {target_agent_id}: {response_text[:100]}...") + return f"[{target_agent_id}] {response_text}" + else: + logger.info(f"✅ [{self.agent_id}] Response has no parts, full response: {str(response)[:200]}...") + return f"[{target_agent_id}] {str(response)}" + else: + logger.info(f"✅ [{self.agent_id}] Message delivered to {target_agent_id}, no response") + return f"Message sent to {target_agent_id}: {message_text}" + + except Exception as e: + return f"❌ Error sending to {target_agent_id}: {str(e)}" + + def _lookup_agent(self, agent_id: str) -> Optional[str]: + """Look up agent URL in registry or use local discovery""" + + # Try registry lookup if available + if self.registry_url: + try: + response = requests.get(f"{self.registry_url}/lookup/{agent_id}", timeout=10) + if response.status_code == 200: + data = response.json() + agent_url = data.get("agent_url") + logger.info(f"🌐 Found {agent_id} in registry: {agent_url}") + return agent_url + except Exception as e: + logger.warning(f"🌐 Registry lookup failed: {e}") + + # Fallback to local discovery (for testing) + local_agents = { + "test_agent": "http://localhost:6000", + } + + if agent_id in local_agents: + logger.info(f"🏠 Found {agent_id} locally: {local_agents[agent_id]}") + return local_agents[agent_id] + + return None + + def _create_response(self, original_msg: Message, conversation_id: str, text: str) -> Message: + """Create a response message""" + return Message( + role=MessageRole.AGENT, + content=TextContent(text=f"[{self.agent_id}] {text}"), + parent_message_id=original_msg.message_id, + conversation_id=conversation_id + ) \ No newline at end of file diff --git a/nanda_core/core/registry_client.py b/nanda_core/core/registry_client.py index 84f83c4..0a7db31 100644 --- a/nanda_core/core/registry_client.py +++ b/nanda_core/core/registry_client.py @@ -1,23 +1,31 @@ -#!/usr/bin/env python3 """ -Registry Client for Nanda Index Registry Integration +Registry Client for NANDA Index Registry Integration Handles agent registration, discovery, and management """ -import requests +import httpx import json import os from typing import Optional, Dict, List, Any from datetime import datetime +import logging + +logger = logging.getLogger(__name__) class RegistryClient: - """Client for interacting with the Nanda index registry""" + """Client for interacting with the NANDA Index registry""" def __init__(self, registry_url: Optional[str] = None): + """Initialize registry client + + Args: + registry_url: URL of NANDA Index (e.g., http://registry.chat39.com:6900) + """ self.registry_url = registry_url or self._get_default_registry_url() - self.session = requests.Session() - self.session.verify = False # For development with self-signed certs + # Use async httpx client instead of requests + self.client = httpx.AsyncClient(timeout=30.0, verify=False) # verify=False for dev with self-signed certs + logger.info(f"Registry client initialized with URL: {self.registry_url}") def _get_default_registry_url(self) -> str: """Get default registry URL from configuration""" @@ -29,60 +37,126 @@ def _get_default_registry_url(self) -> str: pass return "https://registry.chat39.com" - def register_agent(self, agent_id: str, agent_url: str, api_url: Optional[str] = None, agent_facts_url: Optional[str] = None) -> bool: - """Register an agent with the registry""" + async def register(self, agent_facts: Dict[str, Any]) -> bool: + """Register an agent with the registry using AgentFacts + + Args: + agent_facts: Agent metadata dict with fields: + - agent_id: str + - name: str + - domain: str (optional) + - specialization: str (optional) + - description: str (optional) + - capabilities: list + - url: str + - agent_url: str (backward compatibility) + - supported_protocols: list + - endpoints: dict + + Returns: + True if registration successful, False otherwise + """ + if not self.registry_url: + logger.warning("No registry URL configured, skipping registration") + return False + try: - data = { - "agent_id": agent_id, - "agent_url": agent_url - } - if api_url: - data["api_url"] = api_url - if agent_facts_url: - data["agent_facts_url"] = agent_facts_url + # Support both new (register) and old (register_agent) endpoints + response = await self.client.post( + f"{self.registry_url}/register", + json=agent_facts + ) + + if response.status_code == 200: + logger.info(f"✅ Agent {agent_facts.get('agent_id')} registered successfully") + return True + else: + logger.error(f"❌ Registration failed: HTTP {response.status_code} - {response.text}") + return False + + except Exception as e: + logger.error(f"❌ Error registering agent: {e}") + return False - response = self.session.post(f"{self.registry_url}/register", json=data) + async def register_agent(self, agent_id: str, agent_url: str, + api_url: Optional[str] = None, + agent_facts_url: Optional[str] = None) -> bool: + """Legacy registration method for backward compatibility""" + data = { + "agent_id": agent_id, + "agent_url": agent_url + } + if api_url: + data["api_url"] = api_url + if agent_facts_url: + data["agent_facts_url"] = agent_facts_url + + try: + response = await self.client.post(f"{self.registry_url}/register", json=data) return response.status_code == 200 except Exception as e: - print(f"Error registering agent: {e}") + logger.error(f"Error registering agent: {e}") return False - def lookup_agent(self, agent_id: str) -> Optional[Dict[str, Any]]: + async def resolve(self, agent_id: str) -> Optional[Dict[str, Any]]: + """Resolve/lookup an agent in the registry + + Args: + agent_id: Agent identifier to look up + + Returns: + Agent info dict or None if not found + """ + return await self.lookup_agent(agent_id) + + async def lookup_agent(self, agent_id: str) -> Optional[Dict[str, Any]]: """Look up an agent in the registry""" + if not self.registry_url: + logger.warning("No registry URL configured") + return None + try: - response = self.session.get(f"{self.registry_url}/lookup/{agent_id}") + response = await self.client.get(f"{self.registry_url}/lookup/{agent_id}") if response.status_code == 200: return response.json() + else: + logger.warning(f"Agent {agent_id} not found: HTTP {response.status_code}") return None except Exception as e: - print(f"Error looking up agent {agent_id}: {e}") + logger.error(f"Error looking up agent {agent_id}: {e}") return None - def list_agents(self) -> List[Dict[str, Any]]: + async def list_agents(self) -> List[Dict[str, Any]]: """List all registered agents""" + if not self.registry_url: + return [] + try: - response = self.session.get(f"{self.registry_url}/list") + response = await self.client.get(f"{self.registry_url}/list") if response.status_code == 200: return response.json() return [] except Exception as e: - print(f"Error listing agents: {e}") + logger.error(f"Error listing agents: {e}") return [] - def list_clients(self) -> List[Dict[str, Any]]: + async def list_clients(self) -> List[Dict[str, Any]]: """List all registered clients""" + if not self.registry_url: + return [] + try: - response = self.session.get(f"{self.registry_url}/clients") + response = await self.client.get(f"{self.registry_url}/clients") if response.status_code == 200: return response.json() - return self.list_agents() # Fallback to list endpoint + return await self.list_agents() # Fallback to list endpoint except Exception as e: - print(f"Error listing clients: {e}") + logger.error(f"Error listing clients: {e}") return [] - def get_agent_metadata(self, agent_id: str) -> Optional[Dict[str, Any]]: + async def get_agent_metadata(self, agent_id: str) -> Optional[Dict[str, Any]]: """Get detailed metadata for an agent""" - agent_info = self.lookup_agent(agent_id) + agent_info = await self.lookup_agent(agent_id) if not agent_info: return None @@ -90,16 +164,26 @@ def get_agent_metadata(self, agent_id: str) -> Optional[Dict[str, Any]]: metadata = { "agent_id": agent_id, "agent_url": agent_info.get("agent_url"), + "url": agent_info.get("url"), "api_url": agent_info.get("api_url"), + "endpoints": agent_info.get("endpoints", {}), + "supported_protocols": agent_info.get("supported_protocols", ["a2a"]), "last_seen": agent_info.get("last_seen"), "capabilities": agent_info.get("capabilities", []), "description": agent_info.get("description", ""), - "tags": agent_info.get("tags", []) + "tags": agent_info.get("tags", []), + "domain": agent_info.get("domain"), + "specialization": agent_info.get("specialization") } + return metadata - def search_agents(self, query: str = "", capabilities: List[str] = None, tags: List[str] = None) -> List[Dict[str, Any]]: + async def search_agents(self, query: str = "", capabilities: List[str] = None, + tags: List[str] = None) -> List[Dict[str, Any]]: """Search for agents based on criteria""" + if not self.registry_url: + return [] + try: params = {} if query: @@ -109,19 +193,20 @@ def search_agents(self, query: str = "", capabilities: List[str] = None, tags: L if tags: params["tags"] = ",".join(tags) - response = self.session.get(f"{self.registry_url}/search", params=params) + response = await self.client.get(f"{self.registry_url}/search", params=params) if response.status_code == 200: return response.json() # Fallback to client-side filtering - return self._filter_agents_locally(query, capabilities, tags) + return await self._filter_agents_locally(query, capabilities, tags) except Exception as e: - print(f"Error searching agents: {e}") - return self._filter_agents_locally(query, capabilities, tags) + logger.error(f"Error searching agents: {e}") + return await self._filter_agents_locally(query, capabilities, tags) - def _filter_agents_locally(self, query: str = "", capabilities: List[str] = None, tags: List[str] = None) -> List[Dict[str, Any]]: + async def _filter_agents_locally(self, query: str = "", capabilities: List[str] = None, + tags: List[str] = None) -> List[Dict[str, Any]]: """Fallback local filtering when server search is not available""" - all_agents = self.list_agents() + all_agents = await self.list_agents() filtered = [] for agent in all_agents: @@ -147,25 +232,32 @@ def _filter_agents_locally(self, query: str = "", capabilities: List[str] = None return filtered - def get_mcp_servers(self, registry_provider: Optional[str] = None) -> List[Dict[str, Any]]: + async def get_mcp_servers(self, registry_provider: Optional[str] = None) -> List[Dict[str, Any]]: """Get list of available MCP servers""" + if not self.registry_url: + return [] + try: params = {} if registry_provider: params["registry_provider"] = registry_provider - response = self.session.get(f"{self.registry_url}/mcp_servers", params=params) + response = await self.client.get(f"{self.registry_url}/mcp_servers", params=params) if response.status_code == 200: return response.json() return [] except Exception as e: - print(f"Error getting MCP servers: {e}") + logger.error(f"Error getting MCP servers: {e}") return [] - def get_mcp_server_config(self, registry_provider: str, qualified_name: str) -> Optional[Dict[str, Any]]: + async def get_mcp_server_config(self, registry_provider: str, + qualified_name: str) -> Optional[Dict[str, Any]]: """Get configuration for a specific MCP server""" + if not self.registry_url: + return None + try: - response = self.session.get(f"{self.registry_url}/get_mcp_registry", params={ + response = await self.client.get(f"{self.registry_url}/get_mcp_registry", params={ 'registry_provider': registry_provider, 'qualified_name': qualified_name }) @@ -182,11 +274,15 @@ def get_mcp_server_config(self, registry_provider: str, qualified_name: str) -> } return None except Exception as e: - print(f"Error getting MCP server config: {e}") + logger.error(f"Error getting MCP server config: {e}") return None - def update_agent_status(self, agent_id: str, status: str, metadata: Optional[Dict[str, Any]] = None) -> bool: + async def update_agent_status(self, agent_id: str, status: str, + metadata: Optional[Dict[str, Any]] = None) -> bool: """Update agent status and metadata""" + if not self.registry_url: + return False + try: data = { "agent_id": agent_id, @@ -196,36 +292,52 @@ def update_agent_status(self, agent_id: str, status: str, metadata: Optional[Dic if metadata: data.update(metadata) - response = self.session.put(f"{self.registry_url}/agents/{agent_id}/status", json=data) + response = await self.client.put( + f"{self.registry_url}/agents/{agent_id}/status", + json=data + ) return response.status_code == 200 except Exception as e: - print(f"Error updating agent status: {e}") + logger.error(f"Error updating agent status: {e}") return False - def unregister_agent(self, agent_id: str) -> bool: + async def unregister_agent(self, agent_id: str) -> bool: """Unregister an agent from the registry""" + if not self.registry_url: + return False + try: - response = self.session.delete(f"{self.registry_url}/agents/{agent_id}") + response = await self.client.delete(f"{self.registry_url}/agents/{agent_id}") return response.status_code == 200 except Exception as e: - print(f"Error unregistering agent: {e}") + logger.error(f"Error unregistering agent: {e}") return False - def health_check(self) -> bool: + async def health_check(self) -> bool: """Check if the registry is healthy""" + if not self.registry_url: + return False + try: - response = self.session.get(f"{self.registry_url}/health", timeout=5) + response = await self.client.get(f"{self.registry_url}/health", timeout=5) return response.status_code == 200 except Exception: return False - def get_registry_stats(self) -> Optional[Dict[str, Any]]: + async def get_registry_stats(self) -> Optional[Dict[str, Any]]: """Get registry statistics""" + if not self.registry_url: + return None + try: - response = self.session.get(f"{self.registry_url}/stats") + response = await self.client.get(f"{self.registry_url}/stats") if response.status_code == 200: return response.json() return None except Exception as e: - print(f"Error getting registry stats: {e}") - return None \ No newline at end of file + logger.error(f"Error getting registry stats: {e}") + return None + + async def close(self): + """Close the HTTP client connection""" + await self.client.aclose() \ No newline at end of file diff --git a/nanda_core/protocols/AgentExecutor.py b/nanda_core/protocols/AgentExecutor.py new file mode 100644 index 0000000..b5d5a53 --- /dev/null +++ b/nanda_core/protocols/AgentExecutor.py @@ -0,0 +1,73 @@ +from typing import Callable +from a2a.types import TextPart +from a2a.utils import new_agent_text_message +from a2a.server.agent_execution import AgentExecutor, RequestContext +from a2a.server.events import EventQueue + +class NANDAAgentExecutor(AgentExecutor): + """Agent executor that delegates to NANDA bridge handler""" + + def __init__(self, message_handler: Callable): + self.message_handler = message_handler + + async def execute(self, context: RequestContext, event_queue: EventQueue): + """Execute agent logic via bridge handler""" + try: + # Extract message text from context + message_text = "" + if context.message and context.message.parts: + for part in context.message.parts: + # Handle different part types + if hasattr(part, 'root'): + part_obj = part.root + else: + part_obj = part + + # Extract text based on part type + if hasattr(part_obj, 'text'): + message_text = part_obj.text + elif hasattr(part_obj, 'kind') and part_obj.kind == 'text': + message_text = part_obj.text if hasattr(part_obj, 'text') else str(part_obj) + else: + message_text = str(part_obj) + + if message_text: + break + + if not message_text: + message_text = "" + + print(f"📨 Received message: {message_text[:100]}") + + # Convert to bridge format + bridge_message = { + "content": { + "text": message_text, + "type": "text" + }, + "conversation_id": context.task_id or context.context_id or "" + } + + # Call bridge handler + response = await self.message_handler(bridge_message) + + # Extract response text + response_text = response.get("content", {}).get("text", "") + + print(f"✅ Sending response: {response_text}") + + # Send response via event queue + response_message = new_agent_text_message(response_text) + await event_queue.enqueue_event(response_message) + + except Exception as e: + print(f"❌ Error in execute: {e}") + import traceback + traceback.print_exc() + # Send error message + error_message = new_agent_text_message(f"Error: {str(e)}") + event_queue.enqueue_event(error_message) + + async def cancel(self, context: RequestContext, event_queue: EventQueue): + """Handle task cancellation""" + event_queue.enqueue_event(new_agent_text_message("Task cancelled")) diff --git a/nanda_core/protocols/a2a/__init__.py b/nanda_core/protocols/a2a/__init__.py new file mode 100644 index 0000000..3f289c1 --- /dev/null +++ b/nanda_core/protocols/a2a/__init__.py @@ -0,0 +1,3 @@ +from .protocol import A2AProtocol + +__all__ = ['A2AProtocol'] \ No newline at end of file diff --git a/nanda_core/protocols/a2a/protocol.py b/nanda_core/protocols/a2a/protocol.py new file mode 100644 index 0000000..ec7aea9 --- /dev/null +++ b/nanda_core/protocols/a2a/protocol.py @@ -0,0 +1,234 @@ +""" +Official A2A SDK Protocol Adapter +""" + +from typing import Callable, Dict, Any +from a2a.client import A2AClient, ClientConfig, ClientFactory +from a2a.server.request_handlers import DefaultRequestHandler +from a2a.server.tasks import InMemoryTaskStore +from a2a.server.apps import A2AStarletteApplication +from a2a.types import ( + AgentCard, AgentSkill, AgentCapabilities, AgentInterface, + Message, TextPart, Part, Role, + SendMessageRequest, MessageSendParams, + TransportProtocol +) +import uuid +import httpx +import uvicorn +from ..base import AgentProtocol +from ..AgentExecutor import NANDAAgentExecutor + +class A2AProtocol(AgentProtocol): + """A2A SDK protocol adapter""" + + def __init__(self, agent_id: str, agent_name: str, public_url: str, + domain: str = None, specialization: str = None, + description: str = "", capabilities: list = None): + """Initialize A2A protocol adapter + + Args: + agent_id: Unique agent identifier + agent_name: Display name + public_url: Public URL where agent is accessible + domain: Agent's domain of expertise + specialization: Agent's specialization + description: Agent description + capabilities: List of capabilities (default: ["text"]) + """ + self.agent_id = agent_id + self.agent_name = agent_name + self.public_url = public_url + self.description = description + self.capabilities_list = capabilities or ["text"] + + # Create AgentCard + self.agent_card = AgentCard( + name=agent_name, + description=description, + version="1.0.0", + url=f"{public_url}/", + protocol_version="0.2.5", + skills=[ + AgentSkill( + id=agent_id, + name=agent_name, + description=description, + tags=[domain] if domain else [], + examples=[] + ) + ], + capabilities=AgentCapabilities( + streaming=True, + push_notifications=False + ), + defaultInputModes=["text"], + defaultOutputModes=["text"] + ) + + # Incoming message handler (set by bridge) + self.incoming_handler: Callable = None + + # Server components (initialized when handler is set) + self.agent_executor = None + self.request_handler = None + self.server_app = None + + # HTTP client for A2A client + self.httpx_client = httpx.AsyncClient() + + def set_incoming_handler(self, handler: Callable): + """Set callback for incoming messages + + Args: + handler: Async function(message: dict) -> dict + """ + self.incoming_handler = handler + + # Initialize server components + self.agent_executor = NANDAAgentExecutor(handler) + self.request_handler = DefaultRequestHandler( + agent_executor=self.agent_executor, + task_store=InMemoryTaskStore() + ) + self.server_app = A2AStarletteApplication( + agent_card=self.agent_card, + http_handler=self.request_handler + ) + + async def send_message(self, target_url: str, message: Dict[str, Any]) -> Dict[str, Any]: + """Send message using A2A client""" + try: + # Validate target_url + if not target_url: + raise ValueError("target_url is None or empty") + + # Extract message content + content_text = message.get("content", {}).get("text", "") + conversation_id = message.get("conversation_id", "") + metadata = message.get("metadata", {}) + + print(f"🔄 Sending to {target_url}: {content_text[:50]}...") + + # Create A2A message with proper structure + from a2a.types import TextPart, Part, Message, Role + + text_part = TextPart( + kind='text', + text=content_text + ) + part = Part(root=text_part) + + a2a_message = Message( + message_id=f"msg-{conversation_id}-{uuid.uuid4().hex[:8]}", + role=Role.user, + parts=[part], + metadata=metadata if metadata else None + ) + + # Get client - strip /a2a suffix if present + base_url = target_url.rstrip('/a2a') if target_url else target_url + + client = A2AClient( + httpx_client=self.httpx_client, + url=base_url # Just pass the base URL + ) + + # Create request + from a2a.types import SendMessageRequest, MessageSendParams + request = SendMessageRequest( + id=f"req-{uuid.uuid4().hex[:8]}", + params=MessageSendParams(message=a2a_message) + ) + + # Send message + response = await client.send_message(request) + print(f"🔍 Response attributes: {dir(response)}") + print(f"🔍 Response dict: {response.model_dump() if hasattr(response, 'model_dump') else response}") + + if hasattr(response, 'root'): + result = response.root + + # Check if it's an error response + if hasattr(result, 'error') and result.error: + error_msg = result.error.message if hasattr(result.error, 'message') else str(result.error) + return { + "content": { + "text": f"Error: {error_msg}", + "type": "text" + } + } + + # Check if it's a success response with result + if hasattr(result, 'result'): + msg = result.result + if hasattr(msg, 'parts') and msg.parts: + response_text = "" + for part in msg.parts: + part_obj = part.root if hasattr(part, 'root') else part + if hasattr(part_obj, 'text'): + response_text = part_obj.text + break + + return { + "content": { + "text": response_text, + "type": "text" + } + } + + return { + "content": { + "text": "No response received", + "type": "text" + } + } + + except Exception as e: + print(f"❌ Error sending message: {e}") + import traceback + traceback.print_exc() + return { + "content": { + "text": f"Error: {str(e)}", + "type": "text" + } + } + + def get_metadata(self) -> Dict[str, Any]: + """Get AgentCard metadata + + Returns: + AgentCard as dict + """ + return { + "agent_id": self.agent_id, + "name": self.agent_name, + "url": self.agent_card.url, + "description": self.description, + "capabilities": self.capabilities_list + } + + async def start_server(self, host: str, port: int): + """Start A2A server + + Args: + host: Host to bind to + port: Port to listen on + """ + if not self.server_app: + raise RuntimeError("Server not initialized. Call set_incoming_handler first.") + + print(f"Starting A2A server on {host}:{port}") + + # Build Starlette app + app = self.server_app.build() + + # Run with uvicorn + config = uvicorn.Config(app, host=host, port=port, log_level="info") + server = uvicorn.Server(config) + await server.serve() + + def get_protocol_name(self) -> str: + """Return protocol identifier""" + return "a2a" \ No newline at end of file diff --git a/nanda_core/protocols/base.py b/nanda_core/protocols/base.py new file mode 100644 index 0000000..07ae232 --- /dev/null +++ b/nanda_core/protocols/base.py @@ -0,0 +1,52 @@ +from abc import ABC, abstractmethod +from typing import Callable, Dict, Any + +class AgentProtocol(ABC): + """Base protocol interface for agent communication""" + + @abstractmethod + async def send_message(self, target_url: str, message: Dict[str, Any]) -> Dict[str, Any]: + """Send message to target agent + + Args: + target_url: Target agent's endpoint URL + message: Message dict with 'content', 'conversation_id', etc. + + Returns: + Response dict from target agent + """ + pass + + @abstractmethod + def set_incoming_handler(self, handler: Callable): + """Set callback for incoming messages + + Args: + handler: Async function that processes incoming messages + Should accept message dict and return response dict + """ + pass + + @abstractmethod + def get_metadata(self) -> Dict[str, Any]: + """Get protocol-specific agent metadata (AgentCard, etc.) + + Returns: + Metadata dict for agent discovery + """ + pass + + @abstractmethod + async def start_server(self, host: str, port: int): + """Start protocol server + + Args: + host: Host to bind to + port: Port to listen on + """ + pass + + @abstractmethod + def get_protocol_name(self) -> str: + """Return protocol identifier (a2a, slim, etc.)""" + pass \ No newline at end of file diff --git a/nanda_core/protocols/router.py b/nanda_core/protocols/router.py new file mode 100644 index 0000000..703155f --- /dev/null +++ b/nanda_core/protocols/router.py @@ -0,0 +1,106 @@ +from typing import Dict, List, Optional +from .base import AgentProtocol +import asyncio + +class ProtocolRouter: + """Manages multiple protocol adapters and routes messages""" + + def __init__(self): + self.protocols: Dict[str, AgentProtocol] = {} + self.default_protocol: Optional[str] = None + + def register(self, protocol: AgentProtocol): + """Register a protocol adapter + + Args: + protocol: Protocol adapter instance + """ + name = protocol.get_protocol_name() + self.protocols[name] = protocol + + # First registered protocol becomes default + if self.default_protocol is None: + self.default_protocol = name + + print(f"Registered protocol: {name}") + + def get_protocol(self, name: str) -> Optional[AgentProtocol]: + """Get protocol by name + + Args: + name: Protocol name (a2a, slim, etc.) + + Returns: + Protocol adapter or None + """ + return self.protocols.get(name) + + def get_all_protocols(self) -> List[str]: + """Get list of all registered protocol names""" + return list(self.protocols.keys()) + + async def send(self, protocol_name: str, target_url: str, message: dict) -> dict: + """Send message using specified protocol + + Args: + protocol_name: Protocol to use (a2a, slim) + target_url: Target agent endpoint + message: Message to send + + Returns: + Response from target agent + + Raises: + ValueError: If protocol not registered + """ + protocol = self.protocols.get(protocol_name) + if not protocol: + raise ValueError(f"Protocol '{protocol_name}' not registered. " + f"Available: {list(self.protocols.keys())}") + + return await protocol.send_message(target_url, message) + + def select_protocol(self, supported_protocols: List[str]) -> str: + """Select best available protocol from supported list + + Args: + supported_protocols: List of protocols target agent supports + + Returns: + Selected protocol name + """ + if "slim" in supported_protocols and "slim" in self.protocols: + return "slim" + # Try to find first match from supported list + for proto in supported_protocols: + if proto in self.protocols: + return proto + + # Fallback to default if no match + if self.default_protocol: + return self.default_protocol + + # Last resort: use first registered protocol + if self.protocols: + return list(self.protocols.keys())[0] + + raise ValueError("No protocols registered") + + async def start_all_servers(self, host: str, port: int): + """Start all registered protocol servers concurrently""" + + async def start_protocol(name, protocol): + try: + print(f"🚀 Starting {name} protocol...") + await protocol.start_server(host, port) + except Exception as e: + print(f"❌ Error starting {name}: {e}") + + # Start all protocols as background tasks + tasks = [ + asyncio.create_task(start_protocol(name, protocol)) + for name, protocol in self.protocols.items() + ] + + # Wait for all to start (they run forever) + await asyncio.gather(*tasks) \ No newline at end of file diff --git a/nanda_core/protocols/slim/__init__.py b/nanda_core/protocols/slim/__init__.py new file mode 100644 index 0000000..00c1b34 --- /dev/null +++ b/nanda_core/protocols/slim/__init__.py @@ -0,0 +1,3 @@ +from .adapter import SLIMProtocol + +__all__ = ['SLIMProtocol'] diff --git a/nanda_core/protocols/slim/adapter.py b/nanda_core/protocols/slim/adapter.py new file mode 100644 index 0000000..2476ca4 --- /dev/null +++ b/nanda_core/protocols/slim/adapter.py @@ -0,0 +1,312 @@ +""" +SLIM Protocol Adapter - Secure Low-latency Interactive Messaging + +Provides gRPC-based messaging with persistent connections. +""" +from __future__ import annotations +import asyncio +import json +import logging +from typing import Callable, Dict, Any, Optional +import grpc + +logger = logging.getLogger(__name__) + +try: + # Import SLIM SDK components + from slim_bindings.slim import Slim, PyName, PyIdentityProvider, PyIdentityVerifier + from slim_bindings.session import PySession, PySessionConfiguration, PyMessageContext + SLIM_AVAILABLE = True +except ImportError: + SLIM_AVAILABLE = False + logger.warning("slim-bindings not available. Install with: pip install slim-bindings") + +from ..base import AgentProtocol + + +class SLIMProtocol(AgentProtocol): + """SLIM Protocol adapter for gRPC-based agent communication""" + + def __init__(self, agent_id: str, slim_node_url: str, agent_name: str = None): + """Initialize SLIM protocol adapter + + Args: + agent_id: Unique agent identifier + slim_node_url: SLIM node gRPC endpoint (e.g., grpc://localhost:50051) + agent_name: Optional agent display name + """ + if not SLIM_AVAILABLE: + raise ImportError("slim-bindings not installed. Run: pip install slim-bindings") + + self.agent_id = agent_id + # Ensure URL has http:// prefix (required by SLIM SDK) + if not slim_node_url.startswith('http://') and not slim_node_url.startswith('https://'): + slim_node_url = f'http://{slim_node_url}' + self.slim_node_url = slim_node_url.replace('grpc://', 'http://') + self.agent_name = agent_name or agent_id + + # Message handler (set by bridge) + self.incoming_handler: Optional[Callable] = None + + # SLIM client + self.slim: Optional[Slim] = None + self.session: Optional[PySession] = None + self.stream_task: Optional[asyncio.Task] = None + self.running = False + + # Personal inbox channel name + self.inbox_channel = f"agent-{agent_id}-inbox" + + print(f"SLIM protocol initialized for {agent_id} at {self.slim_node_url}") + + def set_incoming_handler(self, handler: Callable): + """Set callback for incoming messages""" + self.incoming_handler = handler + print(f"Incoming handler set for SLIM protocol") + + async def connect(self): + """Connect to SLIM node and subscribe to inbox""" + try: + # Connect to SLIM node + print(f"Connecting to SLIM node at {self.slim_node_url}...") + + # Create PyName + name = PyName("agntcy", "nanda", self.agent_id) + + # Create identity provider and verifier + provider = PyIdentityProvider.SharedSecret( + identity=self.agent_id, + shared_secret="nanda-dev-secret" # TODO: Use env var + ) + verifier = PyIdentityVerifier.SharedSecret( + identity=self.agent_id, + shared_secret="nanda-dev-secret" # TODO: Use env var + ) + + # Initialize Slim + self.slim = await Slim.new(name, provider, verifier) + print(f"✅ Slim instance created with ID: {self.slim.id}") + + # Connect to SLIM node + conn_id = await self.slim.connect({"endpoint": self.slim_node_url, "tls": {"insecure": True}}) + print(f"✅ Connected to SLIM node (connection ID: {conn_id})") + + # Start listening for incoming sessions + self.running = True + self.stream_task = asyncio.create_task(self._listen_for_messages()) + + except Exception as e: + logger.error(f"❌ Failed to connect to SLIM node: {e}") + import traceback + traceback.print_exc() + raise + + async def _listen_for_messages(self): + """Listen for incoming sessions""" + print(f"👂 Listening for incoming SLIM sessions") + + try: + while self.running: + try: + print(f"⏳ Waiting for incoming session...") + + # Wait for incoming session + session = await self.slim.listen_for_session() + print(f"📬 Got incoming session: {session.id}") + + # Handle session in background task + asyncio.create_task(self._handle_session(session)) + + except Exception as e: + logger.error(f"Error in session listener: {e}") + import traceback + traceback.print_exc() + await asyncio.sleep(1) + + except asyncio.CancelledError: + print("SLIM message listener cancelled") + + async def _handle_session(self, session): + """Handle a single session (can receive multiple messages)""" + try: + while True: + # Receive message + msg_ctx, payload_bytes = await session.get_message() + print(f"📥 SLIM: Received {len(payload_bytes)} bytes") + + # Process message + await self._process_incoming_message(session, msg_ctx, payload_bytes) + + except Exception as e: + print(f"Session {session.id} ended: {e}") + + async def _process_incoming_message(self, session, msg_ctx, payload_bytes: bytes): + """Process incoming SLIM message and route to handler""" + try: + # Decode payload + try: + payload_str = payload_bytes.decode('utf-8') + payload_data = json.loads(payload_str) + except (UnicodeDecodeError, json.JSONDecodeError): + payload_str = payload_bytes.decode('utf-8', errors='replace') + payload_data = {"text": payload_str} + + # Convert to standard NANDA message format + message_dict = { + "content": { + "text": payload_data.get("text", payload_str), + "type": "text" + }, + "conversation_id": "slim-conversation", + "metadata": { + "protocol": "slim", + "session_id": session.id + } + } + + print(f"📥 SLIM message: {message_dict['content']['text'][:100]}") + + # Call handler + if self.incoming_handler: + response = await self.incoming_handler(message_dict) + print(f"🤖 Agent response: {response.get('content', {}).get('text', '')[:100]}...") # <-- ADD THIS + + # Send response back + if response: + await self._send_response(session, msg_ctx, response) + + except Exception as e: + logger.error(f"❌ Error processing SLIM message: {e}") + import traceback + traceback.print_exc() + + async def _send_response(self, session, msg_ctx, response: Dict[str, Any]): + """Send response back using publish_to""" + try: + response_text = response.get("content", {}).get("text", "") + + response_payload = { + "text": response_text, + "type": "response" + } + + await session.publish_to(msg_ctx, json.dumps(response_payload).encode('utf-8')) + print(f"📤 SLIM: Sent response") + + except Exception as e: + logger.error(f"Error sending SLIM response: {e}") + + async def send_message(self, target_url: str, message: Dict[str, Any]) -> Dict[str, Any]: + """Send message via SLIM protocol""" + try: + if not self.slim: + raise RuntimeError("SLIM not connected") + + # Extract target agent ID + target_agent_id = target_url.replace('slim://', '').split('/')[0] + content_text = message.get("content", {}).get("text", "") + + print(f"📤 SLIM: Sending to {target_agent_id}: {content_text[:50]}...") + + # Create message payload + payload = {"text": content_text, "type": "message"} + + # Set route to target + target_name = PyName("agntcy", "nanda", target_agent_id) + await self.slim.set_route(target_name) + + # Create PointToPoint session + from datetime import timedelta + config = PySessionConfiguration.PointToPoint( + peer_name=target_name, + timeout=timedelta(seconds=5), + max_retries=5, + mls_enabled=False + ) + + session = await self.slim.create_session(config) + + # Send message + await session.publish(json.dumps(payload).encode('utf-8')) + print(f"✅ SLIM: Message sent") + + # Wait for reply + msg_ctx, reply_bytes = await session.get_message() + reply_data = json.loads(reply_bytes.decode('utf-8')) + + # Clean up + await session.delete() + + return { + "content": { + "text": reply_data.get("text", reply_bytes.decode('utf-8')), + "type": "text" + } + } + + except Exception as e: + logger.error(f"❌ Error sending SLIM message: {e}") + import traceback + traceback.print_exc() + return { + "content": { + "text": f"SLIM Error: {str(e)}", + "type": "text" + } + } + + async def handle_request(self, request): + """Handle incoming request (not used for SLIM - uses streaming)""" + pass + + def get_metadata(self) -> Dict[str, Any]: + """Get SLIM protocol metadata""" + return { + "agent_id": self.agent_id, + "name": self.agent_name, + "protocol": "slim", + "slim_node": self.slim_node_url, + "inbox_channel": self.inbox_channel, + "capabilities": ["unicast", "async_messaging"] + } + + async def start_server(self, host: str, port: int): + """Start SLIM protocol (connect and listen)""" + print(f"🚀 Starting SLIM protocol for {self.agent_id}") + + # Connect to SLIM node + await self.connect() + + print(f"✅ SLIM ready - listening on {self.inbox_channel}") + + # Keep running (stream_task handles incoming messages) + try: + while self.running: + await asyncio.sleep(1) + except asyncio.CancelledError: + print("SLIM protocol stopped") + finally: + await self.close() + + async def close(self): + """Close SLIM connection and cleanup""" + print(f"🛑 Closing SLIM connection for {self.agent_id}") + + self.running = False + + if self.stream_task: + self.stream_task.cancel() + try: + await self.stream_task + except asyncio.CancelledError: + pass + + if self.slim: + try: + await self.slim.disconnect(self.slim_node_url) + except Exception as e: + logger.error(f"Error disconnecting SLIM: {e}") + + def get_protocol_name(self) -> str: + """Return protocol identifier""" + return "slim" \ No newline at end of file diff --git a/nanda_core/telemetry/health_monitor.py b/nanda_core/telemetry/health_monitor.py index 47d561c..eac8630 100644 --- a/nanda_core/telemetry/health_monitor.py +++ b/nanda_core/telemetry/health_monitor.py @@ -345,7 +345,7 @@ def _get_registry_url(self) -> str: return f.read().strip() except: pass - return "https://chat.nanda-registry.com:6900" + return "https://registry.chat39.com:6900" def get_health_history(self, check_name: str, hours: int = 24) -> List[HealthCheck]: """Get health check history""" diff --git a/setup.py b/setup.py index c2afdd5..103aebd 100644 --- a/setup.py +++ b/setup.py @@ -10,14 +10,17 @@ def read_requirements(): """Read requirements from file""" requirements = [ - "flask", - "anthropic", - "requests", - "python-a2a==0.5.6", - "mcp", - "python-dotenv", - "flask-cors", - "psutil" # For system monitoring + "anthropic>=0.18.0", + "a2a-sdk>=0.2.0", + "httpx>=0.27.0", + "uvicorn>=0.30.0", + "starlette>=0.37.0", + "python-dotenv>=1.0.0", + "psutil>=5.9.0", + # SLIM support + "slim-bindings>=0.1.0", # Add this + "grpcio>=1.60.0", # Add this + "protobuf>=4.25.0", # Add this ] return requirements diff --git a/slim b/slim new file mode 160000 index 0000000..6301ced --- /dev/null +++ b/slim @@ -0,0 +1 @@ +Subproject commit 6301ced85073c028898d60eb620dacb0cff6afbf diff --git a/test_receiver.py b/test_receiver.py new file mode 100644 index 0000000..acf1551 --- /dev/null +++ b/test_receiver.py @@ -0,0 +1,30 @@ +import asyncio +from slim_bindings.slim import Slim, PyName, PyIdentityProvider, PyIdentityVerifier + +async def main(): + print("=== RECEIVER ===") + name = PyName("agntcy", "nanda", "receiver-agent") + provider = PyIdentityProvider.SharedSecret(identity="receiver", shared_secret="test-secret") + verifier = PyIdentityVerifier.SharedSecret(identity="receiver", shared_secret="test-secret") + + slim = await Slim.new(name, provider, verifier) + conn_id = await slim.connect({"endpoint": "http://localhost:46357", "tls": {"insecure": True}}) + print(f"✅ Connected: {conn_id}, ID: {slim.id}") + + print("⏳ Waiting for incoming session...") + session = await slim.listen_for_session() + print(f"✅ Got session: {session.id}") + + # Use get_message() instead of recv() + msg_ctx, payload = await session.get_message() + print(f"📥 Received: {payload.decode('utf-8')}") + + # Reply back + reply = f"Echo: {payload.decode()} from receiver" + await session.publish_to(msg_ctx, reply.encode('utf-8')) + print(f"📤 Sent reply: {reply}") + + await slim.disconnect("http://localhost:46357") + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/test_sender.py b/test_sender.py new file mode 100644 index 0000000..93539b5 --- /dev/null +++ b/test_sender.py @@ -0,0 +1,45 @@ +import asyncio +from datetime import timedelta +from slim_bindings.slim import Slim, PyName, PyIdentityProvider, PyIdentityVerifier +from slim_bindings.session import PySessionConfiguration + +async def main(): + print("=== SENDER ===") + name = PyName("agntcy", "nanda", "sender-agent") + provider = PyIdentityProvider.SharedSecret(identity="sender", shared_secret="test-secret") + verifier = PyIdentityVerifier.SharedSecret(identity="sender", shared_secret="test-secret") + + slim = await Slim.new(name, provider, verifier) + conn_id = await slim.connect({"endpoint": "http://localhost:46357", "tls": {"insecure": True}}) + print(f"✅ Connected: {conn_id}, ID: {slim.id}") + + # CRITICAL: Set route before creating session + peer_name = PyName("agntcy", "nanda", "receiver-agent") + await slim.set_route(peer_name) + print(f"✅ Route set to receiver-agent") + + # Create PointToPoint session + config = PySessionConfiguration.PointToPoint( + peer_name=peer_name, + timeout=timedelta(seconds=5), + max_retries=5, + mls_enabled=False + ) + + session = await slim.create_session(config) + print(f"✅ Session created: {session.id}") + + # Send message + message = "Hello from sender!" + await session.publish(message.encode('utf-8')) + print(f"📤 Sent: {message}") + + # Wait for reply + msg_ctx, reply = await session.get_message() + print(f"📥 Received reply: {reply.decode('utf-8')}") + + await session.delete() + await slim.disconnect("http://localhost:46357") + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file