Skip to content

Commit f4edfbd

Browse files
committedMar 26, 2016
Finish ORSWOT implementation.
1 parent 3062969 commit f4edfbd

File tree

7 files changed

+340
-36
lines changed

7 files changed

+340
-36
lines changed
 

‎.gitignore

+2
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,4 @@
11
*.swp
22
*.swo
3+
Cargo.lock
4+
target

‎Cargo.toml

+4-4
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
[package]
2-
name = "crdt"
2+
name = "crdts"
33
description = "Practical serializable CRDTs (counters, LWW, ORSWOT) implemented in Rust"
4-
version = "0.0.1"
4+
version = "0.1.0"
55
authors = ["Tyler Neely <t@jujit.su>"]
66
license = "Apache-2.0"
77
homepage = "https://github.com/spacejam/rust-crdt"
8-
keywords = ["crdt", "data structures", "distributed-systems", "vector clock", "orswot"]
8+
keywords = ["crdt", "data-structures", "distributed-systems", "vector-clock", "orswot"]
99

1010
[[test]]
1111

@@ -17,4 +17,4 @@ rustc-serialize = "0.3.18"
1717
bincode = "0.5.1"
1818

1919
[dev-dependencies]
20-
quickcheck = "*"
20+
quickcheck = "0.2.27"

