diff --git a/common/src/main/java/org/apache/uniffle/common/audit/RpcAuditContext.java b/common/src/main/java/org/apache/uniffle/common/audit/RpcAuditContext.java index fbe85efd0b..030388e5e4 100644 --- a/common/src/main/java/org/apache/uniffle/common/audit/RpcAuditContext.java +++ b/common/src/main/java/org/apache/uniffle/common/audit/RpcAuditContext.java @@ -18,6 +18,7 @@ package org.apache.uniffle.common.audit; import java.io.Closeable; +import java.util.Optional; import org.slf4j.Logger; @@ -25,17 +26,25 @@ /** Context for rpc audit logging. */ public abstract class RpcAuditContext implements Closeable { + private static final ThreadLocal RPC_AUDIT_CONTEXT_THREAD_LOCAL = + new ThreadLocal<>(); private final Logger log; private String command; private String statusCode; private String args; private String returnValue; + private String context; private String from; private long creationTimeNs; protected long executionTimeNs; public RpcAuditContext(Logger log) { this.log = log; + RPC_AUDIT_CONTEXT_THREAD_LOCAL.set(this); + } + + public static final Optional getRpcAuditContext() { + return Optional.ofNullable(RPC_AUDIT_CONTEXT_THREAD_LOCAL.get()); } protected abstract String content(); @@ -119,6 +128,21 @@ public RpcAuditContext withFrom(String from) { return this; } + /** + * Sets context field, context can be concat by invoke multiply time. + * + * @param contextPart the new context part + * @return this {@link RpcAuditContext} instance + */ + public RpcAuditContext withContext(String contextPart) { + if (context == null) { + context = contextPart; + } else { + this.context += ", " + contextPart; + } + return this; + } + @Override public void close() { if (log == null) { @@ -140,6 +164,9 @@ public String toString() { if (returnValue != null) { line += String.format("\treturn{%s}", returnValue); } + if (context != null) { + line += String.format("\tcontext{%s}", context); + } return line; } } diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java index 5cce3a3a8b..346abe5211 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java @@ -806,6 +806,8 @@ public void reportShuffleResult( appId, shuffleId); } + auditContext.withContext("updatedBlockCount=" + updatedBlockCount); + auditContext.withContext("expectedBlockCount=" + expectedBlockCount); } catch (Exception e) { status = StatusCode.INTERNAL_ERROR; msg = "error happened when report shuffle result, check shuffle server for detail"; @@ -853,7 +855,6 @@ public void getShuffleResult( } String msg = "OK"; - GetShuffleResultResponse reply; byte[] serializedBlockIds = null; String requestInfo = "appId[" + appId + "], shuffleId[" + shuffleId + "], partitionId[" + partitionId + "]"; @@ -878,7 +879,8 @@ public void getShuffleResult( } auditContext.withStatusCode(status); - reply = + auditContext.withReturnValue("serializedBlockIdsBytes=" + serializedBlockIdsBytes.size()); + GetShuffleResultResponse reply = GetShuffleResultResponse.newBuilder() .setStatus(status.toProto()) .setRetMsg(msg) @@ -922,7 +924,6 @@ public void getShuffleResultForMultiPart( } String msg = "OK"; - GetShuffleResultForMultiPartResponse reply; byte[] serializedBlockIds = null; String requestInfo = "appId[" + appId + "], shuffleId[" + shuffleId + "], partitions" + partitionsList; @@ -947,8 +948,9 @@ public void getShuffleResultForMultiPart( LOG.error("Error happened when get shuffle result for {}", requestInfo, e); } + auditContext.withReturnValue("serializedBlockIdsBytes=" + serializedBlockIdsBytes.size()); auditContext.withStatusCode(status); - reply = + GetShuffleResultForMultiPartResponse reply = GetShuffleResultForMultiPartResponse.newBuilder() .setStatus(status.toProto()) .setRetMsg(msg) diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java index 226682e638..e3280dd06f 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java @@ -58,6 +58,7 @@ import org.apache.uniffle.common.ShuffleIndexResult; import org.apache.uniffle.common.ShufflePartitionedBlock; import org.apache.uniffle.common.ShufflePartitionedData; +import org.apache.uniffle.common.audit.RpcAuditContext; import org.apache.uniffle.common.config.RssBaseConf; import org.apache.uniffle.common.exception.FileNotFoundException; import org.apache.uniffle.common.exception.InvalidRequestException; @@ -641,9 +642,20 @@ public byte[] getFinishedBlockIds( Roaring64NavigableMap res = Roaring64NavigableMap.bitmapOf(); for (Map.Entry> entry : bitmapIndexToPartitions.entrySet()) { Set requestPartitions = entry.getValue(); - Roaring64NavigableMap bitmap = blockIds[entry.getKey()]; + int bitmapIndex = entry.getKey(); + Roaring64NavigableMap bitmap = blockIds[bitmapIndex]; getBlockIdsByPartitionId(requestPartitions, bitmap, res, blockIdLayout); + RpcAuditContext.getRpcAuditContext() + .ifPresent( + context -> + context.withContext( + String.format( + "bitmap[%d].=<%d,%d>", + bitmapIndex, bitmap.getLongCardinality(), bitmap.getLongSizeInBytes()))); } + RpcAuditContext.getRpcAuditContext() + .ifPresent( + context -> context.withContext("partitionBlockCount=" + res.getLongCardinality())); if (res.getLongCardinality() != expectedBlockNumber) { throw new RssException(