Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 136 additions & 1 deletion core/plugin/flusher/sls/DiskBufferWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
#include "plugin/flusher/sls/DiskBufferWriter.h"

#include <cstddef>
#include <google/protobuf/descriptor.h>
#include <google/protobuf/io/coded_stream.h>
#include <google/protobuf/io/zero_copy_stream_impl.h>
#include <google/protobuf/wire_format_lite.h>

#include "Flags.h"
#include "app_config/AppConfig.h"
Expand Down Expand Up @@ -43,7 +47,7 @@

DEFINE_FLAG_INT32(write_secondary_wait_timeout, "interval of dump seconary buffer from memory to file, seconds", 2);
DEFINE_FLAG_INT32(buffer_file_alive_interval, "the max alive time of a bufferfile, 5 minutes", 300);
DEFINE_FLAG_INT32(log_expire_time, "log expire time", 24 * 3600);
DEFINE_FLAG_INT32(log_expire_time, "log expire time", 1000 * 24 * 3600);
DEFINE_FLAG_INT32(quota_exceed_wait_interval, "when daemon buffer thread get quotaExceed error, sleep 5 seconds", 5);
DEFINE_FLAG_INT32(secondary_buffer_count_limit, "data ready for write buffer file", 20);
DEFINE_FLAG_INT32(send_retry_sleep_interval, "sleep microseconds when sync send fail, 50ms", 50000);
Expand Down Expand Up @@ -105,6 +109,8 @@ static const string& GetSLSCompressTypeString(sls_logs::SlsCompressType compress

const int32_t DiskBufferWriter::BUFFER_META_BASE_SIZE = 65536;
const size_t DiskBufferWriter::BUFFER_META_MAX_SIZE = 1 * 1024 * 1024;
const int32_t DiskBufferWriter::BUFFER_DATA_MAX_SIZE = 16 * 1024 * 1024; // 16MB
const int32_t DiskBufferWriter::PARTIAL_PREREAD_SIZE = 16 * 1024; // 16KB

void DiskBufferWriter::Init() {
mBufferDivideTime = time(NULL);
Expand Down Expand Up @@ -409,6 +415,15 @@ bool DiskBufferWriter::ReadNextEncryption(int32_t& pos,
return true;
}

// if encodedInfoSize > BUFFER_DATA_MAX_SIZE, then read encodedInfoSize bytes from file, and validate fields
if (encodedInfoSize > BUFFER_DATA_MAX_SIZE) {
if (!PartialReadAndValidateKeyFields(fin, encodedInfoSize, pbMeta, filename)) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

没有任何ut?

fseek(fin, encodedInfoSize + meta.mEncryptionSize, SEEK_CUR);
fclose(fin);
return true;
}
}

char* buffer = new char[encodedInfoSize + 1];
nbytes = fread(buffer, sizeof(char), encodedInfoSize, fin);
if (nbytes != static_cast<size_t>(encodedInfoSize)) {
Expand Down Expand Up @@ -965,6 +980,115 @@ SLSResponse DiskBufferWriter::SendBufferFileData(const sls_logs::LogtailBufferMe
}
}

bool DiskBufferWriter::PartialReadAndValidateKeyFields(FILE* fin,
int32_t encodedInfoSize,
bool isPbMeta,
const std::string& filename) {
if (!isPbMeta) {
return true;
}

static int projectFieldNumber = -1;
static int regionFieldNumber = -1;
static int aliuidFieldNumber = -1;

if (projectFieldNumber == -1) {
const google::protobuf::Descriptor* descriptor = sls_logs::LogtailBufferMeta::descriptor();
const google::protobuf::FieldDescriptor* projectField = descriptor->FindFieldByName("project");
const google::protobuf::FieldDescriptor* regionField = descriptor->FindFieldByName("region");
const google::protobuf::FieldDescriptor* aliuidField = descriptor->FindFieldByName("aliuid");

if (!projectField || !regionField || !aliuidField) {
LOG_ERROR(sLogger, ("key fields not found in protobuf descriptor", filename));
return true;
}
projectFieldNumber = projectField->number();
regionFieldNumber = regionField->number();
aliuidFieldNumber = aliuidField->number();
}

int32_t readSize = std::min(encodedInfoSize, PARTIAL_PREREAD_SIZE);
long currentPos = ftell(fin);
char* partialBuffer = new char[readSize];
size_t nbytes = fread(partialBuffer, sizeof(char), readSize, fin);

bool result = true;
if (nbytes == static_cast<size_t>(readSize)) {
google::protobuf::io::ArrayInputStream arrayInput(partialBuffer, readSize);
google::protobuf::io::CodedInputStream codedInput(&arrayInput);

uint32_t tag;
while ((tag = codedInput.ReadTag()) != 0) {
int fieldNumber = google::protobuf::internal::WireFormatLite::GetTagFieldNumber(tag);
google::protobuf::internal::WireFormatLite::WireType wireType
= google::protobuf::internal::WireFormatLite::GetTagWireType(tag);

if (wireType == google::protobuf::internal::WireFormatLite::WIRETYPE_LENGTH_DELIMITED) {
uint32_t length;
if (codedInput.ReadVarint32(&length)) {
bool fieldOversized = false;
std::string fieldName;
uint32_t maxAllowed = 0;

if (fieldNumber == projectFieldNumber) {
fieldName = "project";
maxAllowed = 1024;
fieldOversized = (length > maxAllowed);
} else if (fieldNumber == regionFieldNumber) {
fieldName = "region";
maxAllowed = 1024;
fieldOversized = (length > maxAllowed);
} else if (fieldNumber == aliuidFieldNumber) {
fieldName = "aliuid";
maxAllowed = 16;
fieldOversized = (length > maxAllowed);
}

if (fieldOversized) {
LOG_WARNING(sLogger,
("field exceeds size limit", fieldName)("field_number", fieldNumber)(
"length", length)("max_allowed", maxAllowed));
result = false;
break;
}

if (!codedInput.Skip(length)) {
LOG_ERROR(sLogger, ("failed to skip field content during partial parsing", fieldNumber));
break;
}
} else {
LOG_ERROR(sLogger, ("failed to read field length from partial data", filename));
break;
}
} else {
if (!google::protobuf::internal::WireFormatLite::SkipField(&codedInput, tag)) {
LOG_ERROR(sLogger, ("failed to skip field during partial parsing", fieldNumber));
break;
}
}
}

if (!result) {
LOG_WARNING(sLogger,
("skip large buffer data due to oversized key fields",
filename)("encodedInfoSize", encodedInfoSize)("BUFFER_DATA_MAX_SIZE", BUFFER_DATA_MAX_SIZE));

AlarmManager::GetInstance()->SendAlarm(DISCARD_SECONDARY_ALARM,
"skip buffer data with oversized key fields, filename: " + filename
+ ", encodedInfoSize: " + ToString(encodedInfoSize));
}
} else {
LOG_WARNING(sLogger,
("failed to read partial data for key fields validation", filename)("expected", readSize)("actual",
nbytes));
}

delete[] partialBuffer;
// restore the file pointer to the original position
fseek(fin, currentPos, SEEK_SET);
return result;
}

bool DiskBufferWriter::CheckBufferMetaValidation(const std::string& filename,
const sls_logs::LogtailBufferMeta& bufferMeta) {
if (bufferMeta.project().empty()) {
Expand All @@ -977,6 +1101,17 @@ bool DiskBufferWriter::CheckBufferMetaValidation(const std::string& filename,
filename)("size", bufferMeta.aliuid().size()));
return false;
}

const std::string& aliuid = bufferMeta.aliuid();
for (size_t i = 0; i < aliuid.size(); ++i) {
if (!isdigit(static_cast<unsigned char>(aliuid[i]))) {
LOG_ERROR(sLogger,
("send disk buffer fail", "aliuid contains non-digit character")("filename", filename)("aliuid",
aliuid));
return false;
}
}

if (sizeof(bufferMeta) > BUFFER_META_MAX_SIZE) {
LOG_ERROR(
sLogger,
Expand Down
5 changes: 5 additions & 0 deletions core/plugin/flusher/sls/DiskBufferWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ class DiskBufferWriter {
private:
static const int32_t BUFFER_META_BASE_SIZE;
static const size_t BUFFER_META_MAX_SIZE;
static const int32_t BUFFER_DATA_MAX_SIZE;
static const int32_t PARTIAL_PREREAD_SIZE;

struct EncryptionStateMeta {
int32_t mLogDataSize;
Expand Down Expand Up @@ -94,6 +96,9 @@ class DiskBufferWriter {
std::string GetBufferFileHeader();
bool CheckBufferMetaValidation(const std::string& filename, const sls_logs::LogtailBufferMeta& bufferMeta);

bool
PartialReadAndValidateKeyFields(FILE* fin, int32_t encodedInfoSize, bool isPbMeta, const std::string& filename);

SafeQueue<SenderQueueItem*> mQueue;

std::future<void> mBufferWriterThreadRes;
Expand Down
Loading