Skip to content

Commit

Permalink
Support for tracing the callbacks of async methods in elastiicsearch-…
Browse files Browse the repository at this point in the history
…6.x/7.x-plugin (#694)
  • Loading branch information
CzyerChen authored May 27, 2024
1 parent ffbd90c commit f736b37
Show file tree
Hide file tree
Showing 32 changed files with 1,223 additions and 160 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ Release Notes.
* Fix NPE in Redisson plugin since Redisson 3.20.0.
* Support for showing batch command details and ignoring PING commands in Redisson plugin.
* Fix peer value of Master-Slave mode in Redisson plugin.
* Support for tracing the callbacks of asynchronous methods in elasticsearch-6.x-plugin/elasticsearch-7.x-plugin.

All issues and pull requests are [here](https://github.com/apache/skywalking/milestone/213?closed=1)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public String getMethodsInterceptor() {

@Override
public boolean isOverrideArgs() {
return false;
return true;
}
},
new InstanceMethodsInterceptPoint() {
Expand All @@ -85,7 +85,7 @@ public String getMethodsInterceptor() {

@Override
public boolean isOverrideArgs() {
return false;
return true;
}
},
new InstanceMethodsInterceptPoint() {
Expand All @@ -102,7 +102,7 @@ public String getMethodsInterceptor() {

@Override
public boolean isOverrideArgs() {
return false;
return true;
}
},
new InstanceMethodsInterceptPoint() {
Expand All @@ -119,7 +119,7 @@ public String getMethodsInterceptor() {

@Override
public boolean isOverrideArgs() {
return false;
return true;
}
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public String getMethodsInterceptor() {

@Override
public boolean isOverrideArgs() {
return false;
return true;
}
},
new InstanceMethodsInterceptPoint() {
Expand All @@ -102,7 +102,7 @@ public String getMethodsInterceptor() {

@Override
public boolean isOverrideArgs() {
return false;
return true;
}
},
new InstanceMethodsInterceptPoint() {
Expand All @@ -118,7 +118,7 @@ public String getMethodsInterceptor() {

@Override
public boolean isOverrideArgs() {
return false;
return true;
}
},
new InstanceMethodsInterceptPoint() {
Expand All @@ -134,7 +134,7 @@ public String getMethodsInterceptor() {

@Override
public boolean isOverrideArgs() {
return false;
return true;
}
},
new InstanceMethodsInterceptPoint() {
Expand Down Expand Up @@ -182,7 +182,7 @@ public String getMethodsInterceptor() {

@Override
public boolean isOverrideArgs() {
return false;
return true;
}
},
new InstanceMethodsInterceptPoint() {
Expand All @@ -198,7 +198,7 @@ public String getMethodsInterceptor() {

@Override
public boolean isOverrideArgs() {
return false;
return true;
}
},
new InstanceMethodsInterceptPoint() {
Expand All @@ -214,7 +214,7 @@ public String getMethodsInterceptor() {

@Override
public boolean isOverrideArgs() {
return false;
return true;
}
},
new InstanceMethodsInterceptPoint() {
Expand All @@ -230,7 +230,7 @@ public String getMethodsInterceptor() {

@Override
public boolean isOverrideArgs() {
return false;
return true;
}
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,7 @@ public class Constants {
public static final AbstractTag<String> ES_TOOK_MILLIS = Tags.ofKey("es.took_millis");
public static final AbstractTag<String> ES_TOTAL_HITS = Tags.ofKey("es.total_hits");
public static final AbstractTag<String> ES_INGEST_TOOK_MILLIS = Tags.ofKey("es.ingest_took_millis");

public static final String ON_RESPONSE_SUFFIX = "/onResponse";
public static final String ON_FAILURE_SUFFIX = "/onFailure";
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
import org.apache.skywalking.apm.plugin.elasticsearch.common.RestClientEnhanceInfo;
import org.apache.skywalking.apm.plugin.elasticsearch.v6.support.AdapterUtil;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.analyze.AnalyzeRequest;

import java.lang.reflect.Method;
Expand Down Expand Up @@ -58,6 +60,10 @@ public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allAr
Tags.DB_STATEMENT.set(span, analyzeRequest.text()[0]);
}
SpanLayer.asDB(span);
if (allArguments.length > 2 && allArguments[2] != null && allArguments[2] instanceof ActionListener) {
allArguments[2] = AdapterUtil.wrapActionListener(restClientEnhanceInfo, Constants.ANALYZE_OPERATOR_NAME,
(ActionListener) allArguments[2]);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
import org.apache.skywalking.apm.plugin.elasticsearch.common.RestClientEnhanceInfo;
import org.apache.skywalking.apm.plugin.elasticsearch.v6.support.AdapterUtil;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.indices.CreateIndexRequest;

import static org.apache.skywalking.apm.plugin.elasticsearch.v6.ElasticsearchPluginConfig.Plugin.Elasticsearch.TRACE_DSL;
Expand All @@ -52,6 +54,10 @@ public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allAr
Tags.DB_STATEMENT.set(span, createIndexRequest.mappings().utf8ToString());
}
SpanLayer.asDB(span);
if (allArguments.length > 2 && allArguments[2] != null && allArguments[2] instanceof ActionListener) {
allArguments[2] = AdapterUtil.wrapActionListener(restClientEnhanceInfo, Constants.CREATE_OPERATOR_NAME,
(ActionListener) allArguments[2]);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
import org.apache.skywalking.apm.plugin.elasticsearch.common.RestClientEnhanceInfo;
import org.apache.skywalking.apm.plugin.elasticsearch.v6.support.AdapterUtil;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;

import static org.apache.skywalking.apm.plugin.elasticsearch.v6.interceptor.Constants.DB_TYPE;
Expand All @@ -48,6 +50,10 @@ public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allAr
Tags.DB_TYPE.set(span, DB_TYPE);
Tags.DB_INSTANCE.set(span, Arrays.asList(deleteIndexRequest.indices()).toString());
SpanLayer.asDB(span);
if (allArguments.length > 2 && allArguments[2] != null && allArguments[2] instanceof ActionListener) {
allArguments[2] = AdapterUtil.wrapActionListener(restClientEnhanceInfo, Constants.DELETE_OPERATOR_NAME,
(ActionListener) allArguments[2]);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
import org.apache.skywalking.apm.plugin.elasticsearch.common.RestClientEnhanceInfo;
import org.apache.skywalking.apm.plugin.elasticsearch.v6.support.AdapterUtil;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;

import java.lang.reflect.Method;
Expand All @@ -48,6 +50,10 @@ public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allAr
Tags.DB_TYPE.set(span, DB_TYPE);
Tags.DB_INSTANCE.set(span, buildIndicesString(request.indices()));
SpanLayer.asDB(span);
if (allArguments.length > 2 && allArguments[2] != null && allArguments[2] instanceof ActionListener) {
allArguments[2] = AdapterUtil.wrapActionListener(restClientEnhanceInfo, Constants.REFRESH_OPERATOR_NAME,
(ActionListener) allArguments[2]);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
import org.apache.skywalking.apm.plugin.elasticsearch.common.RestClientEnhanceInfo;
import org.apache.skywalking.apm.plugin.elasticsearch.v6.support.AdapterUtil;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.ClearScrollRequest;

import java.lang.reflect.Method;
Expand All @@ -52,6 +54,10 @@ public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allAr
}

SpanLayer.asDB(span);
if (allArguments.length > 2 && allArguments[2] != null && allArguments[2] instanceof ActionListener) {
allArguments[2] = AdapterUtil.wrapActionListener(restClientEnhanceInfo, Constants.CLEAR_SCROLL_OPERATOR_NAME,
(ActionListener) allArguments[2]);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
import org.apache.skywalking.apm.plugin.elasticsearch.common.RestClientEnhanceInfo;
import org.apache.skywalking.apm.plugin.elasticsearch.v6.support.AdapterUtil;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;

import java.lang.reflect.Method;
Expand Down Expand Up @@ -58,6 +60,10 @@ public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allAr
}

SpanLayer.asDB(span);
if (allArguments.length > 2 && allArguments[2] != null && allArguments[2] instanceof ActionListener) {
allArguments[2] = AdapterUtil.wrapActionListener(restClientEnhanceInfo, Constants.DELETE_BY_QUERY_OPERATOR_NAME,
(ActionListener) allArguments[2]);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
import org.apache.skywalking.apm.plugin.elasticsearch.common.RestClientEnhanceInfo;
import org.apache.skywalking.apm.plugin.elasticsearch.v6.support.AdapterUtil;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.get.GetRequest;

import static org.apache.skywalking.apm.plugin.elasticsearch.v6.ElasticsearchPluginConfig.Plugin.Elasticsearch.TRACE_DSL;
Expand All @@ -51,6 +53,10 @@ public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allAr
}

SpanLayer.asDB(span);
if (allArguments.length > 2 && allArguments[2] != null && allArguments[2] instanceof ActionListener) {
allArguments[2] = AdapterUtil.wrapActionListener(restClientEnhanceInfo, Constants.GET_OPERATOR_NAME,
(ActionListener) allArguments[2]);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
import org.apache.skywalking.apm.plugin.elasticsearch.common.RestClientEnhanceInfo;
import org.apache.skywalking.apm.plugin.elasticsearch.v6.support.AdapterUtil;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.index.IndexRequest;

import static org.apache.skywalking.apm.plugin.elasticsearch.v6.ElasticsearchPluginConfig.Plugin.Elasticsearch.TRACE_DSL;
Expand All @@ -51,6 +53,10 @@ public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allAr
}

SpanLayer.asDB(span);
if (allArguments.length > 2 && allArguments[2] != null && allArguments[2] instanceof ActionListener) {
allArguments[2] = AdapterUtil.wrapActionListener(restClientEnhanceInfo, Constants.INDEX_OPERATOR_NAME,
(ActionListener) allArguments[2]);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
import org.apache.skywalking.apm.plugin.elasticsearch.common.RestClientEnhanceInfo;
import org.apache.skywalking.apm.plugin.elasticsearch.v6.support.AdapterUtil;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchRequest;

import static org.apache.skywalking.apm.plugin.elasticsearch.v6.ElasticsearchPluginConfig.Plugin.Elasticsearch.TRACE_DSL;
Expand All @@ -52,6 +54,10 @@ public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allAr
}

SpanLayer.asDB(span);
if (allArguments.length > 2 && allArguments[2] != null && allArguments[2] instanceof ActionListener) {
allArguments[2] = AdapterUtil.wrapActionListener(restClientEnhanceInfo, Constants.SEARCH_OPERATOR_NAME,
(ActionListener) allArguments[2]);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
import org.apache.skywalking.apm.plugin.elasticsearch.common.RestClientEnhanceInfo;
import org.apache.skywalking.apm.plugin.elasticsearch.v6.support.AdapterUtil;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchScrollRequest;

import java.lang.reflect.Method;
Expand All @@ -51,6 +53,10 @@ public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allAr
}

SpanLayer.asDB(span);
if (allArguments.length > 2 && allArguments[2] != null && allArguments[2] instanceof ActionListener) {
allArguments[2] = AdapterUtil.wrapActionListener(restClientEnhanceInfo, Constants.SEARCH_SCROLL_OPERATOR_NAME,
(ActionListener) allArguments[2]);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
import org.apache.skywalking.apm.plugin.elasticsearch.common.RestClientEnhanceInfo;
import org.apache.skywalking.apm.plugin.elasticsearch.v6.support.AdapterUtil;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.script.mustache.SearchTemplateRequest;

import java.lang.reflect.Method;
Expand Down Expand Up @@ -55,6 +57,10 @@ public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allAr
}

SpanLayer.asDB(span);
if (allArguments.length > 2 && allArguments[2] != null && allArguments[2] instanceof ActionListener) {
allArguments[2] = AdapterUtil.wrapActionListener(restClientEnhanceInfo, Constants.SEARCH_TEMPLATE_OPERATOR_NAME,
(ActionListener) allArguments[2]);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
import org.apache.skywalking.apm.plugin.elasticsearch.common.RestClientEnhanceInfo;
import org.apache.skywalking.apm.plugin.elasticsearch.v6.support.AdapterUtil;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.update.UpdateRequest;

import static org.apache.skywalking.apm.plugin.elasticsearch.v6.ElasticsearchPluginConfig.Plugin.Elasticsearch.TRACE_DSL;
Expand All @@ -51,6 +53,10 @@ public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allAr
}

SpanLayer.asDB(span);
if (allArguments.length > 2 && allArguments[2] != null && allArguments[2] instanceof ActionListener) {
allArguments[2] = AdapterUtil.wrapActionListener(restClientEnhanceInfo, Constants.UPDATE_OPERATOR_NAME,
(ActionListener) allArguments[2]);
}
}

@Override
Expand Down
Loading

0 comments on commit f736b37

Please sign in to comment.