Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
update
  • Loading branch information
BiteTheDDDDt committed Mar 4, 2025
1 parent 625b77e commit 75afeac
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -748,7 +748,7 @@ protected void execInternal() throws Exception {
toArrowFlightHost(param.host), toBrpcHost(param.host), fragments.get(0).getOutputExprs()));
}
}
receiverConsumer = new ResultReceiverConsumer(receivers);
receiverConsumer = new ResultReceiverConsumer(receivers, timeoutDeadline);

LOG.info("dispatch result sink of query {} to {}", DebugUtil.printId(queryId),
topParams.instanceExecParams.get(0).host);
Expand Down
30 changes: 20 additions & 10 deletions fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@
import org.apache.thrift.TDeserializer;
import org.apache.thrift.TException;

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.FutureCallback;

import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -76,20 +79,31 @@ Types.PUniqueId getRealFinstId() {
return finstId;
}

public void createFuture(
FutureCallback<InternalService.PFetchDataResult> callback) throws RpcException {
InternalService.PFetchDataRequest request = InternalService.PFetchDataRequest.newBuilder()
.setFinstId(getRealFinstId())
.setRespInAttachment(false)
.build();
try {
fetchDataAsyncFuture = BackendServiceProxy.getInstance().fetchDataAsyncWithCallback(address, request,
callback);
} catch (RpcException e) {
LOG.warn("fetch result rpc exception, finstId={}", DebugUtil.printId(finstId), e);
SimpleScheduler.addToBlacklist(backendId, e.getMessage());
throw e;
}
}

