Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 100 additions & 40 deletions core/plugin/flusher/sls/DiskBufferWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,17 @@ void DiskBufferWriter::Stop() {
}

bool DiskBufferWriter::PushToDiskBuffer(SenderQueueItem* item, uint32_t retryTimes) {
auto slsItem = static_cast<SLSSenderQueueItem*>(item);
// 添加空指针检查
if (item == nullptr) {
LOG_ERROR(sLogger, ("PushToDiskBuffer failed", "item is null"));
return false;
}

auto* slsItem = static_cast<SLSSenderQueueItem*>(item);
if (slsItem == nullptr) {
LOG_ERROR(sLogger, ("PushToDiskBuffer failed", "slsItem is null"));
return false;
}

uint32_t retry = 0;
while (++retry < retryTimes) {
Expand All @@ -160,17 +170,28 @@ bool DiskBufferWriter::PushToDiskBuffer(SenderQueueItem* item, uint32_t retryTim
this_thread::sleep_for(chrono::milliseconds(50));
}

auto flusher = static_cast<const FlusherSLS*>(slsItem->mFlusher);
const auto* flusher = static_cast<const FlusherSLS*>(slsItem->mFlusher);
if (flusher == nullptr) {
LOG_ERROR(sLogger, ("PushToDiskBuffer failed", "flusher is null"));
return false;
}

// 提前提取字符串,避免后续指针被释放
string region = flusher->mRegion;
string project = flusher->mProject;
string logstore = slsItem->mLogstore;
string queueKeyName = QueueKeyManager::GetInstance()->GetName(item->mFlusher->GetQueueKey());

LOG_WARNING(sLogger,
("failed to add sender queue item to disk buffer writer", "queue is full")("action", "discard data")(
"config-flusher-dst", QueueKeyManager::GetInstance()->GetName(item->mFlusher->GetQueueKey())));
"config-flusher-dst", queueKeyName)("region", region)("project", project)("logstore", logstore));
AlarmManager::GetInstance()->SendAlarmCritical(
DISCARD_DATA_ALARM,
"failed to add sender queue item to disk buffer writer: queue is full\taction: discard data",
flusher->mRegion,
flusher->mProject,
region,
project,
"",
slsItem->mLogstore);
logstore);
return false;
}

Expand All @@ -191,7 +212,12 @@ void DiskBufferWriter::BufferWriterThread() {

if (!res.empty()) {
for (auto itr = res.begin(); itr != res.end(); ++itr) {
SendToBufferFile(*itr);
// 添加空指针检查,避免crash
if (*itr != nullptr) {
SendToBufferFile(*itr);
} else {
LOG_ERROR(sLogger, ("BufferWriterThread", "null item in queue"));
}
delete *itr;
}
res.clear();
Expand Down Expand Up @@ -746,8 +772,38 @@ string DiskBufferWriter::GetBufferFileHeader() {
}

bool DiskBufferWriter::SendToBufferFile(SenderQueueItem* dataPtr) {
auto data = static_cast<SLSSenderQueueItem*>(dataPtr);
auto flusher = static_cast<const FlusherSLS*>(data->mFlusher);
// 添加空指针检查
if (dataPtr == nullptr) {
LOG_ERROR(sLogger, ("SendToBufferFile failed", "dataPtr is null"));
return false;
}

auto* data = static_cast<SLSSenderQueueItem*>(dataPtr);
if (data == nullptr) {
LOG_ERROR(sLogger, ("SendToBufferFile failed", "data is null"));
return false;
}

const auto* flusher = static_cast<const FlusherSLS*>(data->mFlusher);
if (flusher == nullptr) {
LOG_ERROR(sLogger, ("SendToBufferFile failed", "flusher is null"));
return false;
}

// 提前提取所有需要的字符串,避免后续指针被释放
string projectName = flusher->mProject;
string region = flusher->mRegion;
string aliuid = flusher->mAliuid;
string endpoint = flusher->mEndpoint;
string logstore = data->mLogstore;
string subpath = flusher->GetSubpath();
string workspace = flusher->GetWorkspace();
auto compressType = ConvertCompressType(flusher->GetCompressType());
auto telemetryType = flusher->mTelemetryType;
#ifdef __ENTERPRISE__
int32_t endpointMode = GetEndpointMode(flusher->mEndpointMode);
#endif

string bufferFileName = GetBufferFileName();
if (bufferFileName.empty()) {
CreateNewFile();
Expand All @@ -760,11 +816,13 @@ bool DiskBufferWriter::SendToBufferFile(SenderQueueItem* dataPtr) {
AlarmManager::GetInstance()->SendAlarmCritical(SECONDARY_READ_WRITE_ALARM,
string("open file error:") + bufferFileName
+ ",error:" + errorStr,
flusher->mRegion,
flusher->mProject,
region,
projectName,
"",
data->mLogstore);
LOG_ERROR(sLogger, ("open buffer file error", bufferFileName));
logstore);
LOG_ERROR(sLogger,
("open buffer file error", bufferFileName)("region", region)("projectName", projectName)("logstore",
logstore));
return false;
}

Expand All @@ -776,46 +834,48 @@ bool DiskBufferWriter::SendToBufferFile(SenderQueueItem* dataPtr) {
AlarmManager::GetInstance()->SendAlarmCritical(SECONDARY_READ_WRITE_ALARM,
string("write file error:") + bufferFileName
+ ", error:" + errorStr + ", nbytes:" + ToString(nbytes),
flusher->mRegion,
flusher->mProject,
region,
projectName,
"",
data->mLogstore);
LOG_ERROR(sLogger, ("error write encryption header", bufferFileName)("error", errorStr)("nbytes", nbytes));
logstore);
LOG_ERROR(sLogger,
("error write encryption header", bufferFileName)("error", errorStr)("nbytes", nbytes)(
"region", region)("projectName", projectName)("logstore", logstore));
fclose(fout);
return false;
}
}

char* des;
int32_t desLength;
char* des = nullptr;
int32_t desLength = 0;
if (!FileEncryption::GetInstance()->Encrypt(data->mData.c_str(), data->mData.size(), des, desLength)) {
fclose(fout);
LOG_ERROR(sLogger, ("encrypt error, project_name", flusher->mProject));
LOG_ERROR(sLogger, ("encrypt error, project_name", projectName)("region", region)("logstore", logstore));
AlarmManager::GetInstance()->SendAlarmCritical(ENCRYPT_DECRYPT_FAIL_ALARM,
string("encrypt error, project_name:" + flusher->mProject),
flusher->mRegion,
flusher->mProject,
string("encrypt error, project_name:" + projectName),
region,
projectName,
"",
data->mLogstore);
logstore);
return false;
}

