Skip to content

Latest commit

 

History

History
886 lines (658 loc) · 25.6 KB

File metadata and controls

886 lines (658 loc) · 25.6 KB

Redis Streams for Notification Delivery (beta)

Redis Streams provides persistent, reliable message delivery for Agent notifications with guaranteed delivery, message acknowledgment, and automatic recovery.

Table of Contents

Overview

Redis Streams extends Redis with a log data structure that provides:

  • Persistent storage - Messages survive Redis restarts
  • Guaranteed delivery - Messages are acknowledged only after successful processing
  • Consumer groups - Load distribution across multiple Agent instances
  • Automatic recovery - Unacknowledged messages are redelivered
  • Batching - Efficient processing of multiple messages

Prerequisites

Redis Version: Redis 5.0 or higher is required for Redis Streams support.

  • Redis Streams were introduced in Redis 5.0
  • Recommended: Redis 6.0+ for improved performance and stability
  • Verify your version: redis-cli --version

Redis Streams vs Redis Pub/Sub

Agent automatically chooses the best implementation based on your Redis version:

Redis Streams (Redis >= 5.0):

  • Message delivery is critical (notifications must reach clients)
  • Running multiple Agent instances (high availability)
  • Need to recover from Agent restarts without message loss
  • Want visibility into message delivery status

Redis Pub/Sub (Redis < 5.0 or detection fails):

  • Message loss is acceptable (fire-and-forget)
  • Running single Agent instance
  • Need absolute minimum latency (no persistence overhead)

Note: You don't need to choose - Agent detects your Redis version and uses the appropriate implementation automatically.

Why Redis for Notifications?

The Load Balancer Subscription Problem

When running multiple Agent pods behind a load balancer in Kubernetes, you can only subscribe to ONE pod's notifications:

Client subscribes:
  /v1/notifications/event-stream → Load Balancer → Agent Pod 1 (sticky connection)

Decision requests (load balanced):
  /v1/decide → Load Balancer → Agent Pod 1 → Client receives notification
  /v1/decide → Load Balancer → Agent Pod 2 → Client MISSES notification!
  /v1/decide → Load Balancer → Agent Pod 3 → Client MISSES notification!

The Problem:

  1. Client subscribes to /v1/notifications/event-stream via load balancer
  2. Load balancer routes SSE connection to one specific Agent pod (e.g., Pod 1)
  3. Client is now subscribed only to Pod 1's notifications
  4. Decision requests are load-balanced across all pods (Pod 1, 2, 3)
  5. When decision happens on Pod 2 or Pod 3, client never receives notification

Why you can't subscribe to all pods:

  • SSE connections are sticky - once connected to a pod, you stay connected to that pod
  • Load balancer routes to ONE pod - you can't subscribe to multiple pods simultaneously
  • Subscribing directly to pod IPs is not feasible - pods are ephemeral in Kubernetes

Alternative considered (Push model):

  • Configure Agent pods to push notifications to an external endpoint
  • Problem: This would completely change the subscribe-based SSE model
  • Decision: Keep the subscribe model, use Redis as central hub instead

Redis Solution: Central Notification Hub

Redis acts as a shared notification hub that all Agent pods write to and read from:

Decision Flow (all pods publish to Redis):
  /v1/decide → Load Balancer → Agent Pod 1 → Publishes notification → Redis
  /v1/decide → Load Balancer → Agent Pod 2 → Publishes notification → Redis
  /v1/decide → Load Balancer → Agent Pod 3 → Publishes notification → Redis

Subscription Flow (any pod reads from Redis):
  Client → /v1/notifications/event-stream → Load Balancer → Agent Pod 1
                                                             ↓
                                          Agent Pod 1 reads Redis Stream
                                                             ↓
                                          Gets notifications from ALL pods
                                                             ↓
                                          Sends to client via SSE connection

