Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
update

fix checkstyl

fix

More accurate display of predicates on storage layer

Revert "More accurate display of predicates on storage layer"

This reverts commit 67a9c2c.
  • Loading branch information
BiteTheDDDDt committed Mar 4, 2025
1 parent 625b77e commit ee2ffae
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -748,7 +748,7 @@ protected void execInternal() throws Exception {
toArrowFlightHost(param.host), toBrpcHost(param.host), fragments.get(0).getOutputExprs()));
}
}
receiverConsumer = new ResultReceiverConsumer(receivers);
receiverConsumer = new ResultReceiverConsumer(receivers, timeoutDeadline);

LOG.info("dispatch result sink of query {} to {}", DebugUtil.printId(queryId),
topParams.instanceExecParams.get(0).host);
Expand Down
29 changes: 19 additions & 10 deletions fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TUniqueId;

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.FutureCallback;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TDeserializer;
Expand Down Expand Up @@ -76,20 +78,31 @@ Types.PUniqueId getRealFinstId() {
return finstId;
}

public void createFuture(
FutureCallback<InternalService.PFetchDataResult> callback) throws RpcException {
InternalService.PFetchDataRequest request = InternalService.PFetchDataRequest.newBuilder()
.setFinstId(getRealFinstId())
.setRespInAttachment(false)
.build();
try {
fetchDataAsyncFuture = BackendServiceProxy.getInstance().fetchDataAsyncWithCallback(address, request,
callback);
} catch (RpcException e) {
LOG.warn("fetch result rpc exception, finstId={}", DebugUtil.printId(finstId), e);
SimpleScheduler.addToBlacklist(backendId, e.getMessage());
throw e;
}
}

