Skip to content

Commit

Permalink
Polish up kafka,nats,activemq,rabbitmq plugins to set peer at comsume…
Browse files Browse the repository at this point in the history
…r side (#377)
  • Loading branch information
pg-yang authored Nov 6, 2022
1 parent f17a4ea commit d4d21ca
Show file tree
Hide file tree
Showing 15 changed files with 29 additions and 17 deletions.
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ Release Notes.
* Support mannual propagation of tracing context to next operators for webflux.
* Add MQ_TOPIC and MQ_BROKER tags for RocketMQ consumer's span.
* Polish up Pulsar plugins to remove unnecessary dynamic value , set peer at consumer side
* Polish Kafka plugin to set peer at the consumer side.
* Polish NATS plugin to set peer at the consumer side.
* Polish ActiveMQ plugin to set peer at the consumer side.
* Polish RabbitMQ plugin to set peer at the consumer side.

#### Documentation

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public Object afterMethod(final EnhancedInstance objInst,
if (activeSpan == null) {
return ret;
}
activeSpan.setPeer(url);
activeSpan.setComponent(ComponentsDefine.ACTIVEMQ_CONSUMER);
SpanLayer.asMQ(activeSpan);
CarrierItem next = contextCarrier.items();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allA
SpanLayer.asMQ(activeSpan);
Tags.MQ_BROKER.set(activeSpan, requiredInfo.getBrokerServers());
Tags.MQ_TOPIC.set(activeSpan, requiredInfo.getTopics());

activeSpan.setPeer(requiredInfo.getBrokerServers());
for (List<ConsumerRecord<?, ?>> consumerRecords : records.values()) {
for (ConsumerRecord<?, ?> record : consumerRecords) {
ContextCarrier contextCarrier = new ContextCarrier();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ public class DeliverReplyInterceptor implements InstanceMethodsAroundInterceptor
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
final AbstractSpan entrySpan = createEntrySpan((Message) allArguments[0]);
Tags.MQ_BROKER.set(entrySpan, buildServers((Connection) objInst));
final String servers = buildServers((Connection) objInst);
Tags.MQ_BROKER.set(entrySpan, servers);
entrySpan.setPeer(servers);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ static MessageHandler buildTraceMsgHandler(String servers, MessageHandler msgHan
}
AbstractSpan span = NatsCommons.createEntrySpan(msg);
Tags.MQ_BROKER.set(span, servers);
span.setPeer(servers);
try {
msgHandler.onMessage(msg);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allA
Message msg = (Message) ret;
AbstractSpan span = createEntrySpan(msg);
Tags.MQ_BROKER.set(span , servers);
span.setPeer(servers);
// Close the span immediately , as no chance to trace what user want to do
ContextManager.stopSpan(span);
return ret;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public void handleDelivery(final String consumerTag,
Tags.MQ_TOPIC.set(activeSpan, envelope.getExchange());
Tags.MQ_QUEUE.set(activeSpan, envelope.getRoutingKey());
activeSpan.setComponent(ComponentsDefine.RABBITMQ_CONSUMER);
activeSpan.setPeer(serverUrl);
SpanLayer.asMQ(activeSpan);
CarrierItem next = contextCarrier.items();
while (next.hasNext()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,8 @@ SkyWalking analysis MQ performance related metrics through the following tags.
* `mq.topic` indicates MQ topic name , It's optional as some MQ don't hava concept of `topic`
* `transmission.latency` The transmission latency from consumer to producer. Usually you needn't to record this tag manually, instead to call `contextCarrier.extensionInjector().injectSendingTimestamp();` to record tag `sendingTimestamp` on producer side , and SkyWalking would record this tag on consumer side if `sw8-x` context carrier(from producer side) contains `sendingTimestamp`

Notice , you should set `peer` at both sides(producer and consumer). And the value of peer should represent the MQ server cluster.


### Advanced APIs

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ segmentItems:
componentId: 45
isError: false
spanType: Exit
peer: not null
peer: not blank
tags:
- {key: mq.broker, value: not blank}
- {key: mq.queue, value: test}
Expand Down Expand Up @@ -59,7 +59,7 @@ segmentItems:
componentId: 46
isError: false
spanType: Entry
peer: ''
peer: not blank
tags:
- {key: mq.broker, value: not blank}
- {key: mq.queue, value: test}
Expand Down
6 changes: 3 additions & 3 deletions test/plugin/scenarios/kafka-scenario/config/expectedData.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ segmentItems:
componentId: 41
isError: false
spanType: Entry
peer: ''
peer: kafka-server:9092
tags:
- {key: mq.broker, value: 'kafka-server:9092'}
- {key: mq.topic, value: test}
Expand Down Expand Up @@ -194,7 +194,7 @@ segmentItems:
componentId: 41
isError: false
spanType: Entry
peer: ''
peer: kafka-server:9092
tags:
- {key: mq.broker, value: 'kafka-server:9092'}
- {key: mq.topic, value: test.}
Expand Down Expand Up @@ -241,7 +241,7 @@ segmentItems:
componentId: 41
isError: false
spanType: Entry
peer: ''
peer: kafka-server:9092
tags:
- { key: mq.broker, value: 'kafka-server:9092' }
- { key: mq.topic, value: assign }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ segmentItems:
componentId: 132
isError: false
spanType: Entry
peer: ''
peer: nats://nats-server:4222
skipAnalysis: false
tags:
- {key: transmission.latency, value: ge 0}
Expand Down Expand Up @@ -514,7 +514,7 @@ segmentItems:
componentId: 132
isError: false
spanType: Entry
peer: ''
peer: nats://nats-server:4222
skipAnalysis: false
tags:
- {key: transmission.latency, value: ge 0}
Expand All @@ -539,7 +539,7 @@ segmentItems:
componentId: 132
isError: false
spanType: Entry
peer: ''
peer: nats://nats-server:4222
skipAnalysis: false
tags:
- {key: transmission.latency, value: ge 0}
Expand Down Expand Up @@ -602,7 +602,7 @@ segmentItems:
componentId: 132
isError: false
spanType: Entry
peer: ''
peer: nats://nats-server:4222
skipAnalysis: false
tags:
- {key: transmission.latency, value: ge 0}
Expand Down Expand Up @@ -696,7 +696,7 @@ segmentItems:
componentId: 132
isError: false
spanType: Entry
peer: ''
peer: nats://nats-server:4222
skipAnalysis: false
tags:
- {key: transmission.latency, value: ge 0}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ segmentItems:
componentId: 53
isError: false
spanType: Entry
peer: ''
peer: not blank
tags:
- {key: mq.broker, value: not blank}
- {key: mq.topic, value: ''}
Expand All @@ -50,7 +50,7 @@ segmentItems:
componentId: 52
isError: false
spanType: Exit
peer: not null
peer: not blank
tags:
- {key: mq.broker, value: not blank}
- {key: mq.queue, value: test}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ segmentItems:
componentId: 41
isError: false
spanType: Entry
peer: ''
peer: kafka-server:9092
skipAnalysis: false
tags:
- {key: mq.broker, value: 'kafka-server:9092'}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ segmentItems:
componentId: 41
isError: false
spanType: Entry
peer: ''
peer: kafka-server:9092
skipAnalysis: false
tags:
- {key: mq.broker, value: 'kafka-server:9092'}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ segmentItems:
componentId: 41
isError: false
spanType: Entry
peer: ''
peer: kafka-server:9092
skipAnalysis: false
tags:
- {key: mq.broker, value: 'kafka-server:9092'}
Expand Down

0 comments on commit d4d21ca

Please sign in to comment.