How it works:

  1. All Agent pods publish to Redis:

    • Decision on Pod 1 → notification published to Redis
    • Decision on Pod 2 → notification published to Redis
    • Decision on Pod 3 → notification published to Redis
  2. Client subscribes to one pod (via load balancer):

    • Client → /v1/notifications/event-stream → routed to Pod 1
    • Long-lived SSE connection established to Pod 1
  3. Pod 1 reads from Redis Stream:

    • Pod 1 subscribes to Redis (using consumer groups)
    • Receives notifications from ALL pods (including its own)
  4. Pod 1 forwards to client:

    • Sends all notifications to client over SSE connection
    • Client receives notifications from all Agent pods, not just Pod 1

Key Insight: Client connects to ONE pod, but that pod reads from Redis which aggregates notifications from ALL pods. This solves the load balancer problem without changing the subscribe model.

Why Not Use Event Dispatcher?

Event Dispatcher (SDK events → Optimizely servers):

  • Each Agent sends events independently
  • No coordination needed between Agents

Notifications (datafile updates → SSE clients):

  • Need to sync updates across ALL Agents
  • SSE clients connected to different Agents must receive same updates
  • Redis provides the broadcast mechanism

This architecture was designed to ensure datafile consistency across Agent clusters in production environments.

Architecture

┌─────────────┐     XADD      ┌──────────────┐
│   Decide    ├──────────────►│ Redis Stream │
│   Handler   │               │ (persistent) │
└─────────────┘               └──────┬───────┘
                                     │
                              XREADGROUP
                              (batch_size: 5)
                                     │
                                     ▼
                            ┌──-──────────────┐
                            │ Consumer Group  │
                            │  "notifications"│
                            └────────┬────────┘
                                     │
                              ┌──────┴──────┐
                              │   Batch     │
                              │ (5 messages)│
                              └──────┬──────┘
                                     │
                              Send to SSE Client
                                     │
                                     ▼
                                   XACK
                            (acknowledge delivery)

Message Flow

  1. Publish - Decide handler adds notification to stream (XADD)
  2. Read - Consumer reads batch of messages (XREADGROUP)
  3. Process - Messages sent to SSE client
  4. Acknowledge - Successfully delivered messages acknowledged (XACK)
  5. Retry - Unacknowledged messages automatically redelivered

Configuration

⚠️ Prerequisites: Requires Redis 5.0 or higher. Redis Streams are not available in Redis 4.x or earlier.

Quick Start Setup

Step 1 - Enable notifications in config.yaml:

api:
    enableNotifications: true

Step 2 - Enable synchronization:

synchronization:
    notification:
        enable: true
        default: "redis"  # Agent auto-detects Redis version and uses best option

Note: Agent automatically detects your Redis version:

  • Redis >= 5.0: Uses Redis Streams (persistent, batched delivery)
  • Redis < 5.0: Falls back to Redis Pub/Sub (fire-and-forget)
  • Detection fails: Safely falls back to Redis Pub/Sub

Step 3 - Configure Redis connection:

synchronization:
    pubsub:
        redis:
            host: "localhost:6379"
            auth_token: ""          # Recommended: use auth_token or redis_secret
            # password: ""          # Alternative: password (may trigger security scanners)
            database: 0

Step 4 - (Optional) Tune Redis Streams performance:

Note: These parameters only apply when Redis Streams is used (Redis >= 5.0). They are ignored if Redis Pub/Sub is used. Leave these out to use sensible defaults.

synchronization:
    pubsub:
        redis:
            # Batching configuration (optional - defaults shown)
            batch_size: 10          # Messages per batch (default: 10)
            flush_interval: 5s      # Max wait for partial batch (default: 5s)

            # Retry configuration (optional - defaults shown)
            max_retries: 3          # Retry attempts (default: 3)
            retry_delay: 100ms      # Initial retry delay (default: 100ms)
            max_retry_delay: 5s     # Max retry delay (default: 5s)
            connection_timeout: 10s # Connection timeout (default: 10s)

Step 5 - (Optional) Increase HTTP timeouts to prevent SSE disconnects:

