Skip to content

Conversation

peter-lucia
Copy link

@peter-lucia peter-lucia commented Aug 29, 2025

What

Before this change:

  • No producer.close() method existed for producers.

After this change:

  • Adds producer.close() to enable the producer to be shut down on demand.
  • Unit tests updated to validate producer.close() works as intended.
  • Developer readme updated with instructions on how to setup a local environment for testing using uv and python 3.11

Checklist

  • Contains customer facing changes? Including API/behavior changes
  • Did you add sufficient unit test and/or integration test coverage for this PR?
    • If not, please explain why it is not required

Test & Review

  • Unit tests updated and passing

Other Information

  • Other libraries such as kafka-python also give the option to close the producer, though the internals of such libraries are different.
  • The consumer currently has a close() method already available.

@Copilot Copilot AI review requested due to automatic review settings August 29, 2025 23:35
@peter-lucia peter-lucia requested review from MSeal and a team as code owners August 29, 2025 23:35
@confluent-cla-assistant
Copy link

confluent-cla-assistant bot commented Aug 29, 2025

🎉 All Contributor License Agreements have been signed. Ready to merge.
✅ peter-lucia
Please push an empty commit if you would like to re-run the checks to verify CLA status for all contributors.

Copy link

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

Adds a close() method to the Kafka Producer class to enable explicit cleanup of producer resources. This change supports proper resource management, live credential rotation use cases, and prevents memory leaks by allowing users to explicitly destroy producer instances.

  • Implements Producer_close() function with proper resource cleanup
  • Adds method definition to Producer_methods array with documentation

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

@peter-lucia peter-lucia changed the title Work in Progress - Add close() to producer Add close() to producer Sep 3, 2025
@emasab
Copy link
Contributor

emasab commented Sep 4, 2025

Hi @peter-lucia thanks for the PR and the checks.
A producer close isn't necessary as long as there are no network calls to do before destroying. Given you can flush before deallocating the producer, and in case you don't do that messages are discarded, there were no calls to do, but we've to check the recently introduced telemetry push.

Going through the comments:

The RD_KAFKA_DESTROY_F_IMMEDIATE doesn't seem a problem as it's not used in Producer_dealloc. Yes, it leaks some memory, but it should be used only when you have to terminate an application immediately.

PyObject_GC_UnTrack(self)' isn't disabling the GC but it's removing self` from GC tracking, so it avoids cycles that can prevent the GC.

librdkafka internally has a reference count for some C objects so even if you call destroy in a different place and there are references left it would hang, so we have any of those problem before any API change.

rkid is ever increasing and that's OK, it's meant to tell a producer instance from a different one in the logs.

rd_kafka_global_cnt should reach zero if you force the GC

I tried reproducing it with your example but couldn't, this is the code I tried:

import gc
import sys
import json
from time import sleep
from confluent_kafka import Producer

class Kafka:
    def __init__(self, conf):
        def error_cb(*args, **kwargs):
            print(args, kwargs)

        self.conf = {
            'socket.timeout.ms': 10,
            'error_cb': error_cb,
            'message.timeout.ms': 10
            ,**conf
        }
        self.producer = Producer(self.conf)

    def recreate_producer(self):
        print('Create new producer')
        prior_producer = self.producer
        prior_producer.flush(10)
        # prior_producer.close() # This would be preferred
        del prior_producer
        gc.collect()  # technically redundant, since garbage collection will happen after the method returns
        self.producer = Producer(self.conf)

while True:
    handler = Kafka({"bootstrap.servers": sys.argv[1]})
    msg = {"test": "test"}
    handler.producer.produce(json.dumps(msg))
    handler.recreate_producer()
    sleep(0.5)

A simple and reliable way to see if there are active librdkafka instances it to check process threads, if they're growing, but after many iterations they're the same:

ps -T -p $(pgrep -f reproduce)
    PID    SPID TTY          TIME CMD
3739260 3739260 pts/13   00:00:18 python
3739260 3819989 pts/13   00:00:00 rdk:main
3739260 3819990 pts/13   00:00:00 rdk:broker-1
3739260 3819992 pts/13   00:00:00 rdk:broker0

