Skip to content

Commit cef0ea6

Browse files
authored
Ignore leading SECURITY_PROTOCOL:// in bootstrap_servers (#2608)
1 parent b6036f2 commit cef0ea6

File tree

4 files changed

+86
-78
lines changed

4 files changed

+86
-78
lines changed

kafka/cluster.py

+24-1
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,15 @@
33
import collections
44
import copy
55
import logging
6+
import random
7+
import re
68
import threading
79
import time
810

911
from kafka.vendor import six
1012

1113
from kafka import errors as Errors
12-
from kafka.conn import collect_hosts
14+
from kafka.conn import get_ip_port_afi
1315
from kafka.future import Future
1416
from kafka.structs import BrokerMetadata, PartitionMetadata, TopicPartition
1517

@@ -422,3 +424,24 @@ def with_partitions(self, partitions_to_add):
422424
def __str__(self):
423425
return 'ClusterMetadata(brokers: %d, topics: %d, coordinators: %d)' % \
424426
(len(self._brokers), len(self._partitions), len(self._coordinators))
427+
428+
429+
def collect_hosts(hosts, randomize=True):
430+
"""
431+
Collects a comma-separated set of hosts (host:port) and optionally
432+
randomize the returned list.
433+
"""
434+
435+
if isinstance(hosts, six.string_types):
436+
hosts = hosts.strip().split(',')
437+
438+
result = []
439+
for host_port in hosts:
440+
# ignore leading SECURITY_PROTOCOL:// to mimic java client
441+
host_port = re.sub('^.*://', '', host_port)
442+
host, port, afi = get_ip_port_afi(host_port)
443+
result.append((host, port, afi))
444+
445+
if randomize:
446+
random.shuffle(result)
447+
return result

kafka/conn.py

+1-27
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import errno
55
import io
66
import logging
7-
from random import shuffle, uniform
7+
from random import uniform
88

99
# selectors in stdlib as of py3.4
1010
try:
@@ -1496,32 +1496,6 @@ def get_ip_port_afi(host_and_port_str):
14961496
return host, port, af
14971497

14981498

1499-
def collect_hosts(hosts, randomize=True):
1500-
"""
1501-
Collects a comma-separated set of hosts (host:port) and optionally
1502-
randomize the returned list.
1503-
"""
1504-
1505-
if isinstance(hosts, six.string_types):
1506-
hosts = hosts.strip().split(',')
1507-
1508-
result = []
1509-
afi = socket.AF_INET
1510-
for host_port in hosts:
1511-
1512-
host, port, afi = get_ip_port_afi(host_port)
1513-
1514-
if port < 0:
1515-
port = DEFAULT_KAFKA_PORT
1516-
1517-
result.append((host, port, afi))
1518-
1519-
if randomize:
1520-
shuffle(result)
1521-
1522-
return result
1523-
1524-
15251499
def is_inet_4_or_6(gai):
15261500
"""Given a getaddrinfo struct, return True iff ipv4 or ipv6"""
15271501
return gai[0] in (socket.AF_INET, socket.AF_INET6)

test/test_cluster.py

+60-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
# pylint: skip-file
22
from __future__ import absolute_import
33

4-
from kafka.cluster import ClusterMetadata
4+
import socket
5+
6+
from kafka.cluster import ClusterMetadata, collect_hosts
57
from kafka.protocol.metadata import MetadataResponse
68

79

