Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve](routine load) introduce routine load abnormal job monitor #48171

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions fe/fe-common/src/main/java/org/apache/doris/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -1277,6 +1277,30 @@ public class Config extends ConfigBase {
@ConfField(mutable = true, masterOnly = true)
public static int max_get_kafka_meta_timeout_second = 60;

/**
* The interval (in seconds) to check for abnormal routine load jobs, multiplied by maxBatchInterval.
*/
@ConfField(mutable = true, masterOnly = true)
public static int abnormal_check_interval_multiplier = 10;

/**
* The min interval (in seconds) to check for abnormal routine load jobs.
*/
@ConfField(mutable = true, masterOnly = true)
public static int min_abnormal_check_interval_sec = 10 * 60;

/**
* The failure rate threshold for routine load jobs; if exceeded, the job is considered abnormal.
*/
@ConfField(mutable = true, masterOnly = true)
public static double min_abnormal_abort_txn_ratio_threshold = 0.8;

/**
* The threshold for automatic resumption of routine load jobs; if exceeded, the job is considered abnormal.
*/
@ConfField(mutable = true, masterOnly = true)
public static int min_abnormal_auto_resume_count_threshold = 5;

/**
* The max number of files store in SmallFileMgr
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -904,4 +904,35 @@ public Object updateIngestionLoad(HttpServletRequest request, HttpServletRespons

}

@RequestMapping(path = "/api/routine_load/abnormal_jobs", method = RequestMethod.GET)
public Object getRoutineLoadAbnormalJobs(HttpServletRequest request, HttpServletResponse response) {
if (needRedirect(request.getScheme())) {
return redirectToHttps(request);
}

executeCheckPassword(request, response);

Map<Long, String> abnormalJobs = Env.getCurrentEnv().getRoutineLoadManager().getAbnormalJobs();
List<Map<String, String>> result = new LinkedList<>();
for (Long id : abnormalJobs.keySet()) {
String fullJobName = Env.getCurrentEnv().getRoutineLoadManager().fullJobName(id);
Map<String, String> jobInfo = new HashMap<>();
jobInfo.put("routineload_job_name", fullJobName);
jobInfo.put("reason", abnormalJobs.get(id));
result.add(jobInfo);
}
return ResponseEntity.ok(result);
}

@RequestMapping(path = "/api/routine_load/clean_abnormal_jobs", method = RequestMethod.POST)
public Object cleanRoutineLoadAbnormalJobs(HttpServletRequest request, HttpServletResponse response) {
if (needRedirect(request.getScheme())) {
return redirectToHttps(request);
}

executeCheckPassword(request, response);

Env.getCurrentEnv().getRoutineLoadManager().cleanAbnormalJobs();
return ResponseEntityBuilder.ok();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@
import java.util.Map;
import java.util.TimeZone;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;


/**
Expand Down Expand Up @@ -110,6 +112,10 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
// Will be updated periodically by calling updateKafkaPartitions();
private List<Integer> newCurrentKafkaPartition = Lists.newArrayList();

private ConcurrentMap<Integer, Long> snapshoPartitionIdToOffset = Maps.newConcurrentMap();

private String abnormalProgressReason = "";

public KafkaRoutineLoadJob() {
// for serialization, id is dummy
super(-1, LoadDataSourceType.KAFKA);
Expand Down Expand Up @@ -925,4 +931,48 @@ public TFileCompressType getCompressType() {
public double getMaxFilterRatio() {
return maxFilterRatio;
}

@Override
protected boolean checkAbnormalJobByProgress() {
boolean isAbnormalJob = isAbnormalProgress();
refreshPartitionIdToOffsetSnapshot();
return isAbnormalJob;
}

@Override
protected String abnormalProgressReason() {
return abnormalProgressReason;
}

public boolean isAbnormalProgress() {
Map<Integer, Long> abnormalPartitionMap = Maps.newHashMap();

for (Map.Entry<Integer, Long> entry : ((KafkaProgress) progress).getOffsetByPartition().entrySet()) {
Integer partitionId = entry.getKey();
Long partitionOffset = entry.getValue();

if (snapshoPartitionIdToOffset.containsKey(partitionId)) {
Long snapshotOffset = snapshoPartitionIdToOffset.get(partitionId);
if (partitionOffset <= snapshotOffset) {
if (cachedPartitionWithLatestOffsets.containsKey(partitionId)
&& cachedPartitionWithLatestOffsets.get(partitionId) - partitionOffset != 0) {
// progress has not advanced, and the lag is not zero
abnormalPartitionMap.put(partitionId, partitionOffset);
}
}
}
}

if (!abnormalPartitionMap.isEmpty()) {
abnormalProgressReason = "The progress of certain partitions has not advanced, and the lag is not zero,"
+ " abnormal partition: " + abnormalPartitionMap.toString();
return true;
}

return false;
}

public void refreshPartitionIdToOffsetSnapshot() {
this.snapshoPartitionIdToOffset = new ConcurrentHashMap<>(((KafkaProgress) progress).getOffsetByPartition());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,8 @@ public boolean isFinalState() {
protected String qualifiedUser;
protected String cloudCluster;

protected long latestCheckAbnormalJobTime = System.currentTimeMillis();

public void setTypeRead(boolean isTypeRead) {
this.isTypeRead = isTypeRead;
}
Expand Down Expand Up @@ -1103,6 +1105,7 @@ public void afterCommitted(TransactionState txnState, boolean txnOperated) throw
taskBeId = routineLoadTaskInfo.getBeId();
executeTaskOnTxnStatusChanged(routineLoadTaskInfo, txnState, TransactionStatus.COMMITTED, null);
++this.jobStatistic.committedTaskNum;
++this.jobStatistic.currentCommittedTaskNum;
if (LOG.isDebugEnabled()) {
LOG.debug("routine load task committed. task id: {}, job id: {}", txnState.getLabel(), id);
}
Expand Down Expand Up @@ -1228,6 +1231,7 @@ public void afterAborted(TransactionState txnState, boolean txnOperated, String
.build());
}
++this.jobStatistic.abortedTaskNum;
++this.jobStatistic.currentAbortedTaskNum;
TransactionState.TxnStatusChangeReason txnStatusChangeReason = null;
if (txnStatusChangeReasonString != null) {
txnStatusChangeReason =
Expand Down Expand Up @@ -1452,12 +1456,14 @@ private void executeNeedSchedule() {
}

private void executeStop() {
Env.getCurrentEnv().getRoutineLoadManager().removeAbnormalJob(this.id);
state = JobState.STOPPED;
routineLoadTaskInfoList.clear();
endTimestamp = System.currentTimeMillis();
}

private void executeCancel(ErrorReason reason) {
Env.getCurrentEnv().getRoutineLoadManager().removeAbnormalJob(this.id);
cancelReason = reason;
state = JobState.CANCELLED;
routineLoadTaskInfoList.clear();
Expand Down Expand Up @@ -1521,6 +1527,13 @@ public void update() throws UserException {
}
}

writeLock();
try {
checkAbnormalJob();
} finally {
writeUnlock();
}

boolean needAutoResume = needAutoResume();

if (!refreshKafkaPartitions(needAutoResume)) {
Expand All @@ -1542,6 +1555,49 @@ public void update() throws UserException {
}
}

private void checkAbnormalJob() {
// 1. check auto resume count
if (this.autoResumeCount >= Config.min_abnormal_auto_resume_count_threshold) {
Env.getCurrentEnv().getRoutineLoadManager().addAbnormalJob(this.id,
"The auto resume time reaches threshold: " + Config.min_abnormal_auto_resume_count_threshold);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Automatic resume has failed multiple times.

return;
}

// define a check window
long checkIntervalMillis = Math.max(maxBatchIntervalS * Config.abnormal_check_interval_multiplier,
Config.min_abnormal_check_interval_sec) * 1000;
if (System.currentTimeMillis() - latestCheckAbnormalJobTime > checkIntervalMillis) {
// 2. check abort txn ratio
if ((double) this.jobStatistic.currentAbortedTaskNum
/ (this.jobStatistic.currentAbortedTaskNum + this.jobStatistic.currentCommittedTaskNum)
> Config.min_abnormal_abort_txn_ratio_threshold) {
this.jobStatistic.currentAbortedTaskNum = 0;
this.jobStatistic.currentCommittedTaskNum = 0;
Env.getCurrentEnv().getRoutineLoadManager().addAbnormalJob(this.id,
"The ratio of currentAbortedTaskNum to the currentTotalTaskNum of tasks reaches: "
+ Config.min_abnormal_abort_txn_ratio_threshold);
return;
}

// 3. check progress and lag
if (checkAbnormalJobByProgress()) {
Env.getCurrentEnv().getRoutineLoadManager().addAbnormalJob(this.id, abnormalProgressReason());
return;
}

// remove job if all check pass.
Env.getCurrentEnv().getRoutineLoadManager().removeAbnormalJob(this.id);
}
}

protected boolean checkAbnormalJobByProgress() {
return false;
}

protected String abnormalProgressReason() {
return "";
}

// Call this before calling unprotectUpdateProgress().
// Because unprotectUpdateProgress() is protected by writelock.
// So if there are time-consuming operations, they should be done in this method.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ public class RoutineLoadManager implements Writable {

private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);

// key:jobid, value: abnormal reason
private Map<Long, String> abnormalJobs = new ConcurrentHashMap<>();

private void readLock() {
lock.readLock().lock();
}
Expand Down Expand Up @@ -136,6 +139,31 @@ public int getTotalMaxConcurrentTaskNum() {
return beIdToMaxConcurrentTasks.values().stream().mapToInt(i -> i).sum();
}

public Map<Long, String> getAbnormalJobs() {
return abnormalJobs;
}

public void cleanAbnormalJobs() {
abnormalJobs.clear();
}

public void addAbnormalJob(Long id, String reason) {
abnormalJobs.put(id, reason);
}

public void removeAbnormalJob(Long id) {
abnormalJobs.remove(id);
}

public String fullJobName(Long id) {
RoutineLoadJob job = idToRoutineLoadJob.get(id);
Database db = Env.getCurrentEnv().getInternalCatalog().getDbNullable(idToRoutineLoadJob.get(id).getDbId());
if (db == null) {
return job.getName();
}
return db.getName() + "." + job.getName();
}

// return the map of be id -> running tasks num
private Map<Long, Integer> getBeCurrentTasksNumMap() {
Map<Long, Integer> beCurrentTaskNumMap = Maps.newHashMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ public class RoutineLoadStatistic {
public long committedTaskNum = 0;
@SerializedName(value = "abortedTaskNum")
public long abortedTaskNum = 0;
public long currentCommittedTaskNum = 0;
public long currentAbortedTaskNum = 0;

// Save all transactions current running. Including PREPARE, COMMITTED.
// No need to persist, only for tracing txn of routine load job.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ public final class MetricRepo {

public static LongCounterMetric COUNTER_ROUTINE_LOAD_ROWS;
public static LongCounterMetric COUNTER_ROUTINE_LOAD_RECEIVED_BYTES;
public static GaugeMetric<Long> GAUGE_ROUTINE_LOAD_ABNORMAL_JOB_NUMS;
public static LongCounterMetric COUNTER_ROUTINE_LOAD_ERROR_ROWS;
public static LongCounterMetric COUNTER_HIT_SQL_BLOCK_RULE;

Expand Down Expand Up @@ -541,6 +542,14 @@ public Long getValue() {
COUNTER_ROUTINE_LOAD_ERROR_ROWS = new LongCounterMetric("routine_load_error_rows", MetricUnit.ROWS,
"total error rows of routine load");
DORIS_METRIC_REGISTER.addMetrics(COUNTER_ROUTINE_LOAD_ERROR_ROWS);
GAUGE_ROUTINE_LOAD_ABNORMAL_JOB_NUMS = new GaugeMetric<Long>("routine_load_abnormal_job_nums",
MetricUnit.NOUNIT, "abnormal job nums of routine load") {
@Override
public Long getValue() {
return (long) Env.getCurrentEnv().getRoutineLoadManager().getAbnormalJobs().size();
}
};
DORIS_METRIC_REGISTER.addMetrics(GAUGE_ROUTINE_LOAD_ABNORMAL_JOB_NUMS);

COUNTER_HIT_SQL_BLOCK_RULE = new LongCounterMetric("counter_hit_sql_block_rule", MetricUnit.ROWS,
"total hit sql block rule query");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,7 @@ public void testPauseRoutineLoadJob(@Injectable PauseRoutineLoadStmt pauseRoutin
Map<String, List<RoutineLoadJob>> nameToRoutineLoadJob = Maps.newHashMap();
List<RoutineLoadJob> routineLoadJobList = Lists.newArrayList();
RoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob();
Deencapsulation.setField(routineLoadJob, "progress", new KafkaProgress());
routineLoadJobList.add(routineLoadJob);
nameToRoutineLoadJob.put("", routineLoadJobList);
dbToNameToRoutineLoadJob.put(1L, nameToRoutineLoadJob);
Expand Down
3 changes: 3 additions & 0 deletions regression-test/pipeline/cloud_p0/conf/fe_custom.conf
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,6 @@ workload_group_max_num = 25
enable_advance_next_id = true

check_table_lock_leaky = true

min_abnormal_auto_resume_count_threshold = 2
min_abnormal_check_interval_sec = 0
3 changes: 3 additions & 0 deletions regression-test/pipeline/p0/conf/fe.conf
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,6 @@ max_query_profile_num = 2000
max_spilled_profile_num = 2000

check_table_lock_leaky=true

min_abnormal_auto_resume_count_threshold = 2
min_abnormal_check_interval_sec = 0
Loading