Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 81 additions & 1 deletion docs/utilities/batch.md
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,9 @@ used with SQS FIFO. In that case, an `UnsupportedOperationException` is thrown.
in most cases the defaults work well, and changing them is more likely to decrease performance
(see [here](https://www.baeldung.com/java-when-to-use-parallel-stream#fork-join-framework)
and [here](https://dzone.com/articles/be-aware-of-forkjoinpoolcommonpool)).
In situations where this may be useful - such as performing IO-bound work in parallel - make sure to measure before and after!
In situations where this may be useful, such as performing IO-bound work in parallel, make sure to measure before and after!

When using parallel processing with X-Ray tracing enabled, the Tracing utility automatically handles trace context propagation to worker threads. This ensures that subsegments created during parallel message processing appear under the correct parent segment in your X-Ray trace, maintaining proper trace hierarchy and visibility into your batch processing performance.


=== "Example with SQS"
Expand Down Expand Up @@ -536,6 +538,84 @@ used with SQS FIFO. In that case, an `UnsupportedOperationException` is thrown.
}
```

=== "Example with X-Ray Tracing"

```java hl_lines="12 17"
public class SqsBatchHandler implements RequestHandler<SQSEvent, SQSBatchResponse> {

private final BatchMessageHandler<SQSEvent, SQSBatchResponse> handler;

public SqsBatchHandler() {
handler = new BatchMessageHandlerBuilder()
.withSqsBatchHandler()
.buildWithMessageHandler(this::processMessage, Product.class);
}

@Override
@Tracing
public SQSBatchResponse handleRequest(SQSEvent sqsEvent, Context context) {
return handler.processBatchInParallel(sqsEvent, context);
}

@Tracing // This will appear correctly under the handleRequest subsegment
private void processMessage(Product p, Context c) {
// Process the product - subsegments will appear under handleRequest
}
}
```

### Choosing the right concurrency model

The `processBatchInParallel` method has two overloads with different concurrency characteristics:

#### Without custom executor (parallelStream)

When you call `processBatchInParallel(event, context)` without providing an executor, the implementation uses Java's `parallelStream()` which leverages the common `ForkJoinPool`.

**Best for: CPU-bound workloads**

- Thread pool size matches available CPU cores
- Optimized for computational tasks (data transformation, calculations, parsing)
- Main thread participates in work-stealing
- Simple to use with no configuration needed

```java
// Good for CPU-intensive processing
return handler.processBatchInParallel(sqsEvent, context);
```

#### With custom executor (CompletableFuture)

When you call `processBatchInParallel(event, context, executor)` with a custom executor, the implementation uses `CompletableFuture` which gives you full control over the thread pool.

**Best for: I/O-bound workloads**

- You control thread pool size and characteristics
- Ideal for I/O operations (HTTP calls, database queries, S3 operations)
- Can use larger thread pools since threads spend time waiting, not computing
- Main thread only waits; worker threads do all processing

```java
// Good for I/O-intensive processing (API calls, DB queries, etc.)
ExecutorService executor = Executors.newFixedThreadPool(50);
return handler.processBatchInParallel(sqsEvent, context, executor);
```

**For Java 21+: Virtual Threads**

If you're using Java 21 or later, virtual threads are ideal for I/O-bound workloads:

```java
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
return handler.processBatchInParallel(sqsEvent, context, executor);
```

Virtual threads are lightweight and can handle thousands of concurrent I/O operations efficiently without the overhead of platform threads.

**Recommendation for typical Lambda SQS processing:**

Most Lambda functions processing SQS messages perform I/O operations (calling APIs, querying databases, writing to S3). For these workloads, use the custom executor approach with a thread pool sized appropriately for your I/O operations or virtual threads for Java 21+.


## Handling Messages

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,25 @@

package software.amazon.lambda.powertools.batch.handler;

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.events.DynamodbEvent;
import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.events.DynamodbEvent;
import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;

import software.amazon.lambda.powertools.batch.internal.MultiThreadMDC;
import software.amazon.lambda.powertools.batch.internal.XRayTraceEntityPropagator;

/**
* A batch message processor for DynamoDB Streams batches.
Expand All @@ -43,8 +47,8 @@ public class DynamoDbBatchMessageHandler implements BatchMessageHandler<Dynamodb
private final BiConsumer<DynamodbEvent.DynamodbStreamRecord, Context> rawMessageHandler;

public DynamoDbBatchMessageHandler(Consumer<DynamodbEvent.DynamodbStreamRecord> successHandler,
BiConsumer<DynamodbEvent.DynamodbStreamRecord, Throwable> failureHandler,
BiConsumer<DynamodbEvent.DynamodbStreamRecord, Context> rawMessageHandler) {
BiConsumer<DynamodbEvent.DynamodbStreamRecord, Throwable> failureHandler,
BiConsumer<DynamodbEvent.DynamodbStreamRecord, Context> rawMessageHandler) {
this.successHandler = successHandler;
this.failureHandler = failureHandler;
this.rawMessageHandler = rawMessageHandler;
Expand All @@ -65,14 +69,23 @@ public StreamsEventResponse processBatch(DynamodbEvent event, Context context) {
@Override
public StreamsEventResponse processBatchInParallel(DynamodbEvent event, Context context) {
MultiThreadMDC multiThreadMDC = new MultiThreadMDC();
Object capturedSubsegment = XRayTraceEntityPropagator.captureTraceEntity();

List<StreamsEventResponse.BatchItemFailure> batchItemFailures = event.getRecords()
.parallelStream() // Parallel processing
.map(eventRecord -> {
multiThreadMDC.copyMDCToThread(Thread.currentThread().getName());
Optional<StreamsEventResponse.BatchItemFailure> failureOpt = processBatchItem(eventRecord, context);
multiThreadMDC.removeThread(Thread.currentThread().getName());
return failureOpt;
AtomicReference<Optional<StreamsEventResponse.BatchItemFailure>> result = new AtomicReference<>();

XRayTraceEntityPropagator.runWithEntity(capturedSubsegment, () -> {
multiThreadMDC.copyMDCToThread(Thread.currentThread().getName());
try {
result.set(processBatchItem(eventRecord, context));
} finally {
multiThreadMDC.removeThread(Thread.currentThread().getName());
}
});

return result.get();
})
.filter(Optional::isPresent)
.map(Optional::get)
Expand All @@ -84,21 +97,29 @@ public StreamsEventResponse processBatchInParallel(DynamodbEvent event, Context
@Override
public StreamsEventResponse processBatchInParallel(DynamodbEvent event, Context context, Executor executor) {
MultiThreadMDC multiThreadMDC = new MultiThreadMDC();
Object capturedSubsegment = XRayTraceEntityPropagator.captureTraceEntity();

List<StreamsEventResponse.BatchItemFailure> batchItemFailures = new ArrayList<>();
List<CompletableFuture<Void>> futures = event.getRecords().stream()
.map(eventRecord -> CompletableFuture.runAsync(() -> {
multiThreadMDC.copyMDCToThread(Thread.currentThread().getName());
Optional<StreamsEventResponse.BatchItemFailure> failureOpt = processBatchItem(eventRecord, context);
failureOpt.ifPresent(batchItemFailures::add);
multiThreadMDC.removeThread(Thread.currentThread().getName());
XRayTraceEntityPropagator.runWithEntity(capturedSubsegment, () -> {
multiThreadMDC.copyMDCToThread(Thread.currentThread().getName());
try {
Optional<StreamsEventResponse.BatchItemFailure> failureOpt = processBatchItem(eventRecord,
context);
failureOpt.ifPresent(batchItemFailures::add);
} finally {
multiThreadMDC.removeThread(Thread.currentThread().getName());
}
});
}, executor))
.collect(Collectors.toList());
futures.forEach(CompletableFuture::join);
return StreamsEventResponse.builder().withBatchItemFailures(batchItemFailures).build();
}

private Optional<StreamsEventResponse.BatchItemFailure> processBatchItem(DynamodbEvent.DynamodbStreamRecord streamRecord, Context context) {
private Optional<StreamsEventResponse.BatchItemFailure> processBatchItem(
DynamodbEvent.DynamodbStreamRecord streamRecord, Context context) {
try {
LOGGER.debug("Processing item {}", streamRecord.getEventID());

Expand All @@ -124,7 +145,8 @@ private Optional<StreamsEventResponse.BatchItemFailure> processBatchItem(Dynamod
LOGGER.warn("failureHandler threw handling failure", e2);
}
}
return Optional.of(StreamsEventResponse.BatchItemFailure.builder().withItemIdentifier(sequenceNumber).build());
return Optional
.of(StreamsEventResponse.BatchItemFailure.builder().withItemIdentifier(sequenceNumber).build());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,25 @@

package software.amazon.lambda.powertools.batch.handler;


import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;

import software.amazon.lambda.powertools.batch.internal.MultiThreadMDC;
import software.amazon.lambda.powertools.batch.internal.XRayTraceEntityPropagator;
import software.amazon.lambda.powertools.utilities.EventDeserializer;

/**
Expand All @@ -49,10 +52,10 @@ public class KinesisStreamsBatchMessageHandler<M> implements BatchMessageHandler
private final BiConsumer<KinesisEvent.KinesisEventRecord, Throwable> failureHandler;

public KinesisStreamsBatchMessageHandler(BiConsumer<KinesisEvent.KinesisEventRecord, Context> rawMessageHandler,
BiConsumer<M, Context> messageHandler,
Class<M> messageClass,
Consumer<KinesisEvent.KinesisEventRecord> successHandler,
BiConsumer<KinesisEvent.KinesisEventRecord, Throwable> failureHandler) {
BiConsumer<M, Context> messageHandler,
Class<M> messageClass,
Consumer<KinesisEvent.KinesisEventRecord> successHandler,
BiConsumer<KinesisEvent.KinesisEventRecord, Throwable> failureHandler) {

this.rawMessageHandler = rawMessageHandler;
this.messageHandler = messageHandler;
Expand All @@ -76,14 +79,23 @@ public StreamsEventResponse processBatch(KinesisEvent event, Context context) {
@Override
public StreamsEventResponse processBatchInParallel(KinesisEvent event, Context context) {
MultiThreadMDC multiThreadMDC = new MultiThreadMDC();
Object capturedSubsegment = XRayTraceEntityPropagator.captureTraceEntity();

List<StreamsEventResponse.BatchItemFailure> batchItemFailures = event.getRecords()
.parallelStream() // Parallel processing
.map(eventRecord -> {
multiThreadMDC.copyMDCToThread(Thread.currentThread().getName());
Optional<StreamsEventResponse.BatchItemFailure> failureOpt = processBatchItem(eventRecord, context);
multiThreadMDC.removeThread(Thread.currentThread().getName());
return failureOpt;
AtomicReference<Optional<StreamsEventResponse.BatchItemFailure>> result = new AtomicReference<>();

XRayTraceEntityPropagator.runWithEntity(capturedSubsegment, () -> {
multiThreadMDC.copyMDCToThread(Thread.currentThread().getName());
try {
result.set(processBatchItem(eventRecord, context));
} finally {
multiThreadMDC.removeThread(Thread.currentThread().getName());
}
});

return result.get();
})
.filter(Optional::isPresent)
.map(Optional::get)
Expand All @@ -95,21 +107,29 @@ public StreamsEventResponse processBatchInParallel(KinesisEvent event, Context c
@Override
public StreamsEventResponse processBatchInParallel(KinesisEvent event, Context context, Executor executor) {
MultiThreadMDC multiThreadMDC = new MultiThreadMDC();
Object capturedSubsegment = XRayTraceEntityPropagator.captureTraceEntity();

List<StreamsEventResponse.BatchItemFailure> batchItemFailures = new ArrayList<>();
List<CompletableFuture<Void>> futures = event.getRecords().stream()
.map(eventRecord -> CompletableFuture.runAsync(() -> {
multiThreadMDC.copyMDCToThread(Thread.currentThread().getName());
Optional<StreamsEventResponse.BatchItemFailure> failureOpt = processBatchItem(eventRecord, context);
failureOpt.ifPresent(batchItemFailures::add);
multiThreadMDC.removeThread(Thread.currentThread().getName());
XRayTraceEntityPropagator.runWithEntity(capturedSubsegment, () -> {
multiThreadMDC.copyMDCToThread(Thread.currentThread().getName());
try {
Optional<StreamsEventResponse.BatchItemFailure> failureOpt = processBatchItem(eventRecord,
context);
failureOpt.ifPresent(batchItemFailures::add);
} finally {
multiThreadMDC.removeThread(Thread.currentThread().getName());
}
});
}, executor))
.collect(Collectors.toList());
futures.forEach(CompletableFuture::join);
return StreamsEventResponse.builder().withBatchItemFailures(batchItemFailures).build();
}

private Optional<StreamsEventResponse.BatchItemFailure> processBatchItem(KinesisEvent.KinesisEventRecord eventRecord, Context context) {
private Optional<StreamsEventResponse.BatchItemFailure> processBatchItem(
KinesisEvent.KinesisEventRecord eventRecord, Context context) {
try {
LOGGER.debug("Processing item {}", eventRecord.getEventID());

Expand Down Expand Up @@ -141,8 +161,8 @@ private Optional<StreamsEventResponse.BatchItemFailure> processBatchItem(Kinesis
}
}

return Optional.of(StreamsEventResponse.BatchItemFailure.builder().withItemIdentifier(eventRecord.getKinesis().getSequenceNumber()).build());
return Optional.of(StreamsEventResponse.BatchItemFailure.builder()
.withItemIdentifier(eventRecord.getKinesis().getSequenceNumber()).build());
}
}
}

Loading
Loading