Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve](routine load) add more metrics to observe the routine load job #48209

Merged
merged 1 commit into from
Mar 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions be/src/runtime/routine_load/data_consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include "util/blocking_queue.hpp"
#include "util/debug_points.h"
#include "util/defer_op.h"
#include "util/doris_metrics.h"
#include "util/stopwatch.hpp"
#include "util/string_util.h"
#include "util/uid_util.h"
Expand Down Expand Up @@ -219,6 +220,9 @@ Status KafkaDataConsumer::group_consume(BlockingQueue<RdKafka::Message*>* queue,
consumer_watch.start();
std::unique_ptr<RdKafka::Message> msg(_k_consumer->consume(1000 /* timeout, ms */));
consumer_watch.stop();
DorisMetrics::instance()->routine_load_get_msg_count->increment(1);
DorisMetrics::instance()->routine_load_get_msg_latency->increment(
consumer_watch.elapsed_time() / 1000 / 1000);
DBUG_EXECUTE_IF("KafkaDataConsumer.group_consume.out_of_range", {
done = true;
std::stringstream ss;
Expand All @@ -234,6 +238,7 @@ Status KafkaDataConsumer::group_consume(BlockingQueue<RdKafka::Message*>* queue,
if (_consuming_partition_ids.count(msg->partition()) <= 0) {
_consuming_partition_ids.insert(msg->partition());
}
DorisMetrics::instance()->routine_load_consume_bytes->increment(msg->len());
if (msg->len() == 0) {
// ignore msg with length 0.
// put empty msg into queue will cause the load process shutting down.
Expand All @@ -246,6 +251,7 @@ Status KafkaDataConsumer::group_consume(BlockingQueue<RdKafka::Message*>* queue,
msg.release(); // release the ownership, msg will be deleted after being processed
}
++received_rows;
DorisMetrics::instance()->routine_load_consume_rows->increment(1);
break;
case RdKafka::ERR__TIMED_OUT:
// leave the status as OK, because this may happened
Expand Down
10 changes: 10 additions & 0 deletions be/src/util/doris_metrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,11 @@ DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(stream_load_rows_total, MetricUnit::ROWS, "
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(load_rows, MetricUnit::ROWS);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(load_bytes, MetricUnit::BYTES);

DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(routine_load_get_msg_latency, MetricUnit::MILLISECONDS);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(routine_load_get_msg_count, MetricUnit::NOUNIT);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(routine_load_consume_rows, MetricUnit::ROWS);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(routine_load_consume_bytes, MetricUnit::BYTES);

DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(memtable_flush_total, MetricUnit::OPERATIONS);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(memtable_flush_duration_us, MetricUnit::MICROSECONDS);

Expand Down Expand Up @@ -255,6 +260,11 @@ DorisMetrics::DorisMetrics() : _metric_registry(_s_registry_name) {
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, stream_receive_bytes_total);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, stream_load_rows_total);

INT_COUNTER_METRIC_REGISTER(_server_metric_entity, routine_load_get_msg_latency);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, routine_load_get_msg_count);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, routine_load_consume_bytes);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, routine_load_consume_rows);

INT_COUNTER_METRIC_REGISTER(_server_metric_entity, memtable_flush_total);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, memtable_flush_duration_us);

Expand Down
5 changes: 5 additions & 0 deletions be/src/util/doris_metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ class DorisMetrics {
IntCounter* load_rows = nullptr;
IntCounter* load_bytes = nullptr;

IntCounter* routine_load_get_msg_latency = nullptr;
IntCounter* routine_load_get_msg_count = nullptr;
IntCounter* routine_load_consume_bytes = nullptr;
IntCounter* routine_load_consume_rows = nullptr;

IntCounter* memtable_flush_total = nullptr;
IntCounter* memtable_flush_duration_us = nullptr;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.doris.common.LoadException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.proto.InternalService;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.system.Backend;
Expand Down Expand Up @@ -221,37 +222,46 @@ public static List<Pair<Integer, Long>> getRealOffsets(String brokerList, String

private static InternalService.PProxyResult getInfoRequest(InternalService.PProxyRequest request, int timeout)
throws LoadException {
long startTime = System.currentTimeMillis();
int retryTimes = 0;
TNetworkAddress address = null;
Future<InternalService.PProxyResult> future = null;
InternalService.PProxyResult result = null;
while (retryTimes < 3) {
List<Long> backendIds = Env.getCurrentSystemInfo().getAllBackendIds(true);
if (backendIds.isEmpty()) {
throw new LoadException("Failed to get info. No alive backends");
}
Collections.shuffle(backendIds);
Backend be = Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
address = new TNetworkAddress(be.getHost(), be.getBrpcPort());
try {
while (retryTimes < 3) {
List<Long> backendIds = Env.getCurrentSystemInfo().getAllBackendIds(true);
if (backendIds.isEmpty()) {
MetricRepo.COUNTER_ROUTINE_LOAD_GET_META_FAIL_COUNT.increase(1L);
throw new LoadException("Failed to get info. No alive backends");
}
Collections.shuffle(backendIds);
Backend be = Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
address = new TNetworkAddress(be.getHost(), be.getBrpcPort());

try {
future = BackendServiceProxy.getInstance().getInfo(address, request);
result = future.get(Config.max_get_kafka_meta_timeout_second, TimeUnit.SECONDS);
} catch (Exception e) {
LOG.warn("failed to get info request to " + address + " err " + e.getMessage());
retryTimes++;
continue;
}
TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
if (code != TStatusCode.OK) {
LOG.warn("failed to get info request to "
+ address + " err " + result.getStatus().getErrorMsgsList());
retryTimes++;
} else {
return result;
try {
future = BackendServiceProxy.getInstance().getInfo(address, request);
result = future.get(Config.max_get_kafka_meta_timeout_second, TimeUnit.SECONDS);
} catch (Exception e) {
LOG.warn("failed to get info request to " + address + " err " + e.getMessage());
retryTimes++;
continue;
}
TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
if (code != TStatusCode.OK) {
LOG.warn("failed to get info request to "
+ address + " err " + result.getStatus().getErrorMsgsList());
retryTimes++;
} else {
return result;
}
}
}

throw new LoadException("Failed to get info");
MetricRepo.COUNTER_ROUTINE_LOAD_GET_META_FAIL_COUNT.increase(1L);
throw new LoadException("Failed to get info");
} finally {
long endTime = System.currentTimeMillis();
MetricRepo.COUNTER_ROUTINE_LOAD_GET_META_LANTENCY.increase(endTime - startTime);
MetricRepo.COUNTER_ROUTINE_LOAD_GET_META_COUNT.increase(1L);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -861,7 +861,7 @@ private void checkStateTransform(RoutineLoadJob.JobState desireState) throws Use
// if rate of error data is more than max_filter_ratio, pause job
protected void updateProgress(RLTaskTxnCommitAttachment attachment) throws UserException {
updateNumOfData(attachment.getTotalRows(), attachment.getFilteredRows(), attachment.getUnselectedRows(),
attachment.getReceivedBytes(), false /* not replay */);
attachment.getReceivedBytes(), attachment.getTaskExecutionTimeMs(), false /* not replay */);
}

protected void updateCloudProgress(RLTaskTxnCommitAttachment attachment) {
Expand All @@ -877,7 +877,7 @@ protected void updateCloudProgress(RLTaskTxnCommitAttachment attachment) {
}

private void updateNumOfData(long numOfTotalRows, long numOfErrorRows, long unselectedRows, long receivedBytes,
boolean isReplay) throws UserException {
long taskExecutionTime, boolean isReplay) throws UserException {
this.jobStatistic.totalRows += numOfTotalRows;
this.jobStatistic.errorRows += numOfErrorRows;
this.jobStatistic.unselectedRows += unselectedRows;
Expand All @@ -888,6 +888,8 @@ private void updateNumOfData(long numOfTotalRows, long numOfErrorRows, long unse
MetricRepo.COUNTER_ROUTINE_LOAD_ROWS.increase(numOfTotalRows);
MetricRepo.COUNTER_ROUTINE_LOAD_ERROR_ROWS.increase(numOfErrorRows);
MetricRepo.COUNTER_ROUTINE_LOAD_RECEIVED_BYTES.increase(receivedBytes);
MetricRepo.COUNTER_ROUTINE_LOAD_TASK_EXECUTE_TIME.increase(taskExecutionTime);
MetricRepo.COUNTER_ROUTINE_LOAD_TASK_EXECUTE_TIME.increase(1L);
}

// check error rate
Expand Down Expand Up @@ -957,7 +959,7 @@ private void updateNumOfData(long numOfTotalRows, long numOfErrorRows, long unse
protected void replayUpdateProgress(RLTaskTxnCommitAttachment attachment) {
try {
updateNumOfData(attachment.getTotalRows(), attachment.getFilteredRows(), attachment.getUnselectedRows(),
attachment.getReceivedBytes(), true /* is replay */);
attachment.getReceivedBytes(), attachment.getTaskExecutionTimeMs(), true /* is replay */);
} catch (UserException e) {
LOG.error("should not happen", e);
}
Expand Down
20 changes: 20 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,11 @@ public final class MetricRepo {
public static LongCounterMetric COUNTER_ROUTINE_LOAD_ROWS;
public static LongCounterMetric COUNTER_ROUTINE_LOAD_RECEIVED_BYTES;
public static LongCounterMetric COUNTER_ROUTINE_LOAD_ERROR_ROWS;
public static LongCounterMetric COUNTER_ROUTINE_LOAD_GET_META_LANTENCY;
public static LongCounterMetric COUNTER_ROUTINE_LOAD_GET_META_COUNT;
public static LongCounterMetric COUNTER_ROUTINE_LOAD_GET_META_FAIL_COUNT;
public static LongCounterMetric COUNTER_ROUTINE_LOAD_TASK_EXECUTE_TIME;
public static LongCounterMetric COUNTER_ROUTINE_LOAD_TASK_EXECUTE_COUNT;
public static LongCounterMetric COUNTER_HIT_SQL_BLOCK_RULE;

public static AutoMappedMetric<LongCounterMetric> THRIFT_COUNTER_RPC_ALL;
Expand Down Expand Up @@ -541,6 +546,21 @@ public Long getValue() {
COUNTER_ROUTINE_LOAD_ERROR_ROWS = new LongCounterMetric("routine_load_error_rows", MetricUnit.ROWS,
"total error rows of routine load");
DORIS_METRIC_REGISTER.addMetrics(COUNTER_ROUTINE_LOAD_ERROR_ROWS);
COUNTER_ROUTINE_LOAD_GET_META_LANTENCY = new LongCounterMetric("routine_load_get_meta_latency",
MetricUnit.MILLISECONDS, "get meta lantency of routine load");
DORIS_METRIC_REGISTER.addMetrics(COUNTER_ROUTINE_LOAD_GET_META_LANTENCY);
COUNTER_ROUTINE_LOAD_GET_META_COUNT = new LongCounterMetric("routine_load_get_meta_count", MetricUnit.NOUNIT,
"get meta count of routine load");
DORIS_METRIC_REGISTER.addMetrics(COUNTER_ROUTINE_LOAD_GET_META_COUNT);
COUNTER_ROUTINE_LOAD_GET_META_FAIL_COUNT = new LongCounterMetric("routine_load_get_meta_fail_count",
MetricUnit.NOUNIT, "get meta fail count of routine load");
DORIS_METRIC_REGISTER.addMetrics(COUNTER_ROUTINE_LOAD_GET_META_FAIL_COUNT);
COUNTER_ROUTINE_LOAD_TASK_EXECUTE_TIME = new LongCounterMetric("routine_load_task_execute_time",
MetricUnit.MILLISECONDS, "task execute time of routine load");
DORIS_METRIC_REGISTER.addMetrics(COUNTER_ROUTINE_LOAD_TASK_EXECUTE_TIME);
COUNTER_ROUTINE_LOAD_TASK_EXECUTE_COUNT = new LongCounterMetric("routine_load_task_execute_count",
MetricUnit.MILLISECONDS, "task execute count of routine load");
DORIS_METRIC_REGISTER.addMetrics(COUNTER_ROUTINE_LOAD_TASK_EXECUTE_COUNT);

COUNTER_HIT_SQL_BLOCK_RULE = new LongCounterMetric("counter_hit_sql_block_rule", MetricUnit.ROWS,
"total hit sql block rule query");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ public void testUpdateNumOfDataErrorRowMoreThanMax(@Mocked Env env) {
RoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob();
Deencapsulation.setField(routineLoadJob, "maxErrorNum", 0);
Deencapsulation.setField(routineLoadJob, "maxBatchRows", 0);
Deencapsulation.invoke(routineLoadJob, "updateNumOfData", 1L, 1L, 0L, 1L, false);
Deencapsulation.invoke(routineLoadJob, "updateNumOfData", 1L, 1L, 0L, 1L, 1L, false);

Assert.assertEquals(RoutineLoadJob.JobState.PAUSED, Deencapsulation.getField(routineLoadJob, "state"));

Expand All @@ -316,7 +316,7 @@ public void testUpdateTotalMoreThanBatch() {
RoutineLoadStatistic jobStatistic = Deencapsulation.getField(routineLoadJob, "jobStatistic");
Deencapsulation.setField(jobStatistic, "currentErrorRows", 1);
Deencapsulation.setField(jobStatistic, "currentTotalRows", 99);
Deencapsulation.invoke(routineLoadJob, "updateNumOfData", 2L, 0L, 0L, 1L, false);
Deencapsulation.invoke(routineLoadJob, "updateNumOfData", 2L, 0L, 0L, 1L, 1L, false);

Assert.assertEquals(RoutineLoadJob.JobState.RUNNING, Deencapsulation.getField(routineLoadJob, "state"));
Assert.assertEquals(new Long(0), Deencapsulation.getField(jobStatistic, "currentErrorRows"));
Expand Down
Loading
Loading