Skip to content

Commit 2cf71a3

Browse files
committed
Check stream message is confirmed
1 parent 1d55bb5 commit 2cf71a3

File tree

3 files changed

+12
-6
lines changed

3 files changed

+12
-6
lines changed

Diff for: go-stream/offset_tracking_receive.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ func main() {
4040
if string(message.GetData()) == "marker" {
4141
lastOffset.Store(consumerContext.Consumer.GetOffset())
4242
_ = consumerContext.Consumer.StoreOffset()
43-
_ = consumerContext.Consumer.Close()
43+
_ = consumerContext.Consumer.Close()
4444
ch <- true
4545
}
4646
}

Diff for: go-stream/offset_tracking_send.go

+6-4
Original file line numberDiff line numberDiff line change
@@ -54,10 +54,12 @@ func handlePublishConfirm(confirms stream.ChannelPublishConfirm, messageCount in
5454
go func() {
5555
confirmedCount := 0
5656
for confirmed := range confirms {
57-
for _, _ = range confirmed {
58-
confirmedCount++
59-
if confirmedCount == messageCount {
60-
ch <- true
57+
for _, msg := range confirmed {
58+
if msg.IsConfirmed() {
59+
confirmedCount++
60+
if confirmedCount == messageCount {
61+
ch <- true
62+
}
6163
}
6264
}
6365
}

Diff for: java-stream-mvn/src/main/java/OffsetTrackingSend.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,11 @@ public static void main(String[] args) throws InterruptedException {
2323
IntStream.range(0, messageCount).forEach(i -> {
2424
String body = i == messageCount - 1 ? "marker" : "hello";
2525
producer.send(producer.messageBuilder().addData(body.getBytes(UTF_8)).build(),
26-
ctx -> confirmedLatch.countDown());
26+
ctx -> {
27+
if (ctx.isConfirmed()) {
28+
confirmedLatch.countDown();
29+
}
30+
});
2731
});
2832

2933
boolean completed = confirmedLatch.await(60, TimeUnit.SECONDS);

0 commit comments

Comments
 (0)