Skip to content

Commit 0f7f45a

Browse files
Implement RdKafka getAssignment Method
1 parent 8cdbc1b commit 0f7f45a

File tree

2 files changed

+83
-0
lines changed

2 files changed

+83
-0
lines changed

pkg/rdkafka/RdKafkaConsumer.php

+17
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
use Interop\Queue\Queue;
1111
use RdKafka\KafkaConsumer;
1212
use RdKafka\TopicPartition;
13+
use RdKafka\Exception as RdKafkaException;
1314

1415
class RdKafkaConsumer implements Consumer
1516
{
@@ -88,6 +89,22 @@ public function getQueue(): Queue
8889
return $this->topic;
8990
}
9091

92+
/**
93+
* @return RdKafkaTopic[]
94+
*/
95+
public function getAssignment(): array
96+
{
97+
try {
98+
return array_map(function (TopicPartition $partition) {
99+
$topic = new RdKafkaTopic($partition->getTopic());
100+
$topic->setPartition($partition->getPartition());
101+
return $topic;
102+
}, $this->consumer->getAssignment());
103+
} catch (RdKafkaException) {
104+
return [];
105+
}
106+
}
107+
91108
/**
92109
* @return RdKafkaMessage
93110
*/

pkg/rdkafka/Tests/RdKafkaConsumerTest.php

+66
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
use PHPUnit\Framework\TestCase;
1111
use RdKafka\KafkaConsumer;
1212
use RdKafka\Message;
13+
use RdKafka\TopicPartition;
14+
use RdKafka\Exception as RdKafkaException;
1315

1416
class RdKafkaConsumerTest extends TestCase
1517
{
@@ -258,6 +260,62 @@ public function testShouldAllowGetPreviouslySetSerializer()
258260
$this->assertSame($expectedSerializer, $consumer->getSerializer());
259261
}
260262

263+
public function testShouldGetAssignmentWhenThereAreNoPartitions(): void
264+
{
265+
$rdKafka = $this->createKafkaConsumerMock();
266+
$rdKafka->expects($this->once())
267+
->method('getAssignment')
268+
->willReturn([]);
269+
270+
$consumer = new RdKafkaConsumer(
271+
$rdKafka,
272+
$this->createContextMock(),
273+
new RdKafkaTopic(''),
274+
$this->createSerializerMock()
275+
);
276+
277+
$this->assertEquals([], $consumer->getAssignment());
278+
}
279+
280+
public function testShouldGetAssignmentWhenThereArePartitions(): void
281+
{
282+
$partition = new TopicPartition('', 0);
283+
284+
$rdKafka = $this->createKafkaConsumerMock();
285+
$rdKafka->expects($this->once())
286+
->method('getAssignment')
287+
->willReturn([$partition]);
288+
289+
$consumer = new RdKafkaConsumer(
290+
$rdKafka,
291+
$this->createContextMock(),
292+
new RdKafkaTopic(''),
293+
$this->createSerializerMock()
294+
);
295+
296+
$expected = new RdKafkaTopic('');
297+
$expected->setPartition(0);
298+
299+
$this->assertEquals([$expected], $consumer->getAssignment());
300+
}
301+
302+
public function testShouldGetAssignmentReturnEmptyArrayWhenThrowException(): void
303+
{
304+
$rdKafka = $this->createKafkaConsumerMock();
305+
$rdKafka->expects($this->once())
306+
->method('getAssignment')
307+
->willThrowException($this->createExceptionMock());
308+
309+
$consumer = new RdKafkaConsumer(
310+
$rdKafka,
311+
$this->createContextMock(),
312+
new RdKafkaTopic(''),
313+
$this->createSerializerMock()
314+
);
315+
316+
$this->assertEquals([], $consumer->getAssignment());
317+
}
318+
261319
/**
262320
* @return \PHPUnit\Framework\MockObject\MockObject|KafkaConsumer
263321
*/
@@ -281,4 +339,12 @@ private function createSerializerMock()
281339
{
282340
return $this->createMock(Serializer::class);
283341
}
342+
343+
/**
344+
* @return \PHPUnit\Framework\MockObject\MockObject|RdKafkaException
345+
*/
346+
private function createExceptionMock()
347+
{
348+
return $this->createMock(RdKafkaException::class);
349+
}
284350
}

0 commit comments

Comments
 (0)