‎README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ Afterwards, either check out the [riak dt](https://github.com/basho/riak_dt) sou
66

77

88
- [x] Vector Clock
9+
- [x] ORSWOT
910
- [ ] LWW Register
10-
- [ ] ORSWOT
1111
- [ ] Map
1212
- [ ] G-Set
1313
- [ ] OR-Set

‎src/lib.rs

-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
#![crate_id = "crdt"]
21
#![crate_type = "lib"]
32

43
pub use vclock::VClock;

‎src/orswot.rs

+202-12
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,15 @@
33
//! # Examples
44
//!
55
6-
use std::collections::BTreeMap;
6+
use std::collections::{BTreeMap, BTreeSet};
77

88
use super::VClock;
99

1010
#[derive(Debug, Clone, PartialEq, Eq, Hash, Ord, PartialOrd, RustcEncodable, RustcDecodable)]
1111
pub struct Orswot<Member: Ord + Clone, Actor: Ord + Clone> {
1212
clock: VClock<Actor>,
1313
entries: BTreeMap<Member, VClock<Actor>>,
14-
deferred: BTreeMap<VClock<Actor>, Vec<Member>>,
14+
deferred: BTreeMap<VClock<Actor>, BTreeSet<Member>>,
1515
}
1616

1717
impl<Member: Ord + Clone, Actor: Ord + Clone> Orswot<Member, Actor> {
@@ -33,31 +33,221 @@ impl<Member: Ord + Clone, Actor: Ord + Clone> Orswot<Member, Actor> {
3333
}
3434

3535
pub fn add_all(&mut self, members: Vec<(Member, Actor)>) {
36-
members.into_iter().map(|(member, actor)| self.add(member, actor));
36+
for (member, actor) in members.into_iter() {
37+
self.add(member, actor);
38+
}
3739
}
3840

3941
// drop the element out, ignoring the vclock for the element and orswot
4042
pub fn remove(&mut self, member: Member) -> Option<VClock<Actor>> {
4143
self.entries.remove(&member)
4244
}
4345

46+
pub fn remove_with_context(&mut self, member: Member,
47+
context: &VClock<Actor>)
48+
-> Option<VClock<Actor>> {
49+
if context.dominating_vclock(self.clock.clone()).is_empty() {
50+
self.entries.remove(&member)
51+
} else {
52+
let mut deferred_drops = self.deferred.remove(context).unwrap_or(BTreeSet::new());
53+
deferred_drops.insert(member);
54+
self.deferred.insert(context.clone(), deferred_drops);
55+
None
56+
}
57+
}
58+
4459
pub fn remove_all(&mut self, members: Vec<Member>) -> Vec<Option<VClock<Actor>>> {
4560
members.into_iter().map(|member| self.remove(member)).collect()
4661
}
4762

63+
pub fn remove_all_with_context(&mut self,
64+
members: Vec<Member>,
65+
context: &VClock<Actor>)
66+
-> Vec<Option<VClock<Actor>>> {
67+
members.into_iter().map(|member| self.remove_with_context(member, context)).collect()
68+
}
69+
4870
pub fn value(&self) -> Vec<Member> {
4971
self.entries.keys().cloned().collect()
5072
}
5173

52-
pub fn merge(&mut self, other: Orswot<Member, Actor>) {
53-
self.clock.merge(other.clock);
74+
/// Merge combines another `Orswot` with this one.
75+
///
76+
pub fn merge(&mut self, mut other: Orswot<Member, Actor>) {
77+
let mut keep = BTreeMap::new();
78+
for (entry, clock) in self.entries.clone().into_iter() {
79+
if !other.entries.contains_key(&entry) {
80+
// other doesn't contain this entry because it:
81+
// 1. has witnessed it and dropped it
82+
// 2. hasn't witnessed it
83+
if clock.dominating_vclock(other.clock.clone()).is_empty() {
84+
// the other orswot has witnessed the entry's clock, and dropped this entry
85+
} else {
86+
// the other orswot has not witnessed this add, so add it
87+
keep.insert(entry, clock);
88+
}
89+
} else {
90+
// SUBTLE: this entry is present in both orswots, BUT that doesn't mean we
91+
// shouldn't drop it!
92+
let common = clock.intersection(other.clone().clock);
93+
let luniq = clock.dominating_vclock(common.clone());
94+
let runiq = other.clone().clock.dominating_vclock(common.clone());
95+
let lkeep = luniq.dominating_vclock(other.clone().clock);
96+
let rkeep = runiq.dominating_vclock(self.clone().clock);
97+
// Perfectly possible that an item in both sets should be dropped
98+
let mut common = common;
99+
common.merge(lkeep);
100+
common.merge(rkeep);
101+
if !common.is_empty() {
102+
// we should not drop, as there are common clocks
103+
} else {
104+
keep.insert(entry.clone(), clock);
105+
}
106+
// don't want to consider this again below
107+
other.entries.remove(&entry).unwrap();
108+
}
109+
}
110+
111+
for (entry, clock) in other.entries.clone().into_iter() {
112+
let dom_clock = clock.dominating_vclock(self.clock.clone());
113+
if !dom_clock.is_empty() {
114+
// other has witnessed a novel addition, so add it
115+
keep.insert(entry, dom_clock);
116+
}
117+
}
118+
119+
// merge deferred removals
120+
for (clock, mut deferred) in self.deferred.iter_mut() {
121+
let other_deferred = other.deferred.remove(clock).unwrap_or(BTreeSet::new());
122+
for e in other_deferred.into_iter() {
123+
deferred.insert(e);
124+
}
125+
}
126+
127+
self.entries = keep;
128+
129+
// merge vclocks
130+
self.clock.merge(other.clone().clock);
131+
132+
self.apply_deferred();
133+
}
134+
135+
fn apply_deferred(&mut self) {
136+
let deferred = self.deferred.clone();
137+
self.deferred = BTreeMap::new();
138+
for (clock, entries) in deferred.into_iter() {
139+
self.remove_all_with_context(entries.into_iter().collect(), &clock);
140+
}
141+
}
142+
}
143+
144+
#[cfg(test)]
145+
mod tests {
146+
use super::*;
147+
148+
#[test]
149+
fn test_disjoint_merge() {
150+
let (mut a, mut b) = (Orswot::new(), Orswot::new());
151+
a.add("bar", "A");
152+
assert_eq!(a.value(), vec!["bar"]);
153+
b.add("baz", "B");
154+
assert_eq!(b.value(), vec!["baz"]);
155+
let mut c = a.clone();
156+
assert_eq!(c.value(), vec!["bar"]);
157+
c.merge(b);
158+
assert_eq!(c.value(), vec!["bar", "baz"]);
159+
a.remove("bar");
160+
let mut d = a.clone();
161+
d.merge(c);
162+
assert_eq!(d.value(), vec!["baz"]);
163+
}
164+
165+
#[test]
166+
fn test_present_but_removed() {
167+
54168
}
169+
/*
170+
171+
%% Bug found by EQC, not dropping dots in merge when an element is
172+
%% present in both Sets leads to removed items remaining after merge.
173+
present_but_removed_test() ->
174+
%% Add Z to A
175+
{ok, A} = update({add, 'Z'}, a, new()),
176+
%% Replicate it to C so A has 'Z'->{e, 1}
177+
C = A,
178+
%% Remove Z from A
179+
{ok, A2} = update({remove, 'Z'}, a, A),
180+
%% Add Z to B, a new replica
181+
{ok, B} = update({add, 'Z'}, b, new()),
182+
%% Replicate B to A, so now A has a Z, the one with a Dot of
183+
%% {b,1} and clock of [{a, 1}, {b, 1}]
184+
A3 = merge(B, A2),
185+
%% Remove the 'Z' from B replica
186+
{ok, B2} = update({remove, 'Z'}, b, B),
187+
%% Both C and A have a 'Z', but when they merge, there should be
188+
%% no 'Z' as C's has been removed by A and A's has been removed by
189+
%% C.
190+
Merged = lists:foldl(fun(Set, Acc) ->
191+
merge(Set, Acc) end,
192+
%% the order matters, the two replicas that
193+
%% have 'Z' need to merge first to provoke
194+
%% the bug. You end up with 'Z' with two
195+
%% dots, when really it should be removed.
196+
A3,
197+
[C, B2]),
198+
?assertEqual([], value(Merged)).
199+
200+
%% A bug EQC found where dropping the dots in merge was not enough if
201+
%% you then store the value with an empty clock (derp).
202+
no_dots_left_test() ->
203+
{ok, A} = update({add, 'Z'}, a, new()),
204+
{ok, B} = update({add, 'Z'}, b, new()),
205+
C = A, %% replicate A to empty C
206+
{ok, A2} = riak_dt_orswot:update({remove, 'Z'}, a, A),
207+
%% replicate B to A, now A has B's 'Z'
208+
A3 = riak_dt_orswot:merge(A2, B),
209+
%% Remove B's 'Z'
210+
{ok, B2} = riak_dt_orswot:update({remove, 'Z'}, b, B),
211+
%% Replicate C to B, now B has A's old 'Z'
212+
B3 = riak_dt_orswot:merge(B2, C),
213+
%% Merge everytyhing, without the fix You end up with 'Z' present,
214+
%% with no dots
215+
Merged = lists:foldl(fun(Set, Acc) ->
216+
merge(Set, Acc) end,
217+
A3,
218+
[B3, C]),
219+
?assertEqual([], value(Merged)).
220+
221+
%% A test I thought up
222+
%% - existing replica of ['A'] at a and b,
223+
%% - add ['B'] at b, but not communicated to any other nodes, context returned to client
224+
%% - b goes down forever
225+
%% - remove ['A'] at a, using the context the client got from b
226+
%% - will that remove happen?
227+
%% case for shouldn't: the context at b will always be bigger than that at a
228+
%% case for should: we have the information in dots that may allow us to realise it can be removed
229+
%% without us caring.
230+
%%
231+
%% as the code stands, 'A' *is* removed, which is almost certainly correct. This behaviour should
232+
%% always happen, but may not. (ie, the test needs expanding)
233+
dead_node_update_test() ->
234+
{ok, A} = update({add, 'A'}, a, new()),
235+
{ok, B} = update({add, 'B'}, b, A),
236+
BCtx = precondition_context(B),
237+
{ok, A2} = update({remove, 'A'}, a, A, BCtx),
238+
?assertEqual([], value(A2)).
239+
240+
%% Batching should not re-order ops
241+
batch_order_test() ->
242+
{ok, Set} = update({add_all, [<<"bar">>, <<"baz">>]}, a, new()),
243+
Context = precondition_context(Set),
244+
{ok, Set2} = update({update, [{remove, <<"baz">>}, {add, <<"baz">>}]}, a, Set, Context),
245+
?assertEqual([<<"bar">>, <<"baz">>], value(Set2)),
246+
{ok, Set3} = update({update, [{remove, <<"baz">>}, {add, <<"baz">>}]}, a, Set),
247+
?assertEqual([<<"bar">>, <<"baz">>], value(Set3)),
248+
{ok, Set4} = update({remove, <<"baz">>}, a, Set),
249+
{ok, Set5} = update({add, <<"baz">>}, a, Set4),
250+
?assertEqual([<<"bar">>, <<"baz">>], value(Set5)).
55251
56-
fn merge_deferred(&mut self, deferred: BTreeMap<VClock<Actor>, Vec<Member>>) {}
57-
pub fn equal() {}
58-
pub fn precondition_context() {}
59-
pub fn stats() {}
60-
pub fn stat() {}
61-
pub fn parent_clock() {}
62-
pub fn to_version() {}
252+
*/
63253
}

0 commit comments

Comments
 (0)