Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor procedure framework (simplified StateMachineProcedure, and some other things) #14683

Merged
Prev Previous commit
Next Next commit
recover KAWorker but rename it to TemporaryWorker
liyuheng55555 committed Jan 13, 2025
commit 88b4c90644921b7e9a394b5dc07b49e6a3ad1741
Original file line number Diff line number Diff line change
@@ -31,6 +31,7 @@
import org.apache.iotdb.confignode.procedure.store.IProcedureStore;

import com.google.common.base.Preconditions;
import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@@ -726,7 +727,7 @@ private class WorkerThread extends StoppableThread {
protected long keepAliveTime = -1;

public WorkerThread(ThreadGroup threadGroup) {
this(threadGroup, "ProcExecWorker-");
this(threadGroup, "ProcedureCoreWorker-");
}

public WorkerThread(ThreadGroup threadGroup, String prefix) {
@@ -746,26 +747,27 @@ public void run() {
while (isRunning() && keepAlive(lastUpdated)) {
Procedure<Env> procedure = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS);
if (procedure == null) {
Thread.sleep(1000);
continue;
}
this.activeProcedure.set(procedure);
int activeCount = activeExecutorCount.incrementAndGet();
activeExecutorCount.incrementAndGet();
startTime.set(System.currentTimeMillis());
executeProcedure(procedure);
activeCount = activeExecutorCount.decrementAndGet();
LOG.trace("Halt pid={}, activeCount={}", procedure.getProcId(), activeCount);
activeExecutorCount.decrementAndGet();
LOG.trace("Halt pid={}, activeCount={}", procedure.getProcId(), activeExecutorCount.get());
this.activeProcedure.set(null);
lastUpdated = System.currentTimeMillis();
startTime.set(lastUpdated);
}

} catch (Throwable throwable) {
} catch (Exception e) {
if (this.activeProcedure.get() != null) {
LOG.warn(
"Procedure Worker {} terminated {}",
"Exception happened when worker {} execute procedure {}",
getName(),
this.activeProcedure.get(),
throwable);
e);
}
} finally {
LOG.info("Procedure worker {} terminated.", getName());
@@ -791,12 +793,12 @@ public long getCurrentRunTime() {
}
}

// A worker thread which can be added when core workers are stuck. Will timeout after
// keepAliveTime if there is no procedure to run.
private final class KeepAliveWorkerThread extends WorkerThread {
// A temporary worker thread will be launched when too many core workers are stuck.
// They will timeout after keepAliveTime if there is no procedure to run.
private final class TemporaryWorkerThread extends WorkerThread {

public KeepAliveWorkerThread(ThreadGroup group) {
super(group, "KAProcExecWorker-");
public TemporaryWorkerThread(ThreadGroup group) {
super(group, "ProcedureTemporaryWorker-");
this.keepAliveTime = TimeUnit.SECONDS.toMillis(10);
}

@@ -818,22 +820,24 @@ public WorkerMonitor() {
updateTimestamp();
}

private int checkForStuckWorkers() {
private int calculateRunningAndStuckWorkers() {
// Check if any of the worker is stuck
int stuckCount = 0;
int runningCount = 0, stuckCount = 0;
for (WorkerThread worker : workerThreads) {
if (worker.activeProcedure.get() == null
|| worker.getCurrentRunTime() < DEFAULT_WORKER_STUCK_THRESHOLD) {
if (worker.activeProcedure.get() == null) {
continue;
}

runningCount++;
// WARN the worker is stuck
stuckCount++;
LOG.warn(
"Worker stuck {}({}), run time {} ms",
worker,
worker.activeProcedure.get().getProcType(),
worker.getCurrentRunTime());
if (worker.getCurrentRunTime() < DEFAULT_WORKER_STUCK_THRESHOLD) {
stuckCount++;
LOG.warn(
"Worker stuck {}({}), run time {} ms",
worker,
worker.activeProcedure.get().getProcType(),
worker.getCurrentRunTime());
}
LOG.info("Procedure workers: {} is running, {} is running and stuck", runningCount, stuckCount);
}
return stuckCount;
}
@@ -849,7 +853,7 @@ private void checkThreadCount(final int stuckCount) {
// Let's add new worker thread more aggressively, as they will timeout finally if there is no
// work to do.
if (stuckPerc >= DEFAULT_WORKER_ADD_STUCK_PERCENTAGE && workerThreads.size() < maxPoolSize) {
final KeepAliveWorkerThread worker = new KeepAliveWorkerThread(threadGroup);
final TemporaryWorkerThread worker = new TemporaryWorkerThread(threadGroup);
workerThreads.add(worker);
worker.start();
LOG.debug("Added new worker thread {}", worker);
@@ -858,7 +862,7 @@ private void checkThreadCount(final int stuckCount) {

@Override
protected void periodicExecute(Env env) {
final int stuckCount = checkForStuckWorkers();
final int stuckCount = calculateRunningAndStuckWorkers();
checkThreadCount(stuckCount);
updateTimestamp();
}