Skip to content

Commit

Permalink
fix: correct request validation
Browse files Browse the repository at this point in the history
Signed-off-by: feathercyc <[email protected]>
  • Loading branch information
GG2002 committed Aug 11, 2024
1 parent 044260a commit 6f69055
Showing 1 changed file with 163 additions and 39 deletions.
202 changes: 163 additions & 39 deletions crates/xlineapi/src/request_validation.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use std::collections::HashSet;
use std::collections::{hash_map::Entry, HashMap};

use serde::{Deserialize, Serialize};
use thiserror::Error;
use utils::interval_map::{Interval, IntervalMap};

use crate::{
command::KeyRange, AuthRoleAddRequest, AuthRoleGrantPermissionRequest, AuthUserAddRequest,
interval::BytesAffine, AuthRoleAddRequest, AuthRoleGrantPermissionRequest, AuthUserAddRequest,
DeleteRangeRequest, PutRequest, RangeRequest, Request, RequestOp, SortOrder, SortTarget,
TxnRequest,
};
Expand Down Expand Up @@ -85,61 +86,187 @@ impl RequestValidator for TxnRequest {
}
}

let _ignore_success = check_intervals(&self.success)?;
let _ignore_failure = check_intervals(&self.failure)?;
check_intervals(&self.success)?;
check_intervals(&self.failure)?;

Ok(())
}
}

