Skip to content

Commit

Permalink
Fix comments
Browse files Browse the repository at this point in the history
Signed-off-by: PengFei Li <[email protected]>
  • Loading branch information
banmoy committed Nov 14, 2024
1 parent 75eadaf commit 73da818
Show file tree
Hide file tree
Showing 9 changed files with 64 additions and 68 deletions.
8 changes: 4 additions & 4 deletions be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -282,13 +282,13 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, StreamLoadContext* ct
return Status::InternalError("no valid Basic authorization");
}

if (!http_req->header(HTTP_ENABLE_BATCH_WRITE).empty()) {
auto value = http_req->header(HTTP_ENABLE_BATCH_WRITE);
if (!http_req->header(HTTP_ENABLE_MERGE_COMMIT).empty()) {
auto value = http_req->header(HTTP_ENABLE_MERGE_COMMIT);
StringParser::ParseResult parse_result = StringParser::PARSE_SUCCESS;
ctx->enable_batch_write = StringParser::string_to_bool(value.c_str(), value.length(), &parse_result);
if (UNLIKELY(parse_result != StringParser::PARSE_SUCCESS)) {
return Status::InvalidArgument(fmt::format(
"Invalid parameter {}. The value must be be bool type, but is {}", HTTP_ENABLE_BATCH_WRITE, value));
return Status::InvalidArgument(fmt::format("Invalid parameter {}. The value must be bool type, but is {}",
HTTP_ENABLE_MERGE_COMMIT, value));
}
}

Expand Down
8 changes: 4 additions & 4 deletions be/src/http/http_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,10 @@ static const std::string HTTP_CHANNEL_ID = "channel_id";
static const std::string HTTP_COMPRESSION = "compression";

// Headers for batch write ==========================
static const std::string HTTP_ENABLE_BATCH_WRITE = "enable_merge_commit";
static const std::string HTTP_BATCH_WRITE_ASYNC = "merge_commit_async";
static const std::string HTTP_BATCH_WRITE_INTERVAL_MS = "merge_commit_interval_ms";
static const std::string HTTP_BATCH_WRITE_PARALLEL = "merge_commit_parallel";
static const std::string HTTP_ENABLE_MERGE_COMMIT = "enable_merge_commit";
static const std::string HTTP_MERGE_COMMIT_ASYNC = "merge_commit_async";
static const std::string HTTP_MERGE_COMMIT_INTERVAL_MS = "merge_commit_interval_ms";
static const std::string HTTP_MERGE_COMMIT_PARALLEL = "merge_commit_parallel";

static const std::string HTTP_WAREHOUSE = "warehouse";

Expand Down
12 changes: 6 additions & 6 deletions be/src/runtime/batch_write/batch_write_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,24 +154,24 @@ void BatchWriteMgr::receive_stream_load_rpc(ExecEnv* exec_env, brpc::Controller*
ctx->db = request->db();
ctx->table = request->table();
std::map<std::string, std::string> parameters;
for (const PKeyValue& kv : request->parameters()) {
parameters.emplace(kv.key(), kv.value());
for (const PStringPair& pair : request->parameters()) {
parameters.emplace(pair.key(), pair.val());
}

{
auto value = GET_PARAMETER_OR_EMPTY(parameters, HTTP_ENABLE_BATCH_WRITE);
auto value = GET_PARAMETER_OR_EMPTY(parameters, HTTP_ENABLE_MERGE_COMMIT);
StringParser::ParseResult parse_result = StringParser::PARSE_SUCCESS;
ctx->enable_batch_write = StringParser::string_to_bool(value.c_str(), value.length(), &parse_result);
if (UNLIKELY(parse_result != StringParser::PARSE_SUCCESS)) {
ASSIGN_AND_RETURN(ctx->status, Status::InvalidArgument(fmt::format(
"Invalid parameter {}. The value must be be bool type, but is {}",
HTTP_ENABLE_BATCH_WRITE, value)));
"Invalid parameter {}. The value must be bool type, but is {}",
HTTP_ENABLE_MERGE_COMMIT, value)));
}
if (!ctx->enable_batch_write) {
ASSIGN_AND_RETURN(ctx->status,
Status::InvalidArgument(fmt::format(
"RPC interface only support batch write currently. Must set {} to true",
HTTP_ENABLE_BATCH_WRITE, value)));
HTTP_ENABLE_MERGE_COMMIT, value)));
}
}
ctx->label = GET_PARAMETER_OR_EMPTY(parameters, HTTP_LABEL_KEY);
Expand Down
8 changes: 4 additions & 4 deletions be/src/runtime/batch_write/batch_write_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ const std::vector<std::string> LOAD_PARAMETER_NAMES = {HTTP_FORMAT_KEY,
HTTP_LOG_REJECTED_RECORD_NUM,
HTTP_COMPRESSION,
HTTP_WAREHOUSE,
HTTP_ENABLE_BATCH_WRITE,
HTTP_BATCH_WRITE_ASYNC,
HTTP_BATCH_WRITE_INTERVAL_MS,
HTTP_BATCH_WRITE_PARALLEL,
HTTP_ENABLE_MERGE_COMMIT,
HTTP_MERGE_COMMIT_ASYNC,
HTTP_MERGE_COMMIT_INTERVAL_MS,
HTTP_MERGE_COMMIT_PARALLEL,
HTTP_COLUMN_SEPARATOR,
HTTP_ROW_DELIMITER,
HTTP_TRIM_SPACE,
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/batch_write/isomorphic_batch_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ IsomorphicBatchWrite::IsomorphicBatchWrite(BatchWriteId batch_write_id, bthreads

Status IsomorphicBatchWrite::init() {
TEST_ERROR_POINT("IsomorphicBatchWrite::init::error");
auto it = _batch_write_id.load_params.find(HTTP_BATCH_WRITE_ASYNC);
auto it = _batch_write_id.load_params.find(HTTP_MERGE_COMMIT_ASYNC);
if (it != _batch_write_id.load_params.end()) {
_batch_write_async = it->second == "true";
}
Expand Down
26 changes: 13 additions & 13 deletions be/test/http/stream_load_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -399,9 +399,9 @@ TEST_F(StreamLoadActionTest, batch_write_csv) {
request._params.emplace(HTTP_DB_KEY, "db");
request._params.emplace(HTTP_TABLE_KEY, "tbl");
request._headers.emplace(HTTP_LABEL_KEY, "batch_write_csv");
request._headers.emplace(HTTP_ENABLE_BATCH_WRITE, "true");
request._headers.emplace(HTTP_BATCH_WRITE_INTERVAL_MS, "1000");
request._headers.emplace(HTTP_BATCH_WRITE_ASYNC, "true");
request._headers.emplace(HTTP_ENABLE_MERGE_COMMIT, "true");
request._headers.emplace(HTTP_MERGE_COMMIT_INTERVAL_MS, "1000");
request._headers.emplace(HTTP_MERGE_COMMIT_ASYNC, "true");

std::string content = "a|b|c|d";
struct evhttp_request ev_req;
Expand Down Expand Up @@ -435,9 +435,9 @@ TEST_F(StreamLoadActionTest, batch_write_csv) {
[](void* arg) { *(Status*)arg = Status::OK(); });
action.handle(&request);
ASSERT_TRUE(ctx->status.ok());
std::map<std::string, std::string> load_params = {{HTTP_ENABLE_BATCH_WRITE, "true"},
{HTTP_BATCH_WRITE_INTERVAL_MS, "1000"},
{HTTP_BATCH_WRITE_ASYNC, "true"},
std::map<std::string, std::string> load_params = {{HTTP_ENABLE_MERGE_COMMIT, "true"},
{HTTP_MERGE_COMMIT_INTERVAL_MS, "1000"},
{HTTP_MERGE_COMMIT_ASYNC, "true"},
{HTTP_FORMAT_KEY, "csv"},
{HTTP_COLUMN_SEPARATOR, "|"}};
ASSERT_EQ(load_params, ctx->load_parameters);
Expand All @@ -463,9 +463,9 @@ TEST_F(StreamLoadActionTest, batch_write_json) {
request._params.emplace(HTTP_DB_KEY, "db");
request._params.emplace(HTTP_TABLE_KEY, "tbl");
request._headers.emplace(HTTP_LABEL_KEY, "batch_write_csv");
request._headers.emplace(HTTP_ENABLE_BATCH_WRITE, "true");
request._headers.emplace(HTTP_BATCH_WRITE_INTERVAL_MS, "1000");
request._headers.emplace(HTTP_BATCH_WRITE_ASYNC, "true");
request._headers.emplace(HTTP_ENABLE_MERGE_COMMIT, "true");
request._headers.emplace(HTTP_MERGE_COMMIT_INTERVAL_MS, "1000");
request._headers.emplace(HTTP_MERGE_COMMIT_ASYNC, "true");

std::string content = "{\"c0\":\"a\",\"c1\":\"b\"}";
struct evhttp_request ev_req;
Expand Down Expand Up @@ -498,9 +498,9 @@ TEST_F(StreamLoadActionTest, batch_write_json) {
[](void* arg) { *(Status*)arg = Status::OK(); });
action.handle(&request);
ASSERT_TRUE(ctx->status.ok());
std::map<std::string, std::string> load_params = {{HTTP_ENABLE_BATCH_WRITE, "true"},
{HTTP_BATCH_WRITE_INTERVAL_MS, "1000"},
{HTTP_BATCH_WRITE_ASYNC, "true"},
std::map<std::string, std::string> load_params = {{HTTP_ENABLE_MERGE_COMMIT, "true"},
{HTTP_MERGE_COMMIT_INTERVAL_MS, "1000"},
{HTTP_MERGE_COMMIT_ASYNC, "true"},
{HTTP_FORMAT_KEY, "json"}};
ASSERT_EQ(load_params, ctx->load_parameters);
ASSERT_EQ(content, std::string(ctx->buffer->ptr, ctx->buffer->limit));
Expand All @@ -522,7 +522,7 @@ TEST_F(StreamLoadActionTest, enable_batch_write_wrong_argument) {
request._params.emplace(HTTP_DB_KEY, "db");
request._params.emplace(HTTP_TABLE_KEY, "tbl");
request._headers.emplace(HttpHeaders::AUTHORIZATION, "Basic cm9vdDo=");
request._headers.emplace(HTTP_ENABLE_BATCH_WRITE, "abc");
request._headers.emplace(HTTP_ENABLE_MERGE_COMMIT, "abc");
request.set_handler(&action);
action.on_header(&request);
action.handle(&request);
Expand Down
46 changes: 23 additions & 23 deletions be/test/runtime/batch_write/batch_write_mgr_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ TEST_F(BatchWriteMgrTest, register_and_unregister_pipe) {

TEST_F(BatchWriteMgrTest, append_data) {
BatchWriteId batch_write_id = {
"db1", "table1", {{HTTP_ENABLE_BATCH_WRITE, "true"}, {HTTP_BATCH_WRITE_ASYNC, "true"}}};
"db1", "table1", {{HTTP_ENABLE_MERGE_COMMIT, "true"}, {HTTP_MERGE_COMMIT_ASYNC, "true"}}};
auto status_or_ctx = BatchWriteMgr::create_and_register_pipe(_exec_env, _batch_write_mgr.get(), batch_write_id.db,
batch_write_id.table, batch_write_id.load_params,
"label1", 1, generate_uuid(), 1000);
Expand Down Expand Up @@ -202,11 +202,11 @@ TEST_F(BatchWriteMgrTest, stop) {
ASSERT_TRUE(_batch_write_mgr->append_data(data_ctx).is_service_unavailable());
}

#define ADD_KEY_VALUE(request, key, value) \
{ \
PKeyValue* kv = request.add_parameters(); \
kv->set_key(key); \
kv->set_value(value); \
#define ADD_KEY_VALUE(request, key, value) \
{ \
PStringPair* pair = request.add_parameters(); \
pair->set_key(key); \
pair->set_val(value); \
}

TEST_F(BatchWriteMgrTest, stream_load_rpc_success) {
Expand All @@ -225,9 +225,9 @@ TEST_F(BatchWriteMgrTest, stream_load_rpc_success) {
request.set_user("root");
request.set_passwd("123456");
ADD_KEY_VALUE(request, "label", "test1");
ADD_KEY_VALUE(request, HTTP_ENABLE_BATCH_WRITE, "true");
ADD_KEY_VALUE(request, HTTP_BATCH_WRITE_INTERVAL_MS, "1000");
ADD_KEY_VALUE(request, HTTP_BATCH_WRITE_ASYNC, "true");
ADD_KEY_VALUE(request, HTTP_ENABLE_MERGE_COMMIT, "true");
ADD_KEY_VALUE(request, HTTP_MERGE_COMMIT_INTERVAL_MS, "1000");
ADD_KEY_VALUE(request, HTTP_MERGE_COMMIT_ASYNC, "true");
ADD_KEY_VALUE(request, HTTP_TIMEOUT, "60");
ADD_KEY_VALUE(request, HTTP_FORMAT_KEY, "json");
std::string data = "{\"c0\":\"a\",\"c1\":\"b\"}";
Expand All @@ -242,9 +242,9 @@ TEST_F(BatchWriteMgrTest, stream_load_rpc_success) {
EXPECT_EQ("123456", ctx->auth.passwd);
EXPECT_TRUE(ctx->enable_batch_write);
EXPECT_EQ(60, ctx->timeout_second);
std::map<std::string, std::string> load_params = {{HTTP_ENABLE_BATCH_WRITE, "true"},
{HTTP_BATCH_WRITE_INTERVAL_MS, "1000"},
{HTTP_BATCH_WRITE_ASYNC, "true"},
std::map<std::string, std::string> load_params = {{HTTP_ENABLE_MERGE_COMMIT, "true"},
{HTTP_MERGE_COMMIT_INTERVAL_MS, "1000"},
{HTTP_MERGE_COMMIT_ASYNC, "true"},
{HTTP_TIMEOUT, "60"},
{HTTP_FORMAT_KEY, "json"}};
EXPECT_EQ(load_params, ctx->load_parameters);
Expand All @@ -262,7 +262,7 @@ TEST_F(BatchWriteMgrTest, stream_load_rpc_success) {

TEST_F(BatchWriteMgrTest, stream_load_rpc_fail) {
std::string data = "{\"c0\":\"a\",\"c1\":\"b\"}";
// HTTP_ENABLE_BATCH_WRITE is invalid
// HTTP_ENABLE_MERGE_COMMIT is invalid
{
brpc::Controller cntl;
cntl.request_attachment().append(data);
Expand All @@ -272,7 +272,7 @@ TEST_F(BatchWriteMgrTest, stream_load_rpc_fail) {
request.set_user("root");
request.set_passwd("123456");
ADD_KEY_VALUE(request, "label", "test1");
ADD_KEY_VALUE(request, HTTP_ENABLE_BATCH_WRITE, "abc");
ADD_KEY_VALUE(request, HTTP_ENABLE_MERGE_COMMIT, "abc");
PStreamLoadResponse response;
_batch_write_mgr->receive_stream_load_rpc(_exec_env, &cntl, &request, &response);
rapidjson::Document doc;
Expand All @@ -281,7 +281,7 @@ TEST_F(BatchWriteMgrTest, stream_load_rpc_fail) {
ASSERT_NE(nullptr, std::strstr(doc["Message"].GetString(), "Invalid parameter enable_merge_commit"));
}

// HTTP_ENABLE_BATCH_WRITE is false
// HTTP_ENABLE_MERGE_COMMIT is false
{
brpc::Controller cntl;
cntl.request_attachment().append(data);
Expand All @@ -291,7 +291,7 @@ TEST_F(BatchWriteMgrTest, stream_load_rpc_fail) {
request.set_user("root");
request.set_passwd("123456");
ADD_KEY_VALUE(request, "label", "test1");
ADD_KEY_VALUE(request, HTTP_ENABLE_BATCH_WRITE, "false");
ADD_KEY_VALUE(request, HTTP_ENABLE_MERGE_COMMIT, "false");
PStreamLoadResponse response;
_batch_write_mgr->receive_stream_load_rpc(_exec_env, &cntl, &request, &response);
rapidjson::Document doc;
Expand All @@ -310,7 +310,7 @@ TEST_F(BatchWriteMgrTest, stream_load_rpc_fail) {
request.set_user("root");
request.set_passwd("123456");
ADD_KEY_VALUE(request, "label", "test1");
ADD_KEY_VALUE(request, HTTP_ENABLE_BATCH_WRITE, "true");
ADD_KEY_VALUE(request, HTTP_ENABLE_MERGE_COMMIT, "true");
ADD_KEY_VALUE(request, HTTP_TIMEOUT, "abc");
PStreamLoadResponse response;
_batch_write_mgr->receive_stream_load_rpc(_exec_env, &cntl, &request, &response);
Expand All @@ -330,9 +330,9 @@ TEST_F(BatchWriteMgrTest, stream_load_rpc_fail) {
request.set_user("root");
request.set_passwd("123456");
ADD_KEY_VALUE(request, "label", "test1");
ADD_KEY_VALUE(request, HTTP_ENABLE_BATCH_WRITE, "true");
ADD_KEY_VALUE(request, HTTP_BATCH_WRITE_INTERVAL_MS, "1000");
ADD_KEY_VALUE(request, HTTP_BATCH_WRITE_ASYNC, "true");
ADD_KEY_VALUE(request, HTTP_ENABLE_MERGE_COMMIT, "true");
ADD_KEY_VALUE(request, HTTP_MERGE_COMMIT_INTERVAL_MS, "1000");
ADD_KEY_VALUE(request, HTTP_MERGE_COMMIT_ASYNC, "true");
ADD_KEY_VALUE(request, HTTP_FORMAT_KEY, "json");
PStreamLoadResponse response;
_batch_write_mgr->receive_stream_load_rpc(_exec_env, &cntl, &request, &response);
Expand All @@ -357,9 +357,9 @@ TEST_F(BatchWriteMgrTest, stream_load_rpc_fail) {
request.set_user("root");
request.set_passwd("123456");
ADD_KEY_VALUE(request, "label", "test1");
ADD_KEY_VALUE(request, HTTP_ENABLE_BATCH_WRITE, "true");
ADD_KEY_VALUE(request, HTTP_BATCH_WRITE_INTERVAL_MS, "1000");
ADD_KEY_VALUE(request, HTTP_BATCH_WRITE_ASYNC, "true");
ADD_KEY_VALUE(request, HTTP_ENABLE_MERGE_COMMIT, "true");
ADD_KEY_VALUE(request, HTTP_MERGE_COMMIT_INTERVAL_MS, "1000");
ADD_KEY_VALUE(request, HTTP_MERGE_COMMIT_ASYNC, "true");
ADD_KEY_VALUE(request, HTTP_FORMAT_KEY, "json");
PStreamLoadResponse response;
SyncPoint::GetInstance()->SetCallBack("BatchWriteMgr::append_data::fail", [](void* arg) {
Expand Down
4 changes: 2 additions & 2 deletions be/test/runtime/batch_write/isomorphic_batch_write_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ TEST_F(IsomorphicBatchWriteTest, register_and_unregister_pipe) {
}

TEST_F(IsomorphicBatchWriteTest, append_data_async) {
BatchWriteId batch_write_id{.db = "db", .table = "table", .load_params = {{HTTP_BATCH_WRITE_ASYNC, "true"}}};
BatchWriteId batch_write_id{.db = "db", .table = "table", .load_params = {{HTTP_MERGE_COMMIT_ASYNC, "true"}}};
IsomorphicBatchWriteSharedPtr batch_write = std::make_shared<IsomorphicBatchWrite>(batch_write_id, _executor.get());
ASSERT_OK(batch_write->init());
DeferOp defer_writer([&] { batch_write->stop(); });
Expand Down Expand Up @@ -223,7 +223,7 @@ TEST_F(IsomorphicBatchWriteTest, append_data_sync) {
void IsomorphicBatchWriteTest::test_append_data_sync_base(const Status& rpc_status, TTransactionStatus::type txn_status,
const Status& expect_st) {
BatchWriteId batch_write_id{
.db = "db", .table = "table", .load_params = {{HTTP_BATCH_WRITE_ASYNC, "false"}, {HTTP_TIMEOUT, "1"}}};
.db = "db", .table = "table", .load_params = {{HTTP_MERGE_COMMIT_ASYNC, "false"}, {HTTP_TIMEOUT, "1"}}};
IsomorphicBatchWriteSharedPtr batch_write = std::make_shared<IsomorphicBatchWrite>(batch_write_id, _executor.get());
ASSERT_OK(batch_write->init());
DeferOp defer_writer([&] { batch_write->stop(); });
Expand Down
18 changes: 7 additions & 11 deletions gensrc/proto/internal_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -639,21 +639,17 @@ message PProcessDictionaryCacheResult {
optional int64 dictionary_memory_usage = 2;
};

message PKeyValue {
optional string key = 1;
optional string value = 2;
};

// Data will be sent as the attachment of the request
message PStreamLoadRequest {
optional string db = 1;
optional string table = 2;
optional string user = 3;
optional string passwd = 4;
repeated PKeyValue parameters = 5;
optional string db = 1;
optional string table = 2;
optional string user = 3;
optional string passwd = 4;
repeated PStringPair parameters = 5;
}

message PStreamLoadResponse {
optional string json_result = 1;
optional string json_result = 1;
}

// NOTE(zc): If you want to add new method here,
Expand Down

0 comments on commit 73da818

Please sign in to comment.