Skip to content

Commit df59b45

Browse files
committed
Only consume the last message once
The simple consumer repeatedly consumed the last message in a partition.
1 parent 2e58c30 commit df59b45

File tree

2 files changed

+3
-19
lines changed

2 files changed

+3
-19
lines changed

examples/simple-consumer.rb

+2-18
Original file line numberDiff line numberDiff line change
@@ -27,22 +27,6 @@
2727
logger: logger,
2828
)
2929

30-
begin
31-
offset = :latest
32-
partition = 0
33-
34-
loop do
35-
messages = kafka.fetch_messages(
36-
topic: topic,
37-
partition: partition,
38-
offset: offset
39-
)
40-
41-
messages.each do |message|
42-
puts message.value
43-
offset = message.offset + 1
44-
end
45-
end
46-
ensure
47-
kafka.close
30+
kafka.each_message(topic: topic) do |message|
31+
puts message.value
4832
end

lib/kafka/client.rb

+1-1
Original file line numberDiff line numberDiff line change
@@ -357,7 +357,7 @@ def each_message(topic:, start_from_beginning: true, max_wait_time: 5, min_bytes
357357

358358
batches.each do |batch|
359359
batch.messages.each(&block)
360-
offsets[batch.partition] = batch.last_offset
360+
offsets[batch.partition] = batch.last_offset + 1
361361
end
362362
end
363363
end

0 commit comments

Comments
 (0)