If you find some changes to the reproducer code that make it happen please tell us or if it needs to be run for a longer time.

@peter-lucia
Copy link
Author

peter-lucia commented Sep 4, 2025

Hi @emasab,

Thank you for taking a look at this PR and going through the investigation details. The early investigatory writings around librdkafka internals were included here to detail out background on what's happening under the hood in librdkafka and how that relates to the python package as we were digging into it. They were intended to document possible reasons for why producer resources could be living on longer than they should be. Related to this is how memory leaks could happen, or how the python garbage collection against the producer object is adjusted.

Your explanations and comments on our investigation notes generally match our latest understanding as we are digging into this. Going through them:

The RD_KAFKA_DESTROY_F_IMMEDIATE doesn't seem a problem as it's not used in Producer_dealloc. Yes, it leaks some memory, but it should be used only when you have to terminate an application immediately.

Yes, initially it was more of a concern but given that it doesn't appear to be used by the dealloc, we're in agreement here that while it leaks memory, as long as it is not used now or in the future, it should be okay.

PyObject_GC_UnTrack(self)' isn't disabling the GC but it's removing self` from GC tracking, so it avoids cycles that can prevent the GC.

Yes, we didn't mean that it was disabling the GC entirely (if so, we'd have bigger issues!). We're aligned on this: the garbage collector is disabled for the Producer object (self).

Follow up on this -- the dealloc method doesn't confirm the kafka objects are destroyed. The call to wait before they are confirmed to be destroyed can block up until the provided timeout, so we could see why that might not have been done as dealloc's won't generally have timeouts provided -- a default would have to be provided which might not apply to all use cases. For the most certainty during app-initiated cleanup operations where timing is important, a confirmation that the cleanup was actually successful is helpful, which is why we added it to Producer.close().

rkid is ever increasing and that's OK, it's meant to tell a producer instance from a different one in the logs.

Yes, makes sense.

rd_kafka_global_cnt should reach zero if you force the GC

The newly added unit test test_producer_close exposes the issue we're seeing with only relying on GC. It will fail when there are lingering producer resources not cleaned up by other tests. This calls into question when/if the dealloc is actually being called or whether it's actually running successfully. More deterministic behavior around this would be helpful, motivating adding producer.close().

Can you reproduce this by running the unit tests but commenting out the p.close() calls we've added in all other unit tests except for test_producer_close? This will cause test_producer_close to fail. You might have to run a few times to reproduce the failure. We had to run a couple times today to reproduce it again. Our adjustments to the other tests in this PR ensure test_producer_close passes more reliably, but we might want to look at adding p.close() in even more of the producer unit tests to be 100% sure the resources are always cleaned up.

Expanding on this:

One of the additions to producer.close() is a call to rd_kafka_wait_destroyed, which will block until all the underlying kafka objects can be confirmed to be destroyed or until the default (5s) or provided timeout is reached. This, combined with the on demand availability of cleaning up the producer, would provide more deterministic behavior, especially for context managers (e.g. __exit__ clause), or when the producer needs to be fully deleted because credentials are expiring, or there is uncertainty about recent changes (e.g. open telemetry) that could cause the producer to live on. This enables the producer to have the same capabilities as a Consumer, which also has a close() method, but with added clarity on whether or not all underlying kafka objects are actually destroyed.

rd_kafka_wait_destroyed will return 0 if all kafka object are destroyed. This causes the producer.close() method to return True.
rd_kafka_wait_destroyed will return -1 if not all kafka objects are destroyed when the timeout was reached. This causes the python producer.close() method to return False

Please let us know if you can reproduce our findings on your end through the unit testing or if you see any discrepancies. We will look into the additional mechanisms of detecting lingering librdkafka instances but believe the rd_kafka_wait_destroyed method should also be sufficient.

@emasab
Copy link
Contributor

emasab commented Sep 5, 2025

Some observations:

  • which version are you testing? There are some fixes that could block client destruction
  • there's need for a test to reproduce it before any change
  • you cannot call rd_kafka_wait_destroyed because it awaits all instances are destroyed and that's not what's needed you could have a consumer and a producer and only deallocate the producer and create a new one while the consumer stays active

@peter-lucia
Copy link
Author