/// Check if puts and deletes overlap
fn check_intervals(ops: &[RequestOp]) -> Result<(HashSet<&[u8]>, Vec<KeyRange>), ValidationError> {
// TODO: use interval tree is better?
struct LCATree {
nodes: Vec<ReqNode>,
}

let mut dels = Vec::new();
struct ReqNode {
parent: usize,
depth: usize,
}

for op in ops {
if let Some(Request::RequestDeleteRange(ref req)) = op.request {
// collect dels
let del = KeyRange::new(req.key.as_slice(), req.range_end.as_slice());
dels.push(del);
impl LCATree {
fn new() -> Self {
Self {
nodes: vec![ReqNode {
parent: 0,
depth: 0,
}],
}
}
fn get_node(&self, i: usize) -> &ReqNode {
&self.nodes[i]
}
fn insert_req(&mut self, parent: usize, depth: usize) -> usize {
self.nodes.push(ReqNode { parent, depth });
self.nodes.len() - 1
}
fn find_lca(&self, node_a: usize, node_b: usize) -> usize {
let (mut x, mut y) = if self.get_node(node_a).depth < self.get_node(node_b).depth {
(node_a, node_b)
} else {
(node_b, node_a)
};
while self.get_node(x).depth < self.get_node(y).depth {
y = self.get_node(y).parent;
}
while x != y {
x = self.get_node(x).parent;
y = self.get_node(y).parent;
}
x
}
}
type DelsIntervalMap<'a> = IntervalMap<BytesAffine, Vec<usize>>;

fn new_bytes_affine_interval(start: &[u8], key_end: &[u8]) -> Interval<BytesAffine> {
let high = match key_end {
&[] => {
let mut end = start.to_vec();
end.push(0);
BytesAffine::Bytes(end)
}
&[0] => BytesAffine::Unbounded,
bytes => BytesAffine::Bytes(bytes.to_vec()),
};
Interval::new(BytesAffine::new_key(start), high)
}

let mut puts: HashSet<&[u8]> = HashSet::new();
/// Check if puts and deletes overlap
fn check_intervals(ops: &[RequestOp]) -> Result<(), ValidationError> {
let mut lca_tree = LCATree::new();
// Because `dels` stores Vec<node_idx> corresponding to the interval, merging two dels is slightly cumbersome.
// Here, dels are directly passed into the build function
let mut dels = DelsIntervalMap::new();
// This function will traverse all RequestOp and collect all the parent nodes corresponding to `put` and `del` operations.
// During this process, the atomicity of the put operation can be guaranteed.
let puts = build_interval_tree(ops, &mut dels, &mut lca_tree, 0, 0)?;

// Now we have `dels` and `puts` which contain all node index corresponding to `del` and `put` ops,
// we only need to iterate through the puts to find out whether each put overlaps with the del operation in the dels,
// and even if it overlaps, whether it satisfies lca.depth % 2 == 0.
for (put_key, put_vec) in puts {
let put_interval = new_bytes_affine_interval(put_key, &[]);
let overlaps = dels.find_all_overlap(&put_interval);
for put_node_idx in put_vec {
for (_, del_vec) in overlaps.iter() {
for del_node_idx in del_vec.iter() {
let lca_node_idx = lca_tree.find_lca(put_node_idx, *del_node_idx);
// lca.depth % 2 == 0 means this lca is on a success or failure branch,
// and two nodes on the same branch are prohibited from overlapping.
if lca_tree.get_node(lca_node_idx).depth % 2 == 0 {
return Err(ValidationError::DuplicateKey);
}
}
}
}
}

for op in ops {
if let Some(Request::RequestTxn(ref req)) = op.request {
// handle child txn request
let (success_puts, mut success_dels) = check_intervals(&req.success)?;
let (failure_puts, mut failure_dels) = check_intervals(&req.failure)?;
Ok(())
}

for k in success_puts.union(&failure_puts) {
if !puts.insert(k) {
return Err(ValidationError::DuplicateKey);
fn build_interval_tree<'a>(
ops: &'a [RequestOp],
dels_map: &mut DelsIntervalMap<'a>,
lca_tree: &mut LCATree,
parent: usize,
depth: usize,
) -> Result<HashMap<&'a [u8], Vec<usize>>, ValidationError> {
let mut puts_map: HashMap<&[u8], Vec<usize>> = HashMap::new();
for op in ops {
match op.request {
Some(Request::RequestDeleteRange(ref req)) => {
// collect dels
let cur_node_idx = lca_tree.insert_req(parent, depth);
let del = new_bytes_affine_interval(req.key.as_slice(), req.range_end.as_slice());
dels_map.entry(del).or_insert(vec![]).push(cur_node_idx);
}
Some(Request::RequestTxn(ref req)) => {
// RequestTxn is absolutely a node
let cur_node_idx = lca_tree.insert_req(parent, depth);
let success_puts_map = if !req.success.is_empty() {
// success branch is also a node
let success_node_idx = lca_tree.insert_req(cur_node_idx, depth + 1);
build_interval_tree(
&req.success,
dels_map,
lca_tree,
success_node_idx,
depth + 2,
)?
} else {
HashMap::new()
};
let failure_puts_map = if !req.failure.is_empty() {
// failure branch is also a node
let failure_node_idx = lca_tree.insert_req(cur_node_idx, depth + 1);
build_interval_tree(
&req.failure,
dels_map,
lca_tree,
failure_node_idx,
depth + 2,
)?
} else {
HashMap::new()
};
// success_puts_map and failure_puts_map cannot overlap with other op's puts_map.
for (sub_put_key, sub_put_node_idx) in success_puts_map.iter() {
if puts_map.contains_key(sub_put_key) {
return Err(ValidationError::DuplicateKey);
}
puts_map.insert(&sub_put_key, sub_put_node_idx.to_vec());
}
if dels.iter().any(|del| del.contains_key(k)) {
return Err(ValidationError::DuplicateKey);
// but they can overlap with each other
for (sub_put_key, mut sub_put_node_idx) in failure_puts_map.into_iter() {
match puts_map.entry(&sub_put_key) {
Entry::Vacant(_) => {
puts_map.insert(&sub_put_key, sub_put_node_idx);
}
Entry::Occupied(mut put_entry) => {
if !success_puts_map.contains_key(sub_put_key) {
return Err(ValidationError::DuplicateKey);
}
let put_vec = put_entry.get_mut();
put_vec.append(&mut sub_put_node_idx);
}
};
}
}

dels.append(&mut success_dels);
dels.append(&mut failure_dels);
_ => {}
}
}

// put in RequestPut cannot overlap with all puts in RequestTxn
for op in ops {
if let Some(Request::RequestPut(ref req)) = op.request {
// check puts in this level
if !puts.insert(&req.key) {
return Err(ValidationError::DuplicateKey);
}
if dels.iter().any(|del| del.contains_key(&req.key)) {
return Err(ValidationError::DuplicateKey);
match op.request {
Some(Request::RequestPut(ref req)) => {
if puts_map.contains_key(&req.key.as_slice()) {
return Err(ValidationError::DuplicateKey);
}
let cur_node_idx = lca_tree.insert_req(parent, depth);
puts_map.insert(&req.key, vec![cur_node_idx]);
}
_ => {}
}
}
Ok((puts, dels))
Ok(puts_map)
}

impl RequestValidator for AuthUserAddRequest {
Expand Down Expand Up @@ -583,9 +710,6 @@ mod test {
run_test(testcases);
}

// FIXME: This test will fail in the current implementation.
// See https://github.com/xline-kv/Xline/issues/410 for more details
#[ignore]
#[test]
fn check_intervals_txn_nested_overlap_should_return_error() {
let put_op = RequestOp {
Expand Down

0 comments on commit 6f69055

Please sign in to comment.