@@ -132,3 +134,60 @@ def test_metadata_v7():
132134
assert cluster.cluster_id == 'cluster-foo'
133135
assert cluster._partitions['topic-1'][0].offline_replicas == [12]
134136
assert cluster._partitions['topic-1'][0].leader_epoch == 0
137+
138+
139+
def test_collect_hosts__happy_path():
140+
hosts = "127.0.0.1:1234,127.0.0.1"
141+
results = collect_hosts(hosts)
142+
assert set(results) == set([
143+
('127.0.0.1', 1234, socket.AF_INET),
144+
('127.0.0.1', 9092, socket.AF_INET),
145+
])
146+
147+
148+
def test_collect_hosts__ipv6():
149+
hosts = "[localhost]:1234,[2001:1000:2000::1],[2001:1000:2000::1]:1234"
150+
results = collect_hosts(hosts)
151+
assert set(results) == set([
152+
('localhost', 1234, socket.AF_INET6),
153+
('2001:1000:2000::1', 9092, socket.AF_INET6),
154+
('2001:1000:2000::1', 1234, socket.AF_INET6),
155+
])
156+
157+
158+
def test_collect_hosts__string_list():
159+
hosts = [
160+
'localhost:1234',
161+
'localhost',
162+
'[localhost]',
163+
'2001::1',
164+
'[2001::1]',
165+
'[2001::1]:1234',
166+
]
167+
results = collect_hosts(hosts)
168+
assert set(results) == set([
169+
('localhost', 1234, socket.AF_UNSPEC),
170+
('localhost', 9092, socket.AF_UNSPEC),
171+
('localhost', 9092, socket.AF_INET6),
172+
('2001::1', 9092, socket.AF_INET6),
173+
('2001::1', 9092, socket.AF_INET6),
174+
('2001::1', 1234, socket.AF_INET6),
175+
])
176+
177+
178+
def test_collect_hosts__with_spaces():
179+
hosts = "localhost:1234, localhost"
180+
results = collect_hosts(hosts)
181+
assert set(results) == set([
182+
('localhost', 1234, socket.AF_UNSPEC),
183+
('localhost', 9092, socket.AF_UNSPEC),
184+
])
185+
186+
187+
def test_collect_hosts__protocol():
188+
hosts = "SASL_SSL://foo.bar:1234,SASL_SSL://fizz.buzz:5678"
189+
results = collect_hosts(hosts)
190+
assert set(results) == set([
191+
('foo.bar', 1234, socket.AF_UNSPEC),
192+
('fizz.buzz', 5678, socket.AF_UNSPEC),
193+
])

test/test_conn.py

+1-49
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
import mock
1111
import pytest
1212

13-
from kafka.conn import BrokerConnection, ConnectionStates, collect_hosts
13+
from kafka.conn import BrokerConnection, ConnectionStates
1414
from kafka.future import Future
1515
from kafka.protocol.api import RequestHeader
1616
from kafka.protocol.group import HeartbeatResponse
@@ -280,54 +280,6 @@ def test_close(conn):
280280
pass # TODO
281281

282282

283-
def test_collect_hosts__happy_path():
284-
hosts = "127.0.0.1:1234,127.0.0.1"
285-
results = collect_hosts(hosts)
286-
assert set(results) == set([
287-
('127.0.0.1', 1234, socket.AF_INET),
288-
('127.0.0.1', 9092, socket.AF_INET),
289-
])
290-
291-
292-
def test_collect_hosts__ipv6():
293-
hosts = "[localhost]:1234,[2001:1000:2000::1],[2001:1000:2000::1]:1234"
294-
results = collect_hosts(hosts)
295-
assert set(results) == set([
296-
('localhost', 1234, socket.AF_INET6),
297-
('2001:1000:2000::1', 9092, socket.AF_INET6),
298-
('2001:1000:2000::1', 1234, socket.AF_INET6),
299-
])
300-
301-
302-
def test_collect_hosts__string_list():
303-
hosts = [
304-
'localhost:1234',
305-
'localhost',
306-
'[localhost]',
307-
'2001::1',
308-
'[2001::1]',
309-
'[2001::1]:1234',
310-
]
311-
results = collect_hosts(hosts)
312-
assert set(results) == set([
313-
('localhost', 1234, socket.AF_UNSPEC),
314-
('localhost', 9092, socket.AF_UNSPEC),
315-
('localhost', 9092, socket.AF_INET6),
316-
('2001::1', 9092, socket.AF_INET6),
317-
('2001::1', 9092, socket.AF_INET6),
318-
('2001::1', 1234, socket.AF_INET6),
319-
])
320-
321-
322-
def test_collect_hosts__with_spaces():
323-
hosts = "localhost:1234, localhost"
324-
results = collect_hosts(hosts)
325-
assert set(results) == set([
326-
('localhost', 1234, socket.AF_UNSPEC),
327-
('localhost', 9092, socket.AF_UNSPEC),
328-
])
329-
330-
331283
def test_lookup_on_connect():
332284
hostname = 'example.org'
333285
port = 9092

0 commit comments

Comments
 (0)