peter-lucia commented Sep 5, 2025

  • which version are you testing? There are some fixes that could block client destruction

We're testing w/ 2.11.1. Python 3.11

  • there's need for a test to reproduce it before any change

Yes the unit tests are the easiest way to do with the changes that are in this branch. Here's what I was describing

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import gc
import json
import pytest
from confluent_kafka import Producer, KafkaError, KafkaException, \
    TopicPartition, libversion
from struct import pack
from time import sleep

from tests.common import TestConsumer


def error_cb(err):
    print('error_cb', err)


class MockHandler:
    def __init__(self, *args, **kwargs):
        self.conf = {
            'debug': 'all',
            'socket.timeout.ms': 10,
            'error_cb': error_cb,
            'message.timeout.ms': 10
        }
        self.producer = Producer(self.conf)

    def recreate_producer(self, timeout: float) -> bool:
        prior_producer = self.producer
        prior_producer.flush(10)
        destroyed = prior_producer.close(timeout=timeout)
        self.producer = Producer(self.conf)
        return destroyed


def test_basic_api():
    """ Basic API tests, these wont really do anything since there is no
        broker configured. """

    with pytest.raises(TypeError) as ex:
        p = Producer()
    assert ex.match('expected configuration dict')

    p = Producer({'socket.timeout.ms': 10,
                  'error_cb': error_cb,
                  'message.timeout.ms': 10})

    p.produce('mytopic')
    p.produce('mytopic', value='somedata', key='a key')

    def on_delivery(err, msg):
        print('delivery', err, msg)
        # Since there is no broker, produced messages should time out.
        assert err.code() == KafkaError._MSG_TIMED_OUT
        print('message latency', msg.latency())

    p.produce(topic='another_topic', value='testing', partition=9,
              callback=on_delivery)

    p.poll(0.001)

    p.flush(0.002)
    p.flush()

    try:
        p.list_topics(timeout=0.2)
    except KafkaException as e:
        assert e.args[0].code() in (KafkaError._TIMED_OUT, KafkaError._TRANSPORT)

    # assert p.close(), "Failed to validate that producer was closed."


def test_produce_timestamp():
    """ Test produce() with timestamp arg """
    p = Producer({'socket.timeout.ms': 10,
                  'error_cb': error_cb,
                  'message.timeout.ms': 10})

    # Requires librdkafka >=v0.9.4

    try:
        p.produce('mytopic', timestamp=1234567)
    except NotImplementedError:
        # Should only fail on non-supporting librdkafka
        if libversion()[1] >= 0x00090400:
            raise

    p.flush()


# Should be updated to 0.11.4 when it is released
@pytest.mark.skipif(libversion()[1] < 0x000b0400,
                    reason="requires librdkafka >=0.11.4")
def test_produce_headers():
    """ Test produce() with timestamp arg """
    p = Producer({'socket.timeout.ms': 10,
                  'error_cb': error_cb,
                  'message.timeout.ms': 10})

    binval = pack('hhl', 1, 2, 3)

    headers_to_test = [
        [('headerkey', 'headervalue')],
        [('dupkey', 'dupvalue'), ('empty', ''), ('dupkey', 'dupvalue')],
        [('dupkey', 'dupvalue'), ('dupkey', 'diffvalue')],
        [('key_with_null_value', None)],
        [('binaryval', binval)],
        [('alreadyutf8', u'Småland'.encode('utf-8'))],
        [('isunicode', 'Jämtland')],

        {'headerkey': 'headervalue'},
        {'dupkey': 'dupvalue', 'empty': '', 'dupkey': 'dupvalue'},  # noqa: F601
        {'dupkey': 'dupvalue', 'dupkey': 'diffvalue'},  # noqa: F601
        {'key_with_null_value': None},
        {'binaryval': binval},
        {'alreadyutf8': u'Småland'.encode('utf-8')},
        {'isunicode': 'Jämtland'}
        ]

    for headers in headers_to_test:
        print('headers', type(headers), headers)
        p.produce('mytopic', value='somedata', key='a key', headers=headers)
        p.produce('mytopic', value='somedata', headers=headers)

    with pytest.raises(TypeError):
        p.produce('mytopic', value='somedata', key='a key', headers=('a', 'b'))

    with pytest.raises(TypeError):
        p.produce('mytopic', value='somedata', key='a key', headers=[('malformed_header')])

    with pytest.raises(TypeError):
        p.produce('mytopic', value='somedata', headers={'anint': 1234})

    p.flush()