server:
    readTimeout: 300s   # 5 minutes
    writeTimeout: 300s  # 5 minutes

Step 6 - (Optional) Enable TLS/HTTPS:

server:
    keyFile: /path/to/key.pem
    certFile: /path/to/cert.pem

Full Configuration Example

api:
    enableNotifications: true

server:
    readTimeout: 300s
    writeTimeout: 300s
    # Optional: Enable HTTPS
    # keyFile: /path/to/key.pem
    # certFile: /path/to/cert.pem

synchronization:
    pubsub:
        redis:
            host: "localhost:6379"
            auth_token: ""  # Supports: auth_token, redis_secret, password
                           # Fallback: REDIS_PASSWORD environment variable
            database: 0

            # Optional: Redis Streams tuning (only applies if Redis >= 5.0)
            # Uncomment to override defaults
            batch_size: 5           # Messages per batch (default: 10)
            flush_interval: 2s      # Max wait before sending (default: 5s)
            max_retries: 3          # Retry attempts (default: 3)
            retry_delay: 100ms      # Initial retry delay (default: 100ms)
            max_retry_delay: 5s     # Max retry delay (default: 5s)
            connection_timeout: 10s # Connection timeout (default: 10s)

    notification:
        enable: true
        default: "redis"  # Agent auto-detects best option based on Redis version

Security: Password Configuration

To avoid security scanner alerts, use alternative field names:

# Preferred (no security scanner alerts)
auth_token: "your-redis-password"

# Alternative
redis_secret: "your-redis-password"

# Fallback to environment variable (if config field empty)
# export REDIS_PASSWORD="your-redis-password"

# Not recommended (triggers security scanners)
password: "your-redis-password"

The Agent checks fields in this order: auth_tokenredis_secretpasswordREDIS_PASSWORD env var.

Automatic Redis Version Detection

Agent automatically detects your Redis version at startup and chooses the best notification implementation:

Detection Flow:

  1. Agent connects to Redis
  2. Runs INFO server command to get Redis version
  3. Parses redis_version field (e.g., 6.2.5)
  4. If major version >= 5: Uses Redis Streams
  5. If major version < 5: Uses Redis Pub/Sub
  6. If detection fails: Falls back to Redis Pub/Sub (safe default)

Logging Examples:

Redis 6.x detected:

INFO Auto-detecting Redis version to choose best notification implementation...
INFO Redis Streams supported - will use Streams for notifications redis_version=6

Redis 4.x detected:

INFO Auto-detecting Redis version to choose best notification implementation...
INFO Redis Streams not supported - will use Pub/Sub for notifications redis_version=4 min_required=5

Detection failed:

INFO Auto-detecting Redis version to choose best notification implementation...
WARN Could not detect Redis version - will use Pub/Sub as safe fallback error="NOPERM"

Note: If auto-detection fails, Agent safely falls back to Redis Pub/Sub (compatible with all Redis versions).

Configuration Parameters

Note: These parameters only apply when Redis Streams is used (Redis >= 5.0).

Parameter Default Description
batch_size 10 Number of messages to batch before sending
flush_interval 5s Maximum time to wait before sending partial batch
max_retries 3 Maximum retry attempts for failed operations
retry_delay 100ms Initial delay between retry attempts
max_retry_delay 5s Maximum delay with exponential backoff
connection_timeout 10s Timeout for Redis connections

Performance Tuning

For low-latency (real-time notifications):

batch_size: 5
flush_interval: 500ms  # 0.5s max latency

For high-throughput (batch processing):

batch_size: 100
flush_interval: 5s

For burst traffic:

batch_size: 50
flush_interval: 1s

Redis Pub/Sub vs Redis Streams

Comparison

Feature Redis Pub/Sub Redis Streams
Delivery guarantee Fire-and-forget Guaranteed with ACK
Persistence No (in-memory only) Yes (survives restarts)
Message recovery No Yes (redelivery)
Consumer groups No Yes
Latency Lowest (~1ms) Low (~2-5ms)
Memory usage Minimal Higher (persistence)
Complexity Simple Moderate
Redis version 2.0+ 5.0+ required
Selection Auto-detected (< 5.0) Auto-detected (>= 5.0)

