-
Notifications
You must be signed in to change notification settings - Fork 30
Official a2a switch #12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
ryanRfox
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here are a few suggested changes for this PR.
| @@ -0,0 +1,101 @@ | |||
| #!/usr/bin/env python3 | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove from PR. retain copies locally.
| @@ -0,0 +1,252 @@ | |||
| #!/usr/bin/env python3 | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove from PR. retain copies locally.
| self.session.verify = False # For development with self-signed certs | ||
| # Use async httpx client instead of requests | ||
| self.client = httpx.AsyncClient(timeout=30.0, verify=False) # verify=False for dev with self-signed certs | ||
| logger.info(f"Registry client initialized with URL: {self.registry_url}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggest to log a WARNING if verify=FALSE to raise the potential security vulnerability (no SSL cert verification) and mitigate the impacts if used in production.
| traceback.print_exc() | ||
| # Send error message | ||
| error_message = new_agent_text_message(f"Error: {str(e)}") | ||
| event_queue.enqueue_event(error_message) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should await here, like in L61
|
|
||
| async def cancel(self, context: RequestContext, event_queue: EventQueue): | ||
| """Handle task cancellation""" | ||
| event_queue.enqueue_event(new_agent_text_message("Task cancelled")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should await here, like in L61
| @@ -0,0 +1,73 @@ | |||
| from typing import Callable | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename file to agent_executor.py to match python naming conventions.
this will require updates to imports and init
# Change from:
from ..AgentExecutor import NANDAAgentExecutor
# Change to:
from ..agent_executor import NANDAAgentExecutor
ryanRfox
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing Resource Cleanup
Existing Code
a2a/protocol.py:78:
class A2AProtocol(AgentProtocol):
def __init__(self, ...):
# ... init ...
# HTTP client for A2A client
self.httpx_client = httpx.AsyncClient() # ❌ Never closedadapter.py:170-174:
def stop(self):
"""Stop the agent and cleanup telemetry"""
if self.telemetry:
self.telemetry.stop() # ✅ Cleans up telemetry
print(f"🛑 Stopping agent '{self.agent_id}'")
# ❌ But doesn't close HTTP clients or other resources| self.server_app = None | ||
|
|
||
| # HTTP client for A2A client | ||
| self.httpx_client = httpx.AsyncClient() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
client never closed. Suggest to make A2AProtocol a proper async context manager.
import httpx
from typing import Optional
class A2AProtocol(AgentProtocol):
"""A2A SDK protocol adapter"""
def __init__(self, ...):
"""Initialize A2A protocol adapter"""
self.agent_id = agent_id
self.agent_name = agent_name
self.public_url = public_url
# ... other init ...
# HTTP client (will be initialized on first use or in __aenter__)
self.httpx_client: Optional[httpx.AsyncClient] = None
self._client_managed_externally = False
async def _ensure_client(self) -> httpx.AsyncClient:
"""Ensure HTTP client is initialized"""
if self.httpx_client is None:
self.httpx_client = httpx.AsyncClient()
return self.httpx_client
async def send_message(self, target_url: str, message: Dict[str, Any]) -> Dict[str, Any]:
"""Send message using A2A client"""
client = await self._ensure_client()
try:
# ... existing send logic using client ...
pass
except Exception as e:
logger.exception(f"Error sending message: {e}")
raise
async def close(self) -> None:
"""Close HTTP client and cleanup resources"""
if self.httpx_client is not None and not self._client_managed_externally:
await self.httpx_client.aclose()
self.httpx_client = None
logger.debug(f"Closed HTTP client for {self.agent_id}")
async def __aenter__(self):
"""Async context manager entry"""
self.httpx_client = httpx.AsyncClient()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit"""
await self.close()
def get_protocol_name(self) -> str:
"""Return protocol identifier"""
return "a2a"Update ProtocolRouter to manage protocol lifecycle:
class ProtocolRouter:
"""Manages multiple protocol adapters and routes messages"""
def __init__(self) -> None:
self.protocols: Dict[str, AgentProtocol] = {}
self.default_protocol: Optional[str] = None
# ... existing methods ...
async def close_all(self) -> None:
"""Close all protocol adapters and cleanup resources"""
logger.info("Closing all protocol adapters...")
for name, protocol in self.protocols.items():
try:
if hasattr(protocol, 'close'):
await protocol.close()
logger.debug(f"Closed {name} protocol")
except Exception as e:
logger.error(f"Error closing {name} protocol: {e}")
async def __aenter__(self):
"""Async context manager entry"""
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit"""
await self.close_all()Update NANDA adapter to properly cleanup:
class NANDA:
"""Simple NANDA class for clean agent deployment"""
# ... existing __init__ ...
async def stop(self) -> None:
"""Stop the agent and cleanup all resources"""
logger.info(f"Stopping agent '{self.agent_id}'...")
# Close registry client
if self.registry and hasattr(self.registry, 'close'):
try:
await self.registry.close()
logger.debug("Registry client closed")
except Exception as e:
logger.error(f"Error closing registry: {e}")
# Close all protocol adapters
if self.router:
try:
await self.router.close_all()
logger.debug("All protocols closed")
except Exception as e:
logger.error(f"Error closing protocols: {e}")
# Stop telemetry
if self.telemetry:
try:
self.telemetry.stop()
logger.debug("Telemetry stopped")
except Exception as e:
logger.error(f"Error stopping telemetry: {e}")
logger.info(f"Agent '{self.agent_id}' stopped successfully")
async def __aenter__(self):
"""Async context manager entry"""
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit"""
await self.stop()Addresses review feedback from @ryanRfox about missing resource cleanup. Changes: - Added abstract cleanup() method to AgentProtocol base class - Implemented cleanup() in A2AProtocol to properly close httpx_client - Added cleanup_all() method to ProtocolRouter to cleanup all protocols - Updated NANDA.stop() to call protocol cleanup before stopping This ensures all HTTP clients and protocol resources are properly released when agents are stopped, preventing resource leaks. Fixes: PR #12 review feedback 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
Addresses review feedback from @ryanRfox about missing resource cleanup. Changes: - Added abstract cleanup() method to AgentProtocol base class - Implemented cleanup() in A2AProtocol to properly close httpx_client - Added cleanup_all() method to ProtocolRouter to cleanup all protocols - Updated NANDA.stop() to call protocol cleanup before stopping This ensures all HTTP clients and protocol resources are properly released when agents are stopped, preventing resource leaks. Fixes: PR #12 review feedback Reviewed-by: Jin Gao <[email protected]>
2700d40 to
02eac88
Compare
Fix: - Replace deprecated asyncio.get_event_loop() with get_running_loop() - Properly handle both async and sync contexts in NANDA.stop() - Prevents "no current event loop" errors in Python 3.10+ Test: - Add 15 comprehensive tests covering backward compatibility, A2A protocol, resource cleanup, and integration scenarios - All tests passing (15/15)
Official A2A Protocol Support
This PR introduces official Google's A2A protocol support to NANDA, implementing a standardized protocol architecture that enables multi-protocol communication and prepares the framework for future protocol additions.
🎯 Key Changes
Protocol Architecture
ProtocolRouterfor managing multiple communication protocolsEnhanced Core Components
AgentBridgeto work with protocol router instead of direct A2A couplingRegistryClientwith async support and improved registration handlingRegistration & Discovery
supported_protocolsand endpoints metadataExample Agent Improvements
📦 Architecture
NANDA (adapter.py)
├── ProtocolRouter (protocols/router.py)
│ ├── A2AProtocol (protocols/a2a/protocol.py)
│ └── [Future: SLIMProtocol, CustomProtocol]
├── AgentBridge (agent_bridge.py)
├── RegistryClient (registry_client.py)
└── AgentExecutor (protocols/AgentExecutor.py)
✅ Benefits
🚀 Next Steps