diff --git a/application_sdk/server/MESSAGE_PROCESSOR.md b/application_sdk/server/MESSAGE_PROCESSOR.md new file mode 100644 index 000000000..59caff1db --- /dev/null +++ b/application_sdk/server/MESSAGE_PROCESSOR.md @@ -0,0 +1,384 @@ +# Message Processor Implementation + +## Overview + +The message processor provides functionality for processing Kafka messages via Dapr input bindings. It supports both per-message and batch processing modes, with optional Temporal workflow integration. + +**Key Design Decision**: The processing mode is automatically determined by `batch_size`: +- `batch_size = 1` → Per-message mode (immediate processing) +- `batch_size > 1` → Batch mode (accumulate and process) + +This eliminates the need for a separate `process_mode` parameter and provides a more intuitive API. + +## Architecture + +### Core Components + +1. **MessageProcessorConfig** (`application_sdk/server/messaging.py`) + - Configuration for message processor behavior + - Specifies processing mode, batch settings, and workflow integration + +2. **MessageProcessor** (`application_sdk/server/messaging.py`) + - Handles message processing logic + - Supports per-message and batch modes + - Optional workflow triggering + - Built-in metrics and error handling + +3. **APIServer Integration** (`application_sdk/server/fastapi/__init__.py`) + - `register_message_processor()` - Register a processor for a Kafka binding + - `start_message_processors()` - Start all registered processors + - `stop_message_processors()` - Stop all registered processors + +## Processing Modes + +The processing mode is automatically determined by the `batch_size` parameter: + +### Per-Message Mode (batch_size = 1) +```python +config = MessageProcessorConfig( + binding_name="kafka-input", + batch_size=1 +) +``` +- Processes each message immediately upon arrival +- No batching or delays +- Suitable for low-latency requirements +- `batch_timeout` is ignored in this mode + +### Batch Mode (batch_size > 1) +```python +config = MessageProcessorConfig( + binding_name="kafka-input", + batch_size=50, + batch_timeout=5.0 +) +``` +- Accumulates messages up to `batch_size` +- Processes batch when size threshold or timeout is reached +- More efficient for high-throughput scenarios + +## Usage Examples + +### Example 1: Basic Per-Message Processing (batch_size = 1) + +```python +from application_sdk.server.messaging import MessageProcessorConfig + +# Register processor in your APIServer setup +config = MessageProcessorConfig( + binding_name="kafka-input", + batch_size=1 # Per-message mode +) + +async def process_messages(messages: List[dict]): + for msg in messages: + logger.info(f"Processing: {msg}") + # Your custom processing logic here + +server.register_message_processor(config, process_callback=process_messages) +``` + +### Example 2: Batch Processing with Custom Callback + +```python +config = MessageProcessorConfig( + binding_name="kafka-input", + batch_size=100, # Batch mode + batch_timeout=10.0 +) + +async def process_batch(messages: List[dict]): + # Process entire batch + logger.info(f"Processing batch of {len(messages)} messages") + # Bulk insert to database, etc. + await bulk_insert_to_db(messages) + +server.register_message_processor(config, process_callback=process_batch) +``` + +### Example 3: Workflow Integration (Per-Message) + +```python +from app.workflows import MessageProcessorWorkflow + +config = MessageProcessorConfig( + binding_name="kafka-input", + batch_size=1, # Per-message: each message triggers a workflow + trigger_workflow=True, + workflow_class=MessageProcessorWorkflow +) + +server.register_message_processor(config) +``` + +### Example 4: Workflow Integration (Batch) + +```python +config = MessageProcessorConfig( + binding_name="kafka-input", + batch_size=50, # Batch mode + batch_timeout=5.0, + trigger_workflow=True, + workflow_class=BatchProcessorWorkflow +) + +# Each batch triggers a single workflow with all messages +server.register_message_processor(config) +``` + +## Dapr Integration + +### Binding Configuration + +Create a Dapr Kafka input binding in `components/kafka-input-binding.yaml`: + +```yaml +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: kafka-input # Must match config.binding_name +spec: + type: bindings.kafka + version: v1 + metadata: + - name: brokers + value: "localhost:9092" + - name: topics + value: "events-topic" + - name: consumerGroup + value: "my-consumer-group" + - name: initialOffset + value: "newest" + - name: direction + value: "input" + - name: authType + value: "none" +``` + +### Endpoint Convention + +- Dapr calls `POST /{binding-name}` for each message +- Example: binding named "kafka-input" creates endpoint `POST /kafka-input` +- Statistics available at `GET /messages/v1/stats/{binding-name}` + +## Monitoring & Metrics + +### Built-in Metrics + +The message processor automatically records: + +- `kafka_messages_processed_total` - Total messages processed (counter) + - Labels: `status` (success/error), `mode` (per_message/batch) +- `kafka_binding_requests_total` - Total binding requests (counter) + - Labels: `status` (success/error), `binding` (binding name) +- `kafka_binding_duration_seconds` - Request duration (histogram) + - Labels: `binding` (binding name) +- `kafka_batch_size` - Batch size distribution (histogram) + +### Statistics Endpoint + +Get processor stats: +```bash +curl http://localhost:3000/messages/v1/stats/kafka-input +``` + +Response: +```json +{ + "is_running": true, + "current_batch_size": 15, + "batch_size_threshold": 50, + "batch_timeout": 5.0, + "is_batch_mode": true, + "total_processed": 1250, + "total_errors": 3, + "time_since_last_process": 2.5 +} +``` + +## Error Handling + +### Automatic Error Handling + +- All exceptions are logged with full stack traces +- Error metrics are automatically recorded +- Failed messages are counted in `total_errors` +- Batch processing continues even if one message fails + +### Custom Error Handling + +```python +async def process_with_error_handling(messages: List[dict]): + for msg in messages: + try: + await process_message(msg) + except ValidationError as e: + logger.error(f"Validation failed: {e}") + # Send to dead letter queue + await send_to_dlq(msg, str(e)) + except Exception as e: + logger.error(f"Processing failed: {e}") + # Retry logic + await retry_message(msg) + +config = MessageProcessorConfig( + binding_name="kafka-input", + batch_size=1 # Per-message mode +) +server.register_message_processor(config, process_callback=process_with_error_handling) +``` + +## Lifecycle Management + +### Starting Processors + +```python +# Start all registered message processors +await server.start_message_processors() +``` + +### Stopping Processors + +```python +# Stop all processors and process remaining messages +await server.stop_message_processors() +``` + +### Integration with Application Lifecycle + +```python +from contextlib import asynccontextmanager + +@asynccontextmanager +async def lifespan(app: FastAPI): + # Startup + await server.start_message_processors() + yield + # Shutdown + await server.stop_message_processors() + +app = FastAPI(lifespan=lifespan) +``` + +## Performance Considerations + +### Per-Message Mode (batch_size = 1) +- **Pros**: Low latency, immediate processing +- **Cons**: Higher overhead per message +- **Best for**: Real-time processing, low message volume + +### Batch Mode (batch_size > 1) +- **Pros**: Better throughput, more efficient bulk operations +- **Cons**: Higher latency (up to batch_timeout) +- **Best for**: High volume, analytics, bulk database operations + +### Tuning Parameters + +```python +# Per-message: instant processing +batch_size=1 # batch_timeout ignored + +# Low latency with small batching +batch_size=10 +batch_timeout=1.0 + +# Balanced +batch_size=100 +batch_timeout=5.0 + +# High throughput, can tolerate latency +batch_size=1000 +batch_timeout=30.0 +``` + +## Testing + +### Unit Testing + +```python +import pytest +from application_sdk.server.messaging import MessageProcessor, MessageProcessorConfig + +@pytest.mark.asyncio +async def test_per_message_processing(): + config = MessageProcessorConfig( + binding_name="test-binding", + batch_size=1 # Per-message mode + ) + + processed = [] + async def callback(messages): + processed.extend(messages) + + processor = MessageProcessor(config, process_callback=callback) + + message = {"event_type": "test", "data": {"id": 1}} + result = await processor.add_message(message) + + assert result["status"] == "processed" + assert len(processed) == 1 + assert processor.total_processed == 1 +``` + +### Integration Testing + +```python +@pytest.mark.asyncio +async def test_batch_processing(): + config = MessageProcessorConfig( + binding_name="test-binding", + batch_size=3, # Batch mode + batch_timeout=1.0 + ) + + batches = [] + async def callback(messages): + batches.append(messages) + + processor = MessageProcessor(config, process_callback=callback) + await processor.start() + + # Add messages + for i in range(3): + await processor.add_message({"id": i}) + + # Wait for batch processing + await asyncio.sleep(0.1) + + assert len(batches) == 1 + assert len(batches[0]) == 3 + + await processor.stop() +``` + +## Complete Application Example + +See `/atlan-sample-apps/quickstart/simple-message-processor/` for a complete example application using the message processor with Temporal workflows. + +## Troubleshooting + +### Messages not being processed + +1. Check Dapr is running: `dapr list` +2. Verify binding configuration matches `binding_name` +3. Check Kafka is accessible and topic exists +4. Review application logs for errors + +### Batch not processing at expected time + +1. Verify `batch_timeout` configuration +2. Check at least one message was received +3. Ensure processor was started: `await server.start_message_processors()` + +### High memory usage + +1. Reduce `batch_size` for batch mode +2. Ensure messages are being processed (check for stuck processors) +3. Monitor `current_batch_size` in stats endpoint + +## References + +- [Dapr Kafka Binding](https://docs.dapr.io/reference/components-reference/supported-bindings/kafka/) +- [Temporal Workflows](https://docs.temporal.io/) +- [Application SDK Documentation](../docs/) + diff --git a/application_sdk/server/fastapi/__init__.py b/application_sdk/server/fastapi/__init__.py index c2b536a06..87b3ba4dc 100644 --- a/application_sdk/server/fastapi/__init__.py +++ b/application_sdk/server/fastapi/__init__.py @@ -1,6 +1,6 @@ import os import time -from typing import Any, Callable, List, Optional, Type +from typing import Any, Callable, Coroutine, List, Optional, Type # Import with full paths to avoid naming conflicts from fastapi import status @@ -32,6 +32,7 @@ from application_sdk.server import ServerInterface from application_sdk.server.fastapi.middleware.logmiddleware import LogMiddleware from application_sdk.server.fastapi.middleware.metrics import MetricsMiddleware +from application_sdk.server.messaging import MessageProcessor, MessageProcessorConfig from application_sdk.server.fastapi.models import ( ConfigMapResponse, EventWorkflowRequest, @@ -90,6 +91,7 @@ class APIServer(ServerInterface): workflow_router: APIRouter dapr_router: APIRouter events_router: APIRouter + messages_router: APIRouter handler: Optional[HandlerInterface] templates: Jinja2Templates duckdb_ui: DuckDBUI @@ -101,6 +103,7 @@ class APIServer(ServerInterface): workflows: List[WorkflowInterface] = [] event_triggers: List[EventWorkflowTrigger] = [] + message_processors: dict[str, MessageProcessor] = {} ui_enabled: bool = True @@ -138,6 +141,7 @@ def __init__( self.workflow_router = APIRouter() self.dapr_router = APIRouter() self.events_router = APIRouter() + self.messages_router = APIRouter() # Set up the application error_handler = internal_server_error_handler # Store as local variable @@ -214,6 +218,7 @@ def register_routers(self): self.app.include_router(self.workflow_router, prefix="/workflows/v1") self.app.include_router(self.dapr_router, prefix="/dapr") self.app.include_router(self.events_router, prefix="/events/v1") + self.app.include_router(self.messages_router, prefix="/messages/v1") def fallback_home(self, request: Request) -> HTMLResponse: return self.templates.TemplateResponse( @@ -337,6 +342,145 @@ async def start_workflow_event( self.app.include_router(self.events_router, prefix="/events/v1") + def register_message_processor( + self, + config: MessageProcessorConfig, + process_callback: Optional[ + Callable[[List[dict]], Coroutine[Any, Any, None]] + ] = None, + ): + """Register a message processor. + + Args: + config: Message processor configuration + process_callback: Optional custom callback for processing messages + + Raises: + ValueError: If binding name already registered or configuration invalid + """ + if config.binding_name in self.message_processors: + raise ValueError(f"Message processor for binding '{config.binding_name}' already registered") + + # Create message processor instance + processor = MessageProcessor( + config=config, + process_callback=process_callback, + workflow_client=self.workflow_client if config.trigger_workflow else None, + ) + + # Store processor + self.message_processors[config.binding_name] = processor + + # Create endpoint handler for this binding + async def handle_message_binding(request: Request) -> JSONResponse: + """Handle incoming messages from Dapr Messaging input binding. + + This endpoint is called by Dapr when messages arrive on the binding. + Endpoint convention: POST /{binding-name} + """ + start_time = time.time() + metrics = get_metrics() + + try: + # Get message from request + body = await request.json() + + logger.info( + f"Received message from Dapr Messaging input binding: {config.binding_name}" + ) + logger.debug(f"Message body: {body}") + + # Process the message + result = await processor.add_message(body) + + # Record success metric + duration = time.time() - start_time + metrics.record_metric( + name="messaging_binding_requests_total", + value=1.0, + metric_type=MetricType.COUNTER, + labels={"status": "success", "binding": config.binding_name}, + description="Total Messaging input binding requests", + ) + metrics.record_metric( + name="message_processing_duration_seconds", + value=duration, + metric_type=MetricType.HISTOGRAM, + labels={"binding": config.binding_name}, + description="Message processing request duration", + ) + + return JSONResponse(status_code=status.HTTP_200_OK, content=result) + + except Exception as e: + logger.error( + f"Error handling Messaging input binding {config.binding_name}: {e}", + exc_info=True, + ) + + # Record error metric + metrics.record_metric( + name="message_processing_requests_total", + value=1.0, + metric_type=MetricType.COUNTER, + labels={"status": "error", "binding": config.binding_name}, + description="Total Message processing requests", + ) + + return JSONResponse( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + content={"error": str(e)}, + ) + + # Register the endpoint - Dapr calls POST /{binding-name} + self.app.post(f"/{config.binding_name}")(handle_message_binding) + + # Also register stats endpoint + async def get_processor_stats() -> JSONResponse: + """Get statistics for this message processor.""" + return JSONResponse( + status_code=status.HTTP_200_OK, + content=processor.get_stats(), + ) + + self.messages_router.add_api_route( + f"/stats/{config.binding_name}", + get_processor_stats, + methods=["GET"], + ) + + mode = "batch" if config.is_batch_mode else "per-message" + logger.info( + f"Registered message processor for binding '{config.binding_name}' " + f"(mode={mode}, batch_size={config.batch_size}, " + f"timeout={config.batch_timeout}s)" + ) + + async def start_message_processors(self): + """Start all registered message processors.""" + for binding_name, processor in self.message_processors.items(): + try: + await processor.start() + logger.info(f"Started message processor for binding: {binding_name}") + except Exception as e: + logger.error( + f"Error starting message processor {binding_name}: {e}", + exc_info=True, + ) + raise + + async def stop_message_processors(self): + """Stop all registered message processors.""" + for binding_name, processor in self.message_processors.items(): + try: + await processor.stop() + logger.info(f"Stopped message processor for binding: {binding_name}") + except Exception as e: + logger.error( + f"Error stopping message processor {binding_name}: {e}", + exc_info=True, + ) + def register_routes(self): """ Method to register the routes for the FastAPI application diff --git a/application_sdk/server/messaging.py b/application_sdk/server/messaging.py new file mode 100644 index 000000000..ca80f91d1 --- /dev/null +++ b/application_sdk/server/messaging.py @@ -0,0 +1,548 @@ +"""Message processing functionality for Message Processor input bindings. + +This module provides classes for processing messages via Dapr input bindings, +supporting both per-message and batch processing modes. +""" + +import asyncio +from datetime import datetime +from typing import Any, Callable, Coroutine, Dict, List, Optional, Type +from uuid import uuid4 + +from pydantic import BaseModel, Field + +from application_sdk.clients.workflow import WorkflowClient +from application_sdk.observability.logger_adaptor import get_logger +from application_sdk.observability.metrics_adaptor import MetricType, get_metrics +from application_sdk.workflows import WorkflowInterface + +logger = get_logger(__name__) + + +class MessageProcessorConfig(BaseModel): + """Configuration for message processor. + + The processing mode is automatically determined by batch_size: + - batch_size = 1: Per-message processing (immediate) + - batch_size > 1: Batch processing (accumulate and process) + + Attributes: + binding_name: Name of the Messaging input binding (must match component metadata.name) + batch_size: Number of messages to accumulate before processing (1 = per-message mode) + batch_timeout: Maximum time in seconds to wait before processing accumulated messages + trigger_workflow: Whether to trigger a Temporal workflow for each message/batch + workflow_class: Workflow class to trigger (required if trigger_workflow=True) + + Examples: + >>> # Per-message processing (batch_size=1) + >>> config = MessageProcessorConfig( + ... binding_name="message-processor-input", + ... batch_size=1 + ... ) + + >>> # Batch processing with workflow trigger + >>> config = MessageProcessorConfig( + ... binding_name="message-processor-input", + ... batch_size=50, + ... batch_timeout=5.0, + ... trigger_workflow=True, + ... workflow_class=MyWorkflowClass + ... ) + """ + + binding_name: str = Field(..., description="Name of the Messaging input binding") + batch_size: int = Field( + default=1, + gt=0, + description="Number of messages per batch (1 = per-message, >1 = batch mode)", + ) + batch_timeout: float = Field( + default=5.0, + gt=0, + description="Maximum time to wait for batch in seconds (ignored if batch_size=1)", + ) + trigger_workflow: bool = Field( + default=False, description="Whether to trigger a workflow for each message/batch" + ) + workflow_class: Optional[Type[WorkflowInterface]] = Field( + default=None, description="Workflow class to trigger (if trigger_workflow=True)" + ) + + @property + def is_batch_mode(self) -> bool: + """Check if processor is in batch mode. + + Returns: + bool: True if batch_size > 1, False for per-message mode + """ + return self.batch_size > 1 + + +class MessageProcessor: + """Handles message processing from Message Processing input bindings. + + This processor supports two modes: + - per_message: Process each message immediately as it arrives + - batch: Accumulate messages and process in batches (based on size or timeout) + + The processor can optionally trigger Temporal workflows for processing, or use a + custom callback function. + + Attributes: + config: Processor configuration + process_callback: Optional custom callback for message processing + workflow_client: Temporal workflow client for workflow triggering + batch: Current accumulated batch of messages + lock: Async lock for thread-safe batch operations + last_process_time: Timestamp of last batch processing + is_running: Whether the batch timer is running + process_task: Background task for batch timeout checking + total_processed: Total number of messages processed + total_errors: Total number of processing errors + + Examples: + >>> # Per-message processing with callback + >>> async def process_messages(messages: List[dict]): + ... for msg in messages: + ... print(f"Processing: {msg}") + ... + >>> config = MessageProcessorConfig( + ... binding_name="message-processor-input", + ... process_mode="per_message" + ... ) + >>> processor = MessageProcessor(config, process_callback=process_messages) + >>> await processor.start() + + >>> # Batch processing with workflow trigger + >>> config = MessageProcessorConfig( + ... binding_name="message-processor-input", + ... batch_size=50, + ... batch_timeout=5.0, + ... process_mode="batch", + ... trigger_workflow=True, + ... workflow_class=MyWorkflow + ... ) + >>> processor = MessageProcessor(config, workflow_client=client) + >>> await processor.start() + """ + + def __init__( + self, + config: MessageProcessorConfig, + process_callback: Optional[ + Callable[[List[dict]], Coroutine[Any, Any, None]] + ] = None, + workflow_client: Optional[WorkflowClient] = None, + ): + """Initialize the message processor. + + Args: + config: Processor configuration + process_callback: Async callback function for processing messages + workflow_client: Temporal workflow client (required if trigger_workflow=True) + + Raises: + ValueError: If workflow_client is not provided when trigger_workflow=True + ValueError: If workflow_class is not provided when trigger_workflow=True + """ + self.config = config + self.process_callback = process_callback + self.workflow_client = workflow_client + self.batch: List[tuple[str, dict]] = [] # List of (message_id, message) tuples + self.lock = asyncio.Lock() + self.last_process_time = datetime.now() + self.is_running = False + self.process_task: Optional[asyncio.Task] = None + self.total_processed = 0 + self.total_errors = 0 + self.message_futures: Dict[str, asyncio.Future] = {} # Track message processing completion + + if config.trigger_workflow and not workflow_client: + raise ValueError("workflow_client is required when trigger_workflow=True") + + if config.trigger_workflow and not config.workflow_class: + raise ValueError("workflow_class is required when trigger_workflow=True") + + async def start(self): + """Start the batch processor timer. + + For batch mode (batch_size > 1), starts a background timer that processes batches on timeout. + For per-message mode (batch_size = 1), this is a no-op. + """ + if self.config.is_batch_mode and not self.is_running: + self.is_running = True + self.process_task = asyncio.create_task(self._batch_timer()) + logger.info( + f"Message processor started in batch mode " + f"(size={self.config.batch_size}, timeout={self.config.batch_timeout}s)" + ) + + async def stop(self): + """Stop the batch processor and process remaining messages. + + Cancels the background timer and processes any remaining messages in the batch. + """ + self.is_running = False + if self.process_task: + self.process_task.cancel() + try: + await self.process_task + except asyncio.CancelledError: + pass + + if self.batch: + await self._process_batch() + + logger.info("Message processor stopped") + + async def add_message(self, message: dict) -> dict: + """Add a message to the processor. + + This method only acknowledges the message after successful processing. + If processing fails, it retries after 1 second before raising an exception. + + Args: + message: The message data to process + + Returns: + dict: Processing result containing status and details + + Raises: + Exception: If message processing fails after retry + """ + retry_count = 0 + max_retries = 1 + retry_delay = 1.0 # 1 second + + while retry_count <= max_retries: + try: + if not self.config.is_batch_mode: + result = await self._process_single_message(message) + logger.info("✓ Message processed successfully, acknowledging to Dapr") + return result + else: + result = await self._add_to_batch_and_wait(message) + logger.info("✓ Message batch processed successfully, acknowledging to Dapr") + return result + except Exception as e: + retry_count += 1 + self.total_errors += 1 + + if retry_count <= max_retries: + logger.warning( + f"✗ Message processing failed (attempt {retry_count}/{max_retries + 1}), " + f"retrying in {retry_delay}s: {e}" + ) + await asyncio.sleep(retry_delay) + else: + logger.error( + f"✗ Message processing failed after {max_retries + 1} attempts, " + f"NOT acknowledging to Dapr (message will be redelivered): {e}", + exc_info=True + ) + # Raise exception to signal Dapr not to acknowledge + raise + + async def _process_single_message(self, message: dict) -> dict: + """Process a single message immediately. + + Args: + message: The message to process + + Returns: + dict: Processing result + + Raises: + Exception: If processing fails + """ + try: + logger.info(f"Processing message: {message.get('event_type', 'unknown')}") + + if self.config.trigger_workflow: + # Trigger workflow for this message + result = await self._trigger_workflow_for_message(message) + elif self.process_callback: + # Use custom callback + await self.process_callback([message]) + result = { + "status": "processed", + "message": "Message processed successfully", + } + else: + # Default processing - just log + logger.info(f"Processed message: {message}") + result = {"status": "processed", "message": "Message logged"} + + self.total_processed += 1 + + # Record success metric + metrics = get_metrics() + metrics.record_metric( + name="messages_processed_total", + value=1.0, + metric_type=MetricType.COUNTER, + labels={"status": "success", "mode": "per_message"}, + description="Total messages processed", + ) + + return result + + except Exception as e: + logger.error(f"Error processing single message: {e}", exc_info=True) + self.total_errors += 1 + + # Record error metric + metrics = get_metrics() + metrics.record_metric( + name="messages_processed_total", + value=1.0, + metric_type=MetricType.COUNTER, + labels={"status": "error", "mode": "per_message"}, + description="Total messages processed", + ) + raise + + async def _add_to_batch_and_wait(self, message: dict) -> dict: + """Add message to batch and wait for processing to complete. + + This ensures the message is only acknowledged after its batch is successfully processed. + + Args: + message: The message to add + + Returns: + dict: Status of batch processing + + Raises: + Exception: If batch processing fails + """ + message_id = str(uuid4()) + future = asyncio.Future() + + async with self.lock: + # Register future for this message + self.message_futures[message_id] = future + + # Add message with ID to batch + self.batch.append((message_id, message)) + current_size = len(self.batch) + + logger.info( + f"✓ Received: {message.get('event_type', 'unknown')}/{message.get('event_name', 'unknown')} " + f"| Data keys: {list(message.get('data', {}).keys())}" + ) + logger.info(f" Added to batch (current: {current_size}/{self.config.batch_size})") + + # Process immediately if batch is full + if current_size >= self.config.batch_size: + logger.info( + f"⚡ Batch full ({current_size}/{self.config.batch_size}), processing now..." + ) + await self._process_batch() + + # Wait for this message's batch to be processed + # This will block until _process_batch completes and resolves the future + try: + result = await future + return result + except Exception as e: + # Clean up future on error + async with self.lock: + self.message_futures.pop(message_id, None) + raise + + async def _batch_timer(self): + """Timer to process batch on timeout. + + Runs as a background task checking batch timeout periodically. + """ + while self.is_running: + try: + await asyncio.sleep(self.config.batch_timeout) + + async with self.lock: + if self.batch: + time_since_last = ( + datetime.now() - self.last_process_time + ).total_seconds() + if time_since_last >= self.config.batch_timeout: + logger.info( + f"Batch timeout reached ({time_since_last:.1f}s), " + f"processing {len(self.batch)} messages" + ) + await self._process_batch() + except asyncio.CancelledError: + break + except Exception as e: + logger.error(f"Error in batch timer: {e}", exc_info=True) + + async def _process_batch(self): + """Process the current batch (must be called with lock held). + + Resolves or rejects all message futures based on processing result. + + Raises: + Exception: If batch processing fails + """ + if not self.batch: + return + + batch_to_process = self.batch.copy() + self.batch.clear() + self.last_process_time = datetime.now() + + # Extract messages and their IDs + message_ids = [msg_id for msg_id, _ in batch_to_process] + messages = [msg for _, msg in batch_to_process] + + try: + logger.info(f"📦 Processing batch of {len(messages)} messages...") + + if self.config.trigger_workflow: + # Trigger workflow for the batch + await self._trigger_workflow_for_batch(messages) + elif self.process_callback: + # Use custom callback + await self.process_callback(messages) + else: + # Default processing - just log + for message in messages: + logger.info(f"Processed message: {message}") + + self.total_processed += len(messages) + + # Record success metric + metrics = get_metrics() + metrics.record_metric( + name="messages_processed_total", + value=float(len(messages)), + metric_type=MetricType.COUNTER, + labels={"status": "success", "mode": "batch"}, + description="Total messages processed", + ) + metrics.record_metric( + name="message_processor_batch_size", + value=float(len(messages)), + metric_type=MetricType.HISTOGRAM, + labels={}, + description="Message processing batch size distribution", + ) + + logger.info(f"✅ Batch processing completed successfully ({len(messages)} messages)") + + # Resolve all futures with success + success_result = { + "status": "processed", + "message": f"Batch of {len(messages)} messages processed successfully", + "batch_size": len(messages) + } + for msg_id in message_ids: + future = self.message_futures.pop(msg_id, None) + if future and not future.done(): + future.set_result(success_result) + + except Exception as e: + logger.error(f"❌ Error processing batch: {e}", exc_info=True) + self.total_errors += 1 + + # Record error metric + metrics = get_metrics() + metrics.record_metric( + name="messages_processed_total", + value=float(len(messages)), + metric_type=MetricType.COUNTER, + labels={"status": "error", "mode": "batch"}, + description="Total messages processed", + ) + + # Reject all futures with the error + for msg_id in message_ids: + future = self.message_futures.pop(msg_id, None) + if future and not future.done(): + future.set_exception(e) + + raise + + async def _trigger_workflow_for_message(self, message: dict) -> dict: + """Trigger a workflow for a single message. + + Args: + message: The message to process + + Returns: + dict: Workflow trigger result containing workflow_id and run_id + + Raises: + ValueError: If workflow client or class not configured + Exception: If workflow trigger fails + """ + if not self.workflow_client or not self.config.workflow_class: + raise ValueError("Workflow client and class must be configured") + + try: + workflow_data = await self.workflow_client.start_workflow( + {"message": message}, workflow_class=self.config.workflow_class + ) + + return { + "status": "workflow_started", + "workflow_id": workflow_data.get("workflow_id", ""), + "run_id": workflow_data.get("run_id", ""), + } + except Exception as e: + logger.error(f"Error triggering workflow for message: {e}", exc_info=True) + raise + + async def _trigger_workflow_for_batch(self, batch: List[dict]): + """Trigger a workflow for a batch of messages. + + Args: + batch: The batch of messages to process + + Raises: + ValueError: If workflow client or class not configured + Exception: If workflow trigger fails + """ + if not self.workflow_client or not self.config.workflow_class: + raise ValueError("Workflow client and class must be configured") + + try: + workflow_data = await self.workflow_client.start_workflow( + {"messages": batch, "batch_size": len(batch)}, + workflow_class=self.config.workflow_class, + ) + + logger.info( + f"Started workflow for batch: workflow_id={workflow_data.get('workflow_id')}" + ) + except Exception as e: + logger.error(f"Error triggering workflow for batch: {e}", exc_info=True) + raise + + def get_stats(self) -> dict: + """Get processor statistics. + + Returns: + dict: Processor statistics including: + - is_running: Whether processor is active + - current_batch_size: Current number of messages in batch + - batch_size_threshold: Configured batch size limit + - batch_timeout: Configured timeout in seconds + - is_batch_mode: True if batch_size > 1, False otherwise + - total_processed: Total messages processed + - total_errors: Total processing errors + - time_since_last_process: Seconds since last batch processing + """ + return { + "is_running": self.is_running, + "current_batch_size": len(self.batch), + "batch_size_threshold": self.config.batch_size, + "batch_timeout": self.config.batch_timeout, + "is_batch_mode": self.config.is_batch_mode, + "total_processed": self.total_processed, + "total_errors": self.total_errors, + "time_since_last_process": ( + datetime.now() - self.last_process_time + ).total_seconds(), + } + diff --git a/docs/docs/guides/eventstore-non-temporal.md b/docs/docs/guides/eventstore-non-temporal.md new file mode 100644 index 000000000..e9852e653 --- /dev/null +++ b/docs/docs/guides/eventstore-non-temporal.md @@ -0,0 +1,322 @@ +# Using EventStore Without Temporal Workflows + +This guide explains how to use the EventStore service in non-Temporal contexts, such as dapr bindings, event processors, and standalone applications. + +## Overview + +The EventStore service provides two sets of methods: + +1. **Temporal Methods** (original): For use within Temporal workflows and activities +2. **Simple Methods** (new): For use in non-Temporal contexts + +## Methods + +### For Temporal Workflows (Original) + +These methods are designed for use within Temporal workflows and activities. They automatically capture workflow/activity context. + +#### `enrich_event_metadata(event: Event) -> Event` + +Enriches event metadata with Temporal workflow and activity context. + +```python +from application_sdk.services.eventstore import EventStore +from application_sdk.events.models import Event + +# Inside a Temporal workflow or activity +event = Event( + event_type="data.processed", + event_name="batch_completed", + data={"count": 100} +) + +# Automatically captures workflow_id, activity_id, etc. +enriched = EventStore.enrich_event_metadata(event) +``` + +#### `publish_event(event: Event) -> None` + +Publishes an event with Temporal context metadata. + +```python +# Inside a Temporal workflow or activity +await EventStore.publish_event(event) +``` + +### For Non-Temporal Applications (New) + +These methods are designed for standalone applications, processors, and dapr bindings that don't use Temporal. + +#### `enrich_event_metadata_simple(event: Event, custom_metadata: dict = None) -> Event` + +Enriches event metadata with custom information without requiring Temporal context. + +```python +from application_sdk.services.eventstore import EventStore +from application_sdk.events.models import Event + +# In a processor or standalone application +event = Event( + event_type="batch.processed", + event_name="batch_completed", + data={"batch_size": 50} +) + +# Add custom metadata +enriched = EventStore.enrich_event_metadata_simple( + event, + custom_metadata={ + "processor_id": "kafka-batch-1", + "batch_id": "abc123", + "processor_type": "kafka" + } +) +``` + +#### `publish_event_simple(event: Event, custom_metadata: dict = None) -> None` + +Publishes an event with custom metadata, without Temporal context. + +```python +# In a processor or standalone application +await EventStore.publish_event_simple( + event, + custom_metadata={ + "processor_id": "kafka-batch-1", + "source": "batch_processor" + } +) +``` + +## Use Cases + +### 1. Batch Event Processor + +```python +from typing import List +from application_sdk.events.models import Event +from application_sdk.services.eventstore import EventStore +from application_sdk.observability.logger_adaptor import get_logger + +logger = get_logger(__name__) + +async def process_batch(events: List[Event]): + """Process a batch of events.""" + logger.info(f"Processing {len(events)} events") + + # Process events + for event in events: + # Your processing logic + process_event(event) + + # Publish summary event + summary = Event( + event_type="processor_events", + event_name="batch_processed", + data={ + "batch_size": len(events), + "event_types": list(set(e.event_type for e in events)) + } + ) + + await EventStore.publish_event_simple( + summary, + custom_metadata={ + "processor_type": "kafka_batch", + "batch_id": generate_batch_id() + } + ) +``` + +### 2. Dapr Binding Handler + +```python +from fastapi import Request +from application_sdk.events.models import Event +from application_sdk.services.eventstore import EventStore + +async def handle_kafka_event(request: Request): + """Handle event from Kafka via dapr binding.""" + body = await request.json() + event = Event(**body) + + # Process the event + result = await process_event(event) + + # Publish result event + result_event = Event( + event_type="processing_results", + event_name="event_processed", + data=result + ) + + await EventStore.publish_event_simple( + result_event, + custom_metadata={"handler": "kafka_binding"} + ) +``` + +### 3. Standalone Event Publisher + +```python +import asyncio +from application_sdk.events.models import Event +from application_sdk.services.eventstore import EventStore + +async def publish_metrics(): + """Publish metrics as events.""" + metrics = collect_metrics() + + event = Event( + event_type="system_metrics", + event_name="metrics_collected", + data=metrics + ) + + await EventStore.publish_event_simple( + event, + custom_metadata={ + "source": "metrics_collector", + "hostname": get_hostname() + } + ) + +# Run standalone +asyncio.run(publish_metrics()) +``` + +## Custom Metadata Fields + +The `custom_metadata` parameter can include any fields that exist on the `EventMetadata` model: + +```python +custom_metadata = { + # Standard fields + "workflow_id": "custom-workflow-123", + "workflow_type": "custom_processor", + "workflow_state": "running", + + # Custom fields (if added to EventMetadata) + "processor_id": "kafka-1", + "batch_id": "batch-123", + "source_system": "kafka", +} +``` + +## Best Practices + +### 1. Use Appropriate Method + +- **Temporal context**: Use `publish_event()` and `enrich_event_metadata()` +- **Non-Temporal context**: Use `publish_event_simple()` and `enrich_event_metadata_simple()` + +### 2. Include Meaningful Metadata + +```python +# Good - descriptive metadata +custom_metadata = { + "processor_type": "kafka_batch", + "processor_id": f"kafka-{instance_id}", + "batch_id": batch_id, + "processing_duration_ms": duration +} + +# Not ideal - minimal context +custom_metadata = {"id": "123"} +``` + +### 3. Handle Failures Gracefully + +```python +try: + await EventStore.publish_event_simple(event, custom_metadata) +except Exception as e: + # Log but don't fail the main operation + logger.warning(f"Failed to publish event: {e}") +``` + +### 4. Use Consistent Naming + +```python +# Event types should be consistent +event_type = "processor_events" # Good +event_type = "ProcessorEvents" # Inconsistent + +# Event names should be descriptive +event_name = "batch_processed" # Good +event_name = "done" # Too vague +``` + +## Differences from Temporal Methods + +| Feature | Temporal Methods | Simple Methods | +|---------|-----------------|----------------| +| Workflow context | Automatically captured | Not captured | +| Activity context | Automatically captured | Not captured | +| Custom metadata | Not supported | Supported via parameter | +| Use case | Temporal workflows/activities | Processors, bindings, standalone | +| Dependencies | Requires Temporal | No Temporal required | + +## Migration Example + +If you have a processor that was using Temporal methods: + +**Before (requires Temporal):** +```python +# Won't work outside Temporal context +await EventStore.publish_event(event) +``` + +**After (works everywhere):** +```python +# Works in any context +await EventStore.publish_event_simple( + event, + custom_metadata={"processor_id": "my-processor"} +) +``` + +## Troubleshooting + +### Event Not Published + +Check if the eventstore component is registered: + +```python +from application_sdk.common.dapr_utils import is_component_registered +from application_sdk.constants import EVENT_STORE_NAME + +if not is_component_registered(EVENT_STORE_NAME): + logger.warning("EventStore component not registered") +``` + +### Missing Metadata + +Verify your custom metadata fields exist on EventMetadata: + +```python +from application_sdk.events.models import EventMetadata + +# Check available fields +print(EventMetadata.model_fields.keys()) +``` + +### Authentication Errors + +Ensure your application has proper authentication configured: + +```python +from application_sdk.clients.atlan_auth import AtlanAuthClient + +auth_client = AtlanAuthClient() +headers = await auth_client.get_authenticated_headers() +``` + +## See Also + +- [EventStore API Reference](../concepts/eventstore.md) +- [Event Models](../concepts/events.md) +- [Dapr Bindings](https://docs.dapr.io/developing-applications/building-blocks/bindings/) +- [Simple Kafka Processor Example](../../../atlan-sample-apps/quickstart/simple-kafka-processor/) + +