Note: Agent automatically detects your Redis version and uses the appropriate implementation. You don't need to choose manually.

Migration Path

Currently using Redis Pub/Sub? Switching to Redis Streams is automatic if you upgrade Redis:

Scenario 1: Already using default: "redis" (auto-detect)

synchronization:
    notification:
        default: "redis"  # Already using auto-detection
  • Redis 4.x: Currently using Pub/Sub
  • Upgrade Redis to 6.x: Automatically switches to Streams (no config change needed!)

Scenario 2: Explicitly set to default: "redis" (legacy Pub/Sub)

# Old config (explicit Pub/Sub, no auto-detection)
synchronization:
    notification:
        default: "redis"
  • This now uses auto-detection
  • Redis 5+ will automatically use Streams
  • No breaking changes

All Redis Streams configuration is backward compatible - existing pubsub.redis settings are reused.

Testing

Test 1: Batching Behavior

Send burst traffic to trigger batching:

# Send 20 requests instantly (in parallel)
for i in {1..20}; do
    curl -H "X-Optimizely-SDK-Key: YOUR_SDK_KEY" \
         -H "Content-Type: application/json" \
         -d "{\"userId\":\"burst-$i\"}" \
         "localhost:8080/v1/decide" &
done
wait

Verify batching in Redis Monitor:

redis-cli monitor | grep -E "xack|xreadgroup"

Expected patterns:

Multiple XACKs with same timestamp prefix (batch of 5):

"xack" ... "1759461708595-1"
"xack" ... "1759461708595-2"
"xack" ... "1759461708595-3"
"xack" ... "1759461708595-4"
"xack" ... "1759461708595-5"

Test 2: Flush Interval

Send messages slower than batch size:

# Send 3 messages (less than batch_size)
for i in {1..3}; do
    curl -H "X-Optimizely-SDK-Key: YOUR_SDK_KEY" \
         -H "Content-Type: application/json" \
         -d "{\"userId\":\"flush-test-$i\"}" \
         "localhost:8080/v1/decide"
done

Expected: Messages delivered after flush_interval (e.g., 2s) even though batch isn't full.

Test 3: Message Recovery

Test that messages survive Agent restarts:

Step 1 - Send messages:

for i in {1..5}; do
    curl -H "X-Optimizely-SDK-Key: YOUR_SDK_KEY" \
         -H "Content-Type: application/json" \
         -d "{\"userId\":\"recovery-test-$i\"}" \
         "localhost:8080/v1/decide"
done

Step 2 - Kill Agent:

# Stop the agent process
pkill -f optimizely

Step 3 - Verify messages in Redis:

redis-cli
> XLEN stream:optimizely-sync-YOUR_SDK_KEY
(integer) 20  # 5 users × 4 flags

> XRANGE stream:optimizely-sync-YOUR_SDK_KEY - + COUNT 5
# Shows pending messages

Step 4 - Restart Agent:

./bin/optimizely

Expected: All messages automatically redelivered to SSE clients.

Redis CLI Inspection Commands

# List all streams
KEYS stream:*

# Check stream length
XLEN stream:optimizely-sync-{SDK_KEY}

# View messages in stream
XRANGE stream:optimizely-sync-{SDK_KEY} - + COUNT 10

# View consumer group info
XINFO GROUPS stream:optimizely-sync-{SDK_KEY}

# View pending messages (unacknowledged)
XPENDING stream:optimizely-sync-{SDK_KEY} notifications

# View consumer info
XINFO CONSUMERS stream:optimizely-sync-{SDK_KEY} notifications

# Clear stream (for testing)
DEL stream:optimizely-sync-{SDK_KEY}

Migration Guide

Upgrading Redis Version (4.x → 5.x+)

When you upgrade your Redis server from version 4.x to 5.x or higher, Agent will automatically start using Redis Streams on the next restart—no configuration changes needed.

