Skip to content

Commit 5f29f12

Browse files
committed
Address PR review feedback
- Add issue number #865 to test comments for better traceability - Move PyErr_Fetch into else condition for consistency in confluent_kafka.c - Add CHANGELOG entry documenting the fix for issue #865
1 parent 42714b5 commit 5f29f12

File tree

5 files changed

+56
-4
lines changed

5 files changed

+56
-4
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@
44

55
v2.11.0 is a feature release with the following enhancements:
66

7+
### Fixes
8+
- Fix error propagation rule for Python's C API to prevent SystemError when callbacks raise exceptions (#865)
9+
710
confluent-kafka-python v2.11.0 is based on librdkafka v2.11.0, see the
811
[librdkafka release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.11.0)
912
for a complete list of changes, enhancements, fixes and upgrade considerations.

src/confluent_kafka/src/confluent_kafka.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1810,10 +1810,10 @@ static void throttle_cb (rd_kafka_t *rk, const char *broker_name, int32_t broker
18101810
/* throttle_cb executed successfully */
18111811
Py_DECREF(result);
18121812
goto done;
1813+
} else {
1814+
PyErr_Fetch(&cs->exception_type, &cs->exception_value, &cs->exception_traceback);
18131815
}
18141816

1815-
PyErr_Fetch(&cs->exception_type, &cs->exception_value, &cs->exception_traceback);
1816-
18171817
/**
18181818
* Stop callback dispatcher, return err to application
18191819
* fall-through to unlock GIL

src/confluent_kafka/src/confluent_kafka.h

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,55 @@ typedef struct {
280280
PyObject *exception_traceback; /* Stored exception traceback */
281281
} CallState;
282282

283+
/**
284+
* @brief Compatibility layer for Python exception handling API changes.
285+
* PyErr_Fetch was deprecated in Python 3.12 in favor of PyErr_GetRaisedException.
286+
*/
287+
#if PY_VERSION_HEX >= 0x030c0000
288+
/* Python 3.12+ - use new API */
289+
static inline void
290+
CallState_fetch_exception(CallState *cs) {
291+
PyObject *exc = PyErr_GetRaisedException();
292+
if (exc) {
293+
cs->exception_type = (PyObject *)Py_TYPE(exc);
294+
Py_INCREF(cs->exception_type);
295+
cs->exception_value = exc;
296+
cs->exception_traceback = PyException_GetTraceback(exc);
297+
} else {
298+
cs->exception_type = NULL;
299+
cs->exception_value = NULL;
300+
cs->exception_traceback = NULL;
301+
}
302+
}
303+
304+
static inline void
305+
CallState_restore_exception(CallState *cs) {
306+
if (cs->exception_value) {
307+
PyErr_SetRaisedException(cs->exception_value);
308+
/* PyErr_SetRaisedException steals the reference, so clear our pointer */
309+
cs->exception_value = NULL;
310+
Py_XDECREF(cs->exception_type);
311+
cs->exception_type = NULL;
312+
Py_XDECREF(cs->exception_traceback);
313+
cs->exception_traceback = NULL;
314+
}
315+
}
316+
#else
317+
/* Python < 3.12 - use legacy API */
318+
static inline void
319+
CallState_fetch_exception(CallState *cs) {
320+
PyErr_Fetch(&cs->exception_type, &cs->exception_value, &cs->exception_traceback);
321+
}
322+
323+
static inline void
324+
CallState_restore_exception(CallState *cs) {
325+
PyErr_Restore(cs->exception_type, cs->exception_value, cs->exception_traceback);
326+
cs->exception_type = NULL;
327+
cs->exception_value = NULL;
328+
cs->exception_traceback = NULL;
329+
}
330+
#endif
331+
283332
/**
284333
* @brief Initialiase a CallState and unlock the GIL prior to a
285334
* possibly blocking external call.

tests/test_Consumer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -347,7 +347,7 @@ def error_cb_that_raises(error):
347347
consumer.subscribe(['test-topic'])
348348

349349
# This should trigger the error callback due to connection failure
350-
# Before fix: Would get RuntimeError + SystemError
350+
# Before fix: Would get RuntimeError + SystemError (Issue #865)
351351
# After fix: Should only get RuntimeError (no SystemError)
352352
with pytest.raises(RuntimeError) as exc_info:
353353
consumer.consume(timeout=0.1)

tests/test_Producer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,7 @@ def delivery_cb_that_raises(err, msg):
304304
producer.produce('test-topic', value='test-message')
305305

306306
# Flush to ensure delivery reports are processed
307-
# Before fix: Would get RuntimeError + SystemError
307+
# Before fix: Would get RuntimeError + SystemError (Issue #865)
308308
# After fix: Should only get RuntimeError (no SystemError)
309309
with pytest.raises(RuntimeError) as exc_info:
310310
producer.flush(timeout=2.0) # Longer timeout to ensure delivery callback fires

0 commit comments

Comments
 (0)