Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(make_idempotent): support making incr request idempotent in pegasus_server_write and replication_app_base #2196

Merged
merged 14 commits into from
Feb 21, 2025
2 changes: 1 addition & 1 deletion .clang-tidy
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
CheckOptions: []
# Disable some checks that are not useful for us now.
# They are sorted by names, and should be consistent to build_tools/clang_tidy.py.
Checks: 'abseil-*,boost-*,bugprone-*,cert-*,clang-analyzer-*,concurrency-*,cppcoreguidelines-*,darwin-*,fuchsia-*,google-*,hicpp-*,linuxkernel-*,llvm-*,misc-*,modernize-*,performance-*,portability-*,readability-*,-bugprone-easily-swappable-parameters,-bugprone-lambda-function-name,-bugprone-macro-parentheses,-cert-err58-cpp,-concurrency-mt-unsafe,-cppcoreguidelines-avoid-c-arrays,-cppcoreguidelines-avoid-magic-numbers,-cppcoreguidelines-avoid-non-const-global-variables,-cppcoreguidelines-macro-usage,-cppcoreguidelines-non-private-member-variables-in-classes,-cppcoreguidelines-owning-memory,-cppcoreguidelines-pro-bounds-array-to-pointer-decay,-cppcoreguidelines-pro-bounds-pointer-arithmetic,-cppcoreguidelines-pro-type-const-cast,-cppcoreguidelines-pro-type-union-access,-fuchsia-default-arguments-calls,-fuchsia-overloaded-operator,-fuchsia-statically-constructed-objects,-google-readability-avoid-underscore-in-googletest-name,-hicpp-avoid-c-arrays,-hicpp-named-parameter,-hicpp-no-array-decay,-llvm-include-order,-misc-definitions-in-headers,-misc-non-private-member-variables-in-classes,-misc-unused-parameters,-modernize-avoid-bind,-modernize-avoid-c-arrays,-modernize-replace-disallow-copy-and-assign-macro,-modernize-use-trailing-return-type,-performance-unnecessary-value-param,-readability-function-cognitive-complexity,-readability-identifier-length,-readability-magic-numbers,-readability-named-parameter,-readability-suspicious-call-argument'
Checks: 'abseil-*,boost-*,bugprone-*,cert-*,clang-analyzer-*,concurrency-*,cppcoreguidelines-*,darwin-*,fuchsia-*,google-*,hicpp-*,linuxkernel-*,llvm-*,misc-*,modernize-*,performance-*,portability-*,readability-*,-bugprone-easily-swappable-parameters,-bugprone-lambda-function-name,-bugprone-macro-parentheses,-bugprone-sizeof-expression,-cert-err58-cpp,-concurrency-mt-unsafe,-cppcoreguidelines-avoid-c-arrays,-cppcoreguidelines-avoid-magic-numbers,-cppcoreguidelines-avoid-non-const-global-variables,-cppcoreguidelines-macro-usage,-cppcoreguidelines-non-private-member-variables-in-classes,-cppcoreguidelines-owning-memory,-cppcoreguidelines-pro-bounds-array-to-pointer-decay,-cppcoreguidelines-pro-bounds-pointer-arithmetic,-cppcoreguidelines-pro-type-const-cast,-cppcoreguidelines-pro-type-union-access,-fuchsia-default-arguments-calls,-fuchsia-multiple-inheritance,-fuchsia-overloaded-operator,-fuchsia-statically-constructed-objects,-google-readability-avoid-underscore-in-googletest-name,-hicpp-avoid-c-arrays,-hicpp-named-parameter,-hicpp-no-array-decay,-llvm-include-order,-misc-definitions-in-headers,-misc-non-private-member-variables-in-classes,-misc-unused-parameters,-modernize-avoid-bind,-modernize-avoid-c-arrays,-modernize-replace-disallow-copy-and-assign-macro,-modernize-use-trailing-return-type,-performance-unnecessary-value-param,-readability-function-cognitive-complexity,-readability-identifier-length,-readability-magic-numbers,-readability-named-parameter,-readability-suspicious-call-argument'
ExtraArgs:
ExtraArgsBefore: []
FormatStyle: none
Expand Down
2 changes: 2 additions & 0 deletions build_tools/clang_tidy.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ def tidy_on_path(path):
"-checks=-bugprone-easily-swappable-parameters,"
"-bugprone-lambda-function-name,"
"-bugprone-macro-parentheses,"
"-bugprone-sizeof-expression,"
"-cert-err58-cpp,"
"-concurrency-mt-unsafe,"
"-cppcoreguidelines-avoid-c-arrays,"
Expand All @@ -78,6 +79,7 @@ def tidy_on_path(path):
"-cppcoreguidelines-pro-type-const-cast,"
"-cppcoreguidelines-pro-type-union-access,"
"-fuchsia-default-arguments-calls,"
"-fuchsia-multiple-inheritance,"
"-fuchsia-overloaded-operator,"
"-fuchsia-statically-constructed-objects,"
"-google-readability-avoid-underscore-in-googletest-name,"
Expand Down
11 changes: 5 additions & 6 deletions src/redis_protocol/proxy_ut/redis_proxy_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,16 +112,15 @@ class redis_test_parser : public redis_parser
_reserved_entry[index]->request.sub_requests = std::move(msg.sub_requests);
}