1. Upgrade Redis:

# Example: Docker upgrade from Redis 4.x to 6.x
docker stop my-redis
docker run -d --name my-redis -p 6379:6379 redis:6.2

2. Restart Agent:

Agent will detect the new Redis version and automatically use Streams:

INFO Auto-detecting Redis version to choose best notification implementation...
INFO Redis Streams supported - will use Streams for notifications redis_version=6

3. (Optional) Add performance tuning:

If you want to customize batch size or flush interval for high-traffic scenarios:

synchronization:
    pubsub:
        redis:
            batch_size: 50          # Larger batches for high traffic
            flush_interval: 10s     # Longer interval for efficiency

4. Verify operation:

# Check streams are created
redis-cli KEYS "stream:*"

# Monitor activity
redis-cli monitor | grep -E "xadd|xreadgroup|xack"

5. Clean up old pub/sub channels (optional):

# List old channels from previous Pub/Sub usage
redis-cli PUBSUB CHANNELS "optimizely-sync-*"

# They will expire naturally when no longer used

Troubleshooting

Messages Not Delivered

Check 1 - Verify stream exists:

redis-cli KEYS "stream:optimizely-sync-*"

Check 2 - Check consumer group:

redis-cli XINFO GROUPS stream:optimizely-sync-{SDK_KEY}

Expected output:

1) "name"
2) "notifications"
3) "consumers"
4) (integer) 1
5) "pending"
6) (integer) 0

Check 3 - Check for pending messages:

redis-cli XPENDING stream:optimizely-sync-{SDK_KEY} notifications

If pending > 0, messages are stuck. Agent may have crashed before ACK.

Solution: Restart Agent to reprocess pending messages.

High Memory Usage

Cause: Streams not being trimmed.

Check stream length:

redis-cli XLEN stream:optimizely-sync-{SDK_KEY}

Solution 1 - Configure max length (future enhancement):

# Not currently implemented
max_len: 1000  # Keep only last 1000 messages

Solution 2 - Manual cleanup:

# Keep only last 100 messages
redis-cli XTRIM stream:optimizely-sync-{SDK_KEY} MAXLEN ~ 100

Connection Errors

Error: connection refused or timeout

Check Redis availability:

redis-cli ping

Verify configuration:

synchronization:
    pubsub:
        redis:
            host: "localhost:6379"  # Correct host?
            connection_timeout: 10s  # Increase if needed

Check Agent logs:

# Look for connection errors
grep -i "redis" agent.log

Performance Issues

Symptom: High latency for notifications

Solution 1 - Reduce batch size:

batch_size: 5        # Smaller batches
flush_interval: 500ms  # Faster flush

Solution 2 - Check Redis performance:

redis-cli --latency
redis-cli --stat

Solution 3 - Monitor batch metrics:

curl http://localhost:8088/metrics | grep redis_streams

Advanced Topics

Consumer Groups & Load Balancing

Redis Streams uses consumer groups to distribute messages across multiple Agent instances:

  • Stream name: stream:optimizely-sync-{SDK_KEY}
  • Consumer group: notifications (default)
  • Consumer name: consumer-{timestamp} (unique per Agent instance)

How it works:

Stream → Consumer Group "notifications" → Agent 1 (consumer-123) reads msg 1, 2, 3
                                       → Agent 2 (consumer-456) reads msg 4, 5, 6
                                       → Agent 3 (consumer-789) reads msg 7, 8, 9

Multiple Agents reading from same stream will load-balance messages automatically.

Multiple SDK Keys Support

Subscribe to notifications for multiple SDK keys using wildcards:

Single SDK key:

curl -N 'http://localhost:8080/v1/notifications/event-stream' \
     -H 'X-Optimizely-SDK-Key: ABC123'

All SDK keys (Redis pattern subscribe):

# Agent publishes to: stream:optimizely-sync-{sdk_key}
# Subscribe with pattern: stream:optimizely-sync-*

