Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 63 additions & 12 deletions crates/agent-protocol/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,34 +35,85 @@ pub enum EventType {
GuardrailInspect,
}

/// Agent decision
/// Agent response decision indicating how to handle a request or response.
///
/// This enum represents the decision an agent makes when processing a request or response.
/// It allows agents to allow, block, redirect, or challenge requests based on their processing logic.
///
/// # Variants
///
/// - **Allow**: Continue normal processing without modification
/// - **Block**: Reject the request/response with a custom error response
/// - **Redirect**: Send the client to a different URL
/// - **Challenge**: Request additional verification from the client
///
/// # Examples
///
/// ```rust
/// use std::collections::HashMap;
/// use zentinel_agent_protocol::Decision;
///
/// // Allow request to proceed normally
/// let allow = Decision::Allow;
///
/// // Block with 403 and custom body
/// let block = Decision::Block {
/// status: 403,
/// body: Some("Access denied".to_string()),
/// headers: None,
/// };
///
/// // Redirect to login page
/// let redirect = Decision::Redirect {
/// url: "https://example.com/login".to_string(),
/// status: 302,
/// };
///
/// // Challenge with CAPTCHA
/// let mut params = HashMap::new();
/// params.insert("type".to_string(), "recaptcha".to_string());
/// let challenge = Decision::Challenge {
/// challenge_type: "captcha".to_string(),
/// params,
/// };
/// ```
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(rename_all = "snake_case")]
pub enum Decision {
/// Allow the request/response to continue
/// Allow the request/response to continue without modification.
///
/// This is the default decision that indicates normal processing should proceed.
#[default]
Allow,
/// Block the request/response
/// Block the request/response with a custom error response.
///
/// The proxy will return the specified status code and optional body/headers
/// instead of forwarding the request to the upstream or returning the original response.
Block {
/// HTTP status code to return
/// HTTP status code to return (typically 4xx or 5xx)
status: u16,
/// Optional response body
/// Optional response body content
body: Option<String>,
/// Optional response headers
/// Optional response headers to include
headers: Option<HashMap<String, String>>,
},
/// Redirect the request
/// Redirect the client to a different URL.
///
/// The proxy will return a redirect response with the specified URL and status code.
Redirect {
/// Redirect URL
/// Target URL for the redirect
url: String,
/// HTTP status code (301, 302, 303, 307, 308)
/// HTTP redirect status code (301, 302, 303, 307, or 308)
status: u16,
},
/// Challenge the client (e.g., CAPTCHA)
/// Request additional verification from the client.
///
/// This can be used to implement CAPTCHA, multi-factor authentication,
/// or other challenge-response mechanisms.
Challenge {
/// Challenge type
/// Type of challenge (e.g., "captcha", "otp", "totp")
challenge_type: String,
/// Challenge parameters
/// Challenge-specific parameters and configuration
params: HashMap<String, String>,
},
}
Expand Down
40 changes: 37 additions & 3 deletions crates/agent-protocol/src/v2/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -519,16 +519,50 @@ impl AgentEntry {
}
}

