Skip to content

Commit

Permalink
[improve](routine load) add more metrics to observe the routine load …
Browse files Browse the repository at this point in the history
…job (#48209)

### What problem does this PR solve?

related #48511

Add more metrics to observe the routine load job:

| Metrics | Module | Description |
| ---------------------------------- | ------ |
------------------------------- |
| routine_load_get_msg_latency | BE | Time to pull a Kafka message |
| routine_load_get_msg_count | BE | Number of times pulling Kafka
messages |
| routine_load_consume_bytes | BE | Total data volume consumed from
Kafka |
| routine_load_consume_rows | BE | Total number of rows consumed from
Kafka |
| routine_load_task_execute_time | FE | Task execution time |
| routine_load_task_execute_count | FE | Task execution count |
| routine_load_get_meta_latency | FE | Delay in obtaining Kafka metadata
|
| routine_load_get_meta_count | FE | Number of times obtaining Kafka
metadata |
| routine_load_get_meta_fail_count | FE | Number of failures in
obtaining metadata |
| routine_load_received_bytes | FE | Total data volume consumed |
| routine_load_received_rows | FE | Total number of rows consumed |
  • Loading branch information
sollhui authored and Your Name committed Mar 6, 2025
1 parent 5989c2b commit de1c514
Show file tree
Hide file tree
Showing 9 changed files with 357 additions and 30 deletions.
6 changes: 6 additions & 0 deletions be/src/runtime/routine_load/data_consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include "util/blocking_queue.hpp"
#include "util/debug_points.h"
#include "util/defer_op.h"
#include "util/doris_metrics.h"
#include "util/stopwatch.hpp"
#include "util/string_util.h"
#include "util/uid_util.h"
Expand Down Expand Up @@ -219,6 +220,9 @@ Status KafkaDataConsumer::group_consume(BlockingQueue<RdKafka::Message*>* queue,
consumer_watch.start();
std::unique_ptr<RdKafka::Message> msg(_k_consumer->consume(1000 /* timeout, ms */));
consumer_watch.stop();
DorisMetrics::instance()->routine_load_get_msg_count->increment(1);
DorisMetrics::instance()->routine_load_get_msg_latency->increment(
consumer_watch.elapsed_time() / 1000 / 1000);
DBUG_EXECUTE_IF("KafkaDataConsumer.group_consume.out_of_range", {
done = true;
std::stringstream ss;
Expand All @@ -234,6 +238,7 @@ Status KafkaDataConsumer::group_consume(BlockingQueue<RdKafka::Message*>* queue,
if (_consuming_partition_ids.count(msg->partition()) <= 0) {
_consuming_partition_ids.insert(msg->partition());
}
DorisMetrics::instance()->routine_load_consume_bytes->increment(msg->len());
if (msg->len() == 0) {
// ignore msg with length 0.
// put empty msg into queue will cause the load process shutting down.
Expand All @@ -246,6 +251,7 @@ Status KafkaDataConsumer::group_consume(BlockingQueue<RdKafka::Message*>* queue,
msg.release(); // release the ownership, msg will be deleted after being processed
}
++received_rows;
DorisMetrics::instance()->routine_load_consume_rows->increment(1);
break;
case RdKafka::ERR__TIMED_OUT:
// leave the status as OK, because this may happened
Expand Down
10 changes: 10 additions & 0 deletions be/src/util/doris_metrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,11 @@ DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(stream_load_rows_total, MetricUnit::ROWS, "
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(load_rows, MetricUnit::ROWS);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(load_bytes, MetricUnit::BYTES);

DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(routine_load_get_msg_latency, MetricUnit::MILLISECONDS);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(routine_load_get_msg_count, MetricUnit::NOUNIT);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(routine_load_consume_rows, MetricUnit::ROWS);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(routine_load_consume_bytes, MetricUnit::BYTES);

DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(memtable_flush_total, MetricUnit::OPERATIONS);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(memtable_flush_duration_us, MetricUnit::MICROSECONDS);

Expand Down Expand Up @@ -255,6 +260,11 @@ DorisMetrics::DorisMetrics() : _metric_registry(_s_registry_name) {
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, stream_receive_bytes_total);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, stream_load_rows_total);

INT_COUNTER_METRIC_REGISTER(_server_metric_entity, routine_load_get_msg_latency);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, routine_load_get_msg_count);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, routine_load_consume_bytes);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, routine_load_consume_rows);

INT_COUNTER_METRIC_REGISTER(_server_metric_entity, memtable_flush_total);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, memtable_flush_duration_us);

Expand Down
5 changes: 5 additions & 0 deletions be/src/util/doris_metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ class DorisMetrics {
IntCounter* load_rows = nullptr;
IntCounter* load_bytes = nullptr;

IntCounter* routine_load_get_msg_latency = nullptr;
IntCounter* routine_load_get_msg_count = nullptr;
IntCounter* routine_load_consume_bytes = nullptr;
IntCounter* routine_load_consume_rows = nullptr;

IntCounter* memtable_flush_total = nullptr;
IntCounter* memtable_flush_duration_us = nullptr;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.doris.common.LoadException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.proto.InternalService;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.system.Backend;
Expand Down Expand Up @@ -221,37 +222,46 @@ public static List<Pair<Integer, Long>> getRealOffsets(String brokerList, String

private static InternalService.PProxyResult getInfoRequest(InternalService.PProxyRequest request, int timeout)
throws LoadException {
long startTime = System.currentTimeMillis();
int retryTimes = 0;
TNetworkAddress address = null;
Future<InternalService.PProxyResult> future = null;
InternalService.PProxyResult result = null;
while (retryTimes < 3) {
List<Long> backendIds = Env.getCurrentSystemInfo().getAllBackendIds(true);
if (backendIds.isEmpty()) {
throw new LoadException("Failed to get info. No alive backends");
}
Collections.shuffle(backendIds);
Backend be = Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
address = new TNetworkAddress(be.getHost(), be.getBrpcPort());
try {
while (retryTimes < 3) {
List<Long> backendIds = Env.getCurrentSystemInfo().getAllBackendIds(true);
if (backendIds.isEmpty()) {
MetricRepo.COUNTER_ROUTINE_LOAD_GET_META_FAIL_COUNT.increase(1L);
throw new LoadException("Failed to get info. No alive backends");
}
Collections.shuffle(backendIds);
Backend be = Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
address = new TNetworkAddress(be.getHost(), be.getBrpcPort());

try {
future = BackendServiceProxy.getInstance().getInfo(address, request);
result = future.get(Config.max_get_kafka_meta_timeout_second, TimeUnit.SECONDS);
} catch (Exception e) {
LOG.warn("failed to get info request to " + address + " err " + e.getMessage());
retryTimes++;
continue;
}
TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
if (code != TStatusCode.OK) {
LOG.warn("failed to get info request to "
+ address + " err " + result.getStatus().getErrorMsgsList());
retryTimes++;
} else {
return result;
try {
future = BackendServiceProxy.getInstance().getInfo(address, request);
result = future.get(Config.max_get_kafka_meta_timeout_second, TimeUnit.SECONDS);
} catch (Exception e) {
LOG.warn("failed to get info request to " + address + " err " + e.getMessage());
retryTimes++;
continue;
}
TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
if (code != TStatusCode.OK) {
LOG.warn("failed to get info request to "
+ address + " err " + result.getStatus().getErrorMsgsList());
retryTimes++;
} else {
return result;
}
}
}

throw new LoadException("Failed to get info");
MetricRepo.COUNTER_ROUTINE_LOAD_GET_META_FAIL_COUNT.increase(1L);
throw new LoadException("Failed to get info");
} finally {
long endTime = System.currentTimeMillis();
MetricRepo.COUNTER_ROUTINE_LOAD_GET_META_LANTENCY.increase(endTime - startTime);
MetricRepo.COUNTER_ROUTINE_LOAD_GET_META_COUNT.increase(1L);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -861,7 +861,7 @@ private void checkStateTransform(RoutineLoadJob.JobState desireState) throws Use
// if rate of error data is more than max_filter_ratio, pause job
protected void updateProgress(RLTaskTxnCommitAttachment attachment) throws UserException {
updateNumOfData(attachment.getTotalRows(), attachment.getFilteredRows(), attachment.getUnselectedRows(),
attachment.getReceivedBytes(), false /* not replay */);
attachment.getReceivedBytes(), attachment.getTaskExecutionTimeMs(), false /* not replay */);
}

protected void updateCloudProgress(RLTaskTxnCommitAttachment attachment) {
Expand All @@ -877,7 +877,7 @@ protected void updateCloudProgress(RLTaskTxnCommitAttachment attachment) {
}

private void updateNumOfData(long numOfTotalRows, long numOfErrorRows, long unselectedRows, long receivedBytes,
boolean isReplay) throws UserException {
long taskExecutionTime, boolean isReplay) throws UserException {
this.jobStatistic.totalRows += numOfTotalRows;
this.jobStatistic.errorRows += numOfErrorRows;
this.jobStatistic.unselectedRows += unselectedRows;
Expand All @@ -888,6 +888,8 @@ private void updateNumOfData(long numOfTotalRows, long numOfErrorRows, long unse
MetricRepo.COUNTER_ROUTINE_LOAD_ROWS.increase(numOfTotalRows);
MetricRepo.COUNTER_ROUTINE_LOAD_ERROR_ROWS.increase(numOfErrorRows);
MetricRepo.COUNTER_ROUTINE_LOAD_RECEIVED_BYTES.increase(receivedBytes);
MetricRepo.COUNTER_ROUTINE_LOAD_TASK_EXECUTE_TIME.increase(taskExecutionTime);
MetricRepo.COUNTER_ROUTINE_LOAD_TASK_EXECUTE_TIME.increase(1L);
}

// check error rate
Expand Down Expand Up @@ -957,7 +959,7 @@ private void updateNumOfData(long numOfTotalRows, long numOfErrorRows, long unse
protected void replayUpdateProgress(RLTaskTxnCommitAttachment attachment) {
try {
updateNumOfData(attachment.getTotalRows(), attachment.getFilteredRows(), attachment.getUnselectedRows(),
attachment.getReceivedBytes(), true /* is replay */);
attachment.getReceivedBytes(), attachment.getTaskExecutionTimeMs(), true /* is replay */);
} catch (UserException e) {
LOG.error("should not happen", e);
}
Expand Down
20 changes: 20 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,11 @@ public final class MetricRepo {
public static LongCounterMetric COUNTER_ROUTINE_LOAD_ROWS;
public static LongCounterMetric COUNTER_ROUTINE_LOAD_RECEIVED_BYTES;
public static LongCounterMetric COUNTER_ROUTINE_LOAD_ERROR_ROWS;
public static LongCounterMetric COUNTER_ROUTINE_LOAD_GET_META_LANTENCY;
public static LongCounterMetric COUNTER_ROUTINE_LOAD_GET_META_COUNT;
public static LongCounterMetric COUNTER_ROUTINE_LOAD_GET_META_FAIL_COUNT;
public static LongCounterMetric COUNTER_ROUTINE_LOAD_TASK_EXECUTE_TIME;
public static LongCounterMetric COUNTER_ROUTINE_LOAD_TASK_EXECUTE_COUNT;
public static LongCounterMetric COUNTER_HIT_SQL_BLOCK_RULE;

public static AutoMappedMetric<LongCounterMetric> THRIFT_COUNTER_RPC_ALL;
Expand Down Expand Up @@ -535,6 +540,21 @@ public Long getValue() {
COUNTER_ROUTINE_LOAD_ERROR_ROWS = new LongCounterMetric("routine_load_error_rows", MetricUnit.ROWS,
"total error rows of routine load");
DORIS_METRIC_REGISTER.addMetrics(COUNTER_ROUTINE_LOAD_ERROR_ROWS);
COUNTER_ROUTINE_LOAD_GET_META_LANTENCY = new LongCounterMetric("routine_load_get_meta_latency",
MetricUnit.MILLISECONDS, "get meta lantency of routine load");
DORIS_METRIC_REGISTER.addMetrics(COUNTER_ROUTINE_LOAD_GET_META_LANTENCY);
COUNTER_ROUTINE_LOAD_GET_META_COUNT = new LongCounterMetric("routine_load_get_meta_count", MetricUnit.NOUNIT,
"get meta count of routine load");
DORIS_METRIC_REGISTER.addMetrics(COUNTER_ROUTINE_LOAD_GET_META_COUNT);
COUNTER_ROUTINE_LOAD_GET_META_FAIL_COUNT = new LongCounterMetric("routine_load_get_meta_fail_count",
MetricUnit.NOUNIT, "get meta fail count of routine load");
DORIS_METRIC_REGISTER.addMetrics(COUNTER_ROUTINE_LOAD_GET_META_FAIL_COUNT);
COUNTER_ROUTINE_LOAD_TASK_EXECUTE_TIME = new LongCounterMetric("routine_load_task_execute_time",
MetricUnit.MILLISECONDS, "task execute time of routine load");
DORIS_METRIC_REGISTER.addMetrics(COUNTER_ROUTINE_LOAD_TASK_EXECUTE_TIME);
COUNTER_ROUTINE_LOAD_TASK_EXECUTE_COUNT = new LongCounterMetric("routine_load_task_execute_count",
MetricUnit.MILLISECONDS, "task execute count of routine load");
DORIS_METRIC_REGISTER.addMetrics(COUNTER_ROUTINE_LOAD_TASK_EXECUTE_COUNT);

COUNTER_HIT_SQL_BLOCK_RULE = new LongCounterMetric("counter_hit_sql_block_rule", MetricUnit.ROWS,
"total hit sql block rule query");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ public void testUpdateNumOfDataErrorRowMoreThanMax(@Mocked Env env) {
RoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob();
Deencapsulation.setField(routineLoadJob, "maxErrorNum", 0);
Deencapsulation.setField(routineLoadJob, "maxBatchRows", 0);
Deencapsulation.invoke(routineLoadJob, "updateNumOfData", 1L, 1L, 0L, 1L, false);
Deencapsulation.invoke(routineLoadJob, "updateNumOfData", 1L, 1L, 0L, 1L, 1L, false);

Assert.assertEquals(RoutineLoadJob.JobState.PAUSED, Deencapsulation.getField(routineLoadJob, "state"));

Expand All @@ -316,7 +316,7 @@ public void testUpdateTotalMoreThanBatch() {
RoutineLoadStatistic jobStatistic = Deencapsulation.getField(routineLoadJob, "jobStatistic");
Deencapsulation.setField(jobStatistic, "currentErrorRows", 1);
Deencapsulation.setField(jobStatistic, "currentTotalRows", 99);
Deencapsulation.invoke(routineLoadJob, "updateNumOfData", 2L, 0L, 0L, 1L, false);
Deencapsulation.invoke(routineLoadJob, "updateNumOfData", 2L, 0L, 0L, 1L, 1L, false);

Assert.assertEquals(RoutineLoadJob.JobState.RUNNING, Deencapsulation.getField(routineLoadJob, "state"));
Assert.assertEquals(new Long(0), Deencapsulation.getField(jobStatistic, "currentErrorRows"));
Expand Down
Loading

0 comments on commit de1c514

Please sign in to comment.