# Should be updated to 0.11.4 when it is released
@pytest.mark.skipif(libversion()[1] >= 0x000b0400,
                    reason="Old versions should fail when using headers")
def test_produce_headers_should_fail():
    """ Test produce() with timestamp arg """
    p = Producer({'socket.timeout.ms': 10,
                  'error_cb': error_cb,
                  'message.timeout.ms': 10})

    with pytest.raises(NotImplementedError) as ex:
        p.produce('mytopic', value='somedata', key='a key', headers=[('headerkey', 'headervalue')])
    assert ex.match('Producer message headers requires confluent-kafka-python built for librdkafka version >=v0.11.4')


def test_subclassing():
    class SubProducer(Producer):
        def __init__(self, conf, topic):
            super(SubProducer, self).__init__(conf)
            self.topic = topic

        def produce_hi(self):
            super(SubProducer, self).produce(self.topic, value='hi')

    sp = SubProducer(dict(), 'atopic')
    assert isinstance(sp, SubProducer)

    # Invalid config should fail
    with pytest.raises(KafkaException):
        sp = SubProducer({'should.fail': False}, 'mytopic')

    sp = SubProducer({'log.thread.name': True}, 'mytopic')
    sp.produce('someother', value='not hello')
    sp.produce_hi()


def test_dr_msg_errstr():
    """
    Test that the error string for failed messages works (issue #129).
    The underlying problem is that librdkafka reuses the message payload
    for error value on Consumer messages, but on Producer messages the
    payload is the original payload and no rich error string exists.
    """
    p = Producer({"message.timeout.ms": 10})

    def handle_dr(err, msg):
        # Neither message payloads must not affect the error string.
        assert err is not None
        assert err.code() == KafkaError._MSG_TIMED_OUT
        assert "Message timed out" in err.str()

    # Unicode safe string
    p.produce('mytopic', "This is the message payload", on_delivery=handle_dr)

    # Invalid unicode sequence
    p.produce('mytopic', "\xc2\xc2", on_delivery=handle_dr)

    # p.flush()


def test_set_partitioner_murmur2():
    """
    Test ability to set built-in partitioner type murmur
    """
    Producer({'partitioner': 'murmur2'})


def test_set_partitioner_murmur2_random():
    """
    Test ability to set built-in partitioner type murmur2_random
    """
    Producer({'partitioner': 'murmur2_random'})


def test_set_invalid_partitioner_murmur():
    """
    Assert invalid partitioner raises KafkaException
    """
    with pytest.raises(KafkaException) as ex:
        Producer({'partitioner': 'murmur'})
    assert ex.match('Invalid value for configuration property "partitioner": murmur')


def test_transaction_api():
    """ Excercise the transactional API """
    p = Producer({"transactional.id": "test"})

    with pytest.raises(KafkaException) as ex:
        p.init_transactions(0.5)
    assert ex.value.args[0].code() == KafkaError._TIMED_OUT
    assert ex.value.args[0].retriable() is True
    assert ex.value.args[0].fatal() is False
    assert ex.value.args[0].txn_requires_abort() is False

    # Any subsequent APIs will fail since init did not succeed.
    with pytest.raises(KafkaException) as ex:
        p.begin_transaction()
    assert ex.value.args[0].code() == KafkaError._CONFLICT
    assert ex.value.args[0].retriable() is True
    assert ex.value.args[0].fatal() is False
    assert ex.value.args[0].txn_requires_abort() is False

    consumer = TestConsumer({"group.id": "testgroup"})
    group_metadata = consumer.consumer_group_metadata()
    consumer.close()

    with pytest.raises(KafkaException) as ex:
        p.send_offsets_to_transaction([TopicPartition("topic", 0, 123)],
                                      group_metadata)
    assert ex.value.args[0].code() == KafkaError._CONFLICT
    assert ex.value.args[0].retriable() is True
    assert ex.value.args[0].fatal() is False
    assert ex.value.args[0].txn_requires_abort() is False

    with pytest.raises(KafkaException) as ex:
        p.commit_transaction(0.5)
    assert ex.value.args[0].code() == KafkaError._CONFLICT
    assert ex.value.args[0].retriable() is True
    assert ex.value.args[0].fatal() is False
    assert ex.value.args[0].txn_requires_abort() is False

    with pytest.raises(KafkaException) as ex:
        p.abort_transaction(0.5)
    assert ex.value.args[0].code() == KafkaError._CONFLICT
    assert ex.value.args[0].retriable() is True
    assert ex.value.args[0].fatal() is False
    assert ex.value.args[0].txn_requires_abort() is False

    # assert p.close(), "The producer was not closed"