/// Agent connection pool.
/// Agent connection pool for managing multiple connections to external processing agents.
///
/// Manages multiple connections to multiple agents with load balancing,
/// health tracking, automatic reconnection, and metrics collection.
/// `AgentPool` provides production-ready connection pooling, load balancing, health tracking,
/// automatic reconnection, and metrics collection for robust agent communication.
///
/// # Features
///
/// - **Connection pooling**: Maintains multiple connections per agent for better throughput
/// - **Load balancing**: Routes requests using round-robin, least-connections, or health-based strategies
/// - **Health monitoring**: Tracks agent health and routes requests to healthy connections
/// - **Automatic reconnection**: Reconnects failed connections transparently
/// - **Flow control**: Manages backpressure and request queuing
/// - **Metrics**: Collects detailed metrics on performance and health
/// - **Configuration management**: Distributes config updates to agents
///
/// # Performance
///
/// Uses `DashMap` for lock-free reads in the hot path. Agent lookup is O(1)
/// without contention. Connection selection uses cached health state to avoid
/// async I/O per request.
///
/// # Example
///
/// ```rust
/// use std::time::Duration;
/// use zentinel_agent_protocol::v2::{AgentPool, AgentPoolConfig, LoadBalanceStrategy};
/// use zentinel_agent_protocol::{RequestHeadersEvent};
///
/// // Create pool with custom config
/// let config = AgentPoolConfig {
/// connections_per_agent: 4,
/// load_balance_strategy: LoadBalanceStrategy::LeastConnections,
/// health_check_interval: Duration::from_secs(30),
/// ..Default::default()
/// };
/// let pool = AgentPool::with_config(config);
///
/// // Add agent with Unix domain socket endpoint
/// pool.add_agent("waf", "unix:/tmp/waf.sock").await?;
///
/// // Send request headers to an agent
/// let headers = RequestHeadersEvent { /* ... */ };
/// let response = pool.send_request_headers("waf", "correlation-123", &headers).await?;
/// ```
pub struct AgentPool {
config: AgentPoolConfig,
/// Lock-free concurrent map for agent lookup.
Expand Down
99 changes: 91 additions & 8 deletions crates/agent-protocol/src/v2/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,63 @@ use crate::{
WebSocketFrameEvent,
};

/// v2 agent handler trait.
/// Trait for implementing agent handlers in Protocol v2.
///
/// Implement this trait to create a v2 agent. The v2 handler adds:
/// - Capability reporting
/// - Health reporting
/// - Flow control awareness
/// - Metrics export
/// - Configuration updates
/// `AgentHandlerV2` defines the interface that agent implementations must provide
/// to handle various types of events from the proxy. This includes request/response
/// processing, WebSocket handling, health monitoring, and configuration management.
///
/// The trait provides sensible defaults for all methods, allowing agents to implement
/// only the events they need to handle. All methods are async to support I/O operations.
///
/// # Features
///
/// - **Capability reporting**: Declare what the agent can process
/// - **Health reporting**: Report current health status to the proxy
/// - **Flow control awareness**: Handle backpressure and flow control
/// - **Metrics export**: Provide metrics about agent performance
/// - **Configuration updates**: Handle dynamic configuration changes
///
/// # Event Lifecycle
///
/// 1. **Handshake**: Agent declares capabilities when connecting
/// 2. **Headers**: Process request/response headers first
/// 3. **Body chunks**: Handle streaming body data if needed
/// 4. **Completion**: Final processing when request/response is complete
/// 5. **WebSocket**: Handle WebSocket frames for upgraded connections
///
/// # Example
///
/// ```rust
/// use async_trait::async_trait;
/// use zentinel_agent_protocol::v2::{AgentHandlerV2, AgentCapabilities, AgentResponse};
/// use zentinel_agent_protocol::{EventType, RequestHeadersEvent};
///
/// pub struct MyWafAgent;
///
/// #[async_trait]
/// impl AgentHandlerV2 for MyWafAgent {
/// fn capabilities(&self) -> AgentCapabilities {
/// AgentCapabilities::new("my-waf", "My WAF Agent", "1.0.0")
/// .with_event(EventType::RequestHeaders)
/// }
///
/// async fn on_request_headers(&self, event: RequestHeadersEvent) -> AgentResponse {
/// // Inspect headers for malicious patterns
/// if event.headers.contains_key("x-malicious") {
/// AgentResponse::block(403, Some("Blocked by WAF".to_string()))
/// } else {
/// AgentResponse::default_allow()
/// }
/// }
/// }
/// ```
///
/// # Errors
///
/// Agent methods should return `AgentResponse` with appropriate `Decision` variants.
/// Runtime errors should be logged internally rather than propagated, as the proxy
/// needs to maintain high availability.
#[async_trait]
pub trait AgentHandlerV2: Send + Sync {
/// Get agent capabilities.
Expand Down Expand Up @@ -121,7 +170,41 @@ pub enum DrainReason {
Manual,
}

/// v2 gRPC agent server.
/// gRPC-based agent server implementation for Protocol v2.
///
/// `GrpcAgentServerV2` provides a gRPC transport for agents that need to communicate
/// with the Zentinel proxy over the network. This is ideal for agents running in
/// separate processes, containers, or on different machines.
///
/// # Features
///
/// - **Network transport**: Communicates over TCP with HTTP/2 and TLS support
/// - **Language agnostic**: Works with any gRPC client implementation
/// - **Scalability**: Can handle multiple concurrent proxy connections
/// - **Monitoring**: Integrates with gRPC ecosystem tools for observability
///
/// # Example
///
/// ```rust
/// use zentinel_agent_protocol::v2::{GrpcAgentServerV2, AgentHandlerV2};
///
/// // Create server with your handler
/// let handler = Box::new(MyAgent::new());
/// let server = GrpcAgentServerV2::new("my-agent", handler);
///
/// // Serve on a specific address
/// let addr = "127.0.0.1:8080".parse()?;
/// server.run(addr).await?;
/// ```
///
/// # Transport Details
///
/// The gRPC transport uses the standard Agent Protocol v2 service definition:
/// - Bidirectional streaming for event processing
/// - Capability negotiation during handshake
/// - Health check integration
/// - Configuration update support
/// - Metrics collection
pub struct GrpcAgentServerV2 {
id: String,
handler: Arc<dyn AgentHandlerV2>,
Expand Down
49 changes: 46 additions & 3 deletions crates/agent-protocol/src/v2/uds_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,53 @@ use crate::{
RequestHeadersEvent, ResponseBodyChunkEvent, ResponseHeadersEvent, WebSocketFrameEvent,
};

/// v2 agent server over Unix Domain Socket.
/// Unix Domain Socket agent server implementation for Protocol v2.
///
/// Listens on a Unix socket, accepts connections, and dispatches events to an
/// [`AgentHandlerV2`] implementation using the v2 binary wire format.
/// `UdsAgentServerV2` provides a high-performance local transport for agents running
/// on the same machine as the Zentinel proxy. It uses Unix Domain Sockets for
/// inter-process communication with minimal overhead.
///
/// This transport is recommended for agents that need the lowest possible latency
/// and are co-located with the proxy process.
///
/// # Features
///
/// - **High performance**: Minimal overhead for local communication
/// - **Binary protocol**: Efficient binary encoding for reduced CPU usage
/// - **File system permissions**: Uses Unix socket permissions for access control
/// - **Socket cleanup**: Removes stale socket files on startup
///
/// # Example
///
/// ```rust
/// use zentinel_agent_protocol::v2::{UdsAgentServerV2, AgentHandlerV2};
/// use std::path::Path;
///
/// // Create server with your handler
/// let handler = Box::new(MyAgent::new());
/// let server = UdsAgentServerV2::new(
/// "my-agent",
/// Path::new("/var/run/my-agent.sock"),
/// handler
/// );
///
/// // Start listening
/// server.run().await?;
/// ```
///
/// # Socket Management
///
/// The server automatically:
/// - Creates the socket file with appropriate permissions
/// - Removes stale socket files on startup
/// - Handles multiple concurrent proxy connections
///
/// # Errors
///
/// May return errors for:
/// - Permission issues creating/accessing the socket file
/// - Invalid socket paths or filesystem issues
/// - Handler implementation errors
pub struct UdsAgentServerV2 {
id: String,
socket_path: PathBuf,
Expand Down
Loading