Skip to content

Commit

Permalink
add support for kafka3.7.x, because kafka3.7 changed the pull message…
Browse files Browse the repository at this point in the history
… method's name from pollForFetches to poll
  • Loading branch information
darkness-2nd committed Jul 23, 2024
1 parent d875bd3 commit ef2193b
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 61 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,15 @@ public class KafkaConsumerInstrumentation extends AbstractKafkaInstrumentation {
public static final String MAP_CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.ConstructorWithMapInterceptPoint";
public static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.KafkaConsumerInterceptor";
public static final String INTERCEPTOR_CLASS_KAFKA3_2 = "org.apache.skywalking.apm.plugin.kafka.Kafka3ConsumerInterceptor";

public static final String INTERCEPTOR_CLASS_37 = "org.apache.skywalking.apm.plugin.kafka.Kafka37ConsumerInterceptor";

// Kafka 3.7.x's pull message method's name is "poll"
public static final String ENHANCE_METHOD_37 = "poll";

// Kafka 3.7.x's pull message method's return type is "ConsumerRecords"
public static final String ENHANCE_RETURN_TYPE_37 = "org.apache.kafka.clients.consumer.ConsumerRecords";

public static final String ENHANCE_METHOD = "pollOnce";
public static final String ENHANCE_COMPATIBLE_METHOD = "pollForFetches";
public static final String ENHANCE_CLASS = "org.apache.kafka.clients.consumer.KafkaConsumer";
Expand Down Expand Up @@ -112,7 +121,7 @@ public boolean isOverrideArgs() {
new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
// targeting Kafka Client >= 3.2
// targeting Kafka Client >= 3.2 and < 3.7.x
return named(ENHANCE_COMPATIBLE_METHOD).and(returns(named("org.apache.kafka.clients.consumer.internals.Fetch")));
}

Expand All @@ -126,6 +135,22 @@ public boolean isOverrideArgs() {
return false;
}
},
new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named(ENHANCE_METHOD_37).and(returns(named(ENHANCE_RETURN_TYPE_37)));
}

@Override
public String getMethodsInterceptor() {
return INTERCEPTOR_CLASS_37;
}

@Override
public boolean isOverrideArgs() {
return false;
}
},
new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,3 @@ kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.KafkaConsumer
kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.KafkaProducerInstrumentation
kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.KafkaProducerMapInstrumentation
kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.KafkaTemplateCallbackInstrumentation
kafka-3.7.x=org.apache.skywalking.apm.plugin.kafka.define.Kafka37ConsumerInstrumentation

0 comments on commit ef2193b

Please sign in to comment.