def test_purge():
    """
    Verify that when we have a higher message.timeout.ms timeout, we can use purge()
    to stop waiting for messages and get delivery reports
    """
    p = Producer(
        {"socket.timeout.ms": 10, "error_cb": error_cb, "message.timeout.ms": 30000}
    )  # 30 seconds

    # Hack to detect on_delivery was called because inner functions can modify nonlocal objects.
    # When python2 support is dropped, we can use the "nonlocal" keyword instead
    cb_detector = {"on_delivery_called": False}

    def on_delivery(err, msg):
        cb_detector["on_delivery_called"] = True
        # Because we are purging messages, we should see a PURGE_QUEUE kafka error
        assert err.code() == KafkaError._PURGE_QUEUE

    # Our message won't be delivered, but also won't timeout yet because our timeout is 30s.
    p.produce(topic="some_topic", value="testing", partition=9, callback=on_delivery)
    p.flush(0.002)
    assert not cb_detector["on_delivery_called"]

    # When in_queue set to false, we won't purge the message and get delivery callback
    p.purge(in_queue=False)
    p.flush(0.002)
    assert not cb_detector["on_delivery_called"]

    # When we purge including the queue, the message should have delivered a delivery report
    # with a PURGE_QUEUE error
    p.purge()
    p.flush(0.002)
    assert cb_detector["on_delivery_called"]

    # assert p.close(), "The producer was not closed"


def test_producer_bool_value():
    """
    Make sure producer has a truth-y bool value
    See https://github.com/confluentinc/confluent-kafka-python/issues/1427
    """

    p = Producer({})
    assert bool(p)
    # assert p.close(), "The producer was not fully closed"


@pytest.mark.parametrize("timeout,destroyed_expected,exception_expected", [
    (10.0, True, None),
    (5.0, True, None),
    (-100, None, ValueError),
    ("wrong", None, ValueError)
])
def test_producer_close(timeout, destroyed_expected, exception_expected):
    """
    Ensures the producer is fully cleaned up when closed before being recreated
    """

    handler = MockHandler()
    msg = {"test": "test"}
    handler.producer.produce(json.dumps(msg))
    if exception_expected is not None:
        with pytest.raises(exception_expected):
            _ = handler.recreate_producer(timeout)
    else:
        destroyed_actual = handler.recreate_producer(timeout)
        assert destroyed_actual == destroyed_expected
image

We have reproduced this on multiple workstations at this point. It should be reproducible after also updating your Producer.c from what's added in this PR. The two tests failing indicates that the global ref count did not reach 0 within the provided timeout (10s and 5s) when producer.close() is not called in the other unit tests (you can see it's commented out).

  • you cannot call rd_kafka_wait_destroyed because it awaits all instances are destroyed and that's not what's needed you could have a consumer and a producer and only deallocate the producer and create a new one while the consumer stays active

If an overall shutdown method of both the consumer and producer is what's required if we're to call rd_kafka_wait_destroyed, where would be a good place for such a consumer+producer shutdown function?

@emasab
Copy link
Contributor

emasab commented Sep 8, 2025

I checked the failing tests and they're failing because the new code is calling rd_kafka_wait_destroyed that awaits termination of all other instances. That means that until GC is called the close call cannot terminate.

There's no need to call rd_kafka_wait_destroyed (that is deprecated) as when rd_kafka_destroy returns all librdkafka threads from that instance are joined and all memory freed.