sls_logs::LogtailBufferMeta bufferMeta;
bufferMeta.set_project(flusher->mProject);
bufferMeta.set_region(flusher->mRegion);
bufferMeta.set_aliuid(flusher->mAliuid);
bufferMeta.set_logstore(data->mLogstore);
bufferMeta.set_project(projectName);
bufferMeta.set_region(region);
bufferMeta.set_aliuid(aliuid);
bufferMeta.set_logstore(logstore);
bufferMeta.set_datatype(int32_t(data->mType));
bufferMeta.set_rawsize(data->mRawSize);
bufferMeta.set_shardhashkey(data->mShardHashKey);
bufferMeta.set_compresstype(ConvertCompressType(flusher->GetCompressType()));
bufferMeta.set_telemetrytype(flusher->mTelemetryType);
bufferMeta.set_subpath(flusher->GetSubpath());
bufferMeta.set_workspace(flusher->GetWorkspace());
bufferMeta.set_compresstype(compressType);
bufferMeta.set_telemetrytype(telemetryType);
bufferMeta.set_subpath(subpath);
bufferMeta.set_workspace(workspace);
#ifdef __ENTERPRISE__
bufferMeta.set_endpointmode(GetEndpointMode(flusher->mEndpointMode));
bufferMeta.set_endpointmode(endpointMode);
#endif
bufferMeta.set_endpoint(flusher->mEndpoint);
bufferMeta.set_endpoint(endpoint);
string encodedInfo;
bufferMeta.SerializeToString(&encodedInfo);

Expand All @@ -839,13 +899,13 @@ bool DiskBufferWriter::SendToBufferFile(SenderQueueItem* dataPtr) {
AlarmManager::GetInstance()->SendAlarmCritical(SECONDARY_READ_WRITE_ALARM,
string("write file error:") + bufferFileName
+ ", error:" + errorStr + ", nbytes:" + ToString(nbytes),
flusher->mRegion,
flusher->mProject,
region,
projectName,
"",
data->mLogstore);
LOG_ERROR(
sLogger,
("write meta of buffer file", "fail")("filename", bufferFileName)("errorStr", errorStr)("nbytes", nbytes));
logstore);
LOG_ERROR(sLogger,
("write meta of buffer file", "fail")("filename", bufferFileName)("errorStr", errorStr)(
"nbytes", nbytes)("region", region)("projectName", projectName)("logstore", logstore));
delete[] buffer;
fclose(fout);
return false;
Expand Down
Loading