redis-cli PSUBSCRIBE "stream:optimizely-sync-*"

Message Claiming & Fault Tolerance

If an Agent crashes before acknowledging a message, another Agent can claim it:

Step 1 - Agent 1 reads message:

XREADGROUP GROUP notifications consumer1 STREAMS stream:name ">"

Step 2 - Agent 1 crashes (message pending, not acknowledged)

Step 3 - Check pending messages:

XPENDING stream:name notifications
# Shows message owned by consumer1 (dead)

Step 4 - Agent 2 claims abandoned message:

XCLAIM stream:name notifications consumer2 60000 <message-id>
# Claims messages pending > 60 seconds

Step 5 - Agent 2 processes and acknowledges:

XACK stream:name notifications <message-id>

Benefits:

  • Load balancing: Multiple workers process different messages
  • Fault tolerance: Dead workers' messages claimed by others
  • Exactly-once delivery: Messages stay pending until acknowledged

Message Format

Messages stored in streams contain:

{
  "data": "{\"type\":\"decision\",\"message\":{...}}",
  "timestamp": 1759461274
}
  • data: JSON-encoded notification payload
  • timestamp: Unix timestamp of message creation

Retry Logic

Failed operations use exponential backoff:

  1. Initial delay: retry_delay (default: 100ms)
  2. Each retry: delay × 2
  3. Max delay: max_retry_delay (default: 5s)
  4. Max retries: max_retries (default: 3)

Retryable errors:

  • Connection errors (refused, reset, timeout)
  • Redis LOADING, READONLY, CLUSTERDOWN states

Non-retryable errors:

  • Authentication errors
  • Invalid commands
  • Memory limit exceeded

FAQ

Does Agent support TLS/HTTPS?

Yes, TLS is configurable in config.yaml:

server:
    keyFile: /path/to/key.pem   # TLS private key
    certFile: /path/to/cert.pem # TLS certificate

Uncomment and set paths to enable HTTPS for the Agent server.

Can I subscribe to multiple SDK keys?

Yes, use Redis pattern subscribe:

# Subscribe to all SDK keys
redis-cli PSUBSCRIBE "stream:optimizely-sync-*"

Agent publishes to channels: stream:optimizely-sync-{sdk_key}

Are large messages a problem?

Redis Streams: Can handle up to 512MB messages (Redis max string size)

SQS comparison: Only 256KB limit

Considerations:

  • Redis memory usage increases with message size
  • Network bandwidth for large payloads
  • Serialization/deserialization overhead

For production, keep notifications < 1MB for optimal performance.

How do I avoid "password" security scanner alerts?

Use alternative field names in config.yaml:

auth_token: "your-redis-password"  # Preferred
# or
redis_secret: "your-redis-password"
# or
# export REDIS_PASSWORD="your-redis-password"  # Environment variable

Avoid using password: field name which triggers security scanners.

Why use Redis instead of direct event dispatching?

Event dispatching (SDK → Optimizely):

  • Each Agent sends events independently ✓

Redis notifications (Agent ↔ Agent):

  • Syncs datafile updates across all Agent instances
  • Solves the load balancer problem (webhook → random Agent)
  • Ensures all Agents serve consistent data

See Why Redis for Notifications? for details.

Can multiple consumers read the same message?

Consumer groups: No - messages distributed across consumers (load balancing)

Msg 1 → Consumer A
Msg 2 → Consumer B  (different message)
Msg 3 → Consumer A

Multiple consumer groups: Yes - different groups get same messages

Group "notifications" → Consumer A gets Msg 1
Group "analytics"     → Consumer X gets Msg 1 (same message)

What happens if a consumer crashes?

Messages become pending (unacknowledged). Another consumer can claim them:

# Check pending messages
XPENDING stream:name notifications

# Claim abandoned messages (60s timeout)
XCLAIM stream:name notifications consumer2 60000 <message-id>

# Process and acknowledge
XACK stream:name notifications <message-id>

This ensures no message loss even when Agents crash.

See Also