diff --git a/examples/nanda_agent.py b/examples/nanda_agent.py index 847799f..e9cd833 100644 --- a/examples/nanda_agent.py +++ b/examples/nanda_agent.py @@ -1,72 +1,57 @@ +# examples/nanda_agent.py #!/usr/bin/env python3 """ -LLM-Powered Modular NANDA Agent +LLM-Powered NANDA Agent -This agent uses Anthropic Claude for intelligent responses based on configurable personality and expertise. -Simply update the AGENT_CONFIG section to create different agent personalities. +Configurable agent using Anthropic Claude for intelligent responses. +Customize via environment variables or the config dict. """ import os import sys -import time +import asyncio import uuid from datetime import datetime -from typing import Dict, List, Any -# Add the parent directory to the path to allow importing streamlined_adapter sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) from nanda_core.core.adapter import NANDA -# Try to import Anthropic - will fail gracefully if not available +from dotenv import load_dotenv +load_dotenv() + try: from anthropic import Anthropic ANTHROPIC_AVAILABLE = True except ImportError: ANTHROPIC_AVAILABLE = False - print("⚠️ Warning: anthropic library not available. Install with: pip install anthropic") + print("⚠️ anthropic library not available. Install with: pip install anthropic") # ============================================================================= -# AGENT CONFIGURATION - Customize this section for different agents +# CONFIGURATION # ============================================================================= -# Get configuration from environment variables or use defaults -def get_agent_config(): - """Load agent configuration from environment variables or use defaults""" +def get_config(): + """Load configuration from environment variables""" + base_id = os.getenv("AGENT_ID", "helpful-agent") + agent_id = f"{base_id}-{uuid.uuid4().hex[:6]}" if '-' not in base_id else base_id - # Generate agent_id with hex suffix for uniqueness - base_agent_id = os.getenv("AGENT_ID", "helpful-ubuntu-agent") - if not base_agent_id.endswith('-') and '-' not in base_agent_id.split('-')[-1]: - # Add 6-character hex suffix if not already present - hex_suffix = uuid.uuid4().hex[:6] - agent_id = f"{base_agent_id}-{hex_suffix}" - else: - agent_id = base_agent_id + capabilities = os.getenv("AGENT_CAPABILITIES", "general assistance,conversation") + capabilities_list = [cap.strip() for cap in capabilities.split(",")] - print(f"Generated agent_id: {agent_id}") - agent_name = os.getenv("AGENT_NAME", "Ubuntu Helper") + agent_name = os.getenv("AGENT_NAME", "Helper Agent") domain = os.getenv("AGENT_DOMAIN", "general assistance") - specialization = os.getenv("AGENT_SPECIALIZATION", "helpful and friendly AI assistant") - description = os.getenv("AGENT_DESCRIPTION", "I am a helpful AI assistant specializing in general tasks and Ubuntu system administration.") - capabilities = os.getenv("AGENT_CAPABILITIES", "general assistance,Ubuntu system administration,Python development,cloud deployment,agent-to-agent communication") - registry_url = os.getenv("REGISTRY_URL", None) - public_url = os.getenv("PUBLIC_URL", None) - - # Parse capabilities into a list - expertise_list = [cap.strip() for cap in capabilities.split(",")] + specialization = os.getenv("AGENT_SPECIALIZATION", "helpful AI assistant") + description = os.getenv("AGENT_DESCRIPTION", "I'm a helpful AI assistant.") - # Create dynamic system prompt based on configuration - system_prompt = f"""You are {agent_name}, a {specialization} working in the domain of {domain}. + system_prompt = f"""You are {agent_name}, a {specialization} in {domain}. {description} -You are part of the NANDA (Network of Autonomous Distributed Agents) system. You can communicate with other agents and help users with various tasks. +You are part of the NANDA agent network and can communicate with other agents using @agent-id syntax. -Your capabilities include: -{chr(10).join([f"- {cap}" for cap in expertise_list])} +Your capabilities: {', '.join(capabilities_list)} -Always be helpful, accurate, and concise in your responses. If you're unsure about something, say so honestly. You can also help with basic calculations, provide time information, and engage in casual conversation. - -When someone asks about yourself, mention that you're part of the NANDA agent network and can communicate with other agents using the @agent_name syntax.""" +Be helpful, accurate, and concise.""" return { "agent_id": agent_id, @@ -74,200 +59,98 @@ def get_agent_config(): "domain": domain, "specialization": specialization, "description": description, - "expertise": expertise_list, - "registry_url": registry_url, - "public_url": public_url, - "system_prompt": system_prompt, + "capabilities": capabilities_list, + "port": int(os.getenv("PORT", "6000")), + "registry_url": os.getenv("REGISTRY_URL"), + "public_url": os.getenv("PUBLIC_URL") or f"http://localhost:{os.getenv('PORT', '6000')}", "anthropic_api_key": os.getenv("ANTHROPIC_API_KEY"), - "model": "claude-3-haiku-20240307" # Fast and cost-effective model + "model": os.getenv("ANTHROPIC_MODEL", "claude-3-haiku-20240307"), + "system_prompt": system_prompt } -# Load configuration -AGENT_CONFIG = get_agent_config() - -# Port configuration - use environment variable or default to 6000 -PORT = int(os.getenv("PORT", "6000")) - # ============================================================================= -# LLM-POWERED AGENT LOGIC - Uses Anthropic Claude for intelligent responses +# AGENT LOGIC # ============================================================================= -def create_llm_agent_logic(config: Dict[str, Any]): - """ - Creates an LLM-powered agent logic function based on the provided configuration. - Uses Anthropic Claude for intelligent, context-aware responses. - """ +def create_agent_logic(config): + """Create agent logic function with LLM or fallback""" # Initialize Anthropic client - anthropic_client = None + client = None if ANTHROPIC_AVAILABLE and config.get("anthropic_api_key"): try: - anthropic_client = Anthropic(api_key=config["anthropic_api_key"]) - print(f"✅ Anthropic Claude initialized for {config['agent_name']}") + client = Anthropic(api_key=config["anthropic_api_key"]) + print(f"✅ Claude initialized ({config['model']})") except Exception as e: - print(f"❌ Failed to initialize Anthropic: {e}") - anthropic_client = None - - # Prepare system prompt (already formatted in get_agent_config) - system_prompt = config["system_prompt"] + print(f"❌ Claude initialization failed: {e}") - def llm_agent_logic(message: str, conversation_id: str) -> str: - """LLM-powered agent logic with fallback to basic responses""" + def agent_logic(message: str, conversation_id: str) -> str: + """Process message with Claude or fallback""" - # If LLM is available, use it for intelligent responses - if anthropic_client: + if client: try: - # Add current time context if time-related query - context_info = "" - if any(time_word in message.lower() for time_word in ['time', 'date', 'when']): - context_info = f"\n\nCurrent time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" + # Add time context if relevant + context = "" + if any(word in message.lower() for word in ['time', 'date', 'when']): + context = f"\n\nCurrent time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" - response = anthropic_client.messages.create( + response = client.messages.create( model=config["model"], max_tokens=500, - system=system_prompt + context_info, - messages=[ - { - "role": "user", - "content": message - } - ] + system=config["system_prompt"] + context, + messages=[{"role": "user", "content": message}] ) return response.content[0].text.strip() except Exception as e: - print(f"❌ LLM Error: {e}") - # Fall back to basic response - return f"Sorry, I'm having trouble processing that right now. Error: {str(e)}" + return f"Error: {str(e)}" - # Fallback to basic responses if LLM not available + # Fallback responses + msg = message.lower().strip() + if any(greeting in msg for greeting in ['hello', 'hi', 'hey']): + return f"Hello! I'm {config['agent_name']}. Set ANTHROPIC_API_KEY for full capabilities." + elif 'time' in msg: + return f"Current time: {datetime.now().strftime('%H:%M:%S')}" else: - return _basic_fallback_response(message, config) - - return llm_agent_logic - -def _basic_fallback_response(message: str, config: Dict[str, Any]) -> str: - """Basic fallback responses when LLM is not available""" - msg = message.lower().strip() - - # Handle greetings - if any(greeting in msg for greeting in ['hello', 'hi', 'hey']): - return f"Hello! I'm {config['agent_name']}, but I need an Anthropic API key to provide intelligent responses. Please set ANTHROPIC_API_KEY environment variable." + return f"I'm {config['agent_name']}. Set ANTHROPIC_API_KEY to enable LLM responses." - # Handle time requests - elif 'time' in msg: - current_time = datetime.now().strftime("%H:%M:%S") - return f"The current time is {current_time}." - - # Handle basic calculations - elif any(op in message for op in ['+', '-', '*', '/', '=']): - try: - calculation = message.replace('x', '*').replace('X', '*').replace('=', '').strip() - result = eval(calculation) - return f"Calculation result: {calculation} = {result}" - except: - return "Sorry, I couldn't calculate that. Please check your expression." - - # Default fallback - else: - return f"I'm {config['agent_name']}, but I need an Anthropic API key to provide intelligent responses. Please set ANTHROPIC_API_KEY environment variable and restart me." + return agent_logic # ============================================================================= -# MAIN EXECUTION +# MAIN # ============================================================================= -def main(): - """Main function to start the LLM-powered modular agent""" - print(f"🤖 Starting {AGENT_CONFIG['agent_name']}") - print(f"📝 Specialization: {AGENT_CONFIG['specialization']}") - print(f"🎯 Domain: {AGENT_CONFIG['domain']}") - print(f"🛠️ Capabilities: {', '.join(AGENT_CONFIG['expertise'])}") - if AGENT_CONFIG['registry_url']: - print(f"🌐 Registry: {AGENT_CONFIG['registry_url']}") +async def main(): + config = get_config() - # Check for Anthropic API key - if not AGENT_CONFIG.get("anthropic_api_key"): - print("⚠️ Warning: ANTHROPIC_API_KEY not found in environment variables") - print(" The agent will use basic fallback responses only") - print(" Set ANTHROPIC_API_KEY to enable LLM capabilities") - else: - print(f"🧠 LLM Model: {AGENT_CONFIG['model']}") + print(f"🤖 {config['agent_name']} ({config['agent_id']})") + print(f"🎯 {config['domain']} - {config['specialization']}") + print(f"🔗 {config['public_url']}/a2a") + if config['registry_url']: + print(f"🌐 Registry: {config['registry_url']}") - # Create the LLM-powered agent logic based on configuration - agent_logic = create_llm_agent_logic(AGENT_CONFIG) + agent_logic = create_agent_logic(config) - # Create and start the NANDA agent nanda = NANDA( - agent_id=AGENT_CONFIG["agent_id"], + agent_id=config["agent_id"], agent_logic=agent_logic, - port=PORT, - registry_url=AGENT_CONFIG["registry_url"], - public_url=AGENT_CONFIG["public_url"], - enable_telemetry=True + agent_name=config["agent_name"], + domain=config["domain"], + specialization=config["specialization"], + description=config["description"], + capabilities=config["capabilities"], + port=config["port"], + registry_url=config["registry_url"], + public_url=config["public_url"], + enable_telemetry=True, + protocols={"a2a": {"enabled": True}} ) - print(f"🚀 Agent URL: http://localhost:{PORT}/a2a") - print("💡 Try these messages:") - print(" - 'Hello there'") - print(" - 'Tell me about yourself'") - print(" - 'What time is it?'") - print(" - 'How can you help with Ubuntu?'") - print(" - 'Explain Python virtual environments'") - print(" - '5 + 3'") - print("\n🛑 Press Ctrl+C to stop") - - # Start the agent - nanda.start() - -def create_custom_agent(agent_name, specialization, domain, expertise_list, port=6000, anthropic_api_key=None, registry_url=None): - """ - Helper function to quickly create a custom LLM-powered agent with different config - - Example usage: - create_custom_agent( - agent_name="Data Scientist", - specialization="analytical and precise AI assistant", - domain="data science", - expertise_list=["data analysis", "statistics", "machine learning", "Python"], - port=6001, - anthropic_api_key="sk-ant-xxxxx" - ) - """ - custom_config = AGENT_CONFIG.copy() - custom_config.update({ - "agent_id": agent_name.lower().replace(" ", "-"), - "agent_name": agent_name, - "specialization": specialization, - "domain": domain, - "expertise": expertise_list, - "registry_url": registry_url, - "anthropic_api_key": anthropic_api_key or os.getenv("ANTHROPIC_API_KEY"), - "system_prompt": f"""You are {agent_name}, a {specialization} working in the domain of {domain}. - -You are part of the NANDA (Network of Autonomous Distributed Agents) system. You can communicate with other agents and help users with various tasks. - -Your capabilities include: -{chr(10).join([f"- {expertise}" for expertise in expertise_list])} - -Always be helpful, accurate, and concise in your responses. If you're unsure about something, say so honestly. - -When someone asks about yourself, mention that you're part of the NANDA agent network and can communicate with other agents using the @agent_name syntax.""" - }) - - agent_logic = create_llm_agent_logic(custom_config) - - nanda = NANDA( - agent_id=custom_config["agent_id"], - agent_logic=agent_logic, - port=port, - registry_url=custom_config["registry_url"], - enable_telemetry=True - ) + print("\n💬 Try: 'Hello', 'What time is it?', '@other-agent Hello!'") + print("🛑 Press Ctrl+C to stop\n") - print(f"🤖 Starting custom LLM agent: {agent_name}") - print(f"🚀 Agent URL: http://localhost:{port}/a2a") - nanda.start() + await nanda.start() if __name__ == "__main__": - main() + asyncio.run(main()) \ No newline at end of file diff --git a/nanda_core/__init__.py b/nanda_core/__init__.py index b6561ca..b287102 100644 --- a/nanda_core/__init__.py +++ b/nanda_core/__init__.py @@ -4,9 +4,9 @@ """ from .core.adapter import NANDA -from .core.agent_bridge import SimpleAgentBridge +from .core.agent_bridge import AgentBridge __all__ = [ "NANDA", - "SimpleAgentBridge" + "AgentBridge" ] \ No newline at end of file diff --git a/nanda_core/core/__init__.py b/nanda_core/core/__init__.py index f0d8570..cf59933 100644 --- a/nanda_core/core/__init__.py +++ b/nanda_core/core/__init__.py @@ -4,9 +4,9 @@ """ from .adapter import NANDA -from .agent_bridge import SimpleAgentBridge +from .agent_bridge import AgentBridge __all__ = [ "NANDA", - "SimpleAgentBridge" + "AgentBridge" ] \ No newline at end of file diff --git a/nanda_core/core/adapter.py b/nanda_core/core/adapter.py index b98892c..77a453a 100644 --- a/nanda_core/core/adapter.py +++ b/nanda_core/core/adapter.py @@ -1,4 +1,3 @@ -#!/usr/bin/env python3 """ Simple NANDA Adapter - Clean Agent-to-Agent Communication @@ -7,10 +6,13 @@ """ import os +import asyncio import requests from typing import Optional, Callable -from python_a2a import run_server -from .agent_bridge import SimpleAgentBridge +from .agent_bridge import AgentBridge +from .registry_client import RegistryClient +from ..protocols.router import ProtocolRouter +from ..protocols.a2a.protocol import A2AProtocol class NANDA: @@ -19,30 +21,48 @@ class NANDA: def __init__(self, agent_id: str, agent_logic: Callable[[str, str], str], + agent_name: Optional[str] = None, + domain: Optional[str] = None, + specialization: Optional[str] = None, + description: Optional[str] = None, + capabilities: Optional[list] = None, port: int = 6000, registry_url: Optional[str] = None, public_url: Optional[str] = None, host: str = "0.0.0.0", - enable_telemetry: bool = True): + enable_telemetry: bool = True, + protocols: Optional[dict] = None): """ Create a simple NANDA agent Args: agent_id: Unique agent identifier agent_logic: Function that takes (message: str, conversation_id: str) -> response: str + agent_name: Display name (defaults to agent_id) + domain: Agent's domain of expertise + specialization: Agent's specialization + description: Agent description + capabilities: List of capabilities (default: ["text"]) port: Port to run on registry_url: Optional registry URL for agent discovery public_url: Public URL for agent registration (e.g., https://yourdomain.com:6000) host: Host to bind to - enable_telemetry: Enable telemetry logging (optional) + enable_telemetry: Enable telemetry logging + protocols: Dict of protocol configs (e.g., {"a2a": {"enabled": True}}) """ self.agent_id = agent_id self.agent_logic = agent_logic + self.agent_name = agent_name or agent_id + self.domain = domain + self.specialization = specialization + self.description = description or f"AI agent {agent_id}" + self.capabilities = capabilities or ["text"] self.port = port self.registry_url = registry_url - self.public_url = public_url + self.public_url = public_url or f"http://localhost:{port}" self.host = host self.enable_telemetry = enable_telemetry + self.protocols_config = protocols or {"a2a": {"enabled": True}} # Initialize telemetry if enabled self.telemetry = None @@ -54,48 +74,116 @@ def __init__(self, except ImportError: print(f"⚠️ Telemetry requested but module not available") - # Create the bridge with optional features - self.bridge = SimpleAgentBridge( + # Initialize protocol router + self.router = ProtocolRouter() + + # Initialize and register protocols + self._initialize_protocols() + + # Initialize registry client + self.registry = RegistryClient(registry_url) if registry_url else RegistryClient(None) + + # Create the bridge + self.bridge = AgentBridge( + protocol_router=self.router, + registry_client=self.registry, agent_id=agent_id, agent_logic=agent_logic, - registry_url=registry_url, telemetry=self.telemetry ) print(f"🤖 NANDA Agent '{agent_id}' created") if registry_url: print(f"🌐 Registry: {registry_url}") - if public_url: - print(f"🔗 Public URL: {public_url}") + print(f"🔗 Public URL: {self.public_url}") + print(f"🔌 Protocols: {self.router.get_all_protocols()}") + + def _initialize_protocols(self): + """Initialize and register protocol adapters based on config""" + + # Initialize A2A protocol if enabled + if self.protocols_config.get("a2a", {}).get("enabled", True): + a2a_protocol = A2AProtocol( + agent_id=self.agent_id, + agent_name=self.agent_name, + public_url=self.public_url, + domain=self.domain, + specialization=self.specialization, + description=self.description, + capabilities=self.capabilities + ) + self.router.register(a2a_protocol) + + # TODO: Add SLIM protocol when implemented + # if self.protocols_config.get("slim", {}).get("enabled"): + # slim_protocol = SLIMProtocol(...) + # self.router.register(slim_protocol) - def start(self, register: bool = True): + async def start(self, register: bool = True): """Start the agent server""" # Register with registry if provided - if register and self.registry_url and self.public_url: - self._register() + if register and self.registry_url: + await self._register() print(f"🚀 Starting agent '{self.agent_id}' on {self.host}:{self.port}") - # Start the A2A server - run_server(self.bridge, host=self.host, port=self.port) + # Start the bridge (which starts all protocol servers) + await self.bridge.run_server(self.host, self.port) - def _register(self): - """Register agent with registry""" + async def _register(self): + """Register agent with NANDA Index""" try: - data = { + agent_facts = { "agent_id": self.agent_id, - "agent_url": self.public_url + "name": self.agent_name, + "domain": self.domain, + "specialization": self.specialization, + "description": self.description, + "capabilities": self.capabilities, + "url": self.public_url, + "agent_url": self.public_url, # For backward compatibility + "supported_protocols": self.router.get_all_protocols(), + "endpoints": self._get_endpoints() } - response = requests.post(f"{self.registry_url}/register", json=data, timeout=10) - if response.status_code == 200: - print(f"✅ Agent '{self.agent_id}' registered successfully") - else: - print(f"⚠️ Failed to register agent: HTTP {response.status_code}") + + await self.registry.register(agent_facts) + print(f"✅ Agent '{self.agent_id}' registered successfully") + except Exception as e: print(f"⚠️ Registration error: {e}") + def _get_endpoints(self) -> dict: + """Get endpoints for all registered protocols + + Returns: + Dict mapping protocol names to endpoint URLs + """ + endpoints = {} + for protocol_name in self.router.get_all_protocols(): + if protocol_name == "a2a": + endpoints["a2a"] = f"{self.public_url}/a2a" + elif protocol_name == "slim": + endpoints["slim"] = f"grpc://{self.public_url}:50051" + + return endpoints + def stop(self): - """Stop the agent and cleanup telemetry""" + """Stop the agent and cleanup resources""" + # Cleanup protocol resources + try: + try: + # Try to get the currently running event loop + loop = asyncio.get_running_loop() + # If we're already in an event loop, schedule the cleanup + asyncio.create_task(self.router.cleanup_all()) + except RuntimeError: + # No event loop running, create one and run cleanup + asyncio.run(self.router.cleanup_all()) + except Exception as e: + print(f"⚠️ Error during protocol cleanup: {e}") + + # Cleanup telemetry if self.telemetry: self.telemetry.stop() + print(f"🛑 Stopping agent '{self.agent_id}'") \ No newline at end of file diff --git a/nanda_core/core/adapter_copy.py b/nanda_core/core/adapter_copy.py new file mode 100644 index 0000000..b98892c --- /dev/null +++ b/nanda_core/core/adapter_copy.py @@ -0,0 +1,101 @@ +#!/usr/bin/env python3 +""" +Simple NANDA Adapter - Clean Agent-to-Agent Communication + +Simple, clean adapter focused on A2A communication without complexity. +8-10 lines to deploy an agent. +""" + +import os +import requests +from typing import Optional, Callable +from python_a2a import run_server +from .agent_bridge import SimpleAgentBridge + + +class NANDA: + """Simple NANDA class for clean agent deployment""" + + def __init__(self, + agent_id: str, + agent_logic: Callable[[str, str], str], + port: int = 6000, + registry_url: Optional[str] = None, + public_url: Optional[str] = None, + host: str = "0.0.0.0", + enable_telemetry: bool = True): + """ + Create a simple NANDA agent + + Args: + agent_id: Unique agent identifier + agent_logic: Function that takes (message: str, conversation_id: str) -> response: str + port: Port to run on + registry_url: Optional registry URL for agent discovery + public_url: Public URL for agent registration (e.g., https://yourdomain.com:6000) + host: Host to bind to + enable_telemetry: Enable telemetry logging (optional) + """ + self.agent_id = agent_id + self.agent_logic = agent_logic + self.port = port + self.registry_url = registry_url + self.public_url = public_url + self.host = host + self.enable_telemetry = enable_telemetry + + # Initialize telemetry if enabled + self.telemetry = None + if enable_telemetry: + try: + from ..telemetry.telemetry_system import TelemetrySystem + self.telemetry = TelemetrySystem(agent_id) + print(f"📊 Telemetry enabled for {agent_id}") + except ImportError: + print(f"⚠️ Telemetry requested but module not available") + + # Create the bridge with optional features + self.bridge = SimpleAgentBridge( + agent_id=agent_id, + agent_logic=agent_logic, + registry_url=registry_url, + telemetry=self.telemetry + ) + + print(f"🤖 NANDA Agent '{agent_id}' created") + if registry_url: + print(f"🌐 Registry: {registry_url}") + if public_url: + print(f"🔗 Public URL: {public_url}") + + def start(self, register: bool = True): + """Start the agent server""" + # Register with registry if provided + if register and self.registry_url and self.public_url: + self._register() + + print(f"🚀 Starting agent '{self.agent_id}' on {self.host}:{self.port}") + + # Start the A2A server + run_server(self.bridge, host=self.host, port=self.port) + + def _register(self): + """Register agent with registry""" + try: + data = { + "agent_id": self.agent_id, + "agent_url": self.public_url + } + response = requests.post(f"{self.registry_url}/register", json=data, timeout=10) + if response.status_code == 200: + print(f"✅ Agent '{self.agent_id}' registered successfully") + else: + print(f"⚠️ Failed to register agent: HTTP {response.status_code}") + except Exception as e: + print(f"⚠️ Registration error: {e}") + + def stop(self): + """Stop the agent and cleanup telemetry""" + if self.telemetry: + self.telemetry.stop() + print(f"🛑 Stopping agent '{self.agent_id}'") \ No newline at end of file diff --git a/nanda_core/core/agent_bridge.py b/nanda_core/core/agent_bridge.py index fc1a668..028d9fa 100644 --- a/nanda_core/core/agent_bridge.py +++ b/nanda_core/core/agent_bridge.py @@ -1,145 +1,154 @@ -#!/usr/bin/env python3 """ -Simple Agent Bridge for A2A Communication +Agent Bridge for Protocol-Agnostic Communication -Clean, simple bridge focused on agent-to-agent communication. +Handles message routing between agents using any registered protocol. """ -import os +import re import uuid import logging -import requests from typing import Callable, Optional, Dict, Any -from python_a2a import A2AServer, A2AClient, Message, TextContent, MessageRole, Metadata +from ..protocols.router import ProtocolRouter +from .registry_client import RegistryClient -# Configure logger to capture conversation logs logger = logging.getLogger(__name__) -class SimpleAgentBridge(A2AServer): - """Simple Agent Bridge for A2A communication only""" +class AgentBridge: + """Protocol-agnostic agent message router and coordinator""" def __init__(self, - agent_id: str, + protocol_router: ProtocolRouter, + registry_client: RegistryClient, + agent_id: str, agent_logic: Callable[[str, str], str], - registry_url: Optional[str] = None, - telemetry = None): - super().__init__() + telemetry=None): + """Initialize agent bridge + + Args: + protocol_router: Router managing protocol adapters + registry_client: Client for NANDA Index + agent_id: This agent's unique identifier + agent_logic: Agent's business logic function(message: str, conversation_id: str) -> str + telemetry: Optional telemetry system + """ + self.router = protocol_router + self.registry = registry_client self.agent_id = agent_id self.agent_logic = agent_logic - self.registry_url = registry_url self.telemetry = telemetry - def handle_message(self, msg: Message) -> Message: - """Handle incoming messages""" - conversation_id = msg.conversation_id or str(uuid.uuid4()) + # Register this bridge as incoming handler for all protocols + for protocol_name in self.router.get_all_protocols(): + protocol = self.router.get_protocol(protocol_name) + protocol.set_incoming_handler(self.handle_message) - # Only handle text content - if not isinstance(msg.content, TextContent): - return self._create_response( - msg, conversation_id, - "Only text messages supported" - ) + logger.info(f"🌉 Bridge initialized for {agent_id} with protocols: {self.router.get_all_protocols()}") + + def extract_agent_id(self, content: str) -> Optional[str]: + """Extract @agent-id from message content""" + match = re.search(r'@([\w-]+)', content) + return match.group(1) if match else None + + def parse_incoming_agent_message(self, text: str) -> Optional[Dict[str, str]]: + """Parse incoming agent-to-agent message in format: + FROM: sender\nTO: receiver\nMESSAGE: content + """ + if not (text.startswith("FROM:") and "TO:" in text and "MESSAGE:" in text): + return None + + try: + lines = text.strip().split('\n') + result = {} + + for line in lines: + if line.startswith("FROM:"): + result['from_agent'] = line[5:].strip() + elif line.startswith("TO:"): + result['to_agent'] = line[3:].strip() + elif line.startswith("MESSAGE:"): + result['message'] = line[8:].strip() + + return result if all(k in result for k in ['from_agent', 'to_agent', 'message']) else None + except Exception as e: + logger.error(f"Error parsing agent message: {e}") + return None + + async def handle_message(self, message: Dict[str, Any]) -> Dict[str, Any]: + """Main message handling entry point - user_text = msg.content.text + Called by protocol adapters when messages arrive. + """ + content = message.get("content", {}).get("text", "") + conversation_id = message.get("conversation_id", "") or str(uuid.uuid4()) - # Check if this is an agent-to-agent message in our simple format - if user_text.startswith("FROM:") and "TO:" in user_text and "MESSAGE:" in user_text: - return self._handle_incoming_agent_message(user_text, msg, conversation_id) + # Check if this is an incoming agent-to-agent message + parsed = self.parse_incoming_agent_message(content) + if parsed: + return await self._handle_incoming_agent_message(parsed, conversation_id) - logger.info(f"📨 [{self.agent_id}] Received: {user_text}") + logger.info(f"📨 [{self.agent_id}] Received: {content}") - # Handle different message types try: - if user_text.startswith("@"): - # Agent-to-agent message (outgoing) - return self._handle_agent_message(user_text, msg, conversation_id) - elif user_text.startswith("/"): + # Check message type + if content.startswith("@"): + # Outgoing agent-to-agent message + return await self._handle_outgoing_agent_message(content, conversation_id) + elif content.startswith("/"): # System command - return self._handle_command(user_text, msg, conversation_id) + return await self._handle_command(content, conversation_id) else: # Regular message - use agent logic if self.telemetry: self.telemetry.log_message_received(self.agent_id, conversation_id) - response = self.agent_logic(user_text, conversation_id) - return self._create_response(msg, conversation_id, response) + response = self.agent_logic(content, conversation_id) + return self._create_response(response) except Exception as e: - return self._create_response( - msg, conversation_id, - f"Error: {str(e)}" - ) + logger.error(f"❌ [{self.agent_id}] Error handling message: {e}") + return self._create_response(f"Error: {str(e)}") - def _handle_incoming_agent_message(self, user_text: str, msg: Message, conversation_id: str) -> Message: + async def _handle_incoming_agent_message(self, parsed: Dict[str, str], + conversation_id: str) -> Dict[str, Any]: """Handle incoming messages from other agents""" - try: - lines = user_text.strip().split('\n') - from_agent = "" - to_agent = "" - message_content = "" - - for line in lines: - if line.startswith("FROM:"): - from_agent = line[5:].strip() - elif line.startswith("TO:"): - to_agent = line[3:].strip() - elif line.startswith("MESSAGE:"): - message_content = line[8:].strip() - - logger.info(f"📨 [{self.agent_id}] ← [{from_agent}]: {message_content}") - - # Check if this is a reply (don't respond to replies to avoid infinite loops) - if message_content.startswith("Response to "): - logger.info(f"🔄 [{self.agent_id}] Received reply from {from_agent}, displaying to user") - # Display the reply to user but don't respond back to avoid loops - return self._create_response( - msg, conversation_id, - f"[{from_agent}] {message_content[len('Response to ' + self.agent_id + ': '):]}" - ) - - # Process the message through our agent logic - if self.telemetry: - self.telemetry.log_message_received(self.agent_id, conversation_id) - - response = self.agent_logic(message_content, conversation_id) - - # Send response back - return self._create_response( - msg, conversation_id, - f"Response to {from_agent}: {response}" - ) - - except Exception as e: - logger.error(f"❌ [{self.agent_id}] Error processing incoming agent message: {e}") - return self._create_response( - msg, conversation_id, - f"Error processing message from agent: {str(e)}" - ) - - def _handle_agent_message(self, user_text: str, msg: Message, conversation_id: str) -> Message: + from_agent = parsed['from_agent'] + message_content = parsed['message'] + + logger.info(f"📨 [{self.agent_id}] ← [{from_agent}]: {message_content}") + + # Check if this is a reply (avoid infinite loops) + if message_content.startswith("Response to "): + logger.info(f"🔄 [{self.agent_id}] Received reply from {from_agent}") + return self._create_response(f"[{from_agent}] {message_content[len(f'Response to {self.agent_id}: '):]}") + + # Process through agent logic + if self.telemetry: + self.telemetry.log_message_received(self.agent_id, conversation_id) + + response = self.agent_logic(message_content, conversation_id) + return self._create_response(f"Response to {from_agent}: {response}") + + async def _handle_outgoing_agent_message(self, content: str, + conversation_id: str) -> Dict[str, Any]: """Handle messages to other agents (@agent_id message)""" - parts = user_text.split(" ", 1) + parts = content.split(" ", 1) if len(parts) <= 1: - return self._create_response( - msg, conversation_id, - "Invalid format. Use '@agent_id message'" - ) + return self._create_response("Invalid format. Use '@agent_id message'") - target_agent = parts[0][1:] # Remove @ + target_agent_id = parts[0][1:] # Remove @ message_text = parts[1] - logger.info(f"🔄 [{self.agent_id}] Sending to {target_agent}: {message_text}") + logger.info(f"🔄 [{self.agent_id}] Sending to {target_agent_id}: {message_text}") - # Look up target agent and send message - result = self._send_to_agent(target_agent, message_text, conversation_id) - return self._create_response(msg, conversation_id, result) + # Route to target agent + result = await self.route_to_agent(target_agent_id, message_text, conversation_id) + return result - def _handle_command(self, user_text: str, msg: Message, conversation_id: str) -> Message: + async def _handle_command(self, content: str, conversation_id: str) -> Dict[str, Any]: """Handle system commands""" - parts = user_text.split(" ", 1) + parts = content.split(" ", 1) command = parts[0][1:] if len(parts) > 0 else "" - args = parts[1] if len(parts) > 1 else "" if command == "help": help_text = """Available commands: @@ -147,106 +156,109 @@ def _handle_command(self, user_text: str, msg: Message, conversation_id: str) -> /ping - Test agent responsiveness /status - Show agent status @agent_id message - Send message to another agent""" - return self._create_response(msg, conversation_id, help_text) + return self._create_response(help_text) elif command == "ping": - return self._create_response(msg, conversation_id, "Pong!") + return self._create_response("Pong!") elif command == "status": - status = f"Agent: {self.agent_id}, Status: Running" - if self.registry_url: - status += f", Registry: {self.registry_url}" - return self._create_response(msg, conversation_id, status) + protocols = self.router.get_all_protocols() + status = f"Agent: {self.agent_id}, Status: Running, Protocols: {', '.join(protocols)}" + if hasattr(self.registry, 'registry_url') and self.registry.registry_url: + status += f", Registry: {self.registry.registry_url}" + return self._create_response(status) else: return self._create_response( - msg, conversation_id, f"Unknown command: {command}. Use /help for available commands" ) - def _send_to_agent(self, target_agent_id: str, message_text: str, conversation_id: str) -> str: - """Send message to another agent""" + async def route_to_agent(self, target_agent_id: str, message_text: str, + conversation_id: str) -> Dict[str, Any]: + """Route message to target agent via appropriate protocol""" try: - # Look up agent URL - agent_url = self._lookup_agent(target_agent_id) - if not agent_url: - return f"Agent {target_agent_id} not found" + # Resolve agent via NANDA Index + agent_info = await self.registry.resolve(target_agent_id) - # Ensure URL has /a2a endpoint - if not agent_url.endswith('/a2a'): - agent_url = f"{agent_url}/a2a" + if not agent_info: + logger.warning(f"🔍 Agent {target_agent_id} not found in registry") + return self._create_response(f"Agent {target_agent_id} not found") - logger.info(f"📤 [{self.agent_id}] → [{target_agent_id}]: {message_text}") + # Debug: print what we got from registry + logger.info(f"📋 Agent info from registry: {agent_info}") - # Create simple message with metadata - simple_message = f"FROM: {self.agent_id}\nTO: {target_agent_id}\nMESSAGE: {message_text}" + # Select protocol + supported_protocols = agent_info.get("supported_protocols", ["a2a"]) + protocol_name = self.router.select_protocol(supported_protocols) + + # Get target URL - try multiple fields + endpoints = agent_info.get("endpoints", {}) + target_url = endpoints.get(protocol_name) - # Send message using A2A client - client = A2AClient(agent_url, timeout=30) - response = client.send_message( - Message( - role=MessageRole.USER, - content=TextContent(text=simple_message), - conversation_id=conversation_id, - metadata=Metadata(custom_fields={ - 'from_agent_id': self.agent_id, - 'to_agent_id': target_agent_id, - 'message_type': 'agent_to_agent' - }) + if not target_url: + # Try different URL fields + target_url = ( + agent_info.get("url") or + agent_info.get("agent_url") or + agent_info.get("public_url") ) - ) + + # Ensure target_url is valid + if not target_url: + logger.error(f"❌ No URL found for {target_agent_id}. Agent info: {agent_info}") + return self._create_response(f"No endpoint found for {target_agent_id}") + + # Add /a2a suffix if needed and not already present + if protocol_name == "a2a" and not target_url.endswith('/a2a'): + target_url = f"{target_url}/a2a" + + logger.info(f"📤 [{self.agent_id}] → [{target_agent_id}] via {protocol_name}: {message_text}") + logger.info(f"🔗 Target URL: {target_url}") + + # Create message in simple format + simple_message = f"FROM: {self.agent_id}\nTO: {target_agent_id}\nMESSAGE: {message_text}" + + # Send via protocol + message = { + "content": { + "text": simple_message, + "type": "text" + }, + "conversation_id": conversation_id, + "metadata": { + "from_agent_id": self.agent_id, + "to_agent_id": target_agent_id, + "message_type": "agent_to_agent" + } + } + + response = await self.router.send(protocol_name, target_url, message) if self.telemetry: self.telemetry.log_message_sent(target_agent_id, conversation_id) - # Extract the actual response content from the target agent - logger.info(f"🔍 [{self.agent_id}] Response type: {type(response)}, has parts: {hasattr(response, 'parts') if response else 'None'}") - if response: - if hasattr(response, 'parts') and response.parts: - response_text = response.parts[0].text - logger.info(f"✅ [{self.agent_id}] Received response from {target_agent_id}: {response_text[:100]}...") - return f"[{target_agent_id}] {response_text}" - else: - logger.info(f"✅ [{self.agent_id}] Response has no parts, full response: {str(response)[:200]}...") - return f"[{target_agent_id}] {str(response)}" - else: - logger.info(f"✅ [{self.agent_id}] Message delivered to {target_agent_id}, no response") - return f"Message sent to {target_agent_id}: {message_text}" + # Extract response + response_text = response.get("content", {}).get("text", str(response)) + logger.info(f"✅ [{self.agent_id}] Response from {target_agent_id}: {response_text[:100]}...") + + return self._create_response(f"[{target_agent_id}] {response_text}") except Exception as e: - return f"❌ Error sending to {target_agent_id}: {str(e)}" + logger.error(f"❌ Error routing to {target_agent_id}: {e}") + import traceback + traceback.print_exc() + return self._create_response(f"❌ Error sending to {target_agent_id}: {str(e)}") - def _lookup_agent(self, agent_id: str) -> Optional[str]: - """Look up agent URL in registry or use local discovery""" - - # Try registry lookup if available - if self.registry_url: - try: - response = requests.get(f"{self.registry_url}/lookup/{agent_id}", timeout=10) - if response.status_code == 200: - data = response.json() - agent_url = data.get("agent_url") - logger.info(f"🌐 Found {agent_id} in registry: {agent_url}") - return agent_url - except Exception as e: - logger.warning(f"🌐 Registry lookup failed: {e}") - - # Fallback to local discovery (for testing) - local_agents = { - "test_agent": "http://localhost:6000", + def _create_response(self, text: str) -> Dict[str, Any]: + """Create a standardized response dict""" + return { + "content": { + "text": f"[{self.agent_id}] {text}", + "type": "text" + } } - - if agent_id in local_agents: - logger.info(f"🏠 Found {agent_id} locally: {local_agents[agent_id]}") - return local_agents[agent_id] - - return None - def _create_response(self, original_msg: Message, conversation_id: str, text: str) -> Message: - """Create a response message""" - return Message( - role=MessageRole.AGENT, - content=TextContent(text=f"[{self.agent_id}] {text}"), - parent_message_id=original_msg.message_id, - conversation_id=conversation_id - ) \ No newline at end of file + async def run_server(self, host: str = "0.0.0.0", port: int = 8000): + """Start all protocol servers""" + logger.info(f"🚀 Starting agent bridge for {self.agent_id} on {host}:{port}") + await self.router.start_all_servers(host, port) \ No newline at end of file diff --git a/nanda_core/core/agent_bridge_copy.py b/nanda_core/core/agent_bridge_copy.py new file mode 100644 index 0000000..fc1a668 --- /dev/null +++ b/nanda_core/core/agent_bridge_copy.py @@ -0,0 +1,252 @@ +#!/usr/bin/env python3 +""" +Simple Agent Bridge for A2A Communication + +Clean, simple bridge focused on agent-to-agent communication. +""" + +import os +import uuid +import logging +import requests +from typing import Callable, Optional, Dict, Any +from python_a2a import A2AServer, A2AClient, Message, TextContent, MessageRole, Metadata + +# Configure logger to capture conversation logs +logger = logging.getLogger(__name__) + + +class SimpleAgentBridge(A2AServer): + """Simple Agent Bridge for A2A communication only""" + + def __init__(self, + agent_id: str, + agent_logic: Callable[[str, str], str], + registry_url: Optional[str] = None, + telemetry = None): + super().__init__() + self.agent_id = agent_id + self.agent_logic = agent_logic + self.registry_url = registry_url + self.telemetry = telemetry + + def handle_message(self, msg: Message) -> Message: + """Handle incoming messages""" + conversation_id = msg.conversation_id or str(uuid.uuid4()) + + # Only handle text content + if not isinstance(msg.content, TextContent): + return self._create_response( + msg, conversation_id, + "Only text messages supported" + ) + + user_text = msg.content.text + + # Check if this is an agent-to-agent message in our simple format + if user_text.startswith("FROM:") and "TO:" in user_text and "MESSAGE:" in user_text: + return self._handle_incoming_agent_message(user_text, msg, conversation_id) + + logger.info(f"📨 [{self.agent_id}] Received: {user_text}") + + # Handle different message types + try: + if user_text.startswith("@"): + # Agent-to-agent message (outgoing) + return self._handle_agent_message(user_text, msg, conversation_id) + elif user_text.startswith("/"): + # System command + return self._handle_command(user_text, msg, conversation_id) + else: + # Regular message - use agent logic + if self.telemetry: + self.telemetry.log_message_received(self.agent_id, conversation_id) + + response = self.agent_logic(user_text, conversation_id) + return self._create_response(msg, conversation_id, response) + + except Exception as e: + return self._create_response( + msg, conversation_id, + f"Error: {str(e)}" + ) + + def _handle_incoming_agent_message(self, user_text: str, msg: Message, conversation_id: str) -> Message: + """Handle incoming messages from other agents""" + try: + lines = user_text.strip().split('\n') + from_agent = "" + to_agent = "" + message_content = "" + + for line in lines: + if line.startswith("FROM:"): + from_agent = line[5:].strip() + elif line.startswith("TO:"): + to_agent = line[3:].strip() + elif line.startswith("MESSAGE:"): + message_content = line[8:].strip() + + logger.info(f"📨 [{self.agent_id}] ← [{from_agent}]: {message_content}") + + # Check if this is a reply (don't respond to replies to avoid infinite loops) + if message_content.startswith("Response to "): + logger.info(f"🔄 [{self.agent_id}] Received reply from {from_agent}, displaying to user") + # Display the reply to user but don't respond back to avoid loops + return self._create_response( + msg, conversation_id, + f"[{from_agent}] {message_content[len('Response to ' + self.agent_id + ': '):]}" + ) + + # Process the message through our agent logic + if self.telemetry: + self.telemetry.log_message_received(self.agent_id, conversation_id) + + response = self.agent_logic(message_content, conversation_id) + + # Send response back + return self._create_response( + msg, conversation_id, + f"Response to {from_agent}: {response}" + ) + + except Exception as e: + logger.error(f"❌ [{self.agent_id}] Error processing incoming agent message: {e}") + return self._create_response( + msg, conversation_id, + f"Error processing message from agent: {str(e)}" + ) + + def _handle_agent_message(self, user_text: str, msg: Message, conversation_id: str) -> Message: + """Handle messages to other agents (@agent_id message)""" + parts = user_text.split(" ", 1) + if len(parts) <= 1: + return self._create_response( + msg, conversation_id, + "Invalid format. Use '@agent_id message'" + ) + + target_agent = parts[0][1:] # Remove @ + message_text = parts[1] + + logger.info(f"🔄 [{self.agent_id}] Sending to {target_agent}: {message_text}") + + # Look up target agent and send message + result = self._send_to_agent(target_agent, message_text, conversation_id) + return self._create_response(msg, conversation_id, result) + + def _handle_command(self, user_text: str, msg: Message, conversation_id: str) -> Message: + """Handle system commands""" + parts = user_text.split(" ", 1) + command = parts[0][1:] if len(parts) > 0 else "" + args = parts[1] if len(parts) > 1 else "" + + if command == "help": + help_text = """Available commands: +/help - Show this help +/ping - Test agent responsiveness +/status - Show agent status +@agent_id message - Send message to another agent""" + return self._create_response(msg, conversation_id, help_text) + + elif command == "ping": + return self._create_response(msg, conversation_id, "Pong!") + + elif command == "status": + status = f"Agent: {self.agent_id}, Status: Running" + if self.registry_url: + status += f", Registry: {self.registry_url}" + return self._create_response(msg, conversation_id, status) + + else: + return self._create_response( + msg, conversation_id, + f"Unknown command: {command}. Use /help for available commands" + ) + + def _send_to_agent(self, target_agent_id: str, message_text: str, conversation_id: str) -> str: + """Send message to another agent""" + try: + # Look up agent URL + agent_url = self._lookup_agent(target_agent_id) + if not agent_url: + return f"Agent {target_agent_id} not found" + + # Ensure URL has /a2a endpoint + if not agent_url.endswith('/a2a'): + agent_url = f"{agent_url}/a2a" + + logger.info(f"📤 [{self.agent_id}] → [{target_agent_id}]: {message_text}") + + # Create simple message with metadata + simple_message = f"FROM: {self.agent_id}\nTO: {target_agent_id}\nMESSAGE: {message_text}" + + # Send message using A2A client + client = A2AClient(agent_url, timeout=30) + response = client.send_message( + Message( + role=MessageRole.USER, + content=TextContent(text=simple_message), + conversation_id=conversation_id, + metadata=Metadata(custom_fields={ + 'from_agent_id': self.agent_id, + 'to_agent_id': target_agent_id, + 'message_type': 'agent_to_agent' + }) + ) + ) + + if self.telemetry: + self.telemetry.log_message_sent(target_agent_id, conversation_id) + + # Extract the actual response content from the target agent + logger.info(f"🔍 [{self.agent_id}] Response type: {type(response)}, has parts: {hasattr(response, 'parts') if response else 'None'}") + if response: + if hasattr(response, 'parts') and response.parts: + response_text = response.parts[0].text + logger.info(f"✅ [{self.agent_id}] Received response from {target_agent_id}: {response_text[:100]}...") + return f"[{target_agent_id}] {response_text}" + else: + logger.info(f"✅ [{self.agent_id}] Response has no parts, full response: {str(response)[:200]}...") + return f"[{target_agent_id}] {str(response)}" + else: + logger.info(f"✅ [{self.agent_id}] Message delivered to {target_agent_id}, no response") + return f"Message sent to {target_agent_id}: {message_text}" + + except Exception as e: + return f"❌ Error sending to {target_agent_id}: {str(e)}" + + def _lookup_agent(self, agent_id: str) -> Optional[str]: + """Look up agent URL in registry or use local discovery""" + + # Try registry lookup if available + if self.registry_url: + try: + response = requests.get(f"{self.registry_url}/lookup/{agent_id}", timeout=10) + if response.status_code == 200: + data = response.json() + agent_url = data.get("agent_url") + logger.info(f"🌐 Found {agent_id} in registry: {agent_url}") + return agent_url + except Exception as e: + logger.warning(f"🌐 Registry lookup failed: {e}") + + # Fallback to local discovery (for testing) + local_agents = { + "test_agent": "http://localhost:6000", + } + + if agent_id in local_agents: + logger.info(f"🏠 Found {agent_id} locally: {local_agents[agent_id]}") + return local_agents[agent_id] + + return None + + def _create_response(self, original_msg: Message, conversation_id: str, text: str) -> Message: + """Create a response message""" + return Message( + role=MessageRole.AGENT, + content=TextContent(text=f"[{self.agent_id}] {text}"), + parent_message_id=original_msg.message_id, + conversation_id=conversation_id + ) \ No newline at end of file diff --git a/nanda_core/core/registry_client.py b/nanda_core/core/registry_client.py index 84f83c4..ac12c79 100644 --- a/nanda_core/core/registry_client.py +++ b/nanda_core/core/registry_client.py @@ -1,23 +1,31 @@ -#!/usr/bin/env python3 """ -Registry Client for Nanda Index Registry Integration +Registry Client for NANDA Index Registry Integration Handles agent registration, discovery, and management """ -import requests +import httpx import json import os from typing import Optional, Dict, List, Any from datetime import datetime +import logging + +logger = logging.getLogger(__name__) class RegistryClient: - """Client for interacting with the Nanda index registry""" + """Client for interacting with the NANDA Index registry""" def __init__(self, registry_url: Optional[str] = None): + """Initialize registry client + + Args: + registry_url: URL of NANDA Index (e.g., http://registry.chat39.com:6900) + """ self.registry_url = registry_url or self._get_default_registry_url() - self.session = requests.Session() - self.session.verify = False # For development with self-signed certs + # Use async httpx client instead of requests + self.client = httpx.AsyncClient(timeout=30.0, verify=False) # verify=False for dev with self-signed certs + logger.info(f"Registry client initialized with URL: {self.registry_url}") def _get_default_registry_url(self) -> str: """Get default registry URL from configuration""" @@ -29,60 +37,126 @@ def _get_default_registry_url(self) -> str: pass return "https://registry.chat39.com" - def register_agent(self, agent_id: str, agent_url: str, api_url: Optional[str] = None, agent_facts_url: Optional[str] = None) -> bool: - """Register an agent with the registry""" + async def register(self, agent_facts: Dict[str, Any]) -> bool: + """Register an agent with the registry using AgentFacts + + Args: + agent_facts: Agent metadata dict with fields: + - agent_id: str + - name: str + - domain: str (optional) + - specialization: str (optional) + - description: str (optional) + - capabilities: list + - url: str + - agent_url: str (backward compatibility) + - supported_protocols: list + - endpoints: dict + + Returns: + True if registration successful, False otherwise + """ + if not self.registry_url: + logger.warning("No registry URL configured, skipping registration") + return False + try: - data = { - "agent_id": agent_id, - "agent_url": agent_url - } - if api_url: - data["api_url"] = api_url - if agent_facts_url: - data["agent_facts_url"] = agent_facts_url + # Support both new (register) and old (register_agent) endpoints + response = await self.client.post( + f"{self.registry_url}/register", + json=agent_facts + ) + + if response.status_code == 200: + logger.info(f"✅ Agent {agent_facts.get('agent_id')} registered successfully") + return True + else: + logger.error(f"❌ Registration failed: HTTP {response.status_code} - {response.text}") + return False + + except Exception as e: + logger.error(f"❌ Error registering agent: {e}") + return False - response = self.session.post(f"{self.registry_url}/register", json=data) + async def register_agent(self, agent_id: str, agent_url: str, + api_url: Optional[str] = None, + agent_facts_url: Optional[str] = None) -> bool: + """Legacy registration method for backward compatibility""" + data = { + "agent_id": agent_id, + "agent_url": agent_url + } + if api_url: + data["api_url"] = api_url + if agent_facts_url: + data["agent_facts_url"] = agent_facts_url + + try: + response = await self.client.post(f"{self.registry_url}/register", json=data) return response.status_code == 200 except Exception as e: - print(f"Error registering agent: {e}") + logger.error(f"Error registering agent: {e}") return False - def lookup_agent(self, agent_id: str) -> Optional[Dict[str, Any]]: + async def resolve(self, agent_id: str) -> Optional[Dict[str, Any]]: + """Resolve/lookup an agent in the registry + + Args: + agent_id: Agent identifier to look up + + Returns: + Agent info dict or None if not found + """ + return await self.lookup_agent(agent_id) + + async def lookup_agent(self, agent_id: str) -> Optional[Dict[str, Any]]: """Look up an agent in the registry""" + if not self.registry_url: + logger.warning("No registry URL configured") + return None + try: - response = self.session.get(f"{self.registry_url}/lookup/{agent_id}") + response = await self.client.get(f"{self.registry_url}/lookup/{agent_id}") if response.status_code == 200: return response.json() + else: + logger.warning(f"Agent {agent_id} not found: HTTP {response.status_code}") return None except Exception as e: - print(f"Error looking up agent {agent_id}: {e}") + logger.error(f"Error looking up agent {agent_id}: {e}") return None - def list_agents(self) -> List[Dict[str, Any]]: + async def list_agents(self) -> List[Dict[str, Any]]: """List all registered agents""" + if not self.registry_url: + return [] + try: - response = self.session.get(f"{self.registry_url}/list") + response = await self.client.get(f"{self.registry_url}/list") if response.status_code == 200: return response.json() return [] except Exception as e: - print(f"Error listing agents: {e}") + logger.error(f"Error listing agents: {e}") return [] - def list_clients(self) -> List[Dict[str, Any]]: + async def list_clients(self) -> List[Dict[str, Any]]: """List all registered clients""" + if not self.registry_url: + return [] + try: - response = self.session.get(f"{self.registry_url}/clients") + response = await self.client.get(f"{self.registry_url}/clients") if response.status_code == 200: return response.json() - return self.list_agents() # Fallback to list endpoint + return await self.list_agents() # Fallback to list endpoint except Exception as e: - print(f"Error listing clients: {e}") + logger.error(f"Error listing clients: {e}") return [] - def get_agent_metadata(self, agent_id: str) -> Optional[Dict[str, Any]]: + async def get_agent_metadata(self, agent_id: str) -> Optional[Dict[str, Any]]: """Get detailed metadata for an agent""" - agent_info = self.lookup_agent(agent_id) + agent_info = await self.lookup_agent(agent_id) if not agent_info: return None @@ -90,16 +164,25 @@ def get_agent_metadata(self, agent_id: str) -> Optional[Dict[str, Any]]: metadata = { "agent_id": agent_id, "agent_url": agent_info.get("agent_url"), + "url": agent_info.get("url"), # New field "api_url": agent_info.get("api_url"), + "endpoints": agent_info.get("endpoints", {}), # New field + "supported_protocols": agent_info.get("supported_protocols", ["a2a"]), # New field "last_seen": agent_info.get("last_seen"), "capabilities": agent_info.get("capabilities", []), "description": agent_info.get("description", ""), - "tags": agent_info.get("tags", []) + "tags": agent_info.get("tags", []), + "domain": agent_info.get("domain"), # New field + "specialization": agent_info.get("specialization") # New field } return metadata - def search_agents(self, query: str = "", capabilities: List[str] = None, tags: List[str] = None) -> List[Dict[str, Any]]: + async def search_agents(self, query: str = "", capabilities: List[str] = None, + tags: List[str] = None) -> List[Dict[str, Any]]: """Search for agents based on criteria""" + if not self.registry_url: + return [] + try: params = {} if query: @@ -109,19 +192,20 @@ def search_agents(self, query: str = "", capabilities: List[str] = None, tags: L if tags: params["tags"] = ",".join(tags) - response = self.session.get(f"{self.registry_url}/search", params=params) + response = await self.client.get(f"{self.registry_url}/search", params=params) if response.status_code == 200: return response.json() # Fallback to client-side filtering - return self._filter_agents_locally(query, capabilities, tags) + return await self._filter_agents_locally(query, capabilities, tags) except Exception as e: - print(f"Error searching agents: {e}") - return self._filter_agents_locally(query, capabilities, tags) + logger.error(f"Error searching agents: {e}") + return await self._filter_agents_locally(query, capabilities, tags) - def _filter_agents_locally(self, query: str = "", capabilities: List[str] = None, tags: List[str] = None) -> List[Dict[str, Any]]: + async def _filter_agents_locally(self, query: str = "", capabilities: List[str] = None, + tags: List[str] = None) -> List[Dict[str, Any]]: """Fallback local filtering when server search is not available""" - all_agents = self.list_agents() + all_agents = await self.list_agents() filtered = [] for agent in all_agents: @@ -147,25 +231,32 @@ def _filter_agents_locally(self, query: str = "", capabilities: List[str] = None return filtered - def get_mcp_servers(self, registry_provider: Optional[str] = None) -> List[Dict[str, Any]]: + async def get_mcp_servers(self, registry_provider: Optional[str] = None) -> List[Dict[str, Any]]: """Get list of available MCP servers""" + if not self.registry_url: + return [] + try: params = {} if registry_provider: params["registry_provider"] = registry_provider - response = self.session.get(f"{self.registry_url}/mcp_servers", params=params) + response = await self.client.get(f"{self.registry_url}/mcp_servers", params=params) if response.status_code == 200: return response.json() return [] except Exception as e: - print(f"Error getting MCP servers: {e}") + logger.error(f"Error getting MCP servers: {e}") return [] - def get_mcp_server_config(self, registry_provider: str, qualified_name: str) -> Optional[Dict[str, Any]]: + async def get_mcp_server_config(self, registry_provider: str, + qualified_name: str) -> Optional[Dict[str, Any]]: """Get configuration for a specific MCP server""" + if not self.registry_url: + return None + try: - response = self.session.get(f"{self.registry_url}/get_mcp_registry", params={ + response = await self.client.get(f"{self.registry_url}/get_mcp_registry", params={ 'registry_provider': registry_provider, 'qualified_name': qualified_name }) @@ -182,11 +273,15 @@ def get_mcp_server_config(self, registry_provider: str, qualified_name: str) -> } return None except Exception as e: - print(f"Error getting MCP server config: {e}") + logger.error(f"Error getting MCP server config: {e}") return None - def update_agent_status(self, agent_id: str, status: str, metadata: Optional[Dict[str, Any]] = None) -> bool: + async def update_agent_status(self, agent_id: str, status: str, + metadata: Optional[Dict[str, Any]] = None) -> bool: """Update agent status and metadata""" + if not self.registry_url: + return False + try: data = { "agent_id": agent_id, @@ -196,36 +291,52 @@ def update_agent_status(self, agent_id: str, status: str, metadata: Optional[Dic if metadata: data.update(metadata) - response = self.session.put(f"{self.registry_url}/agents/{agent_id}/status", json=data) + response = await self.client.put( + f"{self.registry_url}/agents/{agent_id}/status", + json=data + ) return response.status_code == 200 except Exception as e: - print(f"Error updating agent status: {e}") + logger.error(f"Error updating agent status: {e}") return False - def unregister_agent(self, agent_id: str) -> bool: + async def unregister_agent(self, agent_id: str) -> bool: """Unregister an agent from the registry""" + if not self.registry_url: + return False + try: - response = self.session.delete(f"{self.registry_url}/agents/{agent_id}") + response = await self.client.delete(f"{self.registry_url}/agents/{agent_id}") return response.status_code == 200 except Exception as e: - print(f"Error unregistering agent: {e}") + logger.error(f"Error unregistering agent: {e}") return False - def health_check(self) -> bool: + async def health_check(self) -> bool: """Check if the registry is healthy""" + if not self.registry_url: + return False + try: - response = self.session.get(f"{self.registry_url}/health", timeout=5) + response = await self.client.get(f"{self.registry_url}/health", timeout=5) return response.status_code == 200 except Exception: return False - def get_registry_stats(self) -> Optional[Dict[str, Any]]: + async def get_registry_stats(self) -> Optional[Dict[str, Any]]: """Get registry statistics""" + if not self.registry_url: + return None + try: - response = self.session.get(f"{self.registry_url}/stats") + response = await self.client.get(f"{self.registry_url}/stats") if response.status_code == 200: return response.json() return None except Exception as e: - print(f"Error getting registry stats: {e}") - return None \ No newline at end of file + logger.error(f"Error getting registry stats: {e}") + return None + + async def close(self): + """Close the HTTP client connection""" + await self.client.aclose() \ No newline at end of file diff --git a/nanda_core/protocols/AgentExecutor.py b/nanda_core/protocols/AgentExecutor.py new file mode 100644 index 0000000..b5d5a53 --- /dev/null +++ b/nanda_core/protocols/AgentExecutor.py @@ -0,0 +1,73 @@ +from typing import Callable +from a2a.types import TextPart +from a2a.utils import new_agent_text_message +from a2a.server.agent_execution import AgentExecutor, RequestContext +from a2a.server.events import EventQueue + +class NANDAAgentExecutor(AgentExecutor): + """Agent executor that delegates to NANDA bridge handler""" + + def __init__(self, message_handler: Callable): + self.message_handler = message_handler + + async def execute(self, context: RequestContext, event_queue: EventQueue): + """Execute agent logic via bridge handler""" + try: + # Extract message text from context + message_text = "" + if context.message and context.message.parts: + for part in context.message.parts: + # Handle different part types + if hasattr(part, 'root'): + part_obj = part.root + else: + part_obj = part + + # Extract text based on part type + if hasattr(part_obj, 'text'): + message_text = part_obj.text + elif hasattr(part_obj, 'kind') and part_obj.kind == 'text': + message_text = part_obj.text if hasattr(part_obj, 'text') else str(part_obj) + else: + message_text = str(part_obj) + + if message_text: + break + + if not message_text: + message_text = "" + + print(f"📨 Received message: {message_text[:100]}") + + # Convert to bridge format + bridge_message = { + "content": { + "text": message_text, + "type": "text" + }, + "conversation_id": context.task_id or context.context_id or "" + } + + # Call bridge handler + response = await self.message_handler(bridge_message) + + # Extract response text + response_text = response.get("content", {}).get("text", "") + + print(f"✅ Sending response: {response_text}") + + # Send response via event queue + response_message = new_agent_text_message(response_text) + await event_queue.enqueue_event(response_message) + + except Exception as e: + print(f"❌ Error in execute: {e}") + import traceback + traceback.print_exc() + # Send error message + error_message = new_agent_text_message(f"Error: {str(e)}") + event_queue.enqueue_event(error_message) + + async def cancel(self, context: RequestContext, event_queue: EventQueue): + """Handle task cancellation""" + event_queue.enqueue_event(new_agent_text_message("Task cancelled")) diff --git a/nanda_core/protocols/a2a/__init__.py b/nanda_core/protocols/a2a/__init__.py new file mode 100644 index 0000000..3f289c1 --- /dev/null +++ b/nanda_core/protocols/a2a/__init__.py @@ -0,0 +1,3 @@ +from .protocol import A2AProtocol + +__all__ = ['A2AProtocol'] \ No newline at end of file diff --git a/nanda_core/protocols/a2a/protocol.py b/nanda_core/protocols/a2a/protocol.py new file mode 100644 index 0000000..10374c8 --- /dev/null +++ b/nanda_core/protocols/a2a/protocol.py @@ -0,0 +1,240 @@ +""" +Official A2A SDK Protocol Adapter +""" + +from typing import Callable, Dict, Any +from a2a.client import A2AClient, ClientConfig, ClientFactory +from a2a.server.request_handlers import DefaultRequestHandler +from a2a.server.tasks import InMemoryTaskStore +from a2a.server.apps import A2AStarletteApplication +from a2a.types import ( + AgentCard, AgentSkill, AgentCapabilities, AgentInterface, + Message, TextPart, Part, Role, + SendMessageRequest, MessageSendParams, + TransportProtocol +) +import uuid +import httpx +import uvicorn +from ..base import AgentProtocol +from ..AgentExecutor import NANDAAgentExecutor + +class A2AProtocol(AgentProtocol): + """A2A SDK protocol adapter""" + + def __init__(self, agent_id: str, agent_name: str, public_url: str, + domain: str = None, specialization: str = None, + description: str = "", capabilities: list = None): + """Initialize A2A protocol adapter + + Args: + agent_id: Unique agent identifier + agent_name: Display name + public_url: Public URL where agent is accessible + domain: Agent's domain of expertise + specialization: Agent's specialization + description: Agent description + capabilities: List of capabilities (default: ["text"]) + """ + self.agent_id = agent_id + self.agent_name = agent_name + self.public_url = public_url + self.description = description + self.capabilities_list = capabilities or ["text"] + + # Create AgentCard + self.agent_card = AgentCard( + name=agent_name, + description=description, + version="1.0.0", + url=f"{public_url}/", + protocol_version="0.2.5", + skills=[ + AgentSkill( + id=agent_id, + name=agent_name, + description=description, + tags=[domain] if domain else [], + examples=[] + ) + ], + capabilities=AgentCapabilities( + streaming=True, + push_notifications=False + ), + defaultInputModes=["text"], + defaultOutputModes=["text"] + ) + + # Incoming message handler (set by bridge) + self.incoming_handler: Callable = None + + # Server components (initialized when handler is set) + self.agent_executor = None + self.request_handler = None + self.server_app = None + + # HTTP client for A2A client + self.httpx_client = httpx.AsyncClient() + + def set_incoming_handler(self, handler: Callable): + """Set callback for incoming messages + + Args: + handler: Async function(message: dict) -> dict + """ + self.incoming_handler = handler + + # Initialize server components + self.agent_executor = NANDAAgentExecutor(handler) + self.request_handler = DefaultRequestHandler( + agent_executor=self.agent_executor, + task_store=InMemoryTaskStore() + ) + self.server_app = A2AStarletteApplication( + agent_card=self.agent_card, + http_handler=self.request_handler + ) + + async def send_message(self, target_url: str, message: Dict[str, Any]) -> Dict[str, Any]: + """Send message using A2A client""" + try: + # Validate target_url + if not target_url: + raise ValueError("target_url is None or empty") + + # Extract message content + content_text = message.get("content", {}).get("text", "") + conversation_id = message.get("conversation_id", "") + metadata = message.get("metadata", {}) + + print(f"🔄 Sending to {target_url}: {content_text[:50]}...") + + # Create A2A message with proper structure + from a2a.types import TextPart, Part, Message, Role + + text_part = TextPart( + kind='text', + text=content_text + ) + part = Part(root=text_part) + + a2a_message = Message( + message_id=f"msg-{conversation_id}-{uuid.uuid4().hex[:8]}", + role=Role.user, + parts=[part], + metadata=metadata if metadata else None + ) + + # Get client - strip /a2a suffix if present + base_url = target_url.rstrip('/a2a') if target_url else target_url + + client = A2AClient( + httpx_client=self.httpx_client, + url=base_url # Just pass the base URL + ) + + # Create request + from a2a.types import SendMessageRequest, MessageSendParams + request = SendMessageRequest( + id=f"req-{uuid.uuid4().hex[:8]}", + params=MessageSendParams(message=a2a_message) + ) + + # Send message + response = await client.send_message(request) + print(f"🔍 Response attributes: {dir(response)}") + print(f"🔍 Response dict: {response.model_dump() if hasattr(response, 'model_dump') else response}") + + if hasattr(response, 'root'): + result = response.root + + # Check if it's an error response + if hasattr(result, 'error') and result.error: + error_msg = result.error.message if hasattr(result.error, 'message') else str(result.error) + return { + "content": { + "text": f"Error: {error_msg}", + "type": "text" + } + } + + # Check if it's a success response with result + if hasattr(result, 'result'): + msg = result.result + if hasattr(msg, 'parts') and msg.parts: + response_text = "" + for part in msg.parts: + part_obj = part.root if hasattr(part, 'root') else part + if hasattr(part_obj, 'text'): + response_text = part_obj.text + break + + return { + "content": { + "text": response_text, + "type": "text" + } + } + + return { + "content": { + "text": "No response received", + "type": "text" + } + } + + except Exception as e: + print(f"❌ Error sending message: {e}") + import traceback + traceback.print_exc() + return { + "content": { + "text": f"Error: {str(e)}", + "type": "text" + } + } + + def get_metadata(self) -> Dict[str, Any]: + """Get AgentCard metadata + + Returns: + AgentCard as dict + """ + return { + "agent_id": self.agent_id, + "name": self.agent_name, + "url": self.agent_card.url, + "description": self.description, + "capabilities": self.capabilities_list + } + + async def start_server(self, host: str, port: int): + """Start A2A server + + Args: + host: Host to bind to + port: Port to listen on + """ + if not self.server_app: + raise RuntimeError("Server not initialized. Call set_incoming_handler first.") + + print(f"Starting A2A server on {host}:{port}") + + # Build Starlette app + app = self.server_app.build() + + # Run with uvicorn + config = uvicorn.Config(app, host=host, port=port, log_level="info") + server = uvicorn.Server(config) + await server.serve() + + def get_protocol_name(self) -> str: + """Return protocol identifier""" + return "a2a" + + async def cleanup(self): + """Clean up A2A protocol resources""" + if self.httpx_client: + await self.httpx_client.aclose() + print(f"🧹 Closed HTTP client for A2A protocol") \ No newline at end of file diff --git a/nanda_core/protocols/base.py b/nanda_core/protocols/base.py new file mode 100644 index 0000000..f4bc133 --- /dev/null +++ b/nanda_core/protocols/base.py @@ -0,0 +1,60 @@ +from abc import ABC, abstractmethod +from typing import Callable, Dict, Any + +class AgentProtocol(ABC): + """Base protocol interface for agent communication""" + + @abstractmethod + async def send_message(self, target_url: str, message: Dict[str, Any]) -> Dict[str, Any]: + """Send message to target agent + + Args: + target_url: Target agent's endpoint URL + message: Message dict with 'content', 'conversation_id', etc. + + Returns: + Response dict from target agent + """ + pass + + @abstractmethod + def set_incoming_handler(self, handler: Callable): + """Set callback for incoming messages + + Args: + handler: Async function that processes incoming messages + Should accept message dict and return response dict + """ + pass + + @abstractmethod + def get_metadata(self) -> Dict[str, Any]: + """Get protocol-specific agent metadata (AgentCard, etc.) + + Returns: + Metadata dict for agent discovery + """ + pass + + @abstractmethod + async def start_server(self, host: str, port: int): + """Start protocol server + + Args: + host: Host to bind to + port: Port to listen on + """ + pass + + @abstractmethod + def get_protocol_name(self) -> str: + """Return protocol identifier (a2a, slim, etc.)""" + pass + + @abstractmethod + async def cleanup(self): + """Clean up protocol resources (HTTP clients, connections, etc.) + + Called when agent is stopping to ensure proper resource cleanup. + """ + pass \ No newline at end of file diff --git a/nanda_core/protocols/router.py b/nanda_core/protocols/router.py new file mode 100644 index 0000000..c31fba9 --- /dev/null +++ b/nanda_core/protocols/router.py @@ -0,0 +1,111 @@ +from typing import Dict, List, Optional +from .base import AgentProtocol + +class ProtocolRouter: + """Manages multiple protocol adapters and routes messages""" + + def __init__(self): + self.protocols: Dict[str, AgentProtocol] = {} + self.default_protocol: Optional[str] = None + + def register(self, protocol: AgentProtocol): + """Register a protocol adapter + + Args: + protocol: Protocol adapter instance + """ + name = protocol.get_protocol_name() + self.protocols[name] = protocol + + # First registered protocol becomes default + if self.default_protocol is None: + self.default_protocol = name + + print(f"Registered protocol: {name}") + + def get_protocol(self, name: str) -> Optional[AgentProtocol]: + """Get protocol by name + + Args: + name: Protocol name (a2a, slim, etc.) + + Returns: + Protocol adapter or None + """ + return self.protocols.get(name) + + def get_all_protocols(self) -> List[str]: + """Get list of all registered protocol names""" + return list(self.protocols.keys()) + + async def send(self, protocol_name: str, target_url: str, message: dict) -> dict: + """Send message using specified protocol + + Args: + protocol_name: Protocol to use (a2a, slim) + target_url: Target agent endpoint + message: Message to send + + Returns: + Response from target agent + + Raises: + ValueError: If protocol not registered + """ + protocol = self.protocols.get(protocol_name) + if not protocol: + raise ValueError(f"Protocol '{protocol_name}' not registered. " + f"Available: {list(self.protocols.keys())}") + + return await protocol.send_message(target_url, message) + + def select_protocol(self, supported_protocols: List[str]) -> str: + """Select best available protocol from supported list + + Args: + supported_protocols: List of protocols target agent supports + + Returns: + Selected protocol name + """ + # Try to find first match from supported list + for proto in supported_protocols: + if proto in self.protocols: + return proto + + # Fallback to default if no match + if self.default_protocol: + return self.default_protocol + + # Last resort: use first registered protocol + if self.protocols: + return list(self.protocols.keys())[0] + + raise ValueError("No protocols registered") + + async def start_all_servers(self, host: str, port: int): + """Start all registered protocol servers + + Note: For protocols that need different ports, override in protocol adapter + + Args: + host: Host to bind to + port: Base port (protocols may use port+offset) + """ + for name, protocol in self.protocols.items(): + print(f"Starting {name} protocol server...") + # Each protocol handles its own server startup + # They can use different ports internally + await protocol.start_server(host, port) + + async def cleanup_all(self): + """Clean up all registered protocol resources + + Called when agent is stopping to ensure all protocols + properly release their resources (HTTP clients, connections, etc.) + """ + for name, protocol in self.protocols.items(): + try: + await protocol.cleanup() + except Exception as e: + print(f"⚠️ Error cleaning up {name} protocol: {e}") \ No newline at end of file diff --git a/nanda_core/telemetry/health_monitor.py b/nanda_core/telemetry/health_monitor.py index 47d561c..eac8630 100644 --- a/nanda_core/telemetry/health_monitor.py +++ b/nanda_core/telemetry/health_monitor.py @@ -345,7 +345,7 @@ def _get_registry_url(self) -> str: return f.read().strip() except: pass - return "https://chat.nanda-registry.com:6900" + return "https://registry.chat39.com:6900" def get_health_history(self, check_name: str, hours: int = 24) -> List[HealthCheck]: """Get health check history""" diff --git a/setup.py b/setup.py index c2afdd5..5f07af9 100644 --- a/setup.py +++ b/setup.py @@ -10,14 +10,13 @@ def read_requirements(): """Read requirements from file""" requirements = [ - "flask", - "anthropic", - "requests", - "python-a2a==0.5.6", - "mcp", - "python-dotenv", - "flask-cors", - "psutil" # For system monitoring + "anthropic>=0.18.0", + "a2a-sdk>=0.2.0", # Official A2A SDK (replaced python-a2a) + "httpx>=0.27.0", # For async HTTP (replaces requests) + "uvicorn>=0.30.0", # ASGI server for A2A + "starlette>=0.37.0", # For A2A server (included with a2a-sdk) + "python-dotenv>=1.0.0", + "psutil>=5.9.0", # For system monitoring ] return requirements diff --git a/tests/README.md b/tests/README.md new file mode 100644 index 0000000..b6d518d --- /dev/null +++ b/tests/README.md @@ -0,0 +1,98 @@ +# NEST Tests + +Comprehensive test suite for NEST framework. + +## Structure + +``` +tests/ +├── README.md # This file +├── __init__.py # Package marker +└── test_protocols/ # Protocol tests + ├── __init__.py + └── test_a2a_protocol.py # A2A protocol tests (PR #12) +``` + +## Running Tests + +### Install pytest + +```bash +pip install pytest pytest-asyncio +``` + +### Run all tests + +```bash +pytest tests/ -v +``` + +### Run specific test file + +```bash +pytest tests/test_protocols/test_a2a_protocol.py -v +``` + +### Run specific test class + +```bash +pytest tests/test_protocols/test_a2a_protocol.py::TestResourceCleanup -v +``` + +### Run with coverage + +```bash +pip install pytest-cov +pytest tests/ --cov=nanda_core --cov-report=html +``` + +## Test Categories + +### 1. Backward Compatibility Tests (`TestA2AProtocolBackwardCompatibility`) + +Tests that ensure the new implementation doesn't break existing functionality: +- NANDA initialization with default A2A protocol +- Protocol router integration +- Agent bridge integration + +### 2. Official A2A Protocol Tests (`TestOfficialA2AProtocol`) + +Tests for the new official A2A protocol implementation: +- Protocol initialization +- HTTP client creation +- AgentCard generation +- Metadata formatting +- Message handler setup + +### 3. Resource Cleanup Tests (`TestResourceCleanup`) + +Tests for PR #12 resource cleanup fixes: +- A2A protocol cleanup closes HTTP client +- ProtocolRouter cleanup_all() works correctly +- NANDA.stop() triggers cleanup properly +- Error handling during cleanup + +### 4. Integration Tests (`TestIntegration`) + +End-to-end tests for complete workflows: +- Full agent lifecycle (create -> use -> stop) + +## CI/CD Integration + +These tests are designed to run in CI/CD pipelines: + +```yaml +# Example GitHub Actions +- name: Run tests + run: | + pip install pytest pytest-asyncio + pytest tests/ -v +``` + +## Adding New Tests + +1. Create test file in appropriate subdirectory +2. Follow naming convention: `test_*.py` +3. Use descriptive test names: `test_feature_does_something` +4. Add docstrings explaining what's being tested +5. Update this README if adding new test categories diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_protocols/__init__.py b/tests/test_protocols/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_protocols/test_a2a_protocol.py b/tests/test_protocols/test_a2a_protocol.py new file mode 100644 index 0000000..30b611d --- /dev/null +++ b/tests/test_protocols/test_a2a_protocol.py @@ -0,0 +1,565 @@ +#!/usr/bin/env python3 +""" +Comprehensive tests for A2A Protocol Implementation (PR #12) + +Tests three critical areas: +1. Backward compatibility with previous A2A implementation +2. Official A2A protocol support (new implementation) +3. Resource cleanup functionality (PR #12 fix) + +Run with: pytest tests/test_protocols/test_a2a_protocol.py -v + +Error Scenarios Tested: +- Event loop handling in sync/async contexts +- HTTP client lifecycle management +- Protocol router integration +- Resource cleanup on errors +""" + +import pytest +import asyncio +import sys +from nanda_core.core.adapter import NANDA +from nanda_core.protocols.a2a.protocol import A2AProtocol +from nanda_core.protocols.router import ProtocolRouter + + +# Test agent logic +def simple_agent_logic(message: str, conversation_id: str) -> str: + """Simple echo agent for testing""" + return f"Echo: {message}" + + +class TestA2AProtocolBackwardCompatibility: + """ + Test backward compatibility with previous A2A implementation + + Purpose: Ensure existing code doesn't break with the new protocol architecture + + Potential Issues: + - Missing protocol registration + - Router not initialized + - Bridge not connected to router + """ + + def test_nanda_initialization_default(self): + """ + Test that NANDA initializes with A2A protocol by default + + Expected: A2A protocol should be auto-registered + + Failure Reasons: + - Protocol not registered → Check ProtocolRouter.register() + - Wrong protocol name → Check A2AProtocol.get_protocol_name() + """ + try: + agent = NANDA( + agent_id="test-backward-compat", + agent_logic=simple_agent_logic, + port=6000, + host="127.0.0.1", + public_url="http://127.0.0.1:6000", + enable_telemetry=False + ) + + # Should have A2A protocol registered + assert "a2a" in agent.router.get_all_protocols(), \ + "A2A protocol not registered. Check NANDA.__init__() protocol setup" + + protocol = agent.router.get_protocol("a2a") + assert protocol is not None, \ + "A2A protocol registered but not retrievable. Check ProtocolRouter.get_protocol()" + + except Exception as e: + pytest.fail(f"NANDA initialization failed: {e}\n" + f"Possible cause: Missing dependencies or protocol registration error") + + def test_agent_has_router(self): + """ + Test that agent has protocol router + + Expected: Agent should have router attribute of type ProtocolRouter + + Failure Reasons: + - Router not initialized → Check NANDA.__init__() + - Wrong router type → Check import statements + """ + agent = NANDA( + agent_id="test-router", + agent_logic=simple_agent_logic, + port=6000, + host="127.0.0.1", + public_url="http://127.0.0.1:6000", + enable_telemetry=False + ) + + assert hasattr(agent, 'router'), \ + "Agent missing 'router' attribute. Check NANDA.__init__()" + + assert isinstance(agent.router, ProtocolRouter), \ + f"Router is {type(agent.router)}, expected ProtocolRouter" + + def test_agent_bridge_integration(self): + """ + Test that agent bridge integrates with protocol router + + Expected: Bridge should have router reference + + Failure Reasons: + - Bridge not initialized → Check NANDA.__init__() + - Router not passed to bridge → Check AgentBridge.__init__() + """ + agent = NANDA( + agent_id="test-bridge", + agent_logic=simple_agent_logic, + port=6000, + host="127.0.0.1", + public_url="http://127.0.0.1:6000", + enable_telemetry=False + ) + + assert hasattr(agent, 'bridge'), \ + "Agent missing 'bridge' attribute" + + assert hasattr(agent.bridge, 'router'), \ + "Bridge missing 'router' attribute. Check AgentBridge integration" + + +class TestOfficialA2AProtocol: + """ + Test official A2A protocol implementation + + Purpose: Verify the new A2A SDK integration works correctly + + Dependencies: + - a2a-sdk >= 0.2.0 + - httpx >= 0.27.0 + """ + + def test_a2a_protocol_initialization(self): + """ + Test A2A protocol initializes correctly + + Expected: Protocol should store agent metadata + + Failure Reasons: + - Missing required parameters → Check A2AProtocol.__init__() signature + - AgentCard creation fails → Check a2a-sdk installation + """ + try: + protocol = A2AProtocol( + agent_id="test-agent", + agent_name="Test Agent", + public_url="http://localhost:6000", + domain="testing", + specialization="test agent", + description="Test description", + capabilities=["text"] + ) + + assert protocol.agent_id == "test-agent", "Agent ID mismatch" + assert protocol.agent_name == "Test Agent", "Agent name mismatch" + assert protocol.get_protocol_name() == "a2a", "Protocol name should be 'a2a'" + + except ImportError as e: + pytest.fail(f"A2A SDK import failed: {e}\n" + f"Solution: pip install a2a-sdk>=0.2.0") + + def test_a2a_protocol_has_httpx_client(self): + """ + Test that A2A protocol creates HTTP client + + Expected: httpx.AsyncClient should be initialized + + Failure Reasons: + - httpx not installed → pip install httpx>=0.27.0 + - Client not initialized → Check A2AProtocol.__init__() + """ + protocol = A2AProtocol( + agent_id="test-agent", + agent_name="Test Agent", + public_url="http://localhost:6000" + ) + + assert hasattr(protocol, 'httpx_client'), \ + "Protocol missing 'httpx_client' attribute" + + assert protocol.httpx_client is not None, \ + "HTTP client is None. Check A2AProtocol.__init__()" + + assert not protocol.httpx_client.is_closed, \ + "HTTP client should not be closed after initialization" + + def test_a2a_protocol_agent_card(self): + """ + Test that A2A protocol creates AgentCard + + Expected: AgentCard with correct metadata + + Failure Reasons: + - AgentCard not created → Check a2a.types import + - Metadata mismatch → Check AgentCard construction + """ + protocol = A2AProtocol( + agent_id="test-agent", + agent_name="Test Agent", + public_url="http://localhost:6000", + domain="testing", + description="Test description" + ) + + assert hasattr(protocol, 'agent_card'), \ + "Protocol missing 'agent_card' attribute" + + assert protocol.agent_card.name == "Test Agent", \ + f"AgentCard name mismatch: {protocol.agent_card.name}" + + assert protocol.agent_card.description == "Test description", \ + f"AgentCard description mismatch: {protocol.agent_card.description}" + + def test_a2a_protocol_metadata(self): + """ + Test A2A protocol metadata generation + + Expected: Metadata dict with agent info and capabilities + + Failure Reasons: + - get_metadata() returns wrong format + - Missing required fields + """ + protocol = A2AProtocol( + agent_id="test-agent", + agent_name="Test Agent", + public_url="http://localhost:6000", + capabilities=["text", "image"] + ) + + metadata = protocol.get_metadata() + + assert "agent_id" in metadata, "Metadata missing 'agent_id'" + assert metadata["agent_id"] == "test-agent" + + assert "name" in metadata, "Metadata missing 'name'" + assert metadata["name"] == "Test Agent" + + assert "capabilities" in metadata, "Metadata missing 'capabilities'" + assert "text" in metadata["capabilities"], "'text' not in capabilities" + assert "image" in metadata["capabilities"], "'image' not in capabilities" + + @pytest.mark.asyncio + async def test_a2a_protocol_incoming_handler(self): + """ + Test setting incoming message handler + + Expected: Server components should be initialized after setting handler + + Failure Reasons: + - Handler not set → Check set_incoming_handler() + - Server components not initialized → Check A2AStarletteApplication creation + """ + protocol = A2AProtocol( + agent_id="test-agent", + agent_name="Test Agent", + public_url="http://localhost:6000" + ) + + # Set handler + async def test_handler(message: dict) -> dict: + return {"response": "test"} + + protocol.set_incoming_handler(test_handler) + + # Should initialize server components + assert protocol.incoming_handler is not None, \ + "Handler not set after set_incoming_handler()" + + assert protocol.agent_executor is not None, \ + "AgentExecutor not initialized. Check NANDAAgentExecutor creation" + + assert protocol.request_handler is not None, \ + "RequestHandler not initialized. Check DefaultRequestHandler creation" + + assert protocol.server_app is not None, \ + "Server app not initialized. Check A2AStarletteApplication creation" + + +class TestResourceCleanup: + """ + Test resource cleanup functionality (PR #12 fix) + + Purpose: Verify HTTP clients and other resources are properly released + + Critical: Resource leaks can cause: + - Socket exhaustion + - Memory leaks + - Connection pool issues + """ + + @pytest.mark.asyncio + async def test_a2a_protocol_cleanup(self): + """ + Test that A2A protocol cleanup closes HTTP client + + Expected: HTTP client should be closed after cleanup() + + Failure Reasons: + - cleanup() not implemented → Check A2AProtocol.cleanup() + - aclose() not called → Check httpx_client.aclose() call + """ + protocol = A2AProtocol( + agent_id="test-cleanup", + agent_name="Test Cleanup", + public_url="http://localhost:6000" + ) + + # Client should be open + assert protocol.httpx_client is not None, "HTTP client is None" + assert not protocol.httpx_client.is_closed, \ + "HTTP client should not be closed before cleanup" + + # Cleanup + await protocol.cleanup() + + # Client should be closed + assert protocol.httpx_client.is_closed, \ + "HTTP client not closed after cleanup(). Check A2AProtocol.cleanup() implementation" + + @pytest.mark.asyncio + async def test_protocol_router_cleanup_all(self): + """ + Test that ProtocolRouter cleans up all protocols + + Expected: All registered protocols should be cleaned up + + Failure Reasons: + - cleanup_all() not implemented → Check ProtocolRouter.cleanup_all() + - Not iterating all protocols → Check iteration logic + """ + router = ProtocolRouter() + + # Register A2A protocol + protocol = A2AProtocol( + agent_id="test-router-cleanup", + agent_name="Test Router Cleanup", + public_url="http://localhost:6000" + ) + router.register(protocol) + + # Client should be open + assert not protocol.httpx_client.is_closed, \ + "HTTP client should not be closed before router cleanup" + + # Cleanup all + await router.cleanup_all() + + # Client should be closed + assert protocol.httpx_client.is_closed, \ + "HTTP client not closed after router.cleanup_all(). " \ + "Check ProtocolRouter.cleanup_all() implementation" + + def test_nanda_stop_calls_cleanup(self): + """ + Test that NANDA.stop() triggers protocol cleanup + + Expected: stop() should cleanup all protocol resources + + Failure Reasons: + - stop() doesn't call cleanup → Check NANDA.stop() + - Event loop handling issue → Check asyncio.get_running_loop() logic + - asyncio.run() not called in sync context → Check RuntimeError handling + + Context: This test runs in pytest (sync context), so stop() should + create a temporary event loop to run cleanup. + """ + agent = NANDA( + agent_id="test-stop-cleanup", + agent_logic=simple_agent_logic, + port=6000, + host="127.0.0.1", + public_url="http://127.0.0.1:6000", + enable_telemetry=False + ) + + # Get protocol + protocol = agent.router.get_protocol("a2a") + assert protocol is not None, "A2A protocol not found" + assert not protocol.httpx_client.is_closed, \ + "HTTP client should not be closed before stop()" + + # Stop agent (should trigger cleanup) + try: + agent.stop() + except Exception as e: + pytest.fail(f"agent.stop() raised exception: {e}\n" + f"Check NANDA.stop() error handling") + + # HTTP client should be closed + assert protocol.httpx_client.is_closed, \ + "HTTP client not closed after stop(). " \ + "Possible causes:\n" \ + "1. cleanup() not called → Check NANDA.stop() implementation\n" \ + "2. Event loop issue → Check asyncio.get_running_loop() / asyncio.run() logic\n" \ + "3. RuntimeError not caught → Check try-except around get_running_loop()" + + @pytest.mark.asyncio + async def test_cleanup_handles_errors_gracefully(self): + """ + Test that cleanup handles errors without crashing + + Expected: cleanup_all() should continue even if one protocol fails + + Failure Reasons: + - Exception not caught → Check try-except in cleanup_all() + - Error propagates → Should print warning, not raise + """ + router = ProtocolRouter() + + # Create a mock protocol that raises error on cleanup + class MockProtocol: + def get_protocol_name(self): + return "mock" + + async def cleanup(self): + raise Exception("Test cleanup error") + + router.protocols["mock"] = MockProtocol() + + # Cleanup should not raise exception + try: + await router.cleanup_all() + success = True + except Exception as e: + success = False + pytest.fail(f"cleanup_all() should handle errors gracefully, but raised: {e}\n" + f"Check ProtocolRouter.cleanup_all() error handling") + + assert success, "cleanup_all() should not raise exceptions" + + @pytest.mark.asyncio + async def test_cleanup_in_async_context(self): + """ + Test cleanup when called from async context + + Expected: Should use create_task() instead of asyncio.run() + + Note: This test verifies the async context path in NANDA.stop() + """ + agent = NANDA( + agent_id="test-async-cleanup", + agent_logic=simple_agent_logic, + port=6000, + host="127.0.0.1", + public_url="http://127.0.0.1:6000", + enable_telemetry=False + ) + + protocol = agent.router.get_protocol("a2a") + + # In async context, stop() should work without errors + agent.stop() + + # Wait a bit for the task to complete + await asyncio.sleep(0.1) + + # Note: In async context, create_task() is used, so cleanup happens asynchronously + # We can't guarantee client is closed immediately + + +class TestIntegration: + """ + Integration tests for complete agent lifecycle + + Purpose: Test real-world usage scenarios end-to-end + """ + + def test_full_agent_lifecycle(self): + """ + Test complete agent lifecycle: create → use → stop + + Expected: Agent should initialize, work, and cleanup properly + + This is the most important integration test. If this fails, check: + 1. Agent initialization + 2. Protocol registration + 3. Resource cleanup + """ + # Create + try: + agent = NANDA( + agent_id="test-lifecycle", + agent_logic=simple_agent_logic, + agent_name="Lifecycle Test", + domain="testing", + port=6000, + host="127.0.0.1", + public_url="http://127.0.0.1:6000", + enable_telemetry=False + ) + except Exception as e: + pytest.fail(f"Agent creation failed: {e}") + + # Verify initialization + assert agent.agent_id == "test-lifecycle" + assert "a2a" in agent.router.get_all_protocols(), \ + "A2A protocol not registered during initialization" + + # Get protocol + protocol = agent.router.get_protocol("a2a") + assert protocol is not None, "A2A protocol not retrievable" + assert not protocol.httpx_client.is_closed, \ + "HTTP client closed prematurely" + + # Stop (cleanup) + try: + agent.stop() + except Exception as e: + pytest.fail(f"Agent stop failed: {e}\n" + f"Check NANDA.stop() implementation and error handling") + + # Verify cleanup + assert protocol.httpx_client.is_closed, \ + "HTTP client not closed after stop().\n" \ + f"Test environment: Python {sys.version}\n" \ + f"This indicates the resource cleanup fix (PR #12) may not be working.\n" \ + f"Check NANDA.stop() event loop handling." + + def test_multiple_agents_cleanup(self): + """ + Test cleanup with multiple agents + + Expected: Each agent should cleanup its own resources + """ + agents = [] + + # Create multiple agents + for i in range(3): + agent = NANDA( + agent_id=f"test-agent-{i}", + agent_logic=simple_agent_logic, + port=6000 + i, + host="127.0.0.1", + public_url=f"http://127.0.0.1:{6000 + i}", + enable_telemetry=False + ) + agents.append(agent) + + # Get all protocols + protocols = [agent.router.get_protocol("a2a") for agent in agents] + + # All clients should be open + for protocol in protocols: + assert not protocol.httpx_client.is_closed + + # Stop all agents + for agent in agents: + agent.stop() + + # All clients should be closed + for i, protocol in enumerate(protocols): + assert protocol.httpx_client.is_closed, \ + f"Agent {i} HTTP client not closed" + + +if __name__ == "__main__": + # Run with pytest + pytest.main([__file__, "-v", "--tb=short", "-s"])