diff --git a/CHANGELOG.md b/CHANGELOG.md index b5f8c123b..87fc15f2e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,9 @@ v2.11.0 is a feature release with the following enhancements: +### Fixes +- Fix error propagation rule for Python's C API to prevent SystemError when callbacks raise exceptions (#865) + confluent-kafka-python v2.11.0 is based on librdkafka v2.11.0, see the [librdkafka release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.11.0) for a complete list of changes, enhancements, fixes and upgrade considerations. diff --git a/src/confluent_kafka/src/Admin.c b/src/confluent_kafka/src/Admin.c index 451017ad6..f198c5404 100644 --- a/src/confluent_kafka/src/Admin.c +++ b/src/confluent_kafka/src/Admin.c @@ -5082,7 +5082,7 @@ static void Admin_background_event_cb (rd_kafka_t *rk, rd_kafka_event_t *rkev, PyObject *trace = NULL; /* Fetch (and clear) currently raised exception */ - PyErr_Fetch(&exctype, &error, &trace); + cfl_exception_fetch(&exctype, &error, &trace); Py_XDECREF(trace); } diff --git a/src/confluent_kafka/src/Consumer.c b/src/confluent_kafka/src/Consumer.c index 7376c6d84..7dc8bb23d 100644 --- a/src/confluent_kafka/src/Consumer.c +++ b/src/confluent_kafka/src/Consumer.c @@ -1575,6 +1575,8 @@ static void Consumer_rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err, if (result) Py_DECREF(result); else { + + CallState_fetch_exception(cs); CallState_crash(cs); rd_kafka_yield(rk); } diff --git a/src/confluent_kafka/src/Producer.c b/src/confluent_kafka/src/Producer.c index b6a51f510..c1f5bfc4a 100644 --- a/src/confluent_kafka/src/Producer.c +++ b/src/confluent_kafka/src/Producer.c @@ -163,6 +163,8 @@ static void dr_msg_cb (rd_kafka_t *rk, const rd_kafka_message_t *rkm, if (result) Py_DECREF(result); else { + + CallState_fetch_exception(cs); CallState_crash(cs); rd_kafka_yield(rk); } diff --git a/src/confluent_kafka/src/confluent_kafka.c b/src/confluent_kafka/src/confluent_kafka.c index bbc335d46..4bdebee14 100644 --- a/src/confluent_kafka/src/confluent_kafka.c +++ b/src/confluent_kafka/src/confluent_kafka.c @@ -1755,6 +1755,8 @@ static void error_cb (rd_kafka_t *rk, int err, const char *reason, void *opaque) if (result) Py_DECREF(result); else { + + CallState_fetch_exception(cs); crash: CallState_crash(cs); rd_kafka_yield(h->rk); @@ -1808,6 +1810,8 @@ static void throttle_cb (rd_kafka_t *rk, const char *broker_name, int32_t broker /* throttle_cb executed successfully */ Py_DECREF(result); goto done; + } else { + CallState_fetch_exception(cs); } /** @@ -1839,6 +1843,7 @@ static int stats_cb(rd_kafka_t *rk, char *json, size_t json_len, void *opaque) { if (result) Py_DECREF(result); else { + CallState_fetch_exception(cs); CallState_crash(cs); rd_kafka_yield(h->rk); } @@ -1874,6 +1879,7 @@ static void log_cb (const rd_kafka_t *rk, int level, if (result) Py_DECREF(result); else { + CallState_fetch_exception(cs); CallState_crash(cs); rd_kafka_yield(h->rk); } @@ -2572,6 +2578,9 @@ void CallState_begin (Handle *h, CallState *cs) { cs->thread_state = PyEval_SaveThread(); assert(cs->thread_state != NULL); cs->crashed = 0; + cs->exception_type = NULL; + cs->exception_value = NULL; + cs->exception_traceback = NULL; #ifdef WITH_PY_TSS PyThread_tss_set(&h->tlskey, cs); #else @@ -2592,8 +2601,16 @@ int CallState_end (Handle *h, CallState *cs) { PyEval_RestoreThread(cs->thread_state); - if (PyErr_CheckSignals() == -1 || cs->crashed) + if (PyErr_CheckSignals() == -1) + return 0; + + if (cs->crashed) { + /* Restore the saved exception if we have one */ + if (cs->exception_type) { + CallState_restore_exception(cs); + } return 0; + } return 1; } diff --git a/src/confluent_kafka/src/confluent_kafka.h b/src/confluent_kafka/src/confluent_kafka.h index ac7474c9e..e4b1f3d54 100644 --- a/src/confluent_kafka/src/confluent_kafka.h +++ b/src/confluent_kafka/src/confluent_kafka.h @@ -275,8 +275,66 @@ int Handle_traverse (Handle *h, visitproc visit, void *arg); typedef struct { PyThreadState *thread_state; int crashed; /* Callback crashed */ + PyObject *exception_type; /* Stored exception type */ + PyObject *exception_value; /* Stored exception value */ + PyObject *exception_traceback; /* Stored exception traceback */ } CallState; +/** + * @brief Compatibility layer for Python exception handling API changes. + * PyErr_Fetch/PyErr_Restore were deprecated in Python 3.12 in favor of + * PyErr_GetRaisedException/PyErr_SetRaisedException. + */ + +static inline void +cfl_exception_fetch(PyObject **exc_type, PyObject **exc_value, PyObject **exc_traceback) { +#if PY_VERSION_HEX >= 0x030c0000 + /* Python 3.12+ - use new API */ + PyObject *exc = PyErr_GetRaisedException(); + if (exc) { + *exc_type = (PyObject *)Py_TYPE(exc); + Py_INCREF(*exc_type); + *exc_value = exc; + *exc_traceback = PyException_GetTraceback(exc); + } else { + *exc_type = *exc_value = *exc_traceback = NULL; + } +#else + /* Python < 3.12 - use legacy API */ + PyErr_Fetch(exc_type, exc_value, exc_traceback); +#endif +} + +static inline void +cfl_exception_restore(PyObject *exc_type, PyObject *exc_value, PyObject *exc_traceback) { +#if PY_VERSION_HEX >= 0x030c0000 + /* Python 3.12+ - use new API */ + if (exc_value) { + PyErr_SetRaisedException(exc_value); + Py_XDECREF(exc_type); + Py_XDECREF(exc_traceback); + } +#else + /* Python < 3.12 - use legacy API */ + PyErr_Restore(exc_type, exc_value, exc_traceback); +#endif +} + +static inline void +CallState_fetch_exception(CallState *cs) { + cfl_exception_fetch(&cs->exception_type, &cs->exception_value, &cs->exception_traceback); +} + +static inline void +CallState_restore_exception(CallState *cs) { + if (cs->exception_type) { + cfl_exception_restore(cs->exception_type, cs->exception_value, cs->exception_traceback); + cs->exception_type = NULL; + cs->exception_value = NULL; + cs->exception_traceback = NULL; + } +} + /** * @brief Initialiase a CallState and unlock the GIL prior to a * possibly blocking external call. diff --git a/tests/test_Consumer.py b/tests/test_Consumer.py index bfe03dffc..850fa5ac5 100644 --- a/tests/test_Consumer.py +++ b/tests/test_Consumer.py @@ -324,3 +324,38 @@ def test_consumer_without_groupid(): with pytest.raises(ValueError) as ex: TestConsumer({'bootstrap.servers': "mybroker:9092"}) assert ex.match('group.id must be set') + + +def test_callback_exception_no_system_error(): + + exception_raised = [] + + def error_cb_that_raises(error): + """Error callback that raises an exception""" + exception_raised.append(error) + raise RuntimeError("Test exception from error_cb") + + # Create consumer with error callback that raises exception + consumer = TestConsumer({ + 'group.id': 'test-callback-systemerror-fix', + 'bootstrap.servers': 'nonexistent-broker:9092', # Will trigger error + 'socket.timeout.ms': 100, + 'session.timeout.ms': 1000, + 'error_cb': error_cb_that_raises + }) + + consumer.subscribe(['test-topic']) + + # This should trigger the error callback due to connection failure + # Before fix: Would get RuntimeError + SystemError (Issue #865) + # After fix: Should only get RuntimeError (no SystemError) + with pytest.raises(RuntimeError) as exc_info: + consumer.consume(timeout=0.1) + + # Verify we got the expected exception message + assert "Test exception from error_cb" in str(exc_info.value) + + # Verify the error callback was actually called + assert len(exception_raised) > 0 + + consumer.close() diff --git a/tests/test_Producer.py b/tests/test_Producer.py index 0f0d69e1d..11049e0a6 100644 --- a/tests/test_Producer.py +++ b/tests/test_Producer.py @@ -283,3 +283,34 @@ def test_producer_bool_value(): p = Producer({}) assert bool(p) + + +def test_callback_exception_no_system_error(): + delivery_reports = [] + + def delivery_cb_that_raises(err, msg): + """Delivery report callback that raises an exception""" + delivery_reports.append((err, msg)) + raise RuntimeError("Test exception from delivery_cb") + + producer = Producer({ + 'bootstrap.servers': 'nonexistent-broker:9092', # Will cause delivery failures + 'socket.timeout.ms': 100, + 'message.timeout.ms': 10, # Very short timeout to trigger delivery failure quickly + 'on_delivery': delivery_cb_that_raises + }) + + # Produce a message - this will trigger delivery report callback when it fails + producer.produce('test-topic', value='test-message') + + # Flush to ensure delivery reports are processed + # Before fix: Would get RuntimeError + SystemError (Issue #865) + # After fix: Should only get RuntimeError (no SystemError) + with pytest.raises(RuntimeError) as exc_info: + producer.flush(timeout=2.0) # Longer timeout to ensure delivery callback fires + + # Verify we got an exception from our callback + assert "Test exception from delivery_cb" in str(exc_info.value) + + # Verify the delivery callback was actually called + assert len(delivery_reports) > 0