So if you don't call rd_kafka_wait_destroyed in producer.close() or if you call gc.collect() or gc.set_threshold(1, 1, 1) at the beginning of test_producer_close, or if you don't call producer.close() before replacing the producer, the tests are passing.

That doesn't mean that for other reasons the rd_kafka_destroy cannot block but in that case it's something to search in librdkafka code that's why I'm asking for a test to reproduce the missing instance destruction without new code.

About the telemetry push in rd_kafka_destroy have to investigate more as it can last up to 1s, but if a rd_kafka_producer_close() is to be added it should be in librdkafka before confluent-kafka-python. The Producer_dealloc code is releasing the GIL so it shouldn't block other Python threads from continuing but just the garbage collection.

@peter-lucia
Copy link
Author

I checked the failing tests and they're failing because the new code is calling rd_kafka_wait_destroyed that awaits termination of all other instances. That means that until GC is called the close call cannot terminate.

Let's dig into this a bit more. The close() call returns once all underlying kafka objects are destroyed or if the timeout (default=5s) is reached before that happens. In the two failing tests, the timeout was reached first, indicating that either there were lingering producers still running or the current number of active producers was greater than 0 (not cleaned up).

GC will run right after the completion of the other test functions. GC should be handling the cleanup of any lingering consumers or producers left over from other tests before the start of new tests. The fact that these two tests fail fairly consistently when we don't call producer.close() at the end of the other tests suggests that the GC alone is not cleaning up all producers.

Relying on the GC alone may be allowing the python to move on and asynchronously create new producers before the previous ones have been cleared. This may explain why GC alone doesn't clean up all underlying producers.

@emasab
Copy link
Contributor

emasab commented Sep 9, 2025

GC should be handling the cleanup of any lingering consumers or producers left over from other tests before the start of new tests.

That isn't happening for sure, the GC cyclic garbage collector isn't triggered automatically there and neither it's triggered at specific time intervals but it happens when there is a certain number of allocations - deallocations, that's why it's not called when the close is awaiting and if you call it explicitly with gc.collect() or reduce the generation thresholds with gc.set_threshold(1, 1, 1) it doesn't block and the test passes.

rd_kafka_wait_destroyed isn't currently used in confluent-kafka-python and it shouldn't be as it's awaiting all other existing instances and causing these deadlocks when the GC didn't run before it.

But there's one additional thing, the cyclic GC is needed because the producer isn't automatically destroyed for reference count zero. I check if there's some cycle that can prevent the destruction by refcount or some dangling reference count that prevents it from reaching zero and calling the destructor immediately after the previous tests return.
That could make that the producer is surviving for longer until collection happens.

@peter-lucia
Copy link
Author

Yes, I think our understandings align here: the GC isn't happening and like you are saying it isn't triggered automatically after the completion of the previous tests.

the cyclic GC is needed because the producer isn't automatically destroyed for reference count zero

Yes

I check if there's some cycle that can prevent the destruction by refcount or some dangling reference count that prevents it from reaching zero and calling the destructor immediately after the previous tests return.
That could make that the producer is surviving for longer until collection happens.

Did your checks reveal anything or is this still an open question?

I have updated the PR to no longer include the rd_kafka_wait_destroyed. This will enable the producer to be closed on demand, which is the core of what we're after here.

@emasab
Copy link
Contributor

emasab commented Sep 10, 2025

@peter-lucia I finished my checks and found that in failing tests what was preventing the immediate garbage collection was the exceptions thrown in test_basic_api and test_transaction_api. Given they keep a traceback with all local variables including the producer. If you delete the local variables with del ex the tests failing with rd_kafka_wait_destroyed (just for debug purpose) are passing.

I understand there's need for a more predictable producer disposal just this way after calling close() all other methods like closed_producer.poll() are causing a segmentation fault. We've to add null checks to the API like for the consumer:

        if (!self->rk) {
                PyErr_SetString(PyExc_RuntimeError,
                                "Consumer closed");
                return NULL;
        }

If you prefer we can continue and finish these changes.

@peter-lucia
Copy link
Author

If you prefer we can continue and finish these changes.

That is fine by me, go ahead!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants