Skip to content

Commit b974ccd

Browse files
KAFKA-19096: Added checks for the number of dropped records to foreign key join tests. (#21111)
Follow up to #20605 to improve test coverage. Reviewers: Matthias J. Sax <matthias@confluent.io>
1 parent 172aa3c commit b974ccd

File tree

3 files changed

+127
-1
lines changed

3 files changed

+127
-1
lines changed

streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplierTests.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import static org.hamcrest.MatcherAssert.assertThat;
4949
import static org.hamcrest.Matchers.empty;
5050
import static org.hamcrest.Matchers.is;
51+
import static org.junit.jupiter.api.Assertions.assertEquals;
5152

5253
public class ForeignTableJoinProcessorSupplierTests {
5354

@@ -113,6 +114,9 @@ public void shouldPropagateRightRecordForEachMatchingPrimaryKey() {
113114
context.forwarded().get(1).record(),
114115
is(new Record<>(pk2, new SubscriptionResponseWrapper<>(hash, "new_value", null), 0))
115116
);
117+
118+
// test dropped-records sensors
119+
assertEquals(0.0, getDroppedRecordsTotalMetric(context));
116120
}
117121

118122
@Test
@@ -125,6 +129,9 @@ public void shouldPropagateNothingIfNoMatchingPrimaryKey() {
125129
processor.process(record);
126130

127131
assertThat(context.forwarded(), empty());
132+
133+
// test dropped-records sensors
134+
assertEquals(0.0, getDroppedRecordsTotalMetric(context));
128135
}
129136

130137
@Test
@@ -145,6 +152,9 @@ public void shouldPropagateTombstoneRightRecordForEachMatchingPrimaryKey() {
145152
context.forwarded().get(1).record(),
146153
is(new Record<>(pk2, new SubscriptionResponseWrapper<>(hash, null, null), 0))
147154
);
155+
156+
// test dropped-records sensors
157+
assertEquals(0.0, getDroppedRecordsTotalMetric(context));
148158
}
149159

150160
@Test
@@ -162,6 +172,9 @@ public void shouldNotMatchForeignKeysHavingThisFKAsPrefix() {
162172
context.forwarded().get(0).record(),
163173
is(new Record<>(pk2, new SubscriptionResponseWrapper<>(hash, "new_value", null), 0))
164174
);
175+
176+
// test dropped-records sensors
177+
assertEquals(0.0, getDroppedRecordsTotalMetric(context));
165178
}
166179

167180
@Test
@@ -175,7 +188,7 @@ public void shouldIgnoreRecordWithNullKey() {
175188
assertThat(context.forwarded(), empty());
176189

177190
// test dropped-records sensors
178-
Assertions.assertEquals(1.0, getDroppedRecordsTotalMetric(context));
191+
assertEquals(1.0, getDroppedRecordsTotalMetric(context));
179192
Assertions.assertNotEquals(0.0, getDroppedRecordsRateMetric(context));
180193
}
181194

streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplierTest.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,9 @@ public void shouldForwardWhenHashMatches() {
164164
final List<MockProcessorContext.CapturedForward<? extends String, ? extends String>> forwarded = context.forwarded();
165165
assertThat(forwarded.size(), is(1));
166166
assertThat(forwarded.get(0).record(), is(new Record<>("lhs1", "(lhsValue,rhsValue)", 0)));
167+
168+
// test dropped-records sensors
169+
assertEquals(0.0, getDroppedRecordsTotalMetric(context));
167170
}
168171

169172
@Test
@@ -190,6 +193,9 @@ public void shouldEmitTombstoneForInnerJoinWhenRightIsNull() {
190193
final List<MockProcessorContext.CapturedForward<? extends String, ? extends String>> forwarded = context.forwarded();
191194
assertThat(forwarded.size(), is(1));
192195
assertThat(forwarded.get(0).record(), is(new Record<>("lhs1", null, 0)));
196+
197+
// test dropped-records sensors
198+
assertEquals(0.0, getDroppedRecordsTotalMetric(context));
193199
}
194200

195201
@Test
@@ -216,6 +222,9 @@ public void shouldEmitResultForLeftJoinWhenRightIsNull() {
216222
final List<MockProcessorContext.CapturedForward<? extends String, ? extends String>> forwarded = context.forwarded();
217223
assertThat(forwarded.size(), is(1));
218224
assertThat(forwarded.get(0).record(), is(new Record<>("lhs1", "(lhsValue,null)", 0)));
225+
226+
// test dropped-records sensors
227+
assertEquals(0.0, getDroppedRecordsTotalMetric(context));
219228
}
220229

221230
@Test
@@ -242,6 +251,9 @@ public void shouldEmitTombstoneForLeftJoinWhenRightIsNullAndLeftIsNull() {
242251
final List<MockProcessorContext.CapturedForward<? extends String, ? extends String>> forwarded = context.forwarded();
243252
assertThat(forwarded.size(), is(1));
244253
assertThat(forwarded.get(0).record(), is(new Record<>("lhs1", null, 0)));
254+
255+
// test dropped-records sensors
256+
assertEquals(0.0, getDroppedRecordsTotalMetric(context));
245257
}
246258

247259
static Object getDroppedRecordsTotalMetric(final InternalProcessorContext<String, ?> context) {

0 commit comments

Comments
 (0)