Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ jobs:
- "3.9.0"
- "4.0.0"
python:
- "3.13"
- "3.14"
include:
#- python: "pypy3.9"
# kafka: "2.6.0"
Expand All @@ -50,6 +50,8 @@ jobs:
kafka: "4.0.0"
- python: "3.12"
kafka: "4.0.0"
- python: "3.13"
kafka: "4.0.0"

steps:
- uses: actions/checkout@v6
Expand Down
3 changes: 2 additions & 1 deletion docs/compatibility.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ Although kafka-python is tested and expected to work on recent broker versions,
not all features are supported. Please see github open issues for feature tracking.
PRs welcome!

kafka-python is tested on python 2.7, and 3.8-3.13.
kafka-python is tested on python 3.8-3.14.
python 2.7 was supported through kafka-python release 2.3.

Builds and tests via Github Actions Workflows. See https://github.com/dpkp/kafka-python/actions
10 changes: 1 addition & 9 deletions kafka/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from __future__ import absolute_import

__title__ = 'kafka'
from kafka.version import __version__
__author__ = 'Dana Powers'
Expand All @@ -8,14 +6,8 @@

# Set default logging handler to avoid "No handler found" warnings.
import logging
try: # Python 2.7+
from logging import NullHandler
except ImportError:
class NullHandler(logging.Handler):
def emit(self, record):
pass

logging.getLogger(__name__).addHandler(NullHandler())
logging.getLogger(__name__).addHandler(logging.NullHandler())


from kafka.admin import KafkaAdminClient
Expand Down
2 changes: 0 additions & 2 deletions kafka/admin/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from __future__ import absolute_import

