Skip to content

Commit

Permalink
Change: Refine Log Entry Traits
Browse files Browse the repository at this point in the history
This commit refines the `RaftEntry` and related traits, to better
support application defined log `Entry` type.

Key Changes:

1. Remove `FromAppData` trait:
   - The `FromAppData` trait, which was used to create log `Entry` from
     application data, is removed.
   - Applications should now implement the new `RaftEntry::new()` method
     to create log entries directly.

2. Remove `RaftLogId` trait, it becomes an internal trait.

3. Update `RaftEntry` trait:

   - The `RaftEntry` trait no longer requires `RaftLogId` (due to its
     redefinition) and now mandates the implementation of:

     - `new()`: For creating a log `Entry`.
     - `log_id_parts()`: To return references to the log ID's committed
       leader ID(term) and index.
     - `set_log_id()`: To update the log entry's ID.

   - Default methods are provided:

     - `new_blank()`, `new_normal()`, `new_membership()` for creating
       different types of log entries.
     - `log_id()` returns an owned `LogId` instance.
     - `index()` returns the index of the log entry.

4. Introduce `RefLogId`:
   - `RefLogId` is a reference-based representation of a log ID,
     complementing the existing `LogIdOf<C>` (a storage-based
     implementation).
   - `RefLogId` adds system-defined properties (e.g., `Ord`
     implementation) while referencing an existing `LogIdOf<C>`.
   - Internal components now use `RefLogId` where possible, improving
     flexibility and consistency.

5. Update example `raft-kv-memstore-grpc`:
   - Updated to implement log `Entry` and related types using protobuf,
     including state machine and RPC message types.
   - Added snapshot streaming transmission implementation.
   - Removed `serde` dependency from the example.

- Part of #1278.

---

Upgrade tips:

1. For Applications with Custom `RaftEntry` Implementations:
   If you've declared a custom `RaftEntry` (e.g., `declare_raft_types!(MyTypes: Entry = MyEntry)`):
   - Remove the implementation of `FromAppData`.
   - Implement the following new methods:
     - `new()`
     - `log_id_parts()`
     - `set_log_id()`

2. For Applications Using OpenRaft's Default `Entry`:
   - No changes are required.
  • Loading branch information
drmingdrmer committed Feb 3, 2025
1 parent 791dc3e commit 7bebecb
Show file tree
Hide file tree
Showing 50 changed files with 961 additions and 462 deletions.
1 change: 0 additions & 1 deletion cluster_benchmark/tests/benchmark/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use openraft::storage::Snapshot;
use openraft::Entry;
use openraft::EntryPayload;
use openraft::OptionalSend;
use openraft::RaftLogId;
use openraft::SnapshotMeta;
use openraft::StorageError;
use openraft::StoredMembership;
Expand Down
4 changes: 3 additions & 1 deletion examples/memstore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ license = "MIT OR Apache-2.0"
repository = "https://github.com/databendlabs/openraft"

[dependencies]
openraft = { path = "../../openraft", features = ["serde", "type-alias"] }
openraft = { path = "../../openraft", features = ["type-alias"] }

tokio = { version = "1.0", default-features = false, features = ["sync"] }

[features]

serde = ["openraft/serde"]

[package.metadata.docs.rs]
all-features = true
3 changes: 1 addition & 2 deletions examples/raft-kv-memstore-grpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ path = "src/bin/main.rs"

[dependencies]
memstore = { path = "../memstore", features = [] }
openraft = { path = "../../openraft", features = ["serde", "type-alias"] }
openraft = { path = "../../openraft", features = ["type-alias"] }

clap = { version = "4.1.11", features = ["derive", "env"] }
serde = { version = "1.0.114", features = ["derive"] }
Expand All @@ -30,7 +30,6 @@ tracing = "0.1.29"
tracing-subscriber = { version = "0.3.0", features = ["env-filter"] }
tonic = "0.12.3"
tonic-build = "0.12.3"
bincode = "1.3.3"
dashmap = "6.1.0"
prost = "0.13.4"
futures = "0.3.31"
Expand Down
24 changes: 10 additions & 14 deletions examples/raft-kv-memstore-grpc/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,16 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
// TODO: remove serde

