Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ Examples:

"my-other-topic" -> "my-topic-myRetrySuffix-1000", "my-topic-myRetrySuffix-2000", ..., "my-topic-myDltSuffix"

NOTE: The default behavior is to create separate retry topics for each attempt, appended with an index value: retry-0, retry-1, ..., retry-n.
Therefore, by default the number of retry topics is the configured `maxAttempts` minus 1.
NOTE: Starting with version 4.1, the default behavior is to reuse a single retry topic for the same delay intervals.
To create separate retry topics for each attempt, set `sameIntervalTopicReuseStrategy` to `MULTIPLE_TOPICS`.

You can xref:retrytopic/topic-naming.adoc#retry-topics-and-dlt-suffixes[configure the suffixes], choose whether to append xref:retrytopic/topic-naming.adoc#append-index-or-delay[the attempt index or delay], use a xref:retrytopic/topic-naming.adoc#single-topic-fixed-delay[single retry topic when using fixed backoff], and use a xref:retrytopic/topic-naming.adoc#single-topic-maxinterval-delay[single retry topic for the attempts with the maxInterval] when using exponential backoffs.

Expand Down Expand Up @@ -99,7 +99,8 @@ public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> templa
}
----

NOTE: The default behavior is creating separate retry topics for each attempt, appended with their index values: retry-0, retry-1, ...
NOTE: Starting with version 4.1, the default behavior is to use a single topic for fixed delay retries.
To use multiple topics, set `sameIntervalTopicReuseStrategy` to `MULTIPLE_TOPICS`.


[[single-topic-maxinterval-delay]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,9 @@ You can set a custom recoverer on the factory or container.

`ShareAcknowledgment` now supports `renew()` to extend the acquisition lock when processing exceeds the broker's lock duration (KIP-1222, Kafka 4.2).
See xref:kafka/kafka-queues.adoc#share-acknowledgment-api[ShareAcknowledgment API] in the Kafka Queues documentation for details.

[[x41-retry-topic-builder-default]]
=== `RetryTopicConfigurationBuilder` Default Strategy Change

The default value of `sameIntervalTopicReuseStrategy` in `RetryTopicConfigurationBuilder` has been changed from `MULTIPLE_TOPICS` to `SINGLE_TOPIC` to align with the `@RetryableTopic` annotation default.
See xref:retrytopic/topic-naming.adoc[Topic Naming] for more information.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
* @author Adrian Chlebosz
* @author Wang Zhiyang
* @author Stephane Nicoll
* @author Heejin Jeon
*
* @since 2.7
*
Expand Down Expand Up @@ -93,7 +94,7 @@ public class RetryTopicConfigurationBuilder {

private TopicSuffixingStrategy topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_DELAY_VALUE;

private SameIntervalTopicReuseStrategy sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.MULTIPLE_TOPICS;
private SameIntervalTopicReuseStrategy sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.SINGLE_TOPIC;

@Nullable
private Boolean autoStartDltHandler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
/**
* @author Tomaz Fernandes
* @author Adrian Chlebosz
* @author Heejin Jeon
* @since 2.7
*/
@ExtendWith(MockitoExtension.class)
Expand Down Expand Up @@ -73,7 +74,8 @@ void shouldSetFixedBackOffPolicy() {

// setup
RetryTopicConfigurationBuilder builder = new RetryTopicConfigurationBuilder();
builder.fixedBackOff(1000);
builder.fixedBackOff(1000)
.sameIntervalTopicReuseStrategy(SameIntervalTopicReuseStrategy.MULTIPLE_TOPICS);

//when
RetryTopicConfiguration configuration = builder.create(kafkaOperations);
Expand All @@ -91,7 +93,8 @@ void shouldSetNoBackoffPolicy() {

// setup
RetryTopicConfigurationBuilder builder = new RetryTopicConfigurationBuilder();
builder.noBackoff();
builder.noBackoff()
.sameIntervalTopicReuseStrategy(SameIntervalTopicReuseStrategy.MULTIPLE_TOPICS);

//when
RetryTopicConfiguration configuration = builder.create(kafkaOperations);
Expand Down Expand Up @@ -201,6 +204,7 @@ void shouldSetDltRoutingRules() {

//when
RetryTopicConfiguration configuration = builder
.sameIntervalTopicReuseStrategy(SameIntervalTopicReuseStrategy.MULTIPLE_TOPICS)
.dltRoutingRules(Map.of("-deserialization", Set.of(DeserializationException.class)))
.create(kafkaOperations);

Expand Down