Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MINOR] improvement(server) Add context to rpc audit log to output necessary context #2088

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,33 @@
package org.apache.uniffle.common.audit;

import java.io.Closeable;
import java.util.Optional;

import org.slf4j.Logger;

import org.apache.uniffle.common.rpc.StatusCode;

/** Context for rpc audit logging. */
public abstract class RpcAuditContext implements Closeable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If having performance degression, can we disable this by config? @maobaolong I see some time consuming operations in the hotspot path.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we can disable the rpc audit log by rss.server.rpc.audit.log.enabled and rss.coordinator.rpc.audit.log.enabled

private static final ThreadLocal<RpcAuditContext> RPC_AUDIT_CONTEXT_THREAD_LOCAL =
new ThreadLocal<>();
private final Logger log;
private String command;
private String statusCode;
private String args;
private String returnValue;
private String context;
private String from;
private long creationTimeNs;
protected long executionTimeNs;

public RpcAuditContext(Logger log) {
this.log = log;
RPC_AUDIT_CONTEXT_THREAD_LOCAL.set(this);
}

public static final Optional<RpcAuditContext> getRpcAuditContext() {
return Optional.ofNullable(RPC_AUDIT_CONTEXT_THREAD_LOCAL.get());
}

protected abstract String content();
Expand Down Expand Up @@ -119,6 +128,21 @@ public RpcAuditContext withFrom(String from) {
return this;
}

/**
* Sets context field, context can be concat by invoke multiply time.
*
* @param contextPart the new context part
* @return this {@link RpcAuditContext} instance
*/
public RpcAuditContext withContext(String contextPart) {
if (context == null) {
context = contextPart;
} else {
this.context += ", " + contextPart;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks strange, from my thought, If want to record the whole up/downstream context tips. Maybe we need to introduce the dedicated Context class to record the all children context?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like this?

class Context {
 string contextName;
 List<Context> children;
}

}
return this;
}

@Override
public void close() {
if (log == null) {
Expand All @@ -140,6 +164,9 @@ public String toString() {
if (returnValue != null) {
line += String.format("\treturn{%s}", returnValue);
}
if (context != null) {
line += String.format("\tcontext{%s}", context);
}
return line;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -806,6 +806,8 @@ public void reportShuffleResult(
appId,
shuffleId);
}
auditContext.withContext("updatedBlockCount=" + updatedBlockCount);
auditContext.withContext("expectedBlockCount=" + expectedBlockCount);
} catch (Exception e) {
status = StatusCode.INTERNAL_ERROR;
msg = "error happened when report shuffle result, check shuffle server for detail";
Expand Down Expand Up @@ -853,7 +855,6 @@ public void getShuffleResult(
}

String msg = "OK";
GetShuffleResultResponse reply;
byte[] serializedBlockIds = null;
String requestInfo =
"appId[" + appId + "], shuffleId[" + shuffleId + "], partitionId[" + partitionId + "]";
Expand All @@ -878,7 +879,8 @@ public void getShuffleResult(
}

auditContext.withStatusCode(status);
reply =
auditContext.withReturnValue("serializedBlockIdsBytes=" + serializedBlockIdsBytes.size());
GetShuffleResultResponse reply =
GetShuffleResultResponse.newBuilder()
.setStatus(status.toProto())
.setRetMsg(msg)
Expand Down Expand Up @@ -922,7 +924,6 @@ public void getShuffleResultForMultiPart(
}

String msg = "OK";
GetShuffleResultForMultiPartResponse reply;
byte[] serializedBlockIds = null;
String requestInfo =
"appId[" + appId + "], shuffleId[" + shuffleId + "], partitions" + partitionsList;
Expand All @@ -947,8 +948,9 @@ public void getShuffleResultForMultiPart(
LOG.error("Error happened when get shuffle result for {}", requestInfo, e);
}

auditContext.withReturnValue("serializedBlockIdsBytes=" + serializedBlockIdsBytes.size());
auditContext.withStatusCode(status);
reply =
GetShuffleResultForMultiPartResponse reply =
GetShuffleResultForMultiPartResponse.newBuilder()
.setStatus(status.toProto())
.setRetMsg(msg)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.uniffle.common.ShuffleIndexResult;
import org.apache.uniffle.common.ShufflePartitionedBlock;
import org.apache.uniffle.common.ShufflePartitionedData;
import org.apache.uniffle.common.audit.RpcAuditContext;
import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.common.exception.FileNotFoundException;
import org.apache.uniffle.common.exception.InvalidRequestException;
Expand Down Expand Up @@ -641,9 +642,20 @@ public byte[] getFinishedBlockIds(
Roaring64NavigableMap res = Roaring64NavigableMap.bitmapOf();
for (Map.Entry<Integer, Set<Integer>> entry : bitmapIndexToPartitions.entrySet()) {
Set<Integer> requestPartitions = entry.getValue();
Roaring64NavigableMap bitmap = blockIds[entry.getKey()];
int bitmapIndex = entry.getKey();
Roaring64NavigableMap bitmap = blockIds[bitmapIndex];
getBlockIdsByPartitionId(requestPartitions, bitmap, res, blockIdLayout);
RpcAuditContext.getRpcAuditContext()
.ifPresent(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ifPresent is a good practise, because this avoids unnessary cpu cost if having the much string operations.

context ->
context.withContext(
String.format(
"bitmap[%d].<size,byte>=<%d,%d>",
bitmapIndex, bitmap.getLongCardinality(), bitmap.getLongSizeInBytes())));
}
RpcAuditContext.getRpcAuditContext()
.ifPresent(
context -> context.withContext("partitionBlockCount=" + res.getLongCardinality()));

if (res.getLongCardinality() != expectedBlockNumber) {
throw new RssException(
Expand Down
Loading