public RowBatch getNext(Status status) throws TException {
if (isDone) {
return null;
}
final RowBatch rowBatch = new RowBatch();
try {
while (!isDone && runStatus.ok()) {
InternalService.PFetchDataRequest request = InternalService.PFetchDataRequest.newBuilder()
.setFinstId(getRealFinstId())
.setRespInAttachment(false)
.build();

currentThread = Thread.currentThread();
fetchDataAsyncFuture = BackendServiceProxy.getInstance().fetchDataAsync(address, request);
Preconditions.checkNotNull(fetchDataAsyncFuture);
InternalService.PFetchDataResult pResult = null;

while (pResult == null) {
Expand Down Expand Up @@ -172,10 +185,6 @@ public RowBatch getNext(Status status) throws TException {
return rowBatch;
}
}
} catch (RpcException e) {
LOG.warn("fetch result rpc exception, finstId={}", DebugUtil.printId(finstId), e);
status.updateStatus(TStatusCode.THRIFT_RPC_ERROR, e.getMessage());
SimpleScheduler.addToBlacklist(backendId, e.getMessage());
} catch (ExecutionException e) {
LOG.warn("fetch result execution exception, finstId={}", DebugUtil.printId(finstId), e);
if (e.getMessage().contains("time out")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,17 @@

import org.apache.doris.common.Status;
import org.apache.doris.common.UserException;
import org.apache.doris.proto.InternalService;
import org.apache.doris.rpc.RpcException;

import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;
import org.apache.thrift.TException;

import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class ResultReceiverConsumer {
class ReceiverContext {
Expand All @@ -43,48 +43,50 @@ public void createFuture() {
return;
}
try {
future = executor.submit(() -> {
RowBatch rowBatch = null;
try {
rowBatch = receiver.getNext(status);
} catch (TException e) {
setErrMsg(e.getMessage());
receiver.createFuture(new FutureCallback<InternalService.PFetchDataResult>() {
@Override
public void onSuccess(InternalService.PFetchDataResult result) {
readyOffsets.offer(offset);
}

@Override
public void onFailure(Throwable t) {
readyOffsets.offer(offset);
}
readyOffsets.offer(offset);
return rowBatch;
});
} catch (Throwable e) {
} catch (RpcException e) {
setErrMsg(e.getMessage());
readyOffsets.offer(offset);
}
}

ResultReceiver receiver;
Status status = new Status();
Future<RowBatch> future;
final int offset;
}

private final ExecutorService executor;
private List<ReceiverContext> contexts = Lists.newArrayList();
private boolean futureInitialized = false;
private String errMsg;
private final long timeoutTs;

void setErrMsg(String errMsg) {
this.errMsg = errMsg;
executor.shutdownNow();
}

BlockingQueue<Integer> readyOffsets;
int finishedReceivers = 0;

public ResultReceiverConsumer(List<ResultReceiver> resultReceivers) {
public ResultReceiverConsumer(List<ResultReceiver> resultReceivers, long timeoutDeadline) {
for (int i = 0; i < resultReceivers.size(); i++) {
ReceiverContext context = new ReceiverContext(resultReceivers.get(i), i);
contexts.add(context);
}
this.executor = Executors.newFixedThreadPool(resultReceivers.size());
this.readyOffsets = new ArrayBlockingQueue<>(resultReceivers.size());
timeoutTs = timeoutDeadline;
}

public boolean isEos() {
return finishedReceivers == contexts.size();
}

public RowBatch getNext(Status status) throws TException, InterruptedException, ExecutionException, UserException {
Expand All @@ -95,30 +97,33 @@ public RowBatch getNext(Status status) throws TException, InterruptedException,
}
}

ReceiverContext context = contexts.get(readyOffsets.take());
if (errMsg != null) {
throw new UserException(errMsg);
Integer offset = readyOffsets.poll(timeoutTs - System.currentTimeMillis(),
java.util.concurrent.TimeUnit.MILLISECONDS);
if (offset == null) {
throw new TException("query timeout");
}
RowBatch rowBatch = context.future.get();
if (errMsg != null) {
throw new UserException(errMsg);
}
if (!context.status.ok()) {
setErrMsg(context.status.getErrorMsg());
status.updateStatus(context.status.getErrorCode(), context.status.getErrorMsg());

ReceiverContext context = contexts.get(offset);
RowBatch rowBatch = context.receiver.getNext(status);
if (!status.ok() || rowBatch == null) {
return rowBatch;
}
if (rowBatch.isEos()) {
finishedReceivers++;
if (finishedReceivers != contexts.size()) {
rowBatch.setEos(false);
} else {
executor.shutdownNow();
}
rowBatch.setEos(isEos());
} else {
context.createFuture();
}

return rowBatch;
}

public synchronized void cancel(Status reason) {
for (ReceiverContext context : contexts) {
context.receiver.cancel(reason);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;

public class QueryProcessor extends AbstractJobProcessor {
Expand All @@ -56,17 +54,13 @@ public class QueryProcessor extends AbstractJobProcessor {
private final long limitRows;

// mutable field
private final List<ResultReceiver> runningReceivers;
private ResultReceiverConsumer receiverConsumer;

private long numReceivedRows;

public QueryProcessor(CoordinatorContext coordinatorContext, List<ResultReceiver> runningReceivers) {
public QueryProcessor(CoordinatorContext coordinatorContext, ResultReceiverConsumer consumer) {
super(coordinatorContext);
this.runningReceivers = new CopyOnWriteArrayList<>(
Objects.requireNonNull(runningReceivers, "runningReceivers can not be null")
);
receiverConsumer = new ResultReceiverConsumer(runningReceivers);
receiverConsumer = consumer;

this.limitRows = coordinatorContext.fragments.get(0)
.getPlanRoot()
Expand Down Expand Up @@ -105,7 +99,9 @@ public static QueryProcessor build(CoordinatorContext coordinatorContext) {
)
);
}
return new QueryProcessor(coordinatorContext, receivers);
ResultReceiverConsumer consumer = new ResultReceiverConsumer(receivers,
coordinatorContext.timeoutDeadline.get());
return new QueryProcessor(coordinatorContext, consumer);
}

@Override
Expand All @@ -114,7 +110,7 @@ protected void doProcessReportExecStatus(TReportExecStatusParams params, SingleF
}

public boolean isEos() {
return runningReceivers.isEmpty();
return receiverConsumer.isEos();
}

public RowBatch getNext() throws UserException, InterruptedException, TException, RpcException, ExecutionException {
Expand Down Expand Up @@ -162,9 +158,7 @@ public RowBatch getNext() throws UserException, InterruptedException, TException
}

public void cancel(Status cancelReason) {
for (ResultReceiver receiver : runningReceivers) {
receiver.cancel(cancelReason);
}
receiverConsumer.cancel(cancelReason);

this.executionTask.ifPresent(sqlPipelineTask -> {
for (MultiFragmentsPipelineTask fragmentsTask : sqlPipelineTask.getChildrenTasks().values()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ public ListenableFuture<InternalService.PCancelPlanFragmentResult> cancelPlanFra
.cancelPlanFragment(request);
}

public Future<InternalService.PFetchDataResult> fetchDataAsync(InternalService.PFetchDataRequest request) {
public ListenableFuture<InternalService.PFetchDataResult> fetchDataAsync(
InternalService.PFetchDataRequest request) {
return stub.fetchData(request);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import org.apache.doris.thrift.TUniqueId;

import com.google.common.collect.Maps;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.ByteString;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -295,6 +297,23 @@ public Future<InternalService.PFetchDataResult> fetchDataAsync(
}
}

public Future<InternalService.PFetchDataResult> fetchDataAsyncWithCallback(
TNetworkAddress address, InternalService.PFetchDataRequest request,
FutureCallback<InternalService.PFetchDataResult> callback) throws RpcException {
try {
final BackendServiceClient client = getProxy(address);
ListenableFuture<InternalService.PFetchDataResult> future = client.fetchDataAsync(request);
Futures.addCallback(
future, callback,
grpcThreadPool);
return future;
} catch (Throwable e) {
LOG.warn("fetch data catch a exception, address={}:{}",
address.getHostname(), address.getPort(), e);
throw new RpcException(address.hostname, e.getMessage());
}
}

public Future<InternalService.PTabletKeyLookupResponse> fetchTabletDataAsync(
TNetworkAddress address, InternalService.PTabletKeyLookupRequest request) throws RpcException {
try {
Expand Down

0 comments on commit ee2ffae

Please sign in to comment.