public RowBatch getNext(Status status) throws TException {
if (isDone) {
return null;
}
final RowBatch rowBatch = new RowBatch();
try {
while (!isDone && runStatus.ok()) {
InternalService.PFetchDataRequest request = InternalService.PFetchDataRequest.newBuilder()
.setFinstId(getRealFinstId())
.setRespInAttachment(false)
.build();

currentThread = Thread.currentThread();
fetchDataAsyncFuture = BackendServiceProxy.getInstance().fetchDataAsync(address, request);
Preconditions.checkNotNull(fetchDataAsyncFuture);
InternalService.PFetchDataResult pResult = null;

while (pResult == null) {
Expand Down Expand Up @@ -172,10 +186,6 @@ public RowBatch getNext(Status status) throws TException {
return rowBatch;
}
}
} catch (RpcException e) {
LOG.warn("fetch result rpc exception, finstId={}", DebugUtil.printId(finstId), e);
status.updateStatus(TStatusCode.THRIFT_RPC_ERROR, e.getMessage());
SimpleScheduler.addToBlacklist(backendId, e.getMessage());
} catch (ExecutionException e) {
LOG.warn("fetch result execution exception, finstId={}", DebugUtil.printId(finstId), e);
if (e.getMessage().contains("time out")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,18 @@

import org.apache.doris.common.Status;
import org.apache.doris.common.UserException;
import org.apache.doris.proto.InternalService;
import org.apache.doris.rpc.RpcException;

import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;

import org.apache.thrift.TException;

import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class ResultReceiverConsumer {
class ReceiverContext {
Expand All @@ -43,48 +44,52 @@ public void createFuture() {
return;
}
try {
future = executor.submit(() -> {
RowBatch rowBatch = null;
try {
rowBatch = receiver.getNext(status);
} catch (TException e) {
setErrMsg(e.getMessage());
FutureCallback<InternalService.PFetchDataResult> callback = new FutureCallback<InternalService.PFetchDataResult>() {
@Override
public void onSuccess(InternalService.PFetchDataResult result) {
readyOffsets.offer(offset);
}

@Override
public void onFailure(Throwable t) {
setErrMsg(t.getMessage());
readyOffsets.offer(offset);
}
readyOffsets.offer(offset);
return rowBatch;
});
} catch (Throwable e) {
};
receiver.createFuture(callback);
} catch (RpcException e) {
setErrMsg(e.getMessage());
readyOffsets.offer(offset);
}
}

ResultReceiver receiver;
Status status = new Status();
Future<RowBatch> future;
final int offset;
}

private final ExecutorService executor;
private List<ReceiverContext> contexts = Lists.newArrayList();
private boolean futureInitialized = false;
private String errMsg;
private final long timeoutTs;

void setErrMsg(String errMsg) {
this.errMsg = errMsg;
executor.shutdownNow();
}

BlockingQueue<Integer> readyOffsets;
int finishedReceivers = 0;

public ResultReceiverConsumer(List<ResultReceiver> resultReceivers) {
public ResultReceiverConsumer(List<ResultReceiver> resultReceivers, long timeoutDeadline) {
for (int i = 0; i < resultReceivers.size(); i++) {
ReceiverContext context = new ReceiverContext(resultReceivers.get(i), i);
contexts.add(context);
}
this.executor = Executors.newFixedThreadPool(resultReceivers.size());
this.readyOffsets = new ArrayBlockingQueue<>(resultReceivers.size());
timeoutTs = timeoutDeadline;
}

public boolean isEos() {
return finishedReceivers == contexts.size();
}

public RowBatch getNext(Status status) throws TException, InterruptedException, ExecutionException, UserException {
Expand All @@ -95,30 +100,37 @@ public RowBatch getNext(Status status) throws TException, InterruptedException,
}
}

ReceiverContext context = contexts.get(readyOffsets.take());
Integer offset = readyOffsets.poll(timeoutTs - System.currentTimeMillis(),
java.util.concurrent.TimeUnit.MILLISECONDS);
if (offset == null) {
throw new TException("query timeout");
}

ReceiverContext context = contexts.get(offset);
if (errMsg != null) {
throw new UserException(errMsg);
}
RowBatch rowBatch = context.future.get();
RowBatch rowBatch = context.receiver.getNext(status);
if (errMsg != null) {
throw new UserException(errMsg);
}
if (!context.status.ok()) {
setErrMsg(context.status.getErrorMsg());
status.updateStatus(context.status.getErrorCode(), context.status.getErrorMsg());
if (!status.ok()) {
setErrMsg(status.getErrorMsg());
return rowBatch;
}
if (rowBatch.isEos()) {
finishedReceivers++;
if (finishedReceivers != contexts.size()) {
rowBatch.setEos(false);
} else {
executor.shutdownNow();
}
rowBatch.setEos(isEos());
} else {
context.createFuture();
}

return rowBatch;
}

public synchronized void cancel(Status reason) {
for (ReceiverContext context : contexts) {
context.receiver.cancel(reason);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,13 @@ public class QueryProcessor extends AbstractJobProcessor {
private final long limitRows;

// mutable field
private final List<ResultReceiver> runningReceivers;
private ResultReceiverConsumer receiverConsumer;

private long numReceivedRows;

public QueryProcessor(CoordinatorContext coordinatorContext, List<ResultReceiver> runningReceivers) {
public QueryProcessor(CoordinatorContext coordinatorContext, ResultReceiverConsumer consumer) {
super(coordinatorContext);
this.runningReceivers = new CopyOnWriteArrayList<>(
Objects.requireNonNull(runningReceivers, "runningReceivers can not be null")
);
receiverConsumer = new ResultReceiverConsumer(runningReceivers);
receiverConsumer = consumer;

this.limitRows = coordinatorContext.fragments.get(0)
.getPlanRoot()
Expand Down Expand Up @@ -105,7 +101,8 @@ public static QueryProcessor build(CoordinatorContext coordinatorContext) {
)
);
}
return new QueryProcessor(coordinatorContext, receivers);
ResultReceiverConsumer consumer = new ResultReceiverConsumer(receivers, coordinatorContext.timeoutDeadline.get());
return new QueryProcessor(coordinatorContext, consumer);
}

@Override
Expand All @@ -114,7 +111,7 @@ protected void doProcessReportExecStatus(TReportExecStatusParams params, SingleF
}

public boolean isEos() {
return runningReceivers.isEmpty();
return receiverConsumer.isEos();
}

public RowBatch getNext() throws UserException, InterruptedException, TException, RpcException, ExecutionException {
Expand Down Expand Up @@ -162,9 +159,7 @@ public RowBatch getNext() throws UserException, InterruptedException, TException
}

public void cancel(Status cancelReason) {
for (ResultReceiver receiver : runningReceivers) {
receiver.cancel(cancelReason);
}
receiverConsumer.cancel(cancelReason);

this.executionTask.ifPresent(sqlPipelineTask -> {
for (MultiFragmentsPipelineTask fragmentsTask : sqlPipelineTask.getChildrenTasks().values()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public ListenableFuture<InternalService.PCancelPlanFragmentResult> cancelPlanFra
.cancelPlanFragment(request);
}

public Future<InternalService.PFetchDataResult> fetchDataAsync(InternalService.PFetchDataRequest request) {
public ListenableFuture<InternalService.PFetchDataResult> fetchDataAsync(InternalService.PFetchDataRequest request) {
return stub.fetchData(request);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import org.apache.doris.thrift.TUniqueId;

import com.google.common.collect.Maps;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.ByteString;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -295,6 +297,23 @@ public Future<InternalService.PFetchDataResult> fetchDataAsync(
}
}

public Future<InternalService.PFetchDataResult> fetchDataAsyncWithCallback(
TNetworkAddress address, InternalService.PFetchDataRequest request,
FutureCallback<InternalService.PFetchDataResult> callback) throws RpcException {
try {
final BackendServiceClient client = getProxy(address);
ListenableFuture<InternalService.PFetchDataResult> future = client.fetchDataAsync(request);
Futures.addCallback(
future, callback,
grpcThreadPool);
return future;
} catch (Throwable e) {
LOG.warn("fetch data catch a exception, address={}:{}",
address.getHostname(), address.getPort(), e);
throw new RpcException(address.hostname, e.getMessage());
}
}

public Future<InternalService.PTabletKeyLookupResponse> fetchTabletDataAsync(
TNetworkAddress address, InternalService.PTabletKeyLookupRequest request) throws RpcException {
try {
Expand Down

0 comments on commit 75afeac

Please sign in to comment.