Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/host_monitor/LinuxSystemInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1044,7 +1044,7 @@ bool LinuxSystemInterface::GetProcessCredNameOnce(pid_t pid, ProcessCredName& pr
passwd pwbuffer;
char buffer[2048];
if (getpwuid_r(cred.uid, &pwbuffer, buffer, sizeof(buffer), &pw) != 0 || pw == nullptr || pw->pw_name == nullptr) {
return false;
return true;
}

processCredName.user = pw->pw_name;
Expand All @@ -1053,7 +1053,7 @@ bool LinuxSystemInterface::GetProcessCredNameOnce(pid_t pid, ProcessCredName& pr
group grpbuffer{};
char groupBuffer[2048];
if (getgrgid_r(cred.gid, &grpbuffer, groupBuffer, sizeof(groupBuffer), &grp) != 0) {
return false;
return true;
}

if (grp != nullptr && grp->gr_name != nullptr) {
Expand Down
9 changes: 5 additions & 4 deletions core/host_monitor/SystemInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ SystemInterface* SystemInterface::GetInstance() {
}

bool SystemInterface::GetSystemInformation(SystemInformation& systemInfo) {
// SystemInformation is static and will not be changed. So cache will never be expired.
// SystemInformation is static and will not be changed. So cache will never be
// expired.
if (mSystemInformationCache.collectTime.time_since_epoch().count() > 0) {
systemInfo = mSystemInformationCache;
return true;
Expand Down Expand Up @@ -288,7 +289,8 @@ bool SystemInterface::SystemInformationCache<InfoT, Args...>::GetWithTimeout(Inf
return true;
}
if (!it->second.second) {
// the cache is stale and no thread is updating, will update by this thread
// the cache is stale and no thread is updating, will update by this
// thread
it->second.second.store(true);
return false;
}
Expand Down Expand Up @@ -382,9 +384,8 @@ bool SystemInterface::SystemInformationCache<InfoT>::GC() {

std::string MacString(const unsigned char* mac) {
std::string str;
if (mac != nullptr && sizeof(mac) >= 6) {
if (mac != nullptr) {
str = fmt::format("{:02X}:{:02X}:{:02X}:{:02X}:{:02X}:{:02X}", mac[0], mac[1], mac[2], mac[3], mac[4], mac[5]);
// str = fmt::sprintf("%02X:%02X:%02X:%02X:%02X:%02X", mac[0], mac[1], mac[2], mac[3], mac[4], mac[5]);
}
return str;
}
Expand Down
10 changes: 5 additions & 5 deletions core/host_monitor/SystemInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ struct ProcessCred {
struct ProcessAllStat {
pid_t pid;
ProcessStat processState;
ProcessInfo processInfo;
// ProcessInfo processInfo;
ProcessCpuInformation processCpu;
ProcessMemoryInformation processMemory;
double memPercent = 0.0;
Expand All @@ -181,10 +181,10 @@ struct ProcessAllStat {

struct ProcessPushMertic {
pid_t pid;
std::string name;
std::string user;
std::string path;
std::string args;
// std::string name;
// std::string user;
// std::string path;
// std::string args;
double cpuPercent = 0.0;
double memPercent = 0.0;
double fdNum = 0.0;
Expand Down
32 changes: 31 additions & 1 deletion core/host_monitor/collector/NetCollector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,14 @@ bool NetCollector::Collect(const HostMonitorTimerEvent::CollectConfig& collectCo
return false;
}

std::vector<std::string> curDevNames; // 本次采集到的所有设备名

// 更新记录ip
for (auto& netInterface : netInterfaces.configs) {
if (netInterface.name.empty()) {
continue;
}

curDevNames.push_back(netInterface.name);
mDevIp[netInterface.name] = netInterface.address.str();
if (mDevIp[netInterface.name].empty()) {
mDevIp[netInterface.name] = netInterface.address6.str();
Expand Down Expand Up @@ -315,6 +317,34 @@ bool NetCollector::Collect(const HostMonitorTimerEvent::CollectConfig& collectCo

mCount = 0;
mLastTime = start;

// 清理掉mLastInterfaceMetrics中,curDevNames中不存在的设备名
for (auto it = mLastInterfaceMetrics.begin(); it != mLastInterfaceMetrics.end();) {
if (std::find(curDevNames.begin(), curDevNames.end(), it->first) == curDevNames.end()) {
it = mLastInterfaceMetrics.erase(it);
} else {
++it;
}
}

// 清理掉mRatePerSecCalMap中,curDevNames中不存在的设备名
for (auto it = mRatePerSecCalMap.begin(); it != mRatePerSecCalMap.end();) {
if (std::find(curDevNames.begin(), curDevNames.end(), it->first) == curDevNames.end()) {
it = mRatePerSecCalMap.erase(it);
} else {
++it;
}
}

// 清理掉mDevIp中,curDevNames中不存在的设备名
for (auto it = mDevIp.begin(); it != mDevIp.end();) {
if (std::find(curDevNames.begin(), curDevNames.end(), it->first) == curDevNames.end()) {
it = mDevIp.erase(it);
} else {
++it;
}
}

return true;
}

Expand Down
2 changes: 0 additions & 2 deletions core/host_monitor/collector/NetCollector.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,11 @@ class NetCollector : public BaseCollector {


private:
std::map<std::string, NetInterfaceMetric> mLastInterfaceStatMap;
std::chrono::steady_clock::time_point mLastTime;
std::map<std::string, NetInterfaceMetric> mLastInterfaceMetrics;
int mCountPerReport = 0;
int mCount = 0;
MetricCalculate<ResTCPStat, uint64_t> mTCPCal;
// std::map<std::string, MetricCalculate<ResNetPackRate>> mPackRateCalMap;
std::map<std::string, MetricCalculate<ResNetRatePerSec>> mRatePerSecCalMap;
std::map<std::string, std::string> mDevIp;
};
Expand Down
115 changes: 67 additions & 48 deletions core/host_monitor/collector/ProcessCollector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
#include "host_monitor/SystemInterface.h"
#include "logger/Logger.h"


namespace logtail {

DEFINE_FLAG_INT32(process_report_top_N, "number of process reported with Top N cpu percent", 5);
Expand Down Expand Up @@ -93,7 +92,6 @@ int ProcessCollector::Init(int processTotalCount, int processReportTopN) {
return 0;
}


bool ProcessCollector::Collect(const HostMonitorTimerEvent::CollectConfig& collectConfig, PipelineEventGroup* group) {
if (group == nullptr) {
return false;
Expand All @@ -119,7 +117,7 @@ bool ProcessCollector::Collect(const HostMonitorTimerEvent::CollectConfig& colle
allPidStats.push_back(stat);
}

GetProcessCpuSorted(allPidStats);
// GetProcessCpuSorted(allPidStats);

int processNum = allPidStats.size();

Expand All @@ -136,18 +134,19 @@ bool ProcessCollector::Collect(const HostMonitorTimerEvent::CollectConfig& colle
pushMertic.numThreads = stat.processState.numThreads;
pushMertic.memPercent = stat.memPercent;
pushMertic.cpuPercent = stat.processCpu.percent;
pushMertic.name = stat.processInfo.name;
pushMertic.user = stat.processInfo.user;
pushMertic.args = stat.processInfo.args;
pushMertic.path = stat.processInfo.path;
// pushMertic.name = stat.processInfo.name;
// pushMertic.user = stat.processInfo.user;
// pushMertic.args = stat.processInfo.args;
// pushMertic.path = stat.processInfo.path;
pushMerticList.push_back(pushMertic);
}

// set calculation
// 为vmState添加多值计算
mVMProcessNumStat.AddValue(processNumStat);
// 给每个pid推送对象设定其多值体系
// mProcessPushMertic是一个字典,key 为pid,对应的value为多值vector,里面存储了每一个pid的多值体系
// mProcessPushMertic是一个字典,key
// 为pid,对应的value为多值vector,里面存储了每一个pid的多值体系
for (auto& metric : pushMerticList) {
uint64_t thisPid = metric.pid;
// auto met = mProcessPushMertic.find(thisPid);
Expand All @@ -169,11 +168,15 @@ bool ProcessCollector::Collect(const HostMonitorTimerEvent::CollectConfig& colle
return true;
}

GetProcessCpuSorted(allPidStats);//排序

// 记录count满足条件以后,计算并推送多值指标;如果没有到达条件,只需要往多值体系内添加统计对象即可
VMProcessNumStat minVMProcessNum, maxVMProcessNum, avgVMProcessNum, lastVMProcessNum;
mVMProcessNumStat.Stat(minVMProcessNum, maxVMProcessNum, avgVMProcessNum, &lastVMProcessNum);

/*
for (auto& metric : pushMerticList) {
//只需要处理前五
// 处理每一个pid的推送数据
uint64_t thisPid = metric.pid;

Expand All @@ -189,7 +192,7 @@ bool ProcessCollector::Collect(const HostMonitorTimerEvent::CollectConfig& colle
mAvgProcessNumThreads.insert(std::make_pair(thisPid, avgMetric.numThreads));
// 每个pid下的多值体系添加完毕
}

*/
// 指标推送
MetricEvent* metricEvent = group->AddMetricEvent(true);
if (!metricEvent) {
Expand Down Expand Up @@ -226,34 +229,45 @@ bool ProcessCollector::Collect(const HostMonitorTimerEvent::CollectConfig& colle
// 上传每一个pid对应的值
double value = 0.0;
pid_t pid = pushMerticList[i].pid;

// 计算pid的多值信息
ProcessPushMertic minMetric, maxMetric, avgMetric;
mProcessPushMertic[pid].Stat(minMetric, maxMetric, avgMetric);

// 获取pid对应的processinfo
ProcessInfo processInfo;
if (!GetProcessInfo(pid, processInfo)) {
continue;
}

// cpu percent
value = static_cast<double>(mAvgProcessCpuPercent.find(pid)->second);
value = static_cast<double>(avgMetric.cpuPercent);
multiDoubleValuesEachPid->SetValue(std::string("process_cpu_avg"),
UntypedMultiDoubleValue{UntypedValueMetricType::MetricTypeGauge, value});
// mem percent
value = static_cast<double>(mAvgProcessMemPercent.find(pid)->second);
value = static_cast<double>(avgMetric.memPercent);
multiDoubleValuesEachPid->SetValue(std::string("process_memory_avg"),
UntypedMultiDoubleValue{UntypedValueMetricType::MetricTypeGauge, value});
// open file number
value = static_cast<double>(mAvgProcessFd.find(pid)->second);
value = static_cast<double>(avgMetric.fdNum);
multiDoubleValuesEachPid->SetValue(std::string("process_openfile_avg"),
UntypedMultiDoubleValue{UntypedValueMetricType::MetricTypeGauge, value});
// process number
value = static_cast<double>(mAvgProcessNumThreads.find(pid)->second);
value = static_cast<double>(avgMetric.numThreads);
multiDoubleValuesEachPid->SetValue(std::string("process_number_avg"),
UntypedMultiDoubleValue{UntypedValueMetricType::MetricTypeGauge, value});

value = static_cast<double>(mMaxProcessNumThreads.find(pid)->second);
value = static_cast<double>(maxMetric.numThreads);
multiDoubleValuesEachPid->SetValue(std::string("process_number_max"),
UntypedMultiDoubleValue{UntypedValueMetricType::MetricTypeGauge, value});

value = static_cast<double>(mMinProcessNumThreads.find(pid)->second);
value = static_cast<double>(minMetric.numThreads);
multiDoubleValuesEachPid->SetValue(std::string("process_number_min"),
UntypedMultiDoubleValue{UntypedValueMetricType::MetricTypeGauge, value});

metricEventEachPid->SetTag("pid", std::to_string(pid));
metricEventEachPid->SetTag("name", pushMerticList[i].name);
metricEventEachPid->SetTag("user", pushMerticList[i].user);
metricEventEachPid->SetTag("name", processInfo.name);
metricEventEachPid->SetTag("user", processInfo.user);
metricEventEachPid->SetTag(std::string("m"), std::string("system.process"));
}

Expand All @@ -269,16 +283,16 @@ bool ProcessCollector::Collect(const HostMonitorTimerEvent::CollectConfig& colle
mAvgProcessNumThreads.clear();
pidNameMap.clear();
pushMerticList.clear();
ClearProcessCpuTimeCache();
return true;
}


// 获取某个pid的信息
bool ProcessCollector::GetProcessAllStat(pid_t pid, ProcessAllStat& processStat) {
// 获取这个pid的cpu信息
processStat.pid = pid;

if (!GetProcessCpuInformation(pid, processStat.processCpu, false)) {
if (!GetProcessCpuInformation(pid, processStat.processCpu)) {
return false;
}

Expand All @@ -297,9 +311,9 @@ bool ProcessCollector::GetProcessAllStat(pid_t pid, ProcessAllStat& processStat)
processStat.fdNum = procFd.total;
processStat.fdNumExact = procFd.exact;

if (!GetProcessInfo(pid, processStat.processInfo)) {
return false;
}
// if (!GetProcessInfo(pid, processStat.processInfo)) {
// return false;
// }

processStat.memPercent = mTotalMemory == 0 ? 0 : 100.0 * processStat.processMemory.resident / mTotalMemory;
return true;
Expand Down Expand Up @@ -417,47 +431,28 @@ bool ProcessCollector::GetProcessState(pid_t pid, ProcessStat& processState) {
return true;
}

// 获取每个Pid的CPU信息
bool ProcessCollector::GetPidsCpu(const std::vector<pid_t>& pids, std::map<pid_t, uint64_t>& pidMap) {
int readCount = 0;
for (pid_t pid : pids) {
if (++readCount > mProcessSilentCount) { // 每读一段时间就要停下,防止进程过多占用太多时间
readCount = 0;
std::this_thread::sleep_for(milliseconds{100});
}
// 获取每个Pid的CPU信息
ProcessCpuInformation procCpu;
if (0 == GetProcessCpuInformation(pid, procCpu, false)) {
pidMap[pid] = procCpu.total;
}
}
return true;
}


// 给pid做cache
bool ProcessCollector::GetProcessCpuInCache(pid_t pid, bool includeCTime) {
bool ProcessCollector::GetProcessCpuInCache(pid_t pid) {
if (cpuTimeCache.find(pid) != cpuTimeCache.end()) {
return true;
} else {
return false;
}
}


bool ProcessCollector::GetProcessCpuInformation(pid_t pid, ProcessCpuInformation& information, bool includeCTime) {
bool ProcessCollector::GetProcessCpuInformation(pid_t pid, ProcessCpuInformation& information) {
const auto now = std::chrono::steady_clock::now();
bool findCache = false;
ProcessCpuInformation* prev = nullptr;

// 由于计算CPU时间需要获取一个时间间隔
// 但是我们这里不应该睡眠,因此只能做一个cache,保存上一次获取的数据
findCache = GetProcessCpuInCache(pid, includeCTime);
findCache = GetProcessCpuInCache(pid);

information.lastTime = now;
ProcessTime processTime{};

if (!GetProcessTime(pid, processTime, includeCTime)) {
if (!GetProcessTime(pid, processTime)) {
return false;
}

Expand Down Expand Up @@ -494,7 +489,7 @@ bool ProcessCollector::GetProcessCpuInformation(pid_t pid, ProcessCpuInformation
return true;
}

bool ProcessCollector::GetProcessTime(pid_t pid, ProcessTime& output, bool includeCTime) {
bool ProcessCollector::GetProcessTime(pid_t pid, ProcessTime& output) {
ProcessInformation processInfo;

if (!ReadProcessStat(pid, processInfo.stat)) {
Expand All @@ -514,8 +509,9 @@ bool ProcessCollector::GetProcessTime(pid_t pid, ProcessTime& output, bool inclu
}

// 数据样例: /proc/1/stat, 解析/proc/pid/stat
// 1 (cat) R 0 1 1 34816 1 4194560 1110 0 0 0 1 1 0 0 20 0 1 0 18938584 4505600 171 18446744073709551615 4194304 4238788
// 140727020025920 0 0 0 0 0 0 0 0 0 17 3 0 0 0 0 0 6336016 6337300 21442560 140727020027760 140727020027777
// 1 (cat) R 0 1 1 34816 1 4194560 1110 0 0 0 1 1 0 0 20 0 1 0 18938584 4505600
// 171 18446744073709551615 4194304 4238788 140727020025920 0 0 0 0 0 0 0 0 0 17
// 3 0 0 0 0 0 6336016 6337300 21442560 140727020027760 140727020027777
// 140727020027777 140727020027887 0
bool ProcessCollector::ReadProcessStat(pid_t pid, ProcessStat& processStat) {
processStat.pid = pid;
Expand Down Expand Up @@ -549,4 +545,27 @@ bool ProcessCollector::ReadProcessStat(pid_t pid, ProcessStat& processStat) {
return true;
}

void ProcessCollector::ClearProcessCpuTimeCache() {
try {
// 清除超时的cache
const auto now = std::chrono::steady_clock::now();
auto it = cpuTimeCache.begin();

while (it != cpuTimeCache.end()) {
// 检查当前元素是否超时
if (now - it->second.lastTime > ProcessSortInterval) {
// 超时,删除该元素
it = cpuTimeCache.erase(it);
} else {
// 未超时,继续检查下一个元素
++it;
}
}
} catch (const std::exception& e) {
LOG_ERROR(sLogger, ("ClearProcessCpuTimeCache error", e.what()));
}

return;
}

} // namespace logtail
Loading
Loading