Skip to content

Commit 36bdcfb

Browse files
committed
Add round robin partitioner
1 parent 1862620 commit 36bdcfb

File tree

3 files changed

+68
-3
lines changed

3 files changed

+68
-3
lines changed

aiokafka/partitioner.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
1+
import logging
12
import random
23

4+
log = logging.getLogger(__name__)
5+
36

47
class DefaultPartitioner:
58
"""Default partitioner.
@@ -94,3 +97,37 @@ def murmur2(data):
9497
h &= 0xFFFFFFFF
9598

9699
return h
100+
101+
102+
class RoundRobinPartitioner:
103+
"""
104+
Round robin partitioner.
105+
106+
If partial ordering is not needed in your use case, round robin partitioning
107+
achieves a more even distribution of messages across partitions and enables
108+
a higher rate of consumption through a more uniform temporal spacing between
109+
messages in a single partition.
110+
"""
111+
112+
def __init__(self):
113+
self._index = 0
114+
115+
def __call__(self, key, all_partitions, available_partitions):
116+
"""
117+
Get the next partition according to the round robin algorithm.
118+
:param key: partitioning key, expects `None`
119+
:param all_partitions: list of all partitions
120+
:param available_partitions: list of available partitions
121+
:return: one of the values from available_partitions or all_partitions
122+
"""
123+
if key:
124+
log.warning(
125+
"Partitioning key is ignored by RoundRobinPartitioner - "
126+
"use DefaultPartitioner instead."
127+
)
128+
partitions = available_partitions or all_partitions
129+
if self._index >= len(partitions):
130+
self._index = 0
131+
partition = partitions[self._index]
132+
self._index += 1
133+
return partition

benchmark/simple_produce_bench.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
from collections import Counter
66
import random
77

8+
from aiokafka.partitioner import DefaultPartitioner, RoundRobinPartitioner
9+
810

911
class Benchmark:
1012

@@ -26,9 +28,14 @@ def __init__(self, args):
2628
else:
2729
self._is_transactional = False
2830

31+
if args.partitioner == "default":
32+
self._producer_kwargs["partitioner"] = DefaultPartitioner()
33+
elif args.partitioner == "round-robin":
34+
self._producer_kwargs["partitioner"] = RoundRobinPartitioner()
35+
2936
self.transaction_size = args.transaction_size
3037

31-
self._partition = args.partition
38+
self._partition = args.partition if args.partition != -1 else None
3239
self._stats_interval = 1
3340
self._stats = [Counter()]
3441

@@ -117,7 +124,7 @@ def parse_args():
117124
help='Topic to produce messages to. Default {default}.')
118125
parser.add_argument(
119126
'--partition', type=int, default=0,
120-
help='Partition to produce messages to. Default {default}.')
127+
help='Partition to produce messages to. Default {default}. Set to -1 to omit.')
121128
parser.add_argument(
122129
'--uvloop', action='store_true',
123130
help='Use uvloop instead of asyncio default loop.')
@@ -130,6 +137,9 @@ def parse_args():
130137
parser.add_argument(
131138
'--transaction-size', type=int, default=100,
132139
help='Number of messages in transaction')
140+
parser.add_argument(
141+
'--partitioner', type=str, default="default", choices=["default", "round-robin"],
142+
help='Partitioner, either `default` or `round-robin`')
133143
return parser.parse_args()
134144

135145

tests/test_partitioner.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import pytest
22

3-
from aiokafka.partitioner import DefaultPartitioner, murmur2
3+
from aiokafka.partitioner import DefaultPartitioner, RoundRobinPartitioner, murmur2
44

55

66
def test_default_partitioner():
@@ -41,3 +41,21 @@ def test_murmur2_not_ascii():
4141
# Verify no regression of murmur2() bug encoding py2 bytes that don't ascii encode
4242
murmur2(b"\xa4")
4343
murmur2(b"\x81" * 1000)
44+
45+
46+
def test_round_robin_partitioner():
47+
partitioner = RoundRobinPartitioner()
48+
49+
all_partitions = available_partitions = list(range(2))
50+
assert partitioner(None, all_partitions, available_partitions) == 0
51+
assert partitioner(None, all_partitions, available_partitions) == 1
52+
assert partitioner(None, all_partitions, available_partitions) == 0
53+
assert partitioner(None, all_partitions, available_partitions) == 1
54+
55+
all_partitions = available_partitions = list(range(4))
56+
assert partitioner(None, all_partitions, available_partitions) == 2
57+
assert partitioner(None, all_partitions, available_partitions) == 3
58+
59+
all_partitions = [50]
60+
available_partitions = [70]
61+
assert partitioner(None, all_partitions, available_partitions) == 70

0 commit comments

Comments
 (0)