Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 16 additions & 10 deletions src/consumer/base_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,16 +241,22 @@ where
fn handle_error_event(&self, event: NativePtr<RDKafkaEvent>) -> Option<KafkaError> {
let rdkafka_err = unsafe { rdsys::rd_kafka_event_error(event.ptr()) };
if rdkafka_err.is_error() {
if rdkafka_err == rdsys::rd_kafka_resp_err_t::RD_KAFKA_RESP_ERR__PARTITION_EOF {
let tp_ptr = unsafe { rdsys::rd_kafka_event_topic_partition(event.ptr()) };
let partition = unsafe { (*tp_ptr).partition };
unsafe { rdsys::rd_kafka_topic_partition_destroy(tp_ptr) };
Some(KafkaError::PartitionEOF(partition))
} else if unsafe { rdsys::rd_kafka_event_error_is_fatal(event.ptr()) } != 0 {
Some(KafkaError::MessageConsumptionFatal(rdkafka_err.into()))
} else {
Some(KafkaError::MessageConsumption(rdkafka_err.into()))
}
let error =
if rdkafka_err == rdsys::rd_kafka_resp_err_t::RD_KAFKA_RESP_ERR__PARTITION_EOF {
let tp_ptr = unsafe { rdsys::rd_kafka_event_topic_partition(event.ptr()) };
let partition = unsafe { (*tp_ptr).partition };
unsafe { rdsys::rd_kafka_topic_partition_destroy(tp_ptr) };
KafkaError::PartitionEOF(partition)
} else if unsafe { rdsys::rd_kafka_event_error_is_fatal(event.ptr()) } != 0 {
KafkaError::MessageConsumptionFatal(rdkafka_err.into())
} else {
KafkaError::MessageConsumption(rdkafka_err.into())
};
let reason = unsafe {
CStr::from_ptr(rdsys::rd_kafka_event_error_string(event.ptr())).to_string_lossy()
};
self.context().error(error.clone(), reason.trim());
Some(error)
} else {
None
}
Expand Down
10 changes: 10 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ pub enum KafkaError {
MessageConsumptionFatal(RDKafkaErrorCode),
/// Message production error.
MessageProduction(RDKafkaErrorCode),
/// Message production failed with fatal error.
MessageProductionFatal(RDKafkaErrorCode),
/// Metadata fetch error.
MetadataFetch(RDKafkaErrorCode),
/// No message was received.
Expand Down Expand Up @@ -225,6 +227,9 @@ impl fmt::Debug for KafkaError {
KafkaError::MessageProduction(err) => {
write!(f, "KafkaError (Message production error: {})", err)
}
KafkaError::MessageProductionFatal(err) => {
write!(f, "(Fatal) KafkaError (Message production error: {})", err)
}
KafkaError::MetadataFetch(err) => {
write!(f, "KafkaError (Metadata fetch error: {})", err)
}
Expand Down Expand Up @@ -274,6 +279,9 @@ impl fmt::Display for KafkaError {
write!(f, "(Fatal) Message consumption error: {}", err)
}
KafkaError::MessageProduction(err) => write!(f, "Message production error: {}", err),
KafkaError::MessageProductionFatal(err) => {
write!(f, "(Fatal) Message production error: {}", err)
}
KafkaError::MetadataFetch(err) => write!(f, "Meta data fetch error: {}", err),
KafkaError::NoMessageReceived => {
write!(f, "No message received within the given poll interval")
Expand Down Expand Up @@ -309,6 +317,7 @@ impl Error for KafkaError {
KafkaError::MessageConsumption(err) => Some(err),
KafkaError::MessageConsumptionFatal(err) => Some(err),
KafkaError::MessageProduction(err) => Some(err),
KafkaError::MessageProductionFatal(err) => Some(err),
KafkaError::MetadataFetch(err) => Some(err),
KafkaError::NoMessageReceived => None,
KafkaError::Nul(_) => None,
Expand Down Expand Up @@ -350,6 +359,7 @@ impl KafkaError {
KafkaError::MessageConsumption(err) => Some(*err),
KafkaError::MessageConsumptionFatal(err) => Some(*err),
KafkaError::MessageProduction(err) => Some(*err),
KafkaError::MessageProductionFatal(err) => Some(*err),
KafkaError::MetadataFetch(err) => Some(*err),
KafkaError::NoMessageReceived => None,
KafkaError::Nul(_) => None,
Expand Down
9 changes: 7 additions & 2 deletions src/producer/base_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,11 +408,16 @@ where

fn handle_error_event(&self, event: NativePtr<RDKafkaEvent>) {
let rdkafka_err = unsafe { rdsys::rd_kafka_event_error(event.ptr()) };
let error = KafkaError::Global(rdkafka_err.into());
let rdkafka_err_is_fatal = unsafe { rdsys::rd_kafka_event_error_is_fatal(event.ptr()) };
let error = if rdkafka_err_is_fatal != 0 {
KafkaError::MessageProductionFatal(rdkafka_err.into())
} else {
KafkaError::MessageProduction(rdkafka_err.into())
};
let reason = unsafe {
CStr::from_ptr(rdsys::rd_kafka_event_error_string(event.ptr())).to_string_lossy()
};
self.context().error(error, reason.trim());
self.context().error(error.clone(), reason.trim());
}

/// Returns a pointer to the native Kafka client.
Expand Down