diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 60f59460a0a34b..e745ac310964e0 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1277,6 +1277,30 @@ public class Config extends ConfigBase { @ConfField(mutable = true, masterOnly = true) public static int max_get_kafka_meta_timeout_second = 60; + /** + * The interval (in seconds) to check for abnormal routine load jobs, multiplied by maxBatchInterval. + */ + @ConfField(mutable = true, masterOnly = true) + public static int abnormal_check_interval_multiplier = 10; + + /** + * The min interval (in seconds) to check for abnormal routine load jobs. + */ + @ConfField(mutable = true, masterOnly = true) + public static int min_abnormal_check_interval_sec = 10 * 60; + + /** + * The failure rate threshold for routine load jobs; if exceeded, the job is considered abnormal. + */ + @ConfField(mutable = true, masterOnly = true) + public static double min_abnormal_abort_txn_ratio_threshold = 0.8; + + /** + * The threshold for automatic resumption of routine load jobs; if exceeded, the job is considered abnormal. + */ + @ConfField(mutable = true, masterOnly = true) + public static int min_abnormal_auto_resume_count_threshold = 5; + /** * The max number of files store in SmallFileMgr */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java index 48776da1569316..d774217c57a531 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java @@ -904,4 +904,35 @@ public Object updateIngestionLoad(HttpServletRequest request, HttpServletRespons } + @RequestMapping(path = "/api/routine_load/abnormal_jobs", method = RequestMethod.GET) + public Object getRoutineLoadAbnormalJobs(HttpServletRequest request, HttpServletResponse response) { + if (needRedirect(request.getScheme())) { + return redirectToHttps(request); + } + + executeCheckPassword(request, response); + + Map abnormalJobs = Env.getCurrentEnv().getRoutineLoadManager().getAbnormalJobs(); + List> result = new LinkedList<>(); + for (Long id : abnormalJobs.keySet()) { + String fullJobName = Env.getCurrentEnv().getRoutineLoadManager().fullJobName(id); + Map jobInfo = new HashMap<>(); + jobInfo.put("routineload_job_name", fullJobName); + jobInfo.put("reason", abnormalJobs.get(id)); + result.add(jobInfo); + } + return ResponseEntity.ok(result); + } + + @RequestMapping(path = "/api/routine_load/clean_abnormal_jobs", method = RequestMethod.POST) + public Object cleanRoutineLoadAbnormalJobs(HttpServletRequest request, HttpServletResponse response) { + if (needRedirect(request.getScheme())) { + return redirectToHttps(request); + } + + executeCheckPassword(request, response); + + Env.getCurrentEnv().getRoutineLoadManager().cleanAbnormalJobs(); + return ResponseEntityBuilder.ok(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java index 6952a32b8b3c9f..74dfca2f2ff738 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java @@ -71,6 +71,8 @@ import java.util.Map; import java.util.TimeZone; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; /** @@ -110,6 +112,10 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob { // Will be updated periodically by calling updateKafkaPartitions(); private List newCurrentKafkaPartition = Lists.newArrayList(); + private ConcurrentMap snapshoPartitionIdToOffset = Maps.newConcurrentMap(); + + private String abnormalProgressReason = ""; + public KafkaRoutineLoadJob() { // for serialization, id is dummy super(-1, LoadDataSourceType.KAFKA); @@ -925,4 +931,48 @@ public TFileCompressType getCompressType() { public double getMaxFilterRatio() { return maxFilterRatio; } + + @Override + protected boolean checkAbnormalJobByProgress() { + boolean isAbnormalJob = isAbnormalProgress(); + refreshPartitionIdToOffsetSnapshot(); + return isAbnormalJob; + } + + @Override + protected String abnormalProgressReason() { + return abnormalProgressReason; + } + + public boolean isAbnormalProgress() { + Map abnormalPartitionMap = Maps.newHashMap(); + + for (Map.Entry entry : ((KafkaProgress) progress).getOffsetByPartition().entrySet()) { + Integer partitionId = entry.getKey(); + Long partitionOffset = entry.getValue(); + + if (snapshoPartitionIdToOffset.containsKey(partitionId)) { + Long snapshotOffset = snapshoPartitionIdToOffset.get(partitionId); + if (partitionOffset <= snapshotOffset) { + if (cachedPartitionWithLatestOffsets.containsKey(partitionId) + && cachedPartitionWithLatestOffsets.get(partitionId) - partitionOffset != 0) { + // progress has not advanced, and the lag is not zero + abnormalPartitionMap.put(partitionId, partitionOffset); + } + } + } + } + + if (!abnormalPartitionMap.isEmpty()) { + abnormalProgressReason = "The progress of certain partitions has not advanced, and the lag is not zero," + + " abnormal partition: " + abnormalPartitionMap.toString(); + return true; + } + + return false; + } + + public void refreshPartitionIdToOffsetSnapshot() { + this.snapshoPartitionIdToOffset = new ConcurrentHashMap<>(((KafkaProgress) progress).getOffsetByPartition()); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 867751d9661117..a8625b4804719d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -295,6 +295,8 @@ public boolean isFinalState() { protected String qualifiedUser; protected String cloudCluster; + protected long latestCheckAbnormalJobTime = System.currentTimeMillis(); + public void setTypeRead(boolean isTypeRead) { this.isTypeRead = isTypeRead; } @@ -1103,6 +1105,7 @@ public void afterCommitted(TransactionState txnState, boolean txnOperated) throw taskBeId = routineLoadTaskInfo.getBeId(); executeTaskOnTxnStatusChanged(routineLoadTaskInfo, txnState, TransactionStatus.COMMITTED, null); ++this.jobStatistic.committedTaskNum; + ++this.jobStatistic.currentCommittedTaskNum; if (LOG.isDebugEnabled()) { LOG.debug("routine load task committed. task id: {}, job id: {}", txnState.getLabel(), id); } @@ -1228,6 +1231,7 @@ public void afterAborted(TransactionState txnState, boolean txnOperated, String .build()); } ++this.jobStatistic.abortedTaskNum; + ++this.jobStatistic.currentAbortedTaskNum; TransactionState.TxnStatusChangeReason txnStatusChangeReason = null; if (txnStatusChangeReasonString != null) { txnStatusChangeReason = @@ -1452,12 +1456,14 @@ private void executeNeedSchedule() { } private void executeStop() { + Env.getCurrentEnv().getRoutineLoadManager().removeAbnormalJob(this.id); state = JobState.STOPPED; routineLoadTaskInfoList.clear(); endTimestamp = System.currentTimeMillis(); } private void executeCancel(ErrorReason reason) { + Env.getCurrentEnv().getRoutineLoadManager().removeAbnormalJob(this.id); cancelReason = reason; state = JobState.CANCELLED; routineLoadTaskInfoList.clear(); @@ -1521,6 +1527,13 @@ public void update() throws UserException { } } + writeLock(); + try { + checkAbnormalJob(); + } finally { + writeUnlock(); + } + boolean needAutoResume = needAutoResume(); if (!refreshKafkaPartitions(needAutoResume)) { @@ -1542,6 +1555,49 @@ public void update() throws UserException { } } + private void checkAbnormalJob() { + // 1. check auto resume count + if (this.autoResumeCount >= Config.min_abnormal_auto_resume_count_threshold) { + Env.getCurrentEnv().getRoutineLoadManager().addAbnormalJob(this.id, + "The auto resume time reaches threshold: " + Config.min_abnormal_auto_resume_count_threshold); + return; + } + + // define a check window + long checkIntervalMillis = Math.max(maxBatchIntervalS * Config.abnormal_check_interval_multiplier, + Config.min_abnormal_check_interval_sec) * 1000; + if (System.currentTimeMillis() - latestCheckAbnormalJobTime > checkIntervalMillis) { + // 2. check abort txn ratio + if ((double) this.jobStatistic.currentAbortedTaskNum + / (this.jobStatistic.currentAbortedTaskNum + this.jobStatistic.currentCommittedTaskNum) + > Config.min_abnormal_abort_txn_ratio_threshold) { + this.jobStatistic.currentAbortedTaskNum = 0; + this.jobStatistic.currentCommittedTaskNum = 0; + Env.getCurrentEnv().getRoutineLoadManager().addAbnormalJob(this.id, + "The ratio of currentAbortedTaskNum to the currentTotalTaskNum of tasks reaches: " + + Config.min_abnormal_abort_txn_ratio_threshold); + return; + } + + // 3. check progress and lag + if (checkAbnormalJobByProgress()) { + Env.getCurrentEnv().getRoutineLoadManager().addAbnormalJob(this.id, abnormalProgressReason()); + return; + } + + // remove job if all check pass. + Env.getCurrentEnv().getRoutineLoadManager().removeAbnormalJob(this.id); + } + } + + protected boolean checkAbnormalJobByProgress() { + return false; + } + + protected String abnormalProgressReason() { + return ""; + } + // Call this before calling unprotectUpdateProgress(). // Because unprotectUpdateProgress() is protected by writelock. // So if there are time-consuming operations, they should be done in this method. diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java index 97f6aba8c589c6..217d459805caab 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java @@ -91,6 +91,9 @@ public class RoutineLoadManager implements Writable { private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); + // key:jobid, value: abnormal reason + private Map abnormalJobs = new ConcurrentHashMap<>(); + private void readLock() { lock.readLock().lock(); } @@ -136,6 +139,31 @@ public int getTotalMaxConcurrentTaskNum() { return beIdToMaxConcurrentTasks.values().stream().mapToInt(i -> i).sum(); } + public Map getAbnormalJobs() { + return abnormalJobs; + } + + public void cleanAbnormalJobs() { + abnormalJobs.clear(); + } + + public void addAbnormalJob(Long id, String reason) { + abnormalJobs.put(id, reason); + } + + public void removeAbnormalJob(Long id) { + abnormalJobs.remove(id); + } + + public String fullJobName(Long id) { + RoutineLoadJob job = idToRoutineLoadJob.get(id); + Database db = Env.getCurrentEnv().getInternalCatalog().getDbNullable(idToRoutineLoadJob.get(id).getDbId()); + if (db == null) { + return job.getName(); + } + return db.getName() + "." + job.getName(); + } + // return the map of be id -> running tasks num private Map getBeCurrentTasksNumMap() { Map beCurrentTaskNumMap = Maps.newHashMap(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadStatistic.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadStatistic.java index ad10367f9823d3..bd52c4d35180e2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadStatistic.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadStatistic.java @@ -61,6 +61,8 @@ public class RoutineLoadStatistic { public long committedTaskNum = 0; @SerializedName(value = "abortedTaskNum") public long abortedTaskNum = 0; + public long currentCommittedTaskNum = 0; + public long currentAbortedTaskNum = 0; // Save all transactions current running. Including PREPARE, COMMITTED. // No need to persist, only for tracing txn of routine load job. diff --git a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java index 4f64588b7cd5db..b8e033a63be629 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java @@ -137,6 +137,7 @@ public final class MetricRepo { public static LongCounterMetric COUNTER_ROUTINE_LOAD_ROWS; public static LongCounterMetric COUNTER_ROUTINE_LOAD_RECEIVED_BYTES; + public static GaugeMetric GAUGE_ROUTINE_LOAD_ABNORMAL_JOB_NUMS; public static LongCounterMetric COUNTER_ROUTINE_LOAD_ERROR_ROWS; public static LongCounterMetric COUNTER_HIT_SQL_BLOCK_RULE; @@ -541,6 +542,14 @@ public Long getValue() { COUNTER_ROUTINE_LOAD_ERROR_ROWS = new LongCounterMetric("routine_load_error_rows", MetricUnit.ROWS, "total error rows of routine load"); DORIS_METRIC_REGISTER.addMetrics(COUNTER_ROUTINE_LOAD_ERROR_ROWS); + GAUGE_ROUTINE_LOAD_ABNORMAL_JOB_NUMS = new GaugeMetric("routine_load_abnormal_job_nums", + MetricUnit.NOUNIT, "abnormal job nums of routine load") { + @Override + public Long getValue() { + return (long) Env.getCurrentEnv().getRoutineLoadManager().getAbnormalJobs().size(); + } + }; + DORIS_METRIC_REGISTER.addMetrics(GAUGE_ROUTINE_LOAD_ABNORMAL_JOB_NUMS); COUNTER_HIT_SQL_BLOCK_RULE = new LongCounterMetric("counter_hit_sql_block_rule", MetricUnit.ROWS, "total hit sql block rule query"); diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java index d706c52557ee41..b4f7c398194f56 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java @@ -594,6 +594,7 @@ public void testPauseRoutineLoadJob(@Injectable PauseRoutineLoadStmt pauseRoutin Map> nameToRoutineLoadJob = Maps.newHashMap(); List routineLoadJobList = Lists.newArrayList(); RoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(); + Deencapsulation.setField(routineLoadJob, "progress", new KafkaProgress()); routineLoadJobList.add(routineLoadJob); nameToRoutineLoadJob.put("", routineLoadJobList); dbToNameToRoutineLoadJob.put(1L, nameToRoutineLoadJob); diff --git a/regression-test/pipeline/cloud_p0/conf/fe_custom.conf b/regression-test/pipeline/cloud_p0/conf/fe_custom.conf index 0b53fa2244df77..aeab269adfc45d 100644 --- a/regression-test/pipeline/cloud_p0/conf/fe_custom.conf +++ b/regression-test/pipeline/cloud_p0/conf/fe_custom.conf @@ -47,3 +47,6 @@ workload_group_max_num = 25 enable_advance_next_id = true check_table_lock_leaky = true + +min_abnormal_auto_resume_count_threshold = 2 +min_abnormal_check_interval_sec = 0 \ No newline at end of file diff --git a/regression-test/pipeline/p0/conf/fe.conf b/regression-test/pipeline/p0/conf/fe.conf index 3bab70ae3370be..e2c13730e30173 100644 --- a/regression-test/pipeline/p0/conf/fe.conf +++ b/regression-test/pipeline/p0/conf/fe.conf @@ -92,3 +92,6 @@ max_query_profile_num = 2000 max_spilled_profile_num = 2000 check_table_lock_leaky=true + +min_abnormal_auto_resume_count_threshold = 2 +min_abnormal_check_interval_sec = 0 diff --git a/regression-test/suites/load_p0/routine_load/data/test_abnormal_job_monitor.csv b/regression-test/suites/load_p0/routine_load/data/test_abnormal_job_monitor.csv new file mode 100644 index 00000000000000..b58285ed575194 --- /dev/null +++ b/regression-test/suites/load_p0/routine_load/data/test_abnormal_job_monitor.csv @@ -0,0 +1,20 @@ +57|2023-08-19|TRUE|2|-25462|-74112029|6458082754318544493|-7910671781690629051|-15205.859375|-306870797.484914|759730669.0|-628556336.0|2023-07-10 18:39:10|2023-02-12|2023-01-27 07:26:06|y||Xi9nDVrLv8m6AwEpUxmtzFAuK48sQ|{"name": "John", "age": 25, "city": "New York"} +49|2023-08-08|FALSE|\N|16275|-2144851675|-2303421957908954634|-46526938720058765|-13141.142578|-686632233.230200|229942298.0|-152553823.0|2022-09-01 00:16:01|2023-03-25|2022-09-07 14:59:03|s||yvuILR2iNxfe8RRml|{"student": true, "name": "Alice", "grade": 9, "subjects": ["math", "science", "history"]} +66|2023-08-15|TRUE|-91|28378|609923317|4872185586197131212|1207709464099378591|\N|-1863683325.985123|-783792012.0|-708986976.0|2022-09-24 10:39:23|2022-09-24|2022-10-16 18:36:43|Y|z|AI1BSPQdKiHJiQH1kguyLSWsDXkC7zwy7PwgWnyGSaa9tBKRex8vHBdxg2QSKZKL2mV2lHz7iI1PnsTd4MXDcIKhqiHyPuQPt2tEtgt0UgF6|{"book": {"title": "The Great Gatsby", "author": "F. Scott Fitzgerald"}, "year": 1925} +91|2023-08-27|TRUE|90|2465|702240964|6373830997821598984|305860046137409400|15991.356445|1599972327.386147|-165530947.0|\N|2023-04-26 19:31:10|2023-07-21|\N|2||B7YKYBYT8w0YC926bZ8Yz1VzyiWw2NWDAiTlEoPVyz9AXGti2Npg1FxWqWk4hEaALw0ZBSuiAIPj41lq36g5QRpPmAjNPK|{"fruit": "apple", "color": "red", "qty": 5, "price": 2.5} +80|2023-08-18|FALSE|-18|-8971|679027874|6535956962935330265|3960889045799757165|-13219.759766|1187161924.505394|-526615878.0|-947410627.0|2023-03-11 07:40:00|2022-11-29|2023-01-14 07:24:07|\N|D|3Nhx6xX1qdwaq7lxwLRSKMtJFbC03swWv12mpySSVysH3igGZTiGPuKMsYW7HAkf6CWc7c0nzqDsjuH3FYVMNCWRmfxMrmY8rykQCC4Ve|{"car": "BMW", "model": "X5", "year": 2020, "color": "black"} +85|2023-08-11|TRUE|-7|24304|-2043877415|-2024144417867729183|\N|5363.024414|-578615669.042831|-378574346.0|-810302932.0|2023-07-15 01:07:41|2023-08-13|2023-01-20 11:57:48|i||WQ9dh9ajPu0y|{"country": "France", "capital": "Paris", "population": 67081000} +31|2023-08-27|FALSE|17|-18849|1728109133|3266501886640700374|527195452623418935|-24062.328125|-1514348021.262435|-322205854.0|-278237157.0|2022-10-07 03:24:23|2022-09-25|\N|0|8|yKMiAntORoRa8svnMfcxlOPwwND1m5s2fdS26Xu6cfs6HK5SAibqIp9h8sZcpjHy4|{"team": "Manchester United", "players": ["Ronaldo", "Rooney", "Giggs"], "coach": "Ole Gunnar Solskjaer"} +20|2023-08-17|FALSE|-5|18158|784479801|1485484354598941738|-6632681928222776815|9708.430664|-330432620.706069|-816424174.0|571112646.0|2022-09-15 21:40:55|2023-02-23|2023-08-13 21:31:54|O|X|2pYmX2vAhfEEHZZYPsgAmda1G7otnwx5TmUC879FPhDeIjvWI79ksBZpfFG2gp7jhCSbpZiecKGklB5SvG8tm31i5SUqe1xrWgLt4HSq7lMJWp75tx2kxD7pRIOpn|{"name": "Sarah", "age": 30, "city": "London", "isMarried": false} +90|2023-08-27|TRUE|22|16456|-1476824962|-3279894870153540825|8990195191470116763|26651.906250|206860148.942546|-580959198.0|-210329147.0|2022-10-07 03:11:03|2023-03-18|2023-04-15 00:38:33|T|L|QW0GQ3GoMtHgxPQOWGfVaveynahNpsNs09siMFA1OtO6QEDBQTdivmGyq7bFzejAqwbbVQQpREAmeLjcFSXLnQuou2KbwYD|{"company": "Apple", "products": [{"name": "iPhone", "price": 1000}, {"name": "MacBook", "price": 1500}]} +8|2023-08-14|TRUE|109|-31573|-1362465190|3990845741226497177|2732763251146840270|-25698.552734|1312831962.567818|771983879.0|173937916.0|2023-03-07 14:13:19|2022-10-18|2023-07-16 05:03:13|D||PBn1wa6X8WneZYLMac11zzyhGl7tPXB5XgjmOV8L6uav9ja5oY433ktb2yhyQQIqBveZPkme|{"animal": "lion", "weight": 200, "habitat": ["savannah", "grassland"]} +65|2023-08-09|FALSE|94|31514|814994517|-297697460695940343|734910652450318597|-13061.891602|62750847.041706|-9808654.0|\N|2023-08-14 22:01:27|2023-05-19|2022-11-13 13:44:28|V||aGeMsI24O12chGlP5ak0AHghAz7bu5MargJBStHnt0yMnChH0JnfYhsfH1u59XIHkJKMsHYktBqORkGlovu8V47E74KeFpaqxn5yLyXfDbhhzUKf|{"language": "Python", "version": 3.9, "frameworks": ["Django", "Flask"]} +62|2023-08-21|FALSE|81|20302|-200761532|6365479976421007608|\N|-29916.533203|1709141750.828478|549873536.0|-119205359.0|2023-05-04 01:14:51|2022-09-17|2022-12-04 19:30:09|d|v|BKWy9dTNg1aZW7ancEJAmEDOPK5TwFsNSHbI78emu9gymeIlx5NoLmyii0QAqdzRvSQPZKiqKkwInGCTIBnK1yYkK7zD|{"username": "user123", "password": "pass123", "email": "user123@example.com"} +50|2023-08-06|TRUE|109|-6330|1479023892|-8630800697573159428|-1645095773540208759|17880.960938|-1453844792.013949|-158871820.0|-862940384.0|2022-09-22 02:03:21|2023-05-14|2023-03-25 02:18:34|m||JKnIgXvGVidGiWl9YRSi3mFI7wHKt1sBpWSadKF8VX3LAuElm4sdc9gtxREaUr57oikSYlU8We8h1MWqQlYNiJObl|{"city": "Tokyo", "temperature": 20.5, "humidity": 75} +58|2023-08-22|\N|0|-18231|1832867360|6997858407575297145|2480714305422728023|-5450.488770|1475901032.138386|-893480655.0|-607891858.0|2023-02-02 05:13:24|2022-09-18|2023-04-23 10:51:15|k||LdFXF7Kmfzgmnn2R6zLsXdmi3A2cLBLq4G4WDVNDhxvH7dYH8Kga2WA47uSIxp6NSrwPSdw0ssB1TS8RFJTDJAB0Uba3e05NL2Aiw0ja|{"restaurant": "Pizza Hut", "menu": ["pizza", "pasta", "salad"]} +60|2023-08-27|FALSE|-52|-2338|-757056972|1047567408607120856|6541476642780646552|6614.089355|-1204448798.517855|236657733.0|731515433.0|2022-12-29 14:47:30|2022-09-24|2023-08-01 12:41:59|O|F|RM4F1Ke7lkcnuxF2nK0j9VBW3MDcgyHR4pseBjtFnqS6GUkVFuzF6u3Cp9Nv7ab0O6UYrpP4DhU|{"game": "Chess", "players": 2, "time": "1 hour"} +68|2023-08-23|TRUE|-73|20117|1737338128|795638676048937749|-5551546237562433901|-30627.039062|68589475.684545|585022347.0|513722420.0|2022-12-28 20:26:51|2022-10-04|2023-07-30 00:20:06|y||keZ3JlWWpdnPBejf0cuiCQCVBBTd5gjvO08NVdcAFewqL7nRT4N9lnvSU6pWmletA5VbPQCeQapJdcnQCHfZUDCf4ulCnczyqr7SGrbGRT0XYcd7iktKM|{"country": "Brazil", "continent": "South America", "population": 211049527} +50|2023-08-24|TRUE|15|14403|\N|-6418906115745394180|9205303779366462513|-4331.548828|-615112179.557648|367305015.0|-551652958.0|2022-12-29 02:27:20|2023-06-01|2023-08-12 04:50:04|a||eCl38sztIvBQvGvGKyYZmyMXy9vIJx197iu3JwP9doJGcrYUl9Uova0rz4iCCgrjlAiZU18Fs9YtCq830nhM|{"band": "The Beatles", "members": ["John Lennon", "Paul McCartney", "George Harrison", "Ringo Starr"]} +81|2023-08-23|FALSE|106|11492|-667795397|4480250461471356146|-5346660566234294101|9082.750000|385167225.902608|-717553011.0|649146853.0|2023-03-20 03:33:16|2022-11-24|2023-02-16 18:29:41|G|9|Lk3eNVQNjucbekD1rZmUlGPiXS5JvcWr2LQzRU8GSGIbSag|{"flower": "rose", "color": "red", "fragrance": true} +41|2023-08-27|TRUE|-104|22750|\N|8527773271030840740|5554497317268279215|-5296.828125|-1715646888.013040|-306075962.0|897769189.0|2022-12-02 17:56:44|2022-10-12|2023-02-19 07:02:54|V|\N|E9GzQdTwX1ITUQz27IVznAs6Ca4WwprKk6Odjs6SH75D2F1089QiY3HQ52LXRD1V6xAWjhLE2hWgW3EdHuAOnUDVrb5V|{"food": "Sushi", "price": 10, "restaurant": "Sushi King"} +21|2023-08-18|FALSE|63|-27847|-35409596|8638201997392767650|4919963231735304178|-23382.541016|-1803403621.426313|-22009767.0|661750756.0|2023-03-31 10:56:14|2023-01-20|2023-02-18 13:37:52|N|T|PSiFwUEx3eVFNtjlnQ70YkgZNvKrGmQ2DN5K9yYHiSdFWeEDB1UpL3Frt8z1kEAIWRDWqXZuyi|{"city": "Sydney", "population": 5312000, "area": 2058.7} \ No newline at end of file diff --git a/regression-test/suites/load_p0/routine_load/test_routin_load_abnormal_job_monitor.groovy b/regression-test/suites/load_p0/routine_load/test_routin_load_abnormal_job_monitor.groovy new file mode 100644 index 00000000000000..c8fe459d33c7d5 --- /dev/null +++ b/regression-test/suites/load_p0/routine_load/test_routin_load_abnormal_job_monitor.groovy @@ -0,0 +1,280 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.kafka.clients.admin.AdminClient +import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.clients.producer.ProducerConfig + +import groovy.json.JsonSlurper + +suite("test_routine_load_abnormal_job_monitor","nonConcurrent") { + def kafkaCsvTpoics = [ + "test_user", + ] + + String enabled = context.config.otherConfigs.get("enableKafkaTest") + String kafka_port = context.config.otherConfigs.get("kafka_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + def kafka_broker = "${externalEnvIp}:${kafka_port}" + + if (enabled != null && enabled.equalsIgnoreCase("true")) { + // define kafka + def props = new Properties() + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "${kafka_broker}".toString()) + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + // Create kafka producer + def producer = new KafkaProducer<>(props) + + for (String kafkaCsvTopic in kafkaCsvTpoics) { + def txt = new File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text + def lines = txt.readLines() + lines.each { line -> + logger.info("=====${line}========") + def record = new ProducerRecord<>(kafkaCsvTopic, null, line) + producer.send(record) + } + } + } + + def jobName1 = "test_abnormal_job_monitor1" + def jobName2 = "test_abnormal_job_monitor2" + def jobName3 = "test_abnormal_job_monitor3" + def tableName = "test_abnormal_job_monitor" + if (enabled != null && enabled.equalsIgnoreCase("true")) { + try { + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} + ( + k00 INT NOT NULL, + k01 DATE NOT NULL, + k02 BOOLEAN NULL, + k03 TINYINT NULL, + k04 SMALLINT NULL, + k05 INT NULL, + k06 BIGINT NULL, + k07 LARGEINT NULL, + k08 FLOAT NULL, + k09 DOUBLE NULL, + k10 DECIMAL(9,1) NULL, + k11 DECIMALV3(9,1) NULL, + k12 DATETIME NULL, + k13 DATEV2 NULL, + k14 DATETIMEV2 NULL, + k15 CHAR NULL, + k16 VARCHAR NULL, + k17 STRING NULL, + k18 JSON NULL, + kd01 BOOLEAN NOT NULL DEFAULT "TRUE", + kd02 TINYINT NOT NULL DEFAULT "1", + kd03 SMALLINT NOT NULL DEFAULT "2", + kd04 INT NOT NULL DEFAULT "3", + kd05 BIGINT NOT NULL DEFAULT "4", + kd06 LARGEINT NOT NULL DEFAULT "5", + kd07 FLOAT NOT NULL DEFAULT "6.0", + kd08 DOUBLE NOT NULL DEFAULT "7.0", + kd09 DECIMAL NOT NULL DEFAULT "888888888", + kd10 DECIMALV3 NOT NULL DEFAULT "999999999", + kd11 DATE NOT NULL DEFAULT "2023-08-24", + kd12 DATETIME NOT NULL DEFAULT "2023-08-24 12:00:00", + kd13 DATEV2 NOT NULL DEFAULT "2023-08-24", + kd14 DATETIMEV2 NOT NULL DEFAULT "2023-08-24 12:00:00", + kd15 CHAR(255) NOT NULL DEFAULT "我能吞下玻璃而不伤身体", + kd16 VARCHAR(300) NOT NULL DEFAULT "我能吞下玻璃而不伤身体", + kd17 STRING NOT NULL DEFAULT "我能吞下玻璃而不伤身体", + kd18 JSON NULL, + + INDEX idx_inverted_k104 (`k05`) USING INVERTED, + INDEX idx_inverted_k110 (`k11`) USING INVERTED, + INDEX idx_inverted_k113 (`k13`) USING INVERTED, + INDEX idx_inverted_k114 (`k14`) USING INVERTED, + INDEX idx_inverted_k117 (`k17`) USING INVERTED PROPERTIES("parser" = "english"), + INDEX idx_ngrambf_k115 (`k15`) USING NGRAM_BF PROPERTIES("gram_size"="3", "bf_size"="256"), + INDEX idx_ngrambf_k116 (`k16`) USING NGRAM_BF PROPERTIES("gram_size"="3", "bf_size"="256"), + INDEX idx_ngrambf_k117 (`k17`) USING NGRAM_BF PROPERTIES("gram_size"="3", "bf_size"="256"), + + INDEX idx_bitmap_k104 (`k02`) USING BITMAP, + INDEX idx_bitmap_k110 (`kd01`) USING BITMAP + + ) + DUPLICATE KEY(k00) + PARTITION BY RANGE(k01) + ( + PARTITION p1 VALUES [('2023-08-01'), ('2023-08-11')), + PARTITION p2 VALUES [('2023-08-11'), ('2023-08-21')), + PARTITION p3 VALUES [('2023-08-21'), ('2023-09-01')) + ) + DISTRIBUTED BY HASH(k00) BUCKETS 32 + PROPERTIES ( + "bloom_filter_columns"="k05", + "replication_num" = "1" + ); + """ + sql "sync" + + // 1. create three abnormal jobs + sql """ + CREATE ROUTINE LOAD ${jobName1} on ${tableName} + COLUMNS(k00,k01,k02,k03,k04,k05,k06,k07,k08,k09,k10,k11,k12,k13,k14,k15,k16,k17,k18), + COLUMNS TERMINATED BY "|" + PROPERTIES + ( + "max_batch_interval" = "5", + "max_batch_rows" = "300000", + "max_batch_size" = "209715200" + ) + FROM KAFKA + ( + "kafka_broker_list" = "${externalEnvIp}:${kafka_port}", + "kafka_topic" = "test_abnormal_job_monitor_invaild", + "property.kafka_default_offsets" = "OFFSET_BEGINNING" + ); + """ + sql "sync" + sql """ + CREATE ROUTINE LOAD ${jobName2} on ${tableName} + COLUMNS(k00,k01,k02,k03,k04,k05,k06,k07,k08,k09,k10,k11,k12,k13,k14,k15,k16,k17,k18), + COLUMNS TERMINATED BY "|" + PROPERTIES + ( + "max_batch_interval" = "5", + "max_batch_rows" = "300000", + "max_batch_size" = "209715200" + ) + FROM KAFKA + ( + "kafka_broker_list" = "${externalEnvIp}:${kafka_port}", + "kafka_topic" = "test_abnormal_job_monitor_invaild", + "property.kafka_default_offsets" = "OFFSET_BEGINNING" + ); + """ + sql "sync" + sql """ + CREATE ROUTINE LOAD ${jobName3} on ${tableName} + COLUMNS(k00,k01,k02,k03,k04,k05,k06,k07,k08,k09,k10,k11,k12,k13,k14,k15,k16,k17,k18), + COLUMNS TERMINATED BY "|" + PROPERTIES + ( + "max_batch_interval" = "5", + "max_batch_rows" = "300000", + "max_batch_size" = "209715200" + ) + FROM KAFKA + ( + "kafka_broker_list" = "${externalEnvIp}:${kafka_port}", + "kafka_topic" = "test_abnormal_job_monitor", + "property.kafka_default_offsets" = "OFFSET_BEGINNING" + ); + """ + sql "sync" + GetDebugPoint().enableDebugPointForAllBEs("FragmentMgr.exec_plan_fragment.failed") + + // 2. check abnormal jobs + def count = 0 + while (true) { + def command = "curl -X GET --location-trusted -u ${context.config.feHttpUser}:${context.config.feHttpPassword} http://${context.config.feHttpAddress}/api/routine_load/abnormal_jobs" + log.info("abnormal jobs: ${command}") + def process = command.execute() + def code = process.waitFor() + def out = process.text + log.info("result: ${out}".toString()) + def jsonSlurper = new JsonSlurper() + def result = jsonSlurper.parseText(out) + log.info("result: ${result}".toString()) + + def name1 = context.config.getDbNameByFile(context.file) + "." + jobName1 + def name2 = context.config.getDbNameByFile(context.file) + "." + jobName2 + def name3 = context.config.getDbNameByFile(context.file) + "." + jobName3 + log.info("except: ${name1}, ${name2}, ${name3}".toString()) + + if (result.toString().contains(name1.toString()) + && result.toString().contains(name2.toString()) + && result.toString().contains(name3.toString())) { + break; + } + + count++ + sleep(1000) + if (count > 120) { + assertEquals(1, 2) + } + } + + // 3. make job normal + try { + sql "pause routine load for ${jobName1}" + } catch (Exception e) { + log.info("exception: ${e}".toString()) + } + sql "ALTER ROUTINE LOAD FOR ${jobName1} FROM KAFKA(\"kafka_topic\" = \"test_abnormal_job_monitor\");" + sql "resume routine load for ${jobName1}" + sql "stop routine load for ${jobName2}" + GetDebugPoint().disableDebugPointForAllBEs("FragmentMgr.exec_plan_fragment.failed") + + // 4. check doris_fe_routine_load_abnormal_job_nums is zero + count = 0 + while (true) { + def end = false + httpTest { + endpoint context.config.feHttpAddress + uri "/metrics?type=json" + op "get" + check { code, body -> + def jsonSlurper = new JsonSlurper() + def result = jsonSlurper.parseText(body) + def entry = result.find { it.tags?.metric == "doris_fe_routine_load_abnormal_job_nums" } + def value = entry ? entry.value : null + log.info("Contains routine_load_abnormal_job_nums: ${entry != null}".toString()) + log.info("Value of routine_load_abnormal_job_nums: ${value}".toString()) + if (value == 0) { + end = true + } + } + } + if (end) { + break + } + count++ + sleep(1000) + if (count > 60) { + assertEquals(1, 2) + } + } + def command = "curl -X GET --location-trusted -u ${context.config.feHttpUser}:${context.config.feHttpPassword} http://${context.config.feHttpAddress}/api/routine_load/abnormal_jobs" + log.info("abnormal jobs: ${command}") + def process = command.execute() + def code = process.waitFor() + def out = process.text + log.info("result: ${out}".toString()) + def jsonSlurper = new JsonSlurper() + def result = jsonSlurper.parseText(out) + log.info("result: ${result}".toString()) + assertTrue(result.isEmpty()) + } finally { + sql "stop routine load for ${jobName1}" + try { + sql "stop routine load for ${jobName2}" + } catch (Exception e) { + log.info("exception: ${e}".toString()) + } + sql "stop routine load for ${jobName3}" + sql "DROP TABLE IF EXISTS ${tableName}" + GetDebugPoint().disableDebugPointForAllBEs("FragmentMgr.exec_plan_fragment.failed") + } + } +} \ No newline at end of file