tonic_build::configure()
.type_attribute("openraftpb.Node", "#[derive(Eq, serde::Serialize, serde::Deserialize)]")
.type_attribute(
"openraftpb.SetRequest",
"#[derive(Eq, serde::Serialize, serde::Deserialize)]",
)
.type_attribute(
"openraftpb.Response",
"#[derive(Eq, serde::Serialize, serde::Deserialize)]",
)
.type_attribute(
"openraftpb.LeaderId",
"#[derive(Eq, serde::Serialize, serde::Deserialize)]",
)
.type_attribute("openraftpb.Vote", "#[derive(Eq, serde::Serialize, serde::Deserialize)]")
.btree_map(["."])
.type_attribute("openraftpb.Node", "#[derive(Eq)]")
.type_attribute("openraftpb.SetRequest", "#[derive(Eq)]")
.type_attribute("openraftpb.Response", "#[derive(Eq)]")
.type_attribute("openraftpb.LeaderId", "#[derive(Eq)]")
.type_attribute("openraftpb.Vote", "#[derive(Eq)]")
.type_attribute("openraftpb.NodeIdSet", "#[derive(Eq)]")
.type_attribute("openraftpb.Membership", "#[derive(Eq)]")
.type_attribute("openraftpb.Entry", "#[derive(Eq)]")
.type_attribute("google.protobuf.Empty", "#[derive(Eq)]")
.compile_protos_with_config(config, &proto_files, &["proto"])?;
Ok(())
}
6 changes: 1 addition & 5 deletions examples/raft-kv-memstore-grpc/proto/api_service.proto
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
syntax = "proto3";
import "internal_service.proto";
package openraftpb;

// ApiService provides the key-value store API operations
Expand All @@ -20,8 +21,3 @@ message Response {
optional string value = 1; // Retrieved value
}

// SetRequest represents a key-value pair to be stored
message SetRequest {
string key = 1; // Key to store
string value = 2; // Value to associate with the key
}
130 changes: 109 additions & 21 deletions examples/raft-kv-memstore-grpc/proto/internal_service.proto
Original file line number Diff line number Diff line change
@@ -1,6 +1,19 @@
syntax = "proto3";
import "google/protobuf/empty.proto";
package openraftpb;

// SetRequest represents a key-value pair to be stored
message SetRequest {
string key = 1; // Key to store
string value = 2; // Value to associate with the key
}

// Node represents a single node in the Raft cluster
message Node {
string rpc_addr = 1; // RPC address for node communication
uint64 node_id = 2; // Unique identifier for the node
}

