Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions core/application/Application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,21 @@ void Application::Start() { // GCOVR_EXCL_START

OnetimeConfigInfoManager::GetInstance()->LoadCheckpointFile();

#ifdef __ENTERPRISE__
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

开源版调用会有什么影响?尽量少引入特殊的控制。

// Wait for all runners to be started, then load built-in pipelines first, followed by sending start metrics.
auto configDiff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(true);
if (!configDiff.first.IsEmpty()) {
CollectionPipelineManager::GetInstance()->UpdatePipelines(configDiff.first);
}
if (!configDiff.second.IsEmpty()) {
TaskPipelineManager::GetInstance()->UpdatePipelines(configDiff.second);
}
if (!configDiff.first.IsEmpty() || !configDiff.second.IsEmpty()) {
OnetimeConfigInfoManager::GetInstance()->DumpCheckpointFile();
}
LoongCollectorMonitor::GetInstance()->SendStartMetric();
#endif

time_t curTime = 0, lastOnetimeConfigTimeoutCheckTime = 0, lastConfigCheckTime = 0, lastUpdateMetricTime = 0,
lastCheckTagsTime = 0, lastQueueGCTime = 0, lastCheckUnusedCheckpointsTime = 0;
while (true) {
Expand Down
16 changes: 13 additions & 3 deletions core/config/watcher/PipelineConfigWatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,18 @@ PipelineConfigWatcher::PipelineConfigWatcher()
mTaskPipelineManager(TaskPipelineManager::GetInstance()) {
}

pair<CollectionConfigDiff, TaskConfigDiff> PipelineConfigWatcher::CheckConfigDiff() {
pair<CollectionConfigDiff, TaskConfigDiff> PipelineConfigWatcher::CheckConfigDiff(bool builtinOnly) {
CollectionConfigDiff pDiff;
TaskConfigDiff tDiff;
unordered_set<string> configSet;
SingletonConfigCache singletonCache;
// builtin pipeline configs
InsertBuiltInPipelines(pDiff, tDiff, configSet, singletonCache);
// file pipeline configs
InsertPipelines(pDiff, tDiff, configSet, singletonCache);

if (!builtinOnly) {
// file pipeline configs
InsertPipelines(pDiff, tDiff, configSet, singletonCache);
}

CheckSingletonInput(pDiff, singletonCache);
for (const auto& name : mCollectionPipelineManager->GetAllConfigNames()) {
Expand Down Expand Up @@ -100,6 +103,13 @@ void PipelineConfigWatcher::InsertBuiltInPipelines(CollectionConfigDiff& pDiff,
for (const auto& pipeline : builtInPipelines) {
const string& pipelineName = pipeline.first;
const string& pipleineDetail = pipeline.second;
if (configSet.find(pipelineName) != configSet.end()) {
LOG_WARNING(sLogger,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

什么时候会出现

("more than 1 config with the same name is found", "skip current config")("inner pipeline",
pipelineName));
continue;
}
configSet.insert(pipelineName);

string errorMsg;
auto iter = mInnerConfigMap.find(pipelineName);
Expand Down
2 changes: 1 addition & 1 deletion core/config/watcher/PipelineConfigWatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class PipelineConfigWatcher : public ConfigWatcher {
return &instance;
}

std::pair<CollectionConfigDiff, TaskConfigDiff> CheckConfigDiff();
std::pair<CollectionConfigDiff, TaskConfigDiff> CheckConfigDiff(bool builtinOnly = false);

#ifdef APSARA_UNIT_TEST_MAIN
void SetPipelineManager(const CollectionPipelineManager* pm) { mCollectionPipelineManager = pm; }
Expand Down
14 changes: 14 additions & 0 deletions core/monitor/Monitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,10 @@ void LoongCollectorMonitor::Stop() {
LOG_INFO(sLogger, ("LoongCollector monitor", "stopped successfully"));
}

void LoongCollectorMonitor::SendStartMetric() {
SelfMonitorServer::GetInstance()->SendStartMetric();
}

bool LoongCollectorMonitor::GetAgentMetric(SelfMonitorMetricEvent& event) {
lock_guard<mutex> lock(mGlobalMetricsMux);
event = mGlobalMetrics.mAgentMetric;
Expand Down Expand Up @@ -675,4 +679,14 @@ void LoongCollectorMonitor::SetRunnerMetric(const std::string& runnerName, const
mGlobalMetrics.mRunnerMetrics[runnerName] = event;
}

void LogtailMonitor::UpdateCpuMem() {
CpuStat curCpuStat;
if (GetCpuStat(curCpuStat)) {
GetMemStat();
LoongCollectorMonitor::GetInstance()->SetAgentMemory(mMemStat.mRss);
CalCpuStat(curCpuStat, mCpuStat);
LoongCollectorMonitor::GetInstance()->SetAgentCpu(mCpuStat.mCpuUsage);
}
}

} // namespace logtail
5 changes: 5 additions & 0 deletions core/monitor/Monitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ class LogtailMonitor {
// LogInput use it to do flow control.
float GetRealtimeCpuLevel() { return mRealtimeCpuStat.mCpuUsage / mScaledCpuUsageUpLimit; }


// UpdateCpuMem updates CPU and Memory statistics and sets them to LoongCollectorMonitor
void UpdateCpuMem();

private:
LogtailMonitor();
~LogtailMonitor() = default;
Expand Down Expand Up @@ -192,6 +196,7 @@ class LoongCollectorMonitor {

void Init();
void Stop();
void SendStartMetric();

bool GetAgentMetric(SelfMonitorMetricEvent& event);
void SetAgentMetric(const SelfMonitorMetricEvent& event);
Expand Down
87 changes: 86 additions & 1 deletion core/monitor/SelfMonitorServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,14 @@

#include "monitor/SelfMonitorServer.h"

#include <json/json.h>

#include <fstream>

#include "MetricConstants.h"
#include "Monitor.h"
#include "app_config/AppConfig.h"
#include "common/FileSystemUtil.h"
#include "runner/ProcessorRunner.h"

using namespace std;
Expand Down Expand Up @@ -100,7 +106,7 @@ void SelfMonitorServer::RemoveMetricPipeline() {
LOG_INFO(sLogger, ("self-monitor metrics pipeline", "removed"));
}

void SelfMonitorServer::SendMetrics() {
void SelfMonitorServer::SendMetrics(bool updateAppInfo) {
ReadMetrics::GetInstance()->UpdateMetrics();

ReadLock lock(mMetricPipelineLock);
Expand All @@ -117,6 +123,10 @@ void SelfMonitorServer::SendMetrics() {
pipelineEventGroup.SetMetadata(EventGroupMetaKey::INTERNAL_DATA_TYPE, INTERNAL_DATA_TYPE_METRIC);
ReadAsPipelineEventGroup(pipelineEventGroup);

if (updateAppInfo) {
UpdateAppInfoJson(pipelineEventGroup);
}

if (pipelineEventGroup.GetEvents().size() > 0) {
ProcessorRunner::GetInstance()->PushQueue(
mMetricPipelineCtx->GetProcessQueueKey(), mMetricInputIndex, std::move(pipelineEventGroup));
Expand Down Expand Up @@ -213,4 +223,79 @@ void SelfMonitorServer::SendAlarms() {
}
}

void SelfMonitorServer::SendStartMetric() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

尽量时序上去保证先后顺序

// 等待mMetricPipelineCtx存在,最多等待1秒
auto startTime = std::chrono::steady_clock::now();
while (true) {
ReadLock lock(mMetricPipelineLock);
if (mMetricPipelineCtx != nullptr && mSelfMonitorMetricRules != nullptr) {
auto endTime = std::chrono::steady_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(endTime - startTime);
LOG_INFO(sLogger,
("send start metric", "wait metric pipeline ready")("cost milliseconds", duration.count()));
break;
}
lock.unlock();

// 检查是否超时
auto currentTime = std::chrono::steady_clock::now();
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(currentTime - startTime);
if (elapsed.count() >= 1000) { // 1秒超时,实测正常启动应该1ms内完成
LOG_WARNING(sLogger, ("send start metric", "wait metric pipeline ready timeout, skip"));
return;
}

std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
// 启动时需要立刻更新一下cpu和memory指标,因为正常启动时,cpu和memory指标会等30s更新一次
LogtailMonitor::GetInstance()->UpdateCpuMem();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

都没加载配置,cpu、内存此时没有什么意义


// 调用SendMetrics,并启用UpdateAppInfo
// TODO: SendMetrics(true);
SendMetrics();
LOG_INFO(sLogger, ("send start metric", "sent successfully"));
}

void SelfMonitorServer::UpdateAppInfoJson(const PipelineEventGroup& pipelineEventGroup) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

没有什么有意义的信息,就不要李留这些代码

// 读取现有的app_info.json
string appInfoFile = GetAgentAppInfoFile();
Json::Value appInfoJson;

// 尝试读取现有文件
ifstream file(appInfoFile);
if (file.is_open()) {
Json::CharReaderBuilder builder;
string errors;
if (Json::parseFromStream(builder, file, &appInfoJson, &errors)) {
file.close();
} else {
file.close();
LOG_WARNING(sLogger, ("failed to parse existing app_info.json", errors));
}
}

const auto& events = pipelineEventGroup.GetEvents();
for (const auto& eventPtr : events) {
if (!eventPtr.Is<MetricEvent>()) {
continue;
}

const auto& metricEvent = eventPtr.Cast<MetricEvent>();
const std::string metricName = metricEvent.GetName().to_string();

// 检查是否是agent指标
if (metricName == MetricCategory::METRIC_CATEGORY_AGENT) {
// TODO: 从metricEvent中提取agent指标设置到appInfoJson
}
// 检查是否是runner指标
else if (metricName == MetricCategory::METRIC_CATEGORY_RUNNER) {
// TODO: 从metricEvent中提取runner指标,例如所有已经启动的runner的名字
}
}
// 写入更新后的app_info.json
string appInfo = appInfoJson.toStyledString();
OverwriteFile(appInfoFile, appInfo);
LOG_INFO(sLogger, ("updated app_info.json with additional metric info", appInfo));
}

} // namespace logtail
7 changes: 6 additions & 1 deletion core/monitor/SelfMonitorServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ class SelfMonitorServer {
void UpdateAlarmPipeline(CollectionPipelineContext* ctx, size_t inputIndex);
void RemoveAlarmPipeline();

// send start metric (agent/runner)
void SendStartMetric();
// update app_info.json
void UpdateAppInfoJson(const PipelineEventGroup& pipelineEventGroup);

static const std::string INTERNAL_DATA_TYPE_ALARM;
static const std::string INTERNAL_DATA_TYPE_METRIC;

Expand All @@ -51,7 +56,7 @@ class SelfMonitorServer {
std::condition_variable mStopCV;

// metrics
void SendMetrics();
void SendMetrics(bool updateAppInfo = false);
bool ProcessSelfMonitorMetricEvent(SelfMonitorMetricEvent& event, const SelfMonitorMetricRule& rule);
void PushSelfMonitorMetricEvents(std::vector<SelfMonitorMetricEvent>& events);
void ReadAsPipelineEventGroup(PipelineEventGroup& pipelineEventGroup);
Expand Down
Loading