from kafka.admin.config_resource import ConfigResource, ConfigResourceType
from kafka.admin.client import KafkaAdminClient
from kafka.admin.acl_resource import (ACL, ACLFilter, ResourcePattern, ResourcePatternFilter, ACLOperation,
Expand Down
2 changes: 0 additions & 2 deletions kafka/admin/__main__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from __future__ import absolute_import

import sys

from kafka.cli.admin import run_cli
Expand Down
9 changes: 1 addition & 8 deletions kafka/admin/acl_resource.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,4 @@
from __future__ import absolute_import

# enum in stdlib as of py3.4
try:
from enum import IntEnum # pylint: disable=import-error
except ImportError:
# vendored backport module
from kafka.vendor.enum34 import IntEnum
from enum import IntEnum

from kafka.errors import IllegalArgumentError

Expand Down
20 changes: 4 additions & 16 deletions kafka/admin/client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from __future__ import absolute_import, division

from collections import defaultdict
import copy
import itertools
Expand All @@ -8,7 +6,6 @@
import time

from . import ConfigResourceType
from kafka.vendor import six

from kafka.admin.acl_resource import ACLOperation, ACLPermissionType, ACLFilter, ACL, ResourcePattern, ResourceType, \
ACLResourcePatternType, valid_acl_operations
Expand Down Expand Up @@ -122,8 +119,7 @@ class KafkaAdminClient(object):
ssl_crlfile (str): Optional filename containing the CRL to check for
certificate expiration. By default, no CRL check is done. When
providing a file, only the leaf certificate will be checked against
this CRL. The CRL can only be checked with Python 3.4+ or 2.7.9+.
Default: None.
this CRL. Default: None.
api_version (tuple): Specify which Kafka API version to use. If set
to None, KafkaClient will attempt to infer the broker version by
probing various APIs. Example: (0, 10, 2). Default: None
Expand Down Expand Up @@ -420,11 +416,7 @@ def _send_request_to_controller(self, request):
raise RuntimeError("This should never happen, please file a bug with full stacktrace if encountered")

def _parse_topic_request_response(self, topic_error_tuples, request, response, tries):
# Also small py2/py3 compatibility -- py3 can ignore extra values
# during unpack via: for x, y, *rest in list_of_values. py2 cannot.
# So for now we have to map across the list and explicitly drop any
# extra values (usually the error_message)
for topic, error_code in map(lambda e: e[:2], topic_error_tuples):
for topic, error_code, *_ in topic_error_tuples:
error_type = Errors.for_code(error_code)
if tries and error_type is Errors.NotControllerError:
# No need to inspect the rest of the errors for
Expand All @@ -439,12 +431,8 @@ def _parse_topic_request_response(self, topic_error_tuples, request, response, t
return True

def _parse_topic_partition_request_response(self, request, response, tries):
# Also small py2/py3 compatibility -- py3 can ignore extra values
# during unpack via: for x, y, *rest in list_of_values. py2 cannot.
# So for now we have to map across the list and explicitly drop any
# extra values (usually the error_message)
for topic, partition_results in response.replication_election_results:
for partition_id, error_code in map(lambda e: e[:2], partition_results):
for partition_id, error_code, *_ in partition_results:
error_type = Errors.for_code(error_code)
if tries and error_type is Errors.NotControllerError:
# No need to inspect the rest of the errors for
Expand Down Expand Up @@ -1418,7 +1406,7 @@ def _list_consumer_group_offsets_request(self, group_id, partitions=None):
topics_partitions_dict = defaultdict(set)
for topic, partition in partitions:
topics_partitions_dict[topic].add(partition)
topics_partitions = list(six.iteritems(topics_partitions_dict))
topics_partitions = list(topics_partitions_dict.items())
return OffsetFetchRequest[version](group_id, topics_partitions)

def _list_consumer_group_offsets_process_response(self, response):
Expand Down
9 changes: 1 addition & 8 deletions kafka/admin/config_resource.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,4 @@
from __future__ import absolute_import

# enum in stdlib as of py3.4
try:
from enum import IntEnum # pylint: disable=import-error
except ImportError:
# vendored backport module
from kafka.vendor.enum34 import IntEnum
from enum import IntEnum


class ConfigResourceType(IntEnum):
Expand Down
3 changes: 0 additions & 3 deletions kafka/admin/new_partitions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
from __future__ import absolute_import


class NewPartitions(object):
"""A class for new partition creation on existing topics. Note that the length of new_assignments, if specified,
must be the difference between the new total number of partitions and the existing number of partitions.
Expand Down
3 changes: 0 additions & 3 deletions kafka/admin/new_topic.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
from __future__ import absolute_import


class NewTopic(object):
""" A class for new topic creation
Arguments:
Expand Down
2 changes: 0 additions & 2 deletions kafka/benchmarks/consumer_performance.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
#!/usr/bin/env python
# Adapted from https://github.com/mrafayaleem/kafka-jython

from __future__ import absolute_import, print_function

import argparse
import pprint
import sys
Expand Down
1 change: 0 additions & 1 deletion kafka/benchmarks/load_example.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#!/usr/bin/env python
from __future__ import print_function

import argparse
import logging
Expand Down
4 changes: 0 additions & 4 deletions kafka/benchmarks/producer_performance.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
#!/usr/bin/env python
# Adapted from https://github.com/mrafayaleem/kafka-jython

from __future__ import absolute_import, print_function

import argparse
import pprint
import sys
import threading
import time
import traceback

from kafka.vendor.six.moves import range

from kafka import KafkaProducer


Expand Down
1 change: 0 additions & 1 deletion kafka/benchmarks/record_batch_compose.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#!/usr/bin/env python3
from __future__ import print_function
import hashlib
import itertools
import os
Expand Down
1 change: 0 additions & 1 deletion kafka/benchmarks/record_batch_read.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#!/usr/bin/env python
from __future__ import print_function
import hashlib
import itertools
import os
Expand Down
30 changes: 11 additions & 19 deletions kafka/benchmarks/varint_speed.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
#!/usr/bin/env python
from __future__ import print_function
import pyperf
from kafka.vendor import six


test_data = [
Expand Down Expand Up @@ -114,7 +112,10 @@ def encode_varint_1(num):
return buf[:i + 1]


def encode_varint_2(value, int2byte=six.int2byte):
def int2byte(i):
return bytes((i,))

def encode_varint_2(value):
value = (value << 1) ^ (value >> 63)

bits = value & 0x7f
Expand All @@ -141,7 +142,7 @@ def encode_varint_3(value, buf):
return value


def encode_varint_4(value, int2byte=six.int2byte):
def encode_varint_4(value):
value = (value << 1) ^ (value >> 63)

if value <= 0x7f: # 1 byte
Expand Down Expand Up @@ -269,22 +270,13 @@ def size_of_varint_2(value):
return 10


if six.PY3:
def _read_byte(memview, pos):
""" Read a byte from memoryview as an integer

Raises:
IndexError: if position is out of bounds
"""
return memview[pos]
else:
def _read_byte(memview, pos):
""" Read a byte from memoryview as an integer
def _read_byte(memview, pos):
""" Read a byte from memoryview as an integer

Raises:
IndexError: if position is out of bounds
"""
return ord(memview[pos])
Raises:
IndexError: if position is out of bounds
"""
return memview[pos]


def decode_varint_1(buffer, pos=0):
Expand Down
2 changes: 0 additions & 2 deletions kafka/cli/admin/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from __future__ import absolute_import

import argparse
import json
import logging
Expand Down
2 changes: 0 additions & 2 deletions kafka/cli/admin/cluster/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from __future__ import absolute_import

import sys

from .describe import DescribeCluster
Expand Down
3 changes: 0 additions & 3 deletions kafka/cli/admin/cluster/describe.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
from __future__ import absolute_import


class DescribeCluster:

@classmethod
Expand Down
2 changes: 0 additions & 2 deletions kafka/cli/admin/configs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from __future__ import absolute_import

import sys

from .describe import DescribeConfigs
Expand Down
2 changes: 0 additions & 2 deletions kafka/cli/admin/configs/describe.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from __future__ import absolute_import

from kafka.admin.config_resource import ConfigResource


Expand Down
2 changes: 0 additions & 2 deletions kafka/cli/admin/consumer_groups/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from __future__ import absolute_import

import sys

from .delete import DeleteConsumerGroups
Expand Down
3 changes: 0 additions & 3 deletions kafka/cli/admin/consumer_groups/delete.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
from __future__ import absolute_import


class DeleteConsumerGroups:

@classmethod
Expand Down
3 changes: 0 additions & 3 deletions kafka/cli/admin/consumer_groups/describe.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
from __future__ import absolute_import


class DescribeConsumerGroups:

@classmethod
Expand Down
3 changes: 0 additions & 3 deletions kafka/cli/admin/consumer_groups/list.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
from __future__ import absolute_import


class ListConsumerGroups:

@classmethod
Expand Down
3 changes: 0 additions & 3 deletions kafka/cli/admin/consumer_groups/list_offsets.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
from __future__ import absolute_import


class ListConsumerGroupOffsets:

@classmethod
Expand Down
2 changes: 0 additions & 2 deletions kafka/cli/admin/log_dirs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from __future__ import absolute_import

import sys

from .describe import DescribeLogDirs
Expand Down
3 changes: 0 additions & 3 deletions kafka/cli/admin/log_dirs/describe.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
from __future__ import absolute_import


class DescribeLogDirs:

@classmethod
Expand Down
2 changes: 0 additions & 2 deletions kafka/cli/admin/topics/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from __future__ import absolute_import

import sys

from .create import CreateTopic
Expand Down
2 changes: 0 additions & 2 deletions kafka/cli/admin/topics/create.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from __future__ import absolute_import

from kafka.admin.new_topic import NewTopic


Expand Down
3 changes: 0 additions & 3 deletions kafka/cli/admin/topics/delete.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
from __future__ import absolute_import


class DeleteTopic:

@classmethod
Expand Down
3 changes: 0 additions & 3 deletions kafka/cli/admin/topics/describe.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
from __future__ import absolute_import


class DescribeTopics:

@classmethod
Expand Down
3 changes: 0 additions & 3 deletions kafka/cli/admin/topics/list.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
from __future__ import absolute_import


class ListTopics:

@classmethod
Expand Down
2 changes: 0 additions & 2 deletions kafka/cli/consumer/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from __future__ import absolute_import, print_function

import argparse
import logging

Expand Down
Loading
Loading