// LeaderId represents the leader identifier in Raft
message LeaderId {
uint64 term = 1;
Expand All @@ -13,12 +26,54 @@ message Vote {
bool committed = 2;
}

message Entry {
uint64 term = 1;
uint64 index = 2;

// Optional Application data
SetRequest app_data = 12;

// Optional Membership config
Membership membership = 13;
}


// NodeIds is a set of NodeIds
message NodeIdSet {
map<uint64, google.protobuf.Empty> node_ids = 1;
}

// Membership config
message Membership {
// Joint(includes more than one NodeIdSet) or uniform(one NodeIdSet) config.
repeated NodeIdSet configs = 1;

// All of the nodes in the cluster, including voters and learners.
// A node id that is included in `configs` is a voter, otherwise it is a learner.
map<uint64, Node> nodes = 2;
}

// LogId represents the log identifier in Raft
message LogId {
uint64 term = 1;
uint64 index = 2;
}

// All the data in a state machine, including user defined data and membership data.
message StateMachineData {
// The last log id that has been applied to the state machine
LogId last_applied = 1;

// User data in a map
map<string, string> data = 2;

// The id of the last membership config log entry that is applied.
LogId last_membership_log_id = 3;

// The last membership config that is applied.
Membership last_membership = 4;
}

// VoteRequest represents a request for votes during leader election
message VoteRequest {
Vote vote = 1;
Expand All @@ -32,26 +87,47 @@ message VoteResponse {
LogId last_log_id = 3;
}

// InternalService handles internal Raft cluster communication
service InternalService {
// Vote handles vote requests between Raft nodes during leader election
rpc Vote(VoteRequest) returns (VoteResponse) {}
message AppendEntriesRequest {
// The leader's vote, used to identify the leader, and must be committed
Vote vote = 1;

// AppendEntries handles call related to append entries RPC
rpc AppendEntries(RaftRequestBytes) returns (RaftReplyBytes) {}
// The previous log id the leader has sent to the follower
LogId prev_log_id = 2;

// Snapshot handles install snapshot RPC
rpc Snapshot(stream SnapshotRequest) returns (RaftReplyBytes) {}
// The entries to be appended to the follower's log
repeated Entry entries = 3;

// The leader's last committed log id
LogId leader_commit = 4;
}

// RaftRequestBytes encapsulates binary Raft request data
message RaftRequestBytes {
bytes value = 1; // Serialized Raft request data
message AppendEntriesResponse {
// If not None, the follower rejected the AppendEntries request due to having a higher vote.
// All other fields are valid only when this field is None
Vote rejected_by = 1;

// The follower accepts this AppendEntries request's vote, but the prev_log_id conflicts with
// the follower's log. The leader should retry with a smaller prev_log_id that matches the
// follower's log. All subsequent fields are valid only when this field is false
bool conflict = 2;

// The last log id the follower accepted from this request.
// If None, all input entries were accepted and persisted.
// Otherwise, only entries up to and including this id were accepted
LogId last_log_id = 3;
}

// RaftReplyBytes encapsulates binary Raft response data
message RaftReplyBytes {
bytes value = 1; // Serialized Raft response data
// The first chunk of snapshot transmission, which contains the snapshot meta.
message SnapshotRequestMeta {
Vote vote = 1;

LogId last_log_id = 2;

LogId last_membership_log_id = 3;

Membership last_membership = 4;

string snapshot_id = 5;
}

// The item of snapshot chunk stream.
Expand All @@ -63,13 +139,25 @@ message RaftReplyBytes {
// Since the second item, the `rpc_meta` should be empty and will be ignored by
// the receiving end.
message SnapshotRequest {
oneof payload {
SnapshotRequestMeta meta = 1;
bytes chunk = 2;
}
}

message SnapshotResponse {
Vote vote = 1;
}

// InternalService handles internal Raft cluster communication
service InternalService {
// Vote handles vote requests between Raft nodes during leader election
rpc Vote(VoteRequest) returns (VoteResponse) {}

// bytes serialized meta data, including vote and snapshot_meta.
// ```text
// (SnapshotFormat, Vote, SnapshotMeta)
// ```
bytes rpc_meta = 1;
// AppendEntries handles call related to append entries RPC
rpc AppendEntries(AppendEntriesRequest) returns (AppendEntriesResponse) {}

// Snapshot data chunk
bytes chunk = 2;
// Snapshot handles install snapshot RPC
rpc Snapshot(stream SnapshotRequest) returns (SnapshotResponse) {}
}

40 changes: 19 additions & 21 deletions examples/raft-kv-memstore-grpc/proto/management_service.proto
Original file line number Diff line number Diff line change
@@ -1,50 +1,48 @@
syntax = "proto3";
import "google/protobuf/empty.proto";
import "google/protobuf/wrappers.proto";
import "internal_service.proto";
import "api_service.proto";
package openraftpb;

// ManagementService handles Raft cluster management operations
service ManagementService {
// Init initializes a new Raft cluster with the given nodes
rpc Init(InitRequest) returns (RaftReplyString) {}
rpc Init(InitRequest) returns (google.protobuf.Empty) {}

// AddLearner adds a new learner node to the Raft cluster
rpc AddLearner(AddLearnerRequest) returns (RaftReplyString) {}
rpc AddLearner(AddLearnerRequest) returns (ClientWriteResponse) {}

// ChangeMembership modifies the cluster membership configuration
rpc ChangeMembership(ChangeMembershipRequest) returns (RaftReplyString) {}
rpc ChangeMembership(ChangeMembershipRequest) returns (ClientWriteResponse) {}

// Metrics retrieves cluster metrics and status information
rpc Metrics(RaftRequestString) returns (RaftReplyString) {}
rpc Metrics(google.protobuf.Empty) returns (google.protobuf.StringValue) {}
}

// InitRequest contains the initial set of nodes for cluster initialization
message InitRequest {
repeated Node nodes = 1; // List of initial cluster nodes
}

// Node represents a single node in the Raft cluster
message Node {
string rpc_addr = 1; // RPC address for node communication
uint64 node_id = 2; // Unique identifier for the node
}

// AddLearnerRequest specifies parameters for adding a learner node
message AddLearnerRequest {
Node node = 1; // Node to be added as a learner
}

// RaftRequestString represents a string-based Raft request
message RaftRequestString {
string data = 1; // Request data in string format
}

// RaftReplyString represents a string-based Raft response
message RaftReplyString {
string data = 1; // Response data
string error = 2; // Error message, if any
}

// ChangeMembershipRequest specifies parameters for modifying cluster membership
message ChangeMembershipRequest {
repeated uint64 members = 1; // New set of member node IDs
bool retain = 2; // Whether to retain existing configuration
}

message ClientWriteResponse {
// The log id of the committed log entry.
LogId log_id = 1;

// If the committed log entry is a normal one.
Response data = 2;

// If the committed log entry is a change-membership entry.
Membership membership = 3;
}
Loading

0 comments on commit 7bebecb

Please sign in to comment.