Skip to content

Commit de7a93a

Browse files
committed
GH-1651 - Improve batching in DefaultEventPublicationRegistry.
1 parent 644c3f7 commit de7a93a

5 files changed

Lines changed: 97 additions & 5 deletions

File tree

spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/core/DefaultEventPublicationRegistry.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,9 +236,12 @@ public void processFailedPublications(ResubmissionOptions options, Consumer<Targ
236236
return;
237237
}
238238

239+
var remainingHeadroom = options.getMaxInFlight() - currentlyResubmitted;
240+
var itemsToRead = Math.min(options.getBatchSize(), remainingHeadroom);
241+
239242
var criteria = FailedCriteria.ALL
240243
.withPublicationsPublishedBefore(clock.instant().minus(options.getMinAge()))
241-
.withItemsToRead(Math.min(options.getBatchSize(), options.getBatchSize() - currentlyResubmitted));
244+
.withItemsToRead(itemsToRead);
242245

243246
processPublications(events.findFailedPublications(criteria), options.getFilter(), consumer);
244247
}

spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/core/EventPublicationRepository.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -211,12 +211,17 @@ default int countByStatus(Status status) {
211211

212212
static class FailedCriteria {
213213

214-
public static FailedCriteria ALL = new FailedCriteria(-1, null);
214+
public static final FailedCriteria ALL = new FailedCriteria(-1, null);
215215

216216
private final long maxItemsToRead;
217217
private final @Nullable Instant publicationDateReference;
218218

219219
private FailedCriteria(long maxItemsToRead, @Nullable Instant publicationDateReference) {
220+
221+
Assert.isTrue(maxItemsToRead == -1 || maxItemsToRead > 0,
222+
() -> "Maximum number of items to read must be -1 (unlimited) or positive, but was: %d!"
223+
.formatted(maxItemsToRead));
224+
220225
this.maxItemsToRead = maxItemsToRead;
221226
this.publicationDateReference = publicationDateReference;
222227
}
@@ -237,13 +242,19 @@ public FailedCriteria withPublicationsPublishedBefore(Instant reference) {
237242
}
238243

239244
/**
240-
* The number of {@link org.springframework.modulith.events.EventPublication}s to read. Return -1 to indicate you
241-
* want to read all items.
245+
* The number of {@link org.springframework.modulith.events.EventPublication}s to read. {@code -1} reads all
246+
* matching items (no limit); positive values cap how many rows are read. A limit of {@code 0} is not supported —
247+
* callers that have nothing to read should not invoke {@link #findFailedPublications(FailedCriteria)}.
248+
*
249+
* @return {@code -1} or a positive value
242250
*/
243251
public long getMaxItemsToRead() {
244252
return maxItemsToRead;
245253
}
246254

255+
/**
256+
* @param itemsToRead {@code -1} for no limit, or a positive value
257+
*/
247258
public FailedCriteria withItemsToRead(long itemsToRead) {
248259
return new FailedCriteria(itemsToRead, publicationDateReference);
249260
}

spring-modulith-events/spring-modulith-events-core/src/test/java/org/springframework/modulith/events/core/DefaultEventPublicationRegistryUnitTests.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,16 @@
2323
import java.time.Clock;
2424
import java.time.Instant;
2525
import java.time.ZoneId;
26+
import java.util.Collections;
2627
import java.util.function.Consumer;
2728
import java.util.stream.Stream;
2829

2930
import org.junit.jupiter.api.Test;
3031
import org.junit.jupiter.api.extension.ExtendWith;
3132
import org.mockito.Mock;
3233
import org.mockito.junit.jupiter.MockitoExtension;
34+
import org.springframework.modulith.events.EventPublication.Status;
35+
import org.springframework.modulith.events.ResubmissionOptions;
3336

3437
/**
3538
* Unit tests for {@link DefaultEventPublicationRegistry}.
@@ -99,6 +102,34 @@ void obtainsCorrectInProgressPublicationForIdenticalEvents() {
99102
assertThat(inProgress.getPublication(secondEvent, identifier)).containsSame(second);
100103
}
101104

105+
@Test // GH-1650
106+
void processFailedPublicationsCapsReadLimitByMaxInFlightHeadroomNotBatchSize() {
107+
108+
when(repository.countByStatus(Status.RESUBMITTED)).thenReturn(100);
109+
when(repository.findFailedPublications(any())).thenReturn(Collections.emptyList());
110+
111+
var registry = createRegistry(Instant.now());
112+
113+
registry.processFailedPublications(
114+
ResubmissionOptions.defaults().withMaxInFlight(Integer.MAX_VALUE).withBatchSize(100), __ -> {});
115+
116+
verify(repository).findFailedPublications(argThat(criteria -> criteria.getMaxItemsToRead() == 100));
117+
}
118+
119+
@Test // GH-1650
120+
void processFailedPublicationsUsesRemainingInFlightWhenLessThanBatchSize() {
121+
122+
when(repository.countByStatus(Status.RESUBMITTED)).thenReturn(100);
123+
when(repository.findFailedPublications(any())).thenReturn(Collections.emptyList());
124+
125+
var registry = createRegistry(Instant.now());
126+
127+
registry.processFailedPublications(
128+
ResubmissionOptions.defaults().withMaxInFlight(150).withBatchSize(100), __ -> {});
129+
130+
verify(repository).findFailedPublications(argThat(criteria -> criteria.getMaxItemsToRead() == 50));
131+
}
132+
102133
private DefaultEventPublicationRegistry createRegistry(Instant instant) {
103134

104135
var clock = Clock.fixed(instant, ZoneId.systemDefault());
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright 2026 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.modulith.events.core;
17+
18+
import static org.assertj.core.api.Assertions.*;
19+
import static org.springframework.modulith.events.core.EventPublicationRepository.FailedCriteria.*;
20+
21+
import org.junit.jupiter.api.Test;
22+
23+
/**
24+
* Unit tests for {@link EventPublicationRepository.FailedCriteria}.
25+
*
26+
* @author Oliver Drotbohm
27+
*/
28+
class FailedCriteriaUnitTests {
29+
30+
@Test // GH-1650
31+
void acceptsNegativeOneAndPositive() {
32+
assertThat(ALL.withItemsToRead(-1).getMaxItemsToRead()).isEqualTo(-1);
33+
assertThat(ALL.withItemsToRead(1).getMaxItemsToRead()).isEqualTo(1);
34+
}
35+
36+
@Test // GH-1650
37+
void rejectsMaxItemsToReadZero() {
38+
assertThatIllegalArgumentException().isThrownBy(() -> ALL.withItemsToRead(0));
39+
}
40+
41+
@Test // GH-1650
42+
void rejectsMaxItemsToReadSmallerThanNegativeOne() {
43+
assertThatIllegalArgumentException().isThrownBy(() -> ALL.withItemsToRead(-2));
44+
}
45+
}

spring-modulith-events/spring-modulith-events-mongodb/src/main/java/org/springframework/modulith/events/mongodb/MongoDbEventPublicationRepository.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,9 @@ public List<TargetEventPublication> findFailedPublications(FailedCriteria criter
248248
throw new IllegalArgumentException("Number of items to read needs to fit into an integer!");
249249
}
250250

251-
return readMapped(defaultQuery(baseCriteria).limit((int) limit));
251+
var query = defaultQuery(baseCriteria);
252+
253+
return readMapped(limit != -1 ? query.limit((int) limit) : query);
252254
}
253255

254256
/*

0 commit comments

Comments
 (0)