static dsn::message_ex *create_message(const char *data)
static dsn::message_ex *create_message(const char *data, unsigned int length)
{
return dsn::message_ex::create_received_request(
RPC_CALL_RAW_MESSAGE, dsn::DSF_THRIFT_BINARY, (void *)data, strlen(data));
RPC_CALL_RAW_MESSAGE, dsn::DSF_THRIFT_BINARY, data, length);
}

static dsn::message_ex *create_message(const char *data, int length)
static dsn::message_ex *create_message(const char *data)
{
return dsn::message_ex::create_received_request(
RPC_CALL_RAW_MESSAGE, dsn::DSF_THRIFT_BINARY, (void *)data, length);
return create_message(data, strlen(data));
}

static dsn::message_ex *marshalling_array(const redis_request &request)
Expand Down Expand Up @@ -379,7 +378,7 @@ TEST_F(proxy_test, test_random_cases)
offsets.insert(total_body_size);

int last_offset = 0;
for (int offset : offsets) {
for (auto offset : offsets) {
dsn::message_ex *msg = redis_test_parser::create_message(msg_buffer_ptr + last_offset,
offset - last_offset);
ASSERT_TRUE(parse(msg));
Expand Down
5 changes: 5 additions & 0 deletions src/replica/mutation.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,11 @@ class mutation : public ref_counter
// user requests
std::vector<dsn::message_ex *> client_requests;

// The original request received from the client. While making an atomic request (incr,
// check_and_set and check_and_mutate) idempotent, an extra variable is needed to hold
// its original request for the purpose of replying to the client.
dsn::message_ptr original_request;

// used by pending mutation queue only
mutation *next;

Expand Down
52 changes: 29 additions & 23 deletions src/replica/replication_app_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
* THE SOFTWARE.
*/

#include <alloca.h>
#include <fmt/core.h>
#include <rocksdb/env.h>
#include <rocksdb/status.h>
Expand All @@ -47,6 +46,7 @@
#include "rpc/serialization.h"
#include "task/task_code.h"
#include "task/task_spec.h"
#include "utils/alloc.h"
#include "utils/autoref_ptr.h"
#include "utils/binary_reader.h"
#include "utils/binary_writer.h"
Expand Down Expand Up @@ -264,7 +264,8 @@ error_code replication_app_base::apply_checkpoint(chkpt_apply_mode mode, const l
int replication_app_base::on_batched_write_requests(int64_t decree,
uint64_t timestamp,
message_ex **requests,
int request_length)
int request_length,
message_ex *original_request)
{
int storage_error = rocksdb::Status::kOk;
for (int i = 0; i < request_length; ++i) {
Expand Down Expand Up @@ -292,37 +293,42 @@ error_code replication_app_base::apply_mutation(const mutation *mu)
}

bool has_ingestion_request = false;
int request_count = static_cast<int>(mu->client_requests.size());
message_ex **batched_requests = (message_ex **)alloca(sizeof(message_ex *) * request_count);
message_ex **faked_requests = (message_ex **)alloca(sizeof(message_ex *) * request_count);
const int request_count = static_cast<int>(mu->client_requests.size());
auto **batched_requests = ALLOC_STACK(message_ex *, request_count);
auto **faked_requests = ALLOC_STACK(message_ex *, request_count);
int batched_count = 0; // write-empties are not included.
int faked_count = 0;
for (int i = 0; i < request_count; i++) {
for (int i = 0; i < request_count; ++i) {
const mutation_update &update = mu->data.updates[i];
message_ex *req = mu->client_requests[i];
LOG_DEBUG_PREFIX("mutation {} #{}: dispatch rpc call {}", mu->name(), i, update.code);
if (update.code != RPC_REPLICATION_WRITE_EMPTY) {
if (req == nullptr) {
req = message_ex::create_received_request(
update.code,
(dsn_msg_serialize_format)update.serialization_type,
(void *)update.data.data(),
update.data.length());
faked_requests[faked_count++] = req;
}
if (update.code == RPC_REPLICATION_WRITE_EMPTY) {
continue;
}

batched_requests[batched_count++] = req;
if (update.code == apps::RPC_RRDB_RRDB_BULK_LOAD) {
has_ingestion_request = true;
}
message_ex *req = mu->client_requests[i];
if (req == nullptr) {
req = message_ex::create_received_request(
update.code,
static_cast<dsn_msg_serialize_format>(update.serialization_type),
update.data.data(),
update.data.length());
faked_requests[faked_count++] = req;
}

batched_requests[batched_count++] = req;
if (update.code == apps::RPC_RRDB_RRDB_BULK_LOAD) {
has_ingestion_request = true;
}
}

int storage_error = on_batched_write_requests(
mu->data.header.decree, mu->data.header.timestamp, batched_requests, batched_count);
const int storage_error = on_batched_write_requests(mu->data.header.decree,
mu->data.header.timestamp,
batched_requests,
batched_count,
mu->original_request);

// release faked requests
for (int i = 0; i < faked_count; i++) {
for (int i = 0; i < faked_count; ++i) {
faked_requests[i]->release_ref();
}

Expand Down
53 changes: 41 additions & 12 deletions src/replica/replication_app_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,16 +129,16 @@ class replication_app_base : public replica_base
static const std::string kDataDir;
static const std::string kRdbDir;

virtual ~replication_app_base() {}
virtual ~replication_app_base() = default;

bool is_primary() const;
[[nodiscard]] bool is_primary() const;

// Whether this replica is duplicating as master.
virtual bool is_duplication_master() const;
[[nodiscard]] virtual bool is_duplication_master() const;
// Whether this replica is duplicating as follower.
virtual bool is_duplication_follower() const;
[[nodiscard]] virtual bool is_duplication_follower() const;

const ballot &get_ballot() const;
[[nodiscard]] const ballot &get_ballot() const;

//
// Open the app.
Expand Down Expand Up @@ -248,21 +248,50 @@ class replication_app_base : public replica_base
// The return type is generated by storage engine, e.g. rocksdb::Status::Code, 0 always mean OK.
virtual int on_request(message_ex *request) WARN_UNUSED_RESULT = 0;

// Make an atomic request received from the client idempotent. Only called by primary replicas.
//
// Current implementation for atomic requests (incr, check_and_set and check_and_mutate) is
// not idempotent. This function is used to translate them into requests like single put
// which is naturally idempotent.
//
// For the other requests which must be idempotent such as single put/remove or non-batch
// writes, this function would do nothing.
//
// Parameters:
// - timestamp: an incremental timestamp generated for this batch of requests.
// - request: the original request received from a client.
// - new_request: as the output parameter pointing to the resulting idempotent request if the
// original request is atomic, otherwise keeping unchanged.
//
// The base class gives a naive implementation that just call on_request
// repeatedly. Storage engine may override this function to get better performance.
// Return:
// - for an idempotent requess always return rocksdb::Status::kOk .
// - for an atomic request, return rocksdb::Status::kOk if succeed in making it idempotent;
// otherwise, return error code (rocksdb::Status::Code).
virtual int make_idempotent(dsn::message_ex *request, dsn::message_ex **new_request) = 0;

// Apply batched write requests from a mutation. This is a virtual function, and base class
// provide a naive implementation that just call on_request for each request. Storage engine
// may override this function to get better performance.
//
// The return type is generated by storage engine, e.g. rocksdb::Status::Code, 0 always mean OK.
// Parameters:
// - decree: the decree of the mutation which these requests are batched into.
// - timestamp: an incremental timestamp generated for this batch of requests.
// - requests: the requests to be applied.
// - request_length: the number of the requests.
// - original_request: the original request received from the client. Must be an atomic
// request (i.e. incr, check_and_set and check_and_mutate) if non-null, and another
// parameter `requests` must hold the idempotent request translated from it. Used to
// reply to the client.
//
// Return rocksdb::Status::kOk or some error code (rocksdb::Status::Code) if these requests
// failed to be applied by storage engine.
virtual int on_batched_write_requests(int64_t decree,
uint64_t timestamp,
message_ex **requests,
int request_length);
int request_length,
message_ex *original_request);

// query compact state.
virtual std::string query_compact_state() const = 0;
// Query compact state.
[[nodiscard]] virtual std::string query_compact_state() const = 0;

// update app envs.
virtual void update_app_envs(const std::map<std::string, std::string> &envs) = 0;
Expand Down
17 changes: 14 additions & 3 deletions src/replica/storage/simple_kv/simple_kv.server.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,25 @@ namespace application {
class simple_kv_service : public replication_app_base, public storage_serverlet<simple_kv_service>
{
public:
simple_kv_service(replica *r) : replication_app_base(r) {}
virtual ~simple_kv_service() {}
explicit simple_kv_service(replica *r) : replication_app_base(r) {}
~simple_kv_service() override = default;

virtual int on_request(dsn::message_ex *request) override WARN_UNUSED_RESULT
simple_kv_service(const simple_kv_service &) = delete;
simple_kv_service &operator=(const simple_kv_service &) = delete;

simple_kv_service(simple_kv_service &&) = delete;
simple_kv_service &operator=(simple_kv_service &&) = delete;

int on_request(dsn::message_ex *request) override WARN_UNUSED_RESULT
{
return handle_request(request);
}

int make_idempotent(dsn::message_ex *request, dsn::message_ex **new_request) override
{
return rocksdb::Status::kOk;
}

protected:
// all service handlers to be implemented further
// RPC_SIMPLE_KV_SIMPLE_KV_READ
Expand Down
6 changes: 5 additions & 1 deletion src/replica/test/mock_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,11 @@ class mock_replication_app_base : public replication_app_base
return ERR_OK;
}
int on_request(message_ex *request) override WARN_UNUSED_RESULT { return 0; }
std::string query_compact_state() const { return ""; };
int make_idempotent(dsn::message_ex *request, dsn::message_ex **new_request) override
{
return rocksdb::Status::kOk;
}
[[nodiscard]] std::string query_compact_state() const override { return ""; };

// we mock the followings
void update_app_envs(const std::map<std::string, std::string> &envs) override { _envs = envs; }
Expand Down
6 changes: 5 additions & 1 deletion src/rpc/rpc_holder.h
Original file line number Diff line number Diff line change
Expand Up @@ -224,13 +224,17 @@ class rpc_holder
dsn_rpc_forward(dsn_request(), addr);
}

inline void set_auto_reply(bool auto_reply) { _i->auto_reply = auto_reply; }

inline void enable_auto_reply() { set_auto_reply(true); }

// Returns an rpc_holder that will reply the request after its lifetime ends.
// By default rpc_holder never replies.
// SEE: serverlet<T>::register_rpc_handler_with_rpc_holder
static inline rpc_holder auto_reply(message_ex *req)
{
rpc_holder rpc(req);
rpc._i->auto_reply = true;
rpc.enable_auto_reply();
return rpc;
}

Expand Down
8 changes: 4 additions & 4 deletions src/rpc/rpc_message.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,13 +142,13 @@ message_ex *message_ex::create_receive_message(const blob &data)

message_ex *message_ex::create_received_request(dsn::task_code code,
dsn_msg_serialize_format format,
void *buffer,
int size,
const char *buffer,
unsigned int size,
int thread_hash,
uint64_t partition_hash)
{
::dsn::blob bb((const char *)buffer, 0, size);
auto msg = ::dsn::message_ex::create_receive_message_with_standalone_header(bb);
dsn::blob bb(buffer, 0, size);
auto *msg = ::dsn::message_ex::create_receive_message_with_standalone_header(bb);
msg->local_rpc_code = code;
const char *name = code.to_string();
strncpy(msg->header->rpc_name, name, sizeof(msg->header->rpc_name) - 1);
Expand Down
16 changes: 12 additions & 4 deletions src/rpc/rpc_message.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,18 @@ class message_ex : public ref_counter, public extensible_object<message_ex, 4>

static message_ex *create_received_request(dsn::task_code rpc_code,
dsn_msg_serialize_format format,
void *buffer,
int size,
int thread_hash = 0,
uint64_t partition_hash = 0);
const char *buffer,
unsigned int size,
int thread_hash,
uint64_t partition_hash);

static message_ex *create_received_request(dsn::task_code rpc_code,
dsn_msg_serialize_format format,
const char *buffer,
unsigned int size)
{
return create_received_request(rpc_code, format, buffer, size, 0, 0);
}

/// This method is only used for receiving request.
/// The returned message:
Expand Down
24 changes: 14 additions & 10 deletions src/rpc/rpc_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,23 +54,27 @@ class rpc_read_stream : public binary_reader
void set_read_msg(message_ex *msg)
{
_msg = msg;
if (nullptr != _msg) {
::dsn::blob bb;
CHECK(((::dsn::message_ex *)_msg)->read_next(bb),
"read msg must have one segment of buffer ready");
init(std::move(bb));
if (_msg == nullptr) {
return;
}

dsn::blob bb;
CHECK(_msg->read_next(bb), "read msg must have one segment of buffer ready");

init(std::move(bb));
}

int read(char *buffer, int sz) { return inner_read(buffer, sz); }
int read(char *buffer, int sz) override { return inner_read(buffer, sz); }

int read(blob &blob, int len) { return inner_read(blob, len); }
int read(blob &blob, int len) override { return inner_read(blob, len); }

~rpc_read_stream()
~rpc_read_stream() override
{
if (_msg) {
_msg->read_commit((size_t)(total_size() - get_remaining_size()));
if (_msg == nullptr) {
return;
}

_msg->read_commit(static_cast<size_t>(total_size() - get_remaining_size()));
}

private:
Expand Down
Loading
Loading