diff --git a/nanda_core/utils/README.md b/nanda_core/utils/README.md new file mode 100644 index 0000000..10e1f81 --- /dev/null +++ b/nanda_core/utils/README.md @@ -0,0 +1,493 @@ +# NANDA Infrastructure Utilities + +Generic infrastructure utilities for NANDA agents, providing rate limiting and event coordination capabilities. + +## Overview + +This package provides two key utilities for building robust, production-ready NANDA agents: + +1. **Rate Limiter** - Token bucket algorithm for DOS protection and request throttling +2. **Event Bus** - Async pub/sub system for agent event coordination + +## Rate Limiter + +### Purpose + +Protect NANDA agents from abuse, DOS attacks, and message floods using the industry-standard token bucket algorithm. Supports per-client rate limiting for multi-tenant systems. + +### Features + +- **Token Bucket Algorithm** - Allows burst traffic while maintaining average rate +- **Per-Client Tracking** - Separate buckets for each client/tenant +- **Configurable Limits** - Set rate and burst capacity via constructor or environment variables +- **Zero Dependencies** - Uses only Python standard library + +### Usage + +#### Basic Usage + +```python +from nanda_core.utils import RateLimiter, RateLimitError + +# Create rate limiter: 50 requests/sec, burst of 100 +limiter = RateLimiter(rate_per_sec=50.0, burst=100.0) + +# Check if client can make request +try: + limiter.check(client_id="user-123") + # Process request +except RateLimitError: + # Return 429 Too Many Requests + print("Rate limit exceeded") +``` + +#### Environment Variable Configuration + +```bash +# Set via environment variables +export RATE_LIMIT_RPS=50 # Requests per second +export RATE_LIMIT_BURST=100 # Burst capacity +``` + +```python +from nanda_core.utils import RateLimiter + +# Uses env vars automatically +limiter = RateLimiter() +``` + +#### Integration with NANDA Agent Bridge + +```python +from nanda_core.core.agent_bridge import SimpleAgentBridge +from nanda_core.utils import RateLimiter, RateLimitError +from python_a2a import Message + +class RateLimitedAgentBridge(SimpleAgentBridge): + def __init__(self, agent_id, agent_logic, **kwargs): + super().__init__(agent_id, agent_logic, **kwargs) + self.rate_limiter = RateLimiter() + + def handle_message(self, msg: Message) -> Message: + # Extract sender from message + sender = msg.metadata.custom_fields.get('from_agent_id', 'anonymous') + + # Check rate limit + try: + self.rate_limiter.check(sender) + except RateLimitError: + return self._create_response( + msg, + msg.conversation_id or str(uuid.uuid4()), + "⚠️ Rate limit exceeded. Please slow down." + ) + + # Continue normal processing + return super().handle_message(msg) +``` + +#### Utility Methods + +```python +# Check remaining tokens for a client +remaining = limiter.get_remaining("user-123") +print(f"Client has {remaining:.1f} tokens remaining") + +# Reset rate limit for specific client +limiter.reset("user-123") + +# Clear all client buckets +limiter.reset_all() +``` + +### Configuration + +| Parameter | Default | Environment Variable | Description | +|-----------|---------|---------------------|-------------| +| `rate_per_sec` | 50 | `RATE_LIMIT_RPS` | Average requests per second | +| `burst` | 100 | `RATE_LIMIT_BURST` | Maximum burst capacity | + +--- + +## Event Bus + +### Purpose + +Lightweight async pub/sub event dispatcher for agent-to-agent event coordination. Supports in-memory subscribers and pluggable external transports (NATS, webhooks). + +### Features + +- **Async Pub/Sub** - Async iterator-based subscription pattern +- **Pluggable Transports** - NATS, webhooks, or custom connectors +- **Backpressure Handling** - Drops old events if subscriber queue full +- **Sync/Async Bridge** - `publish_sync()` for non-async code +- **Environment Configuration** - Auto-configures from env vars + +### Usage + +#### Basic Pub/Sub + +```python +from nanda_core.utils import EventBus +import asyncio + +bus = EventBus() + +# Publisher +async def publish_events(): + await bus.publish({ + "type": "agent.registered", + "agent_id": "compliance-advisor", + "timestamp": "2025-11-12T10:30:00Z" + }) + +# Subscriber +async def monitor_events(): + async for event in bus.subscribe(): + print(f"Event: {event['type']}") + if event['type'] == 'shutdown': + break + +# Run +asyncio.run(asyncio.gather(publish_events(), monitor_events())) +``` + +#### Using Global Event Bus + +```python +from nanda_core.utils import event_bus + +# Publish from anywhere in your code +await event_bus.publish({"type": "test.event"}) + +# Subscribe from anywhere +async for event in event_bus.subscribe(): + handle_event(event) +``` + +#### Sync/Async Bridge + +```python +from nanda_core.utils import event_bus + +# Call from synchronous code +def sync_function(): + event_bus.publish_sync({ + "type": "metric.collected", + "value": 42 + }) +``` + +#### Integration with NANDA Telemetry + +```python +from nanda_core.telemetry.telemetry_system import TelemetrySystem +from nanda_core.utils import event_bus + +class EventPublishingTelemetry(TelemetrySystem): + def log_message_sent(self, to_agent_id: str, conversation_id: str): + # Existing telemetry logging + super().log_message_sent(to_agent_id, conversation_id) + + # Publish event for monitoring + event_bus.publish_sync({ + "type": "telemetry.message.sent", + "from_agent_id": self.agent_id, + "to_agent_id": to_agent_id, + "conversation_id": conversation_id, + "timestamp": datetime.now(timezone.utc).isoformat() + }) +``` + +#### Integration with NANDA Adapter + +```python +from nanda_core.core.adapter import NANDA +from nanda_core.utils import event_bus +from datetime import datetime, timezone + +class EventPublishingNANDA(NANDA): + def _register(self): + # Existing registration logic + response = super()._register() + + if response.status_code == 200: + # Publish registration event + event_bus.publish_sync({ + "type": "agent.registered", + "agent_id": self.agent_id, + "registry_url": self.registry_url, + "timestamp": datetime.now(timezone.utc).isoformat() + }) + + return response +``` + +### External Connectors + +#### NATS Connector + +Publish events to NATS for distributed systems. + +**Installation:** +```bash +pip install nest[events] # Installs nats-py +``` + +**Configuration:** +```bash +export NATS_URL=nats://localhost:4222 +export NATS_SUBJECT=agent-events +``` + +**Usage:** +```python +from nanda_core.utils import EventBus + +# Auto-configures NATS from env vars +bus = EventBus() + +# Events are published to both in-memory subscribers and NATS +await bus.publish({"type": "agent.heartbeat"}) +``` + +#### Webhook Connector + +Send events to HTTP webhooks. + +**Configuration:** +```bash +export EVENT_WEBHOOKS=https://webhook1.com,https://webhook2.com +export EVENT_WEBHOOK_TIMEOUT=5.0 +``` + +**Usage:** +```python +from nanda_core.utils import EventBus + +# Auto-configures webhooks from env vars +bus = EventBus() + +# Events are POSTed to all configured webhooks +await bus.publish({"type": "alert", "severity": "high"}) +``` + +#### Custom Connector + +```python +from nanda_core.utils import EventBus, EventConnector +from typing import Dict, Any + +class CustomConnector: + async def send(self, event: Dict[str, Any]) -> None: + # Your custom logic (database, queue, etc.) + print(f"Custom connector received: {event}") + +bus = EventBus() +await bus.register_connector(CustomConnector()) +``` + +### Configuration + +#### Environment Variables + +| Variable | Default | Description | +|----------|---------|-------------| +| `NATS_URL` | None | NATS server URL (e.g., nats://localhost:4222) | +| `NATS_SUBJECT` | "events" | NATS subject to publish to | +| `EVENT_WEBHOOKS` | None | Comma-separated webhook URLs | +| `EVENT_WEBHOOK_TIMEOUT` | 5.0 | Webhook timeout in seconds | + +#### Event Bus Methods + +```python +# Subscribe to events +async for event in bus.subscribe(): + process(event) + +# Publish event (async) +await bus.publish({"type": "event"}) + +# Publish event (sync) +bus.publish_sync({"type": "event"}) + +# Register custom connector +await bus.register_connector(my_connector) + +# Close all connectors +await bus.close() +``` + +--- + +## Use Cases + +### 1. Protect A2A Messages from Floods + +```python +from nanda_core.utils import RateLimiter, RateLimitError + +limiter = RateLimiter(rate_per_sec=10.0, burst=20.0) + +def handle_a2a_message(sender_id, message): + try: + limiter.check(sender_id) + return process_message(message) + except RateLimitError: + return {"error": "Rate limit exceeded"} +``` + +### 2. Monitor Agent Lifecycle Events + +```python +from nanda_core.utils import event_bus + +# Publisher (in adapter) +def register_agent(agent_id): + # ... registration logic ... + event_bus.publish_sync({ + "type": "agent.registered", + "agent_id": agent_id + }) + +# Subscriber (monitoring service) +async def monitor(): + async for event in event_bus.subscribe(): + if event['type'] == 'agent.registered': + log_registration(event['agent_id']) +``` + +### 3. Telemetry and Metrics Collection + +```python +from nanda_core.utils import event_bus + +# Publish metrics +event_bus.publish_sync({ + "type": "metric.collected", + "metric": "message_latency_ms", + "value": 42.5, + "agent_id": "compliance-advisor" +}) + +# Collect metrics +async def metrics_collector(): + metrics = [] + async for event in event_bus.subscribe(): + if event['type'] == 'metric.collected': + metrics.append(event) + if len(metrics) >= 100: + flush_to_database(metrics) + metrics.clear() +``` + +### 4. Distributed Agent Coordination + +```bash +# Set up NATS for multi-agent coordination +export NATS_URL=nats://nats.example.com:4222 +export NATS_SUBJECT=agent-coordination +``` + +```python +from nanda_core.utils import event_bus + +# Agent 1: Publish task completion +await event_bus.publish({ + "type": "task.completed", + "task_id": "compliance-check-123", + "agent_id": "compliance-advisor" +}) + +# Agent 2: Subscribe to task completions (via NATS) +async for event in event_bus.subscribe(): + if event['type'] == 'task.completed': + start_dependent_task(event['task_id']) +``` + +--- + +## Testing + +### Test Rate Limiter + +```python +import pytest +from nanda_core.utils import RateLimiter, RateLimitError + +def test_rate_limiter_burst(): + limiter = RateLimiter(rate_per_sec=5.0, burst=10.0) + client = "test-client" + + # Burst should work + for i in range(10): + limiter.check(client) + + # 11th should fail + with pytest.raises(RateLimitError): + limiter.check(client) +``` + +### Test Event Bus + +```python +import pytest +import asyncio +from nanda_core.utils import EventBus + +@pytest.mark.asyncio +async def test_event_bus(): + bus = EventBus() + received = [] + + async def subscriber(): + async for event in bus.subscribe(): + received.append(event) + if len(received) >= 2: + break + + sub_task = asyncio.create_task(subscriber()) + await asyncio.sleep(0.1) + + await bus.publish({"type": "test1"}) + await bus.publish({"type": "test2"}) + + await sub_task + assert len(received) == 2 +``` + +--- + +## Dependencies + +### Rate Limiter +- **Required**: None (stdlib only) + +### Event Bus +- **Required**: `httpx` (for webhook connector) +- **Optional**: `nats-py` (for NATS connector) - install with `pip install nest[events]` +- **Optional**: `anyio` (for sync/async bridge improvements) + +--- + +## Contributor + +**Astrocity Foundation - Space Industry Standards Initiative** + +- License: MIT +- Contact: standards@astrocity.foundation +- Based on: Standard algorithms (token bucket, pub/sub pattern) + +--- + +## Examples + +See the examples in each utility's module docstring for additional usage patterns: + +```python +# Run rate limiter example +python -m nanda_core.utils.rate_limiter + +# Run event bus example +python -m nanda_core.utils.event_bus +``` diff --git a/nanda_core/utils/__init__.py b/nanda_core/utils/__init__.py index d199fff..5a41a6d 100644 --- a/nanda_core/utils/__init__.py +++ b/nanda_core/utils/__init__.py @@ -3,5 +3,15 @@ Utility functions and helpers for the Streamlined NANDA Adapter """ -# Placeholder for future utilities -__all__ = [] \ No newline at end of file +from .rate_limiter import RateLimiter, RateLimitError +from .event_bus import EventBus, EventConnector, NATSConnector, WebhookConnector, event_bus + +__all__ = [ + "RateLimiter", + "RateLimitError", + "EventBus", + "EventConnector", + "NATSConnector", + "WebhookConnector", + "event_bus", +] \ No newline at end of file diff --git a/nanda_core/utils/event_bus.py b/nanda_core/utils/event_bus.py new file mode 100644 index 0000000..e7b5423 --- /dev/null +++ b/nanda_core/utils/event_bus.py @@ -0,0 +1,346 @@ +#!/usr/bin/env python3 +""" +Generic Event Bus - Async Pub/Sub System + +A lightweight, async publish/subscribe event dispatcher with pluggable +transport connectors (NATS, webhooks, etc.) for agent-to-agent communication. + +License: MIT +Contributor: Astrocity Foundation - Space Industry Standards Initiative +Based on: Standard pub-sub pattern +""" + +from __future__ import annotations + +import asyncio +import json +import os +from typing import Any, AsyncIterator, Dict, Iterable, List, Optional, Protocol, Set + +try: + import httpx +except ImportError: + httpx = None # Optional dependency + +try: + import anyio +except ImportError: + anyio = None # Optional dependency + +try: + import nats +except ImportError: + nats = None # Optional dependency + + +class EventConnector(Protocol): + """Protocol for pluggable event transport connectors.""" + + async def send(self, event: Dict[str, Any]) -> None: + """Send event to external system.""" + ... + + +class NATSConnector: + """ + NATS connector for distributed event streaming. + + Requires: pip install nats-py + Configuration: + NATS_URL: NATS server URL (e.g., nats://localhost:4222) + NATS_SUBJECT: Subject to publish to (default: events) + """ + + def __init__(self, url: str, subject: str = "events") -> None: + if nats is None: + raise RuntimeError( + "nats-py is not installed. Install with: pip install nats-py" + ) + self._url = url + self._subject = subject + self._client: Optional[nats.aio.client.Client] = None + self._lock = asyncio.Lock() + + async def send(self, event: Dict[str, Any]) -> None: + """Publish event to NATS.""" + async with self._lock: + if self._client is None: + try: + self._client = await nats.connect( + self._url, + allow_reconnect=False + ) + except Exception: + self._client = None + return + client = self._client + + if not client: + return + + payload = json.dumps(event, default=str).encode("utf-8") + try: + await client.publish(self._subject, payload) + except Exception: + # Swallow errors to avoid breaking event dispatch + pass + + async def close(self) -> None: + """Close NATS connection.""" + if self._client: + await self._client.close() + + +class WebhookConnector: + """ + Webhook connector for HTTP event delivery. + + Requires: pip install httpx + Configuration: + EVENT_WEBHOOKS: Comma-separated webhook URLs + EVENT_WEBHOOK_TIMEOUT: Timeout in seconds (default: 5.0) + """ + + def __init__(self, urls: Iterable[str], *, timeout: float = 5.0) -> None: + if httpx is None: + raise RuntimeError( + "httpx is not installed. Install with: pip install httpx" + ) + self._urls = [url.strip() for url in urls if url and url.strip()] + self._timeout = timeout + self._client: Optional[httpx.AsyncClient] = None + self._client_lock = asyncio.Lock() + + async def _ensure_client(self) -> Optional[httpx.AsyncClient]: + """Lazily create HTTP client.""" + async with self._client_lock: + if self._client is None: + self._client = httpx.AsyncClient(timeout=self._timeout) + return self._client + + async def send(self, event: Dict[str, Any]) -> None: + """POST event to all configured webhooks.""" + if not self._urls: + return + + client = await self._ensure_client() + if client is None: + return + + payload = json.dumps(event, default=str) + for url in self._urls: + try: + await client.post( + url, + content=payload, + headers={"content-type": "application/json"}, + ) + except Exception: + # Continue to next webhook on failure + continue + + async def close(self) -> None: + """Close HTTP client.""" + if self._client: + await self._client.aclose() + + +class EventBus: + """ + Async publish/subscribe event bus with pluggable transports. + + Features: + - In-memory subscribers via async iterators + - Pluggable external connectors (NATS, webhooks, etc.) + - Automatic backpressure handling (drops old events if queue full) + - Environment variable configuration + + Usage: + # Publisher + bus = EventBus() + await bus.publish({"type": "agent.registered", "agent_id": "foo"}) + + # Subscriber + async for event in bus.subscribe(): + print(f"Got event: {event}") + + Environment Variables: + NATS_URL: Enable NATS connector + NATS_SUBJECT: NATS subject (default: events) + EVENT_WEBHOOKS: Comma-separated webhook URLs + EVENT_WEBHOOK_TIMEOUT: Webhook timeout seconds (default: 5.0) + """ + + def __init__(self) -> None: + """Initialize event bus with default configuration.""" + self._subscribers: Set[asyncio.Queue[Dict[str, Any]]] = set() + self._lock = asyncio.Lock() + self._connectors: List[EventConnector] = [] + self._configure_default_connectors() + + def _configure_default_connectors(self) -> None: + """Auto-configure connectors from environment variables.""" + # NATS connector + nats_url = os.getenv("NATS_URL") + if nats_url: + subject = os.getenv("NATS_SUBJECT", "events") + try: + self._connectors.append(NATSConnector(nats_url, subject)) + except RuntimeError: + # nats-py not installed, skip + pass + + # Webhook connectors + webhook_urls = os.getenv("EVENT_WEBHOOKS", "") + if webhook_urls: + timeout = float(os.getenv("EVENT_WEBHOOK_TIMEOUT", "5.0")) + self._connectors.append( + WebhookConnector(webhook_urls.split(","), timeout=timeout) + ) + + async def subscribe(self) -> AsyncIterator[Dict[str, Any]]: + """ + Subscribe to events. + + Yields events as they are published. Automatically unsubscribes + when the async iterator is closed. + + Usage: + async for event in bus.subscribe(): + print(event) + """ + queue: asyncio.Queue[Dict[str, Any]] = asyncio.Queue(maxsize=256) + + async with self._lock: + self._subscribers.add(queue) + + try: + while True: + event = await queue.get() + yield event + finally: + async with self._lock: + self._subscribers.discard(queue) + + async def publish(self, event: Dict[str, Any]) -> None: + """ + Publish event to all subscribers and connectors. + + Args: + event: Event dictionary (must be JSON-serializable) + + Note: + If a subscriber's queue is full, the oldest event is dropped. + External connector errors are silently caught to avoid blocking. + """ + async with self._lock: + subscribers = list(self._subscribers) + connectors = list(self._connectors) + + # Publish to in-memory subscribers + for queue in subscribers: + if queue.full(): + # Drop oldest event to make room + try: + queue.get_nowait() + except asyncio.QueueEmpty: + pass + queue.put_nowait(event) + + # Publish to external connectors (don't block on errors) + if connectors: + await asyncio.gather( + *(connector.send(event) for connector in connectors), + return_exceptions=True + ) + + def publish_sync(self, event: Dict[str, Any]) -> None: + """ + Publish event from synchronous code. + + This method bridges sync/async worlds. If called from async context, + schedules as a task. Otherwise, attempts to run via anyio or asyncio. + + Args: + event: Event dictionary + """ + try: + # Check if we're in an async context + loop = asyncio.get_running_loop() + except RuntimeError: + # Not in async context, try anyio bridge or create new loop + if anyio: + try: + anyio.from_thread.run(self.publish, event) + return + except RuntimeError: + pass + # Last resort: create new event loop + asyncio.run(self.publish(event)) + else: + # In async context: schedule as task + loop.create_task(self.publish(event)) + + async def register_connector(self, connector: EventConnector) -> None: + """ + Register a custom event connector. + + Args: + connector: Object implementing EventConnector protocol + """ + async with self._lock: + self._connectors.append(connector) + + async def close(self) -> None: + """Close all connectors and clear subscribers.""" + for connector in self._connectors: + if hasattr(connector, 'close'): + await connector.close() + self._connectors.clear() + self._subscribers.clear() + + +# Global event bus instance (optional convenience) +event_bus = EventBus() + + +# Example usage +if __name__ == "__main__": + async def publisher(): + """Example publisher.""" + for i in range(5): + await event_bus.publish({ + "type": "test.event", + "seq": i, + "message": f"Event {i}" + }) + await asyncio.sleep(0.5) + + async def subscriber(name: str): + """Example subscriber.""" + async for event in event_bus.subscribe(): + print(f"{name} received: {event}") + if event.get("seq", -1) >= 4: + break + + async def main(): + """Run example.""" + print("Starting event bus example...") + print("Publisher will send 5 events, 2 subscribers will receive them\n") + + # Start subscribers + sub1 = asyncio.create_task(subscriber("Sub1")) + sub2 = asyncio.create_task(subscriber("Sub2")) + + # Give subscribers time to register + await asyncio.sleep(0.1) + + # Publish events + await publisher() + + # Wait for subscribers to finish + await asyncio.gather(sub1, sub2) + + print("\nExample complete!") + + asyncio.run(main()) diff --git a/nanda_core/utils/rate_limiter.py b/nanda_core/utils/rate_limiter.py new file mode 100644 index 0000000..952e59f --- /dev/null +++ b/nanda_core/utils/rate_limiter.py @@ -0,0 +1,202 @@ +#!/usr/bin/env python3 +""" +Generic Rate Limiter - Token Bucket Algorithm + +A simple, thread-safe rate limiter using the token bucket algorithm. +Useful for protecting agent systems from abuse and DOS attacks. + +License: MIT +Contributor: Astrocity Foundation - Space Industry Standards Initiative +Based on: Standard token bucket algorithm (public pattern) +""" + +from __future__ import annotations + +import os +import time +from dataclasses import dataclass +from typing import Dict, Optional + + +class RateLimitError(Exception): + """Raised when a client exceeds its configured rate limit.""" + pass + + +@dataclass +class _Bucket: + """Internal bucket state for token bucket algorithm.""" + tokens: float + last_refill: float + + +class RateLimiter: + """ + Token bucket rate limiter for multi-client systems. + + The token bucket algorithm allows for burst traffic while maintaining + an average rate limit over time. Each client gets their own bucket. + + Usage: + limiter = RateLimiter(rate_per_sec=50.0, burst=100.0) + + try: + limiter.check(client_id="user-123") + # Process request + except RateLimitError: + # Return 429 Too Many Requests + pass + + Configuration via environment variables: + RATE_LIMIT_RPS: Requests per second (default: 50) + RATE_LIMIT_BURST: Burst capacity (default: 100) + """ + + def __init__( + self, + *, + rate_per_sec: Optional[float] = None, + burst: Optional[float] = None + ) -> None: + """ + Initialize rate limiter. + + Args: + rate_per_sec: Average requests per second allowed (default: 50) + burst: Maximum burst capacity (default: 100) + """ + # Load from env vars if not specified + default_rate = rate_per_sec if rate_per_sec is not None else float( + os.getenv("RATE_LIMIT_RPS", "50") + ) + default_burst = burst if burst is not None else float( + os.getenv("RATE_LIMIT_BURST", "100") + ) + + # Enforce minimum values + self._rate = max(default_rate, 1.0) + self._capacity = max(default_burst, self._rate) + + # Per-client buckets + self._buckets: Dict[str, _Bucket] = {} + + def check(self, client_id: str) -> None: + """ + Check if client has capacity to make a request. + + Args: + client_id: Unique identifier for the client (tenant, user, API key, etc.) + + Raises: + RateLimitError: If client has exceeded rate limit + + Note: + This method modifies internal state. Call once per request. + """ + now = time.monotonic() + + # Get or create bucket for client + bucket = self._buckets.get(client_id) + if bucket is None: + bucket = _Bucket(tokens=self._capacity, last_refill=now) + self._buckets[client_id] = bucket + + # Refill tokens based on elapsed time + elapsed = now - bucket.last_refill + if elapsed > 0: + # Add tokens at configured rate, capped at capacity + bucket.tokens = min( + self._capacity, + bucket.tokens + elapsed * self._rate + ) + bucket.last_refill = now + + # Check if request can be served + if bucket.tokens < 1.0: + raise RateLimitError( + f"Client {client_id} exceeded rate limit " + f"({self._rate} req/sec, burst {self._capacity})" + ) + + # Consume one token + bucket.tokens -= 1.0 + + def reset(self, client_id: str) -> None: + """ + Reset rate limit for a specific client. + + Args: + client_id: Client identifier to reset + """ + self._buckets.pop(client_id, None) + + def reset_all(self) -> None: + """Clear all client buckets.""" + self._buckets.clear() + + def get_remaining(self, client_id: str) -> float: + """ + Get remaining token count for a client. + + Args: + client_id: Client identifier + + Returns: + Number of tokens remaining (float) + """ + bucket = self._buckets.get(client_id) + if bucket is None: + return self._capacity + + # Calculate current tokens with refill + now = time.monotonic() + elapsed = now - bucket.last_refill + if elapsed > 0: + return min( + self._capacity, + bucket.tokens + elapsed * self._rate + ) + return bucket.tokens + + +# Example usage +if __name__ == "__main__": + import time + + # Create rate limiter: 5 req/sec, burst of 10 + limiter = RateLimiter(rate_per_sec=5.0, burst=10.0) + + client = "test-client" + + print(f"Testing rate limiter (5 req/sec, burst 10)") + print(f"Remaining tokens: {limiter.get_remaining(client):.1f}") + + # Burst of 10 requests should work + print("\nSending burst of 10 requests...") + for i in range(10): + try: + limiter.check(client) + print(f" Request {i+1}: ✓ Allowed") + except RateLimitError as e: + print(f" Request {i+1}: ✗ Denied - {e}") + + # 11th request should fail + print("\nSending 11th request (should fail)...") + try: + limiter.check(client) + print(" ✓ Allowed (unexpected!)") + except RateLimitError as e: + print(f" ✗ Denied - Rate limit working!") + + # Wait and tokens should refill + print("\nWaiting 2 seconds for refill...") + time.sleep(2) + print(f"Remaining tokens: {limiter.get_remaining(client):.1f}") + + # Should work again + print("\nSending request after refill...") + try: + limiter.check(client) + print(" ✓ Allowed - Refill working!") + except RateLimitError as e: + print(f" ✗ Denied - {e}") diff --git a/setup.py b/setup.py index c2afdd5..1c0d4c1 100644 --- a/setup.py +++ b/setup.py @@ -17,7 +17,8 @@ def read_requirements(): "mcp", "python-dotenv", "flask-cors", - "psutil" # For system monitoring + "psutil", # For system monitoring + "httpx" # For async HTTP in event bus webhooks ] return requirements @@ -46,6 +47,7 @@ def read_readme(): extras_require={ "dev": ["pytest", "pytest-asyncio", "black", "flake8"], "monitoring": ["prometheus-client", "grafana-api"], + "events": ["nats-py"], # Optional NATS connector for event bus }, entry_points={ "console_scripts": [