Skip to content

Commit

Permalink
Support async profiler feature (#720)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhengziyi0117 authored Oct 30, 2024
1 parent 576550a commit 2027a98
Show file tree
Hide file tree
Showing 16 changed files with 866 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.skywalking.apm.network.trace.component.command;

import org.apache.skywalking.apm.network.common.v3.Command;
import org.apache.skywalking.apm.network.common.v3.KeyStringValuePair;

import java.util.List;
import java.util.Objects;

public class AsyncProfilerTaskCommand extends BaseCommand implements Serializable, Deserializable<AsyncProfilerTaskCommand> {
public static final Deserializable<AsyncProfilerTaskCommand> DESERIALIZER = new AsyncProfilerTaskCommand("", "", 0, null, "", 0);
public static final String NAME = "AsyncProfilerTaskQuery";

/**
* async-profiler taskId
*/
private final String taskId;
/**
* run profiling for duration (second)
*/
private final int duration;
/**
* async profiler extended parameters. Here is a table of optional parameters.
*
* <p>lock[=DURATION] - profile contended locks overflowing the DURATION ns bucket (default: 10us)</p>
* <p>alloc[=BYTES] - profile allocations with BYTES interval</p>
* <p>interval=N - sampling interval in ns (default: 10'000'000, i.e. 10 ms)</p>
* <p>jstackdepth=N - maximum Java stack depth (default: 2048)</p>
* <p>chunksize=N - approximate size of JFR chunk in bytes (default: 100 MB) </p>
* <p>chunktime=N - duration of JFR chunk in seconds (default: 1 hour) </p>
* details @see <a href="https://github.com/async-profiler/async-profiler/blob/master/src/arguments.cpp#L44">async-profiler argument</a>
*/
private final String execArgs;
/**
* task create time
*/
private final long createTime;

public AsyncProfilerTaskCommand(String serialNumber, String taskId, int duration,
List<String> events, String execArgs, long createTime) {
super(NAME, serialNumber);
this.taskId = taskId;
this.duration = duration;
this.createTime = createTime;
String comma = ",";
StringBuilder sb = new StringBuilder();
if (Objects.nonNull(events) && !events.isEmpty()) {
sb.append("event=")
.append(String.join(comma, events))
.append(comma);
}
if (execArgs != null && !execArgs.isEmpty()) {
sb.append(execArgs);
}
this.execArgs = sb.toString();
}

public AsyncProfilerTaskCommand(String serialNumber, String taskId, int duration,
String execArgs, long createTime) {
super(NAME, serialNumber);
this.taskId = taskId;
this.duration = duration;
this.execArgs = execArgs;
this.createTime = createTime;
}

@Override
public AsyncProfilerTaskCommand deserialize(Command command) {
final List<KeyStringValuePair> argsList = command.getArgsList();
String taskId = null;
int duration = 0;
String execArgs = null;
long createTime = 0;
String serialNumber = null;
for (final KeyStringValuePair pair : argsList) {
if ("SerialNumber".equals(pair.getKey())) {
serialNumber = pair.getValue();
} else if ("TaskId".equals(pair.getKey())) {
taskId = pair.getValue();
} else if ("Duration".equals(pair.getKey())) {
duration = Integer.parseInt(pair.getValue());
} else if ("ExecArgs".equals(pair.getKey())) {
execArgs = pair.getValue();
} else if ("CreateTime".equals(pair.getKey())) {
createTime = Long.parseLong(pair.getValue());
}
}
return new AsyncProfilerTaskCommand(serialNumber, taskId, duration, execArgs, createTime);
}

@Override
public Command.Builder serialize() {
final Command.Builder builder = commandBuilder();
builder.addArgs(KeyStringValuePair.newBuilder().setKey("TaskId").setValue(taskId))
.addArgs(KeyStringValuePair.newBuilder().setKey("Duration").setValue(String.valueOf(duration)))
.addArgs(KeyStringValuePair.newBuilder().setKey("ExecArgs").setValue(execArgs))
.addArgs(KeyStringValuePair.newBuilder().setKey("CreateTime").setValue(String.valueOf(createTime)));
return builder;
}

public String getTaskId() {
return taskId;
}

public int getDuration() {
return duration;
}

public String getExecArgs() {
return execArgs;
}

public long getCreateTime() {
return createTime;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ public static BaseCommand deserialize(final Command command) {
return ProfileTaskCommand.DESERIALIZER.deserialize(command);
} else if (ConfigurationDiscoveryCommand.NAME.equals(commandName)) {
return ConfigurationDiscoveryCommand.DESERIALIZER.deserialize(command);
} else if (AsyncProfilerTaskCommand.NAME.equals(commandName)) {
return AsyncProfilerTaskCommand.DESERIALIZER.deserialize(command);
}

throw new UnsupportedCommandException(command);
}

Expand Down
2 changes: 1 addition & 1 deletion apm-protocol/apm-network/src/main/proto
4 changes: 4 additions & 0 deletions apm-sniffer/apm-agent-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@
<artifactId>jmh-generator-annprocess</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>tools.profiler</groupId>
<artifactId>async-profiler</artifactId>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.skywalking.apm.agent.core.asyncprofiler;

import com.google.protobuf.ByteString;
import io.grpc.Channel;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import io.grpc.stub.StreamObserver;
import org.apache.skywalking.apm.agent.core.boot.BootService;
import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor;
import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
import org.apache.skywalking.apm.agent.core.conf.Config;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
import org.apache.skywalking.apm.agent.core.remote.GRPCChannelListener;
import org.apache.skywalking.apm.agent.core.remote.GRPCChannelManager;
import org.apache.skywalking.apm.agent.core.remote.GRPCChannelStatus;
import org.apache.skywalking.apm.agent.core.remote.GRPCStreamServiceStatus;
import org.apache.skywalking.apm.network.language.asyncprofiler.v10.AsyncProfilerCollectionResponse;
import org.apache.skywalking.apm.network.language.asyncprofiler.v10.AsyncProfilerData;
import org.apache.skywalking.apm.network.language.asyncprofiler.v10.AsyncProfilerMetaData;
import org.apache.skywalking.apm.network.language.asyncprofiler.v10.AsyncProfilerTaskGrpc;
import org.apache.skywalking.apm.network.language.asyncprofiler.v10.AsyncProfilingStatus;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.Objects;
import java.util.concurrent.TimeUnit;

import static org.apache.skywalking.apm.agent.core.conf.Config.AsyncProfiler.DATA_CHUNK_SIZE;
import static org.apache.skywalking.apm.agent.core.conf.Config.Collector.GRPC_UPSTREAM_TIMEOUT;

@DefaultImplementor
public class AsyncProfilerDataSender implements BootService, GRPCChannelListener {
private static final ILog LOGGER = LogManager.getLogger(AsyncProfilerDataSender.class);

private volatile GRPCChannelStatus status = GRPCChannelStatus.DISCONNECT;

private volatile AsyncProfilerTaskGrpc.AsyncProfilerTaskStub asyncProfilerTaskStub;

@Override
public void prepare() throws Throwable {
ServiceManager.INSTANCE.findService(GRPCChannelManager.class).addChannelListener(this);
}

@Override
public void boot() throws Throwable {

}

@Override
public void onComplete() throws Throwable {

}

@Override
public void shutdown() throws Throwable {

}

@Override
public void statusChanged(GRPCChannelStatus status) {
if (GRPCChannelStatus.CONNECTED.equals(status)) {
Channel channel = ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getChannel();
asyncProfilerTaskStub = AsyncProfilerTaskGrpc.newStub(channel);
} else {
asyncProfilerTaskStub = null;
}
this.status = status;
}

public void sendData(AsyncProfilerTask task, FileChannel channel) throws IOException, InterruptedException {
if (status != GRPCChannelStatus.CONNECTED || Objects.isNull(channel) || !channel.isOpen()) {
return;
}

int size = Math.toIntExact(channel.size());
final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false);
StreamObserver<AsyncProfilerData> dataStreamObserver = asyncProfilerTaskStub.withDeadlineAfter(
GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS
).collect(new ClientResponseObserver<AsyncProfilerData, AsyncProfilerCollectionResponse>() {
ClientCallStreamObserver<AsyncProfilerData> requestStream;

@Override
public void beforeStart(ClientCallStreamObserver<AsyncProfilerData> requestStream) {
this.requestStream = requestStream;
}

@Override
public void onNext(AsyncProfilerCollectionResponse value) {
if (AsyncProfilingStatus.TERMINATED_BY_OVERSIZE.equals(value.getType())) {
LOGGER.warn("JFR is too large to be received by the oap server");
} else {
ByteBuffer buf = ByteBuffer.allocateDirect(DATA_CHUNK_SIZE);
try {
while (channel.read(buf) > 0) {
buf.flip();
AsyncProfilerData asyncProfilerData = AsyncProfilerData.newBuilder()
.setContent(ByteString.copyFrom(buf))
.build();
requestStream.onNext(asyncProfilerData);
buf.clear();
}
} catch (IOException e) {
LOGGER.error("Failed to read JFR file and failed to upload to oap", e);
}
}

requestStream.onCompleted();
}

@Override
public void onError(Throwable t) {
status.finished();
LOGGER.error(t, "Send async profiler task data to collector fail with a grpc internal exception.");
ServiceManager.INSTANCE.findService(GRPCChannelManager.class).reportError(t);
}

@Override
public void onCompleted() {
status.finished();
}
});
AsyncProfilerMetaData metaData = AsyncProfilerMetaData.newBuilder()
.setService(Config.Agent.SERVICE_NAME)
.setServiceInstance(Config.Agent.INSTANCE_NAME)
.setType(AsyncProfilingStatus.PROFILING_SUCCESS)
.setContentSize(size)
.setTaskId(task.getTaskId())
.build();
AsyncProfilerData asyncProfilerData = AsyncProfilerData.newBuilder().setMetaData(metaData).build();
dataStreamObserver.onNext(asyncProfilerData);

status.wait4Finish();
}

public void sendError(AsyncProfilerTask task, String errorMessage) {
if (status != GRPCChannelStatus.CONNECTED) {
return;
}
final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false);
StreamObserver<AsyncProfilerData> dataStreamObserver = asyncProfilerTaskStub.withDeadlineAfter(
GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS
).collect(new StreamObserver<AsyncProfilerCollectionResponse>() {
@Override
public void onNext(AsyncProfilerCollectionResponse value) {
}

@Override
public void onError(Throwable t) {
status.finished();
LOGGER.error(t, "Send async profiler task execute error fail with a grpc internal exception.");
ServiceManager.INSTANCE.findService(GRPCChannelManager.class).reportError(t);
}

@Override
public void onCompleted() {
status.finished();
}
});
AsyncProfilerMetaData metaData = AsyncProfilerMetaData.newBuilder()
.setService(Config.Agent.SERVICE_NAME)
.setServiceInstance(Config.Agent.INSTANCE_NAME)
.setTaskId(task.getTaskId())
.setType(AsyncProfilingStatus.EXECUTION_TASK_ERROR)
.setContentSize(-1)
.build();
AsyncProfilerData asyncProfilerData = AsyncProfilerData.newBuilder()
.setMetaData(metaData)
.setErrorMessage(errorMessage)
.build();
dataStreamObserver.onNext(asyncProfilerData);
dataStreamObserver.onCompleted();
status.wait4Finish();
}
}
Loading

0 comments on commit 2027a98

Please sign in to comment.