Skip to content

Commit

Permalink
[branch-2.0] Picks "[opt](merge-on-write) Reduce the version not cont…
Browse files Browse the repository at this point in the history
…inuous logs for merge-on-write table #40946" (#40997)

picks #40946
  • Loading branch information
bobhan1 authored Sep 20, 2024
1 parent f4674f6 commit 761a8eb
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 13 deletions.
4 changes: 4 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1112,6 +1112,10 @@ DEFINE_mBool(enable_missing_rows_correctness_check, "false");
// When the number of missing versions is more than this value, do not directly
// retry the publish and handle it through async publish.
DEFINE_mInt32(mow_publish_max_discontinuous_version_num, "20");
// When the version is not continuous for MOW table in publish phase and the gap between
// current txn's publishing version and the max version of the tablet exceeds this value,
// don't print warning log
DEFINE_mInt32(publish_version_gap_logging_threshold, "200");

// The secure path with user files, used in the `local` table function.
DEFINE_mString(user_files_secure_path, "${DORIS_HOME}");
Expand Down
4 changes: 4 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1155,6 +1155,10 @@ DECLARE_mBool(enable_missing_rows_correctness_check);
// When the number of missing versions is more than this value, do not directly
// retry the publish and handle it through async publish.
DECLARE_mInt32(mow_publish_max_discontinuous_version_num);
// When the version is not continuous for MOW table in publish phase and the gap between
// current txn's publishing version and the max version of the tablet exceeds this value,
// don't print warning log
DECLARE_mInt32(publish_version_gap_logging_threshold);

// The secure path with user files, used in the `local` table function.
DECLARE_mString(user_files_secure_path);
Expand Down
26 changes: 16 additions & 10 deletions be/src/olap/task/engine_publish_version_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -224,16 +224,22 @@ Status EnginePublishVersionTask::finish() {
int64_t missed_txn_id =
StorageEngine::instance()->txn_manager()->get_txn_by_tablet_version(
tablet->tablet_id(), missed_version);
auto msg = fmt::format(
"uniq key with merge-on-write version not continuous, "
"missed version={}, it's transaction_id={}, current publish "
"version={}, tablet_id={}, transaction_id={}",
missed_version, missed_txn_id, version.second, tablet->tablet_id(),
_publish_version_req.transaction_id);
if (first_time_update) {
LOG(INFO) << msg;
} else {
LOG_EVERY_SECOND(INFO) << msg;
bool need_log =
(config::publish_version_gap_logging_threshold < 0 ||
max_version + config::publish_version_gap_logging_threshold >=
version.second);
if (need_log) {
auto msg = fmt::format(
"uniq key with merge-on-write version not continuous, "
"missed version={}, it's transaction_id={}, current publish "
"version={}, tablet_id={}, transaction_id={}",
missed_version, missed_txn_id, version.second,
tablet->tablet_id(), _publish_version_req.transaction_id);
if (first_time_update) {
LOG(INFO) << msg;
} else {
LOG_EVERY_SECOND(INFO) << msg;
}
}
};
// The versions during the schema change period need to be also continuous
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,10 @@ public class Config extends ConfigBase {
"print log interval for publish transaction failed interval"})
public static long publish_fail_log_interval_second = 5 * 60;

@ConfField(mutable = true, masterOnly = true, description = {"一个 PUBLISH_VERSION 任务打印失败日志的次数上限",
"the upper limit of failure logs of PUBLISH_VERSION task"})
public static long publish_version_task_failed_log_threshold = 80;

@ConfField(mutable = true, masterOnly = true, description = {"提交事务的最大超时时间,单位是秒。"
+ "该参数仅用于事务型 insert 操作中。",
"Maximal waiting time for all data inserted before one transaction to be committed, in seconds. "
Expand Down
14 changes: 11 additions & 3 deletions fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.catalog.TabletMeta;
import org.apache.doris.common.Config;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.load.DeleteJob;
import org.apache.doris.load.loadv2.SparkLoadJob;
Expand Down Expand Up @@ -86,11 +87,13 @@ public TMasterResult finishTask(TFinishTaskRequest request) {
// check task status
// retry task by report process
TStatus taskStatus = request.getTaskStatus();
TTaskType taskType = request.getTaskType();
long signature = request.getSignature();
if (LOG.isDebugEnabled()) {
LOG.debug("get task report: {}", request);
}

if (taskStatus.getStatusCode() != TStatusCode.OK) {
if (taskStatus.getStatusCode() != TStatusCode.OK && taskType != TTaskType.PUBLISH_VERSION) {
LOG.warn("finish task reports bad. request: {}", request);
}

Expand All @@ -109,8 +112,6 @@ public TMasterResult finishTask(TFinishTaskRequest request) {
}

long backendId = backend.getId();
TTaskType taskType = request.getTaskType();
long signature = request.getSignature();

AgentTask task = AgentTaskQueue.getTask(backendId, taskType, signature);
if (task == null) {
Expand All @@ -128,6 +129,13 @@ public TMasterResult finishTask(TFinishTaskRequest request) {
} else {
if (taskStatus.getStatusCode() != TStatusCode.OK) {
task.failed();
if (taskType == TTaskType.PUBLISH_VERSION) {
boolean needLog = (Config.publish_version_task_failed_log_threshold < 0
|| task.getFailedTimes() <= Config.publish_version_task_failed_log_threshold);
if (needLog) {
LOG.warn("finish task reports bad. request: {}", request);
}
}
String errMsg = "task type: " + taskType + ", status_code: " + taskStatus.getStatusCode().toString()
+ (taskStatus.isSetErrorMsgs() ? (", status_message: " + taskStatus.getErrorMsgs()) : "")
+ ", backendId: " + backend + ", signature: " + signature;
Expand Down

0 comments on commit 761a8eb

Please sign in to comment.