Skip to content

Commit 59b4115

Browse files
committed
feat: KVMeta: add proposed_at_ms to record the timestamp when a log is proposed
1 parent ac9af6a commit 59b4115

File tree

1 file changed

+99
-15
lines changed

1 file changed

+99
-15
lines changed

src/kv_meta.rs

Lines changed: 99 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,21 @@ pub struct KVMeta {
3333
///
3434
/// See [`flexible_timestamp_to_duration`]
3535
pub expire_at: Option<u64>,
36+
37+
/// The timestamp in milliseconds since Unix epoch (1970-01-01)
38+
/// when the raft-log that writes this record is proposed by the Raft Leader.
39+
#[serde(skip_serializing_if = "Option::is_none")]
40+
pub proposed_at_ms: Option<u64>,
3641
}
3742

3843
impl KVMeta {
3944
/// Create a new KVMeta.
4045
///
4146
/// `expire_at_sec_or_ms` can be either seconds or milliseconds.
42-
pub fn new(expire_at_sec_or_ms: Option<u64>) -> Self {
47+
pub fn new(expire_at_sec_or_ms: Option<u64>, proposed_at_ms: Option<u64>) -> Self {
4348
Self {
4449
expire_at: expire_at_sec_or_ms,
50+
proposed_at_ms,
4551
}
4652
}
4753

@@ -51,6 +57,7 @@ impl KVMeta {
5157
pub fn new_expires_at(expires_at_sec_or_ms: u64) -> Self {
5258
Self {
5359
expire_at: Some(expires_at_sec_or_ms),
60+
proposed_at_ms: None,
5461
}
5562
}
5663

@@ -68,18 +75,43 @@ impl KVMeta {
6875
pub fn expires_at_duration_opt(&self) -> Option<Duration> {
6976
self.expire_at.map(flexible_timestamp_to_duration)
7077
}
78+
79+
pub fn proposed_at(&self) -> Option<Duration> {
80+
self.proposed_at_ms.map(Duration::from_millis)
81+
}
7182
}
7283

7384
impl fmt::Display for KVMeta {
7485
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
75-
match self.expires_at_duration_opt() {
76-
Some(expire_at) => write!(
86+
write!(f, "(",)?;
87+
88+
let mut need_comma = false;
89+
90+
if let Some(expire_at) = self.expires_at_duration_opt() {
91+
need_comma = true;
92+
write!(
7793
f,
78-
"(expires_at: {})",
94+
"expires_at: {}",
7995
expire_at.display_unix_timestamp_short()
80-
),
81-
None => write!(f, "()"),
96+
)?
97+
}
98+
99+
if let Some(proposed_at) = self.proposed_at() {
100+
if need_comma {
101+
write!(f, ", ")?;
102+
}
103+
need_comma = true;
104+
write!(
105+
f,
106+
"proposed_at: {}",
107+
proposed_at.display_unix_timestamp_short()
108+
)?
82109
}
110+
111+
let _ = need_comma;
112+
113+
write!(f, ")",)?;
114+
Ok(())
83115
}
84116
}
85117

@@ -95,31 +127,31 @@ mod tests {
95127

96128
#[test]
97129
fn test_kv_meta_expirable_trait() {
98-
let kv_meta = KVMeta::new(Some(100_000_000_000));
130+
let kv_meta = KVMeta::new(Some(100_000_000_000), None);
99131
assert_eq!(kv_meta.expires_at_ms_opt(), Some(100_000_000_000_000));
100132

101-
let kv_meta = KVMeta::new(Some(100_000_000_001));
133+
let kv_meta = KVMeta::new(Some(100_000_000_001), None);
102134
assert_eq!(kv_meta.expires_at_ms_opt(), Some(100_000_000_001));
103135
}
104136

105137
#[test]
106138
fn test_kv_meta_method_get_expire_at_ms() {
107-
let kv_meta = KVMeta::new(Some(100_000_000_000));
139+
let kv_meta = KVMeta::new(Some(100_000_000_000), None);
108140
assert_eq!(kv_meta.get_expire_at_ms(), Some(100_000_000_000_000));
109141

110-
let kv_meta = KVMeta::new(Some(100_000_000_001));
142+
let kv_meta = KVMeta::new(Some(100_000_000_001), None);
111143
assert_eq!(kv_meta.get_expire_at_ms(), Some(100_000_000_001));
112144
}
113145

114146
#[test]
115147
fn test_kv_meta_method_expires_at_duration() {
116-
let kv_meta = KVMeta::new(Some(100_000_000_000));
148+
let kv_meta = KVMeta::new(Some(100_000_000_000), None);
117149
assert_eq!(
118150
kv_meta.expires_at_duration_opt(),
119151
Some(Duration::from_secs(100_000_000_000))
120152
);
121153

122-
let kv_meta = KVMeta::new(Some(100_000_000_001));
154+
let kv_meta = KVMeta::new(Some(100_000_000_001), None);
123155
assert_eq!(
124156
kv_meta.expires_at_duration_opt(),
125157
Some(Duration::from_millis(100_000_000_001))
@@ -128,13 +160,65 @@ mod tests {
128160

129161
#[test]
130162
fn test_kv_meta_display() {
131-
let kv_meta = KVMeta::new(Some(100_000_000_000));
163+
let kv_meta = KVMeta::new(Some(100_000_000_000), None);
132164
assert_eq!(kv_meta.to_string(), "(expires_at: 5138-11-16T09:46:40.000)");
133165

134-
let kv_meta = KVMeta::new(Some(100_000_000_001));
166+
let kv_meta = KVMeta::new(Some(100_000_000_001), None);
135167
assert_eq!(kv_meta.to_string(), "(expires_at: 1973-03-03T09:46:40.001)");
136168

137-
let kv_meta = KVMeta::new(None);
169+
let kv_meta = KVMeta::new(Some(100_000_000_001), Some(100_000_000_002));
170+
assert_eq!(
171+
kv_meta.to_string(),
172+
"(expires_at: 1973-03-03T09:46:40.001, proposed_at: 1973-03-03T09:46:40.002)"
173+
);
174+
175+
let kv_meta = KVMeta::new(None, Some(100_000_000_002));
176+
assert_eq!(
177+
kv_meta.to_string(),
178+
"(proposed_at: 1973-03-03T09:46:40.002)"
179+
);
180+
181+
let kv_meta = KVMeta::new(None, None);
138182
assert_eq!(kv_meta.to_string(), "()");
139183
}
184+
185+
#[test]
186+
fn test_kv_meta_serde() {
187+
let kv_meta = KVMeta::new(Some(100_000_000_001), Some(100_000_000_002));
188+
let serialized = serde_json::to_string(&kv_meta).unwrap();
189+
assert_eq!(
190+
serialized,
191+
r#"{"expire_at":100000000001,"proposed_at_ms":100000000002}"#
192+
);
193+
194+
let kv_meta = KVMeta::new(Some(100_000_000_001), None);
195+
let serialized = serde_json::to_string(&kv_meta).unwrap();
196+
assert_eq!(serialized, r#"{"expire_at":100000000001}"#);
197+
198+
let kv_meta = KVMeta::new(None, Some(100_000_000_002));
199+
let serialized = serde_json::to_string(&kv_meta).unwrap();
200+
assert_eq!(
201+
serialized,
202+
r#"{"expire_at":null,"proposed_at_ms":100000000002}"#
203+
);
204+
205+
let s = r#"{"expire_at":100000000001,"proposed_at_ms":100000000002}"#;
206+
let deserialized: KVMeta = serde_json::from_str(s).unwrap();
207+
assert_eq!(
208+
deserialized,
209+
KVMeta::new(Some(100_000_000_001), Some(100_000_000_002))
210+
);
211+
212+
let s = r#"{"expire_at":100000000001}"#;
213+
let deserialized: KVMeta = serde_json::from_str(s).unwrap();
214+
assert_eq!(deserialized, KVMeta::new(Some(100_000_000_001), None));
215+
216+
let s = r#"{"proposed_at_ms":100000000002}"#;
217+
let deserialized: KVMeta = serde_json::from_str(s).unwrap();
218+
assert_eq!(deserialized, KVMeta::new(None, Some(100_000_000_002)));
219+
220+
let s = r#"{}"#;
221+
let deserialized: KVMeta = serde_json::from_str(s).unwrap();
222+
assert_eq!(deserialized, KVMeta::new(None, None));
223+
}
140224
}

0 commit comments

Comments
 (0)