Skip to content

Commit 35f5c36

Browse files
committed
Add internal poll to consumer.position() (#2696)
1 parent 3250395 commit 35f5c36

File tree

2 files changed

+27
-6
lines changed

2 files changed

+27
-6
lines changed

kafka/consumer/group.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -755,13 +755,12 @@ def position(self, partition, timeout_ms=None):
755755

756756
timer = Timer(timeout_ms)
757757
position = self._subscription.assignment[partition].position
758-
while position is None:
758+
while position is None and not timer.expired:
759759
# batch update fetch positions for any partitions without a valid position
760-
if self._update_fetch_positions(timeout_ms=timer.timeout_ms):
761-
position = self._subscription.assignment[partition].position
762-
if timer.expired:
763-
return None
764-
else:
760+
self._update_fetch_positions(timeout_ms=timer.timeout_ms)
761+
self._client.poll(timeout_ms=timer.timeout_ms)
762+
position = self._subscription.assignment[partition].position
763+
if position is not None:
765764
return position.offset
766765

767766
def highwater(self, partition):

test/integration/test_consumer_integration.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,3 +302,25 @@ def test_kafka_consumer_offsets_for_times_errors(kafka_consumer_factory, topic):
302302

303303
with pytest.raises(KafkaTimeoutError):
304304
consumer.offsets_for_times({bad_tp: 0})
305+
306+
307+
@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
308+
def test_kafka_consumer_position_after_seek_to_end(kafka_consumer_factory, topic, send_messages):
309+
send_messages(range(0, 10), partition=0)
310+
311+
# Start a consumer with manual partition assignment.
312+
consumer = kafka_consumer_factory(
313+
topics=(),
314+
group_id=None,
315+
enable_auto_commit=False,
316+
)
317+
tp = TopicPartition(topic, 0)
318+
consumer.assign([tp])
319+
320+
# Seek to the end of the partition, and call position() to synchronize the
321+
# partition's offset without calling poll().
322+
consumer.seek_to_end(tp)
323+
position = consumer.position(tp, timeout_ms=1000)
324+
325+
# Verify we got the expected position
326+
assert position == 10, f"Expected position 10, got {position}"

0 commit comments

Comments
 (0)