From dd35ecbb08460fcfac0c9588e3ad1255df1ea75f Mon Sep 17 00:00:00 2001 From: Morgan279 Date: Tue, 27 Sep 2022 22:15:56 +0800 Subject: [PATCH 1/6] Add RDB dump Signed-off-by: Morgan279 --- src/cmd/fake.rs | 15 +- src/cmd/mod.rs | 7 + src/lib.rs | 4 + src/rdb.rs | 577 +++++++++++++++++++++++++++++++++++++++++++++ src/tikv/errors.rs | 28 +++ 5 files changed, 628 insertions(+), 3 deletions(-) create mode 100644 src/rdb.rs diff --git a/src/cmd/fake.rs b/src/cmd/fake.rs index 7b8a149..620a186 100644 --- a/src/cmd/fake.rs +++ b/src/cmd/fake.rs @@ -6,14 +6,14 @@ use tokio::sync::Mutex; use crate::client::Client; use crate::cmd::Invalid; use crate::tikv::errors::{ - REDIS_INVALID_CLIENT_ID_ERR, REDIS_NOT_SUPPORTED_ERR, REDIS_NO_SUCH_CLIENT_ERR, - REDIS_VALUE_IS_NOT_INTEGER_ERR, + REDIS_DUMPING_ERR, REDIS_INVALID_CLIENT_ID_ERR, REDIS_NOT_SUPPORTED_ERR, + REDIS_NO_SUCH_CLIENT_ERR, REDIS_VALUE_IS_NOT_INTEGER_ERR, }; use crate::{ config::LOGGER, tikv::errors::REDIS_UNKNOWN_SUBCOMMAND, utils::{resp_bulk, resp_err, resp_int, resp_invalid_arguments, resp_nil, resp_ok}, - Connection, Frame, Parse, + Connection, Frame, Parse, RDB, }; #[derive(Debug, Clone)] @@ -232,6 +232,15 @@ impl Fake { _ => resp_err(REDIS_UNKNOWN_SUBCOMMAND), } } + "SAVE" => match RDB::dump().await { + Ok(()) => resp_ok(), + Err(e) => { + if e != REDIS_DUMPING_ERR { + RDB::reset_dumping(); + } + resp_err(e) + } + }, // can not reached here _ => resp_nil(), }; diff --git a/src/cmd/mod.rs b/src/cmd/mod.rs index 8f79608..e8b544c 100644 --- a/src/cmd/mod.rs +++ b/src/cmd/mod.rs @@ -325,6 +325,7 @@ pub enum Command { ReadOnly(Fake), Client(Fake), Info(Fake), + Save(Fake), // multi/exec/abort Multi(Multi), @@ -572,6 +573,10 @@ impl Command { Fake::parse_frames(&mut parse, "info"), &mut parse, )), + "save" => Command::Save(transform_parse( + Fake::parse_frames(&mut parse, "save"), + &mut parse, + )), "multi" => Command::Multi(Multi::new()), "exec" => Command::Exec(Multi::new()), "discard" => Command::Discard(Multi::new()), @@ -779,6 +784,7 @@ impl Command { ReadOnly(cmd) => cmd.apply("readonly", dst, cur_client, clients).await, Client(cmd) => cmd.apply("client", dst, cur_client, clients).await, Info(cmd) => cmd.apply("info", dst, cur_client, clients).await, + Save(cmd) => cmd.apply("save", dst, cur_client, clients).await, Scan(cmd) => cmd.apply(dst).await, Xscan(cmd) => cmd.apply(dst).await, @@ -877,6 +883,7 @@ impl Command { Command::ReadOnly(_) => "readonly", Command::Client(_) => "client", Command::Info(_) => "info", + Command::Save(_) => "save", Command::Multi(_) => "multi", Command::Exec(_) => "exec", Command::Discard(_) => "discard", diff --git a/src/lib.rs b/src/lib.rs index 3e3f843..e9b567e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,4 @@ +#![feature(concat_bytes)] #[macro_use] extern crate lazy_static; #[macro_use] @@ -43,6 +44,9 @@ pub mod cluster; pub mod client; +pub mod rdb; +use rdb::RDB; + pub mod utils; pub mod config; diff --git a/src/rdb.rs b/src/rdb.rs new file mode 100644 index 0000000..e69925a --- /dev/null +++ b/src/rdb.rs @@ -0,0 +1,577 @@ +use crate::config::LOGGER; +use crate::tikv::encoding::{DataType, KeyDecoder}; +use crate::tikv::errors::{ + AsyncResult, REDIS_DUMPING_ERR, REDIS_LIST_TOO_LARGE_ERR, REDIS_VALUE_IS_NOT_INTEGER_ERR, +}; +use crate::tikv::{get_txn_client, KEY_ENCODER}; +use crc::{Crc, Digest, CRC_64_GO_ISO}; +use futures::FutureExt; +use slog::debug; +use std::collections::HashMap; +use std::convert::TryInto; +use std::ops::RangeFrom; +use std::pin::Pin; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::time::{SystemTime, UNIX_EPOCH}; +use tikv_client::{BoundRange, Key, Transaction, Value}; +use tokio::fs::File; +use tokio::io::{AsyncWrite, AsyncWriteExt}; + +const RDB_VERSION: u32 = 9; +const REDIS_VERSION: &str = "6.0.16"; + +const RDB_6BITLEN: u8 = 0; +const RDB_14BITLEN: u8 = 1; +const RDB_32BITLEN: u8 = 0x80; +const RDB_64BITLEN: u8 = 0x81; + +const ZIP_LIST_HEADER_SIZE: usize = 10; +const ZIP_BIG_PREV_LEN: usize = 254; +const ZIP_END: u8 = 255; +const ZIP_INT_16B: u8 = 0xc0; +const ZIP_INT_32B: u8 = 0xc0 | 1 << 4; +const ZIP_INT_64B: u8 = 0xc0 | 2 << 4; +const ZIP_INT_24B: u8 = 0xc0 | 3 << 4; +const ZIP_INT_8B: u8 = 0xfe; + +const LEN_SPECIAL: u8 = 3; +const RDB_ENC_INT8: u8 = 0; +const RDB_ENC_INT16: u8 = 1; +const RDB_ENC_INT32: u8 = 2; +const ENCODE_INT8_PREFIX: u8 = (LEN_SPECIAL << 6) | RDB_ENC_INT8; +const ENCODE_INT16_PREFIX: u8 = (LEN_SPECIAL << 6) | RDB_ENC_INT16; +const ENCODE_INT32_PREFIX: u8 = (LEN_SPECIAL << 6) | RDB_ENC_INT32; + +const RDB_OPCODE_AUX: u8 = 250; +const RDB_OPCODE_EXPIRETIME_MS: u8 = 252; +const RDB_OPCODE_SELECTDB: u8 = 254; +const RDB_OPCODE_EOF: u8 = 255; + +const RDB_TYPE_STRING: u8 = 0; +const RDB_TYPE_SET: u8 = 2; +const RDB_TYPE_HASH: u8 = 4; +const RDB_TYPE_LIST_ZIPLIST: u8 = 10; +const RDB_TYPE_ZSET_ZIPLIST: u8 = 12; + +static DUMPING: AtomicBool = AtomicBool::new(false); + +pub struct RDB; + +struct RDBEncoder { + writer: Pin>, + buf: [u8; 16], +} + +impl RDBEncoder { + pub fn new(dump_file: File) -> Self { + RDBEncoder { + writer: Box::pin(dump_file), + buf: [0; 16], + } + } + + pub async fn save_header<'a>(&mut self, digest: &mut Digest<'a, u64>) -> AsyncResult<()> { + self.write(format!("REDIS{:04}", RDB_VERSION).as_bytes(), digest) + .await?; + self.save_aux_field("redis-ver", REDIS_VERSION, digest) + .await?; + self.save_aux_field("redis-bits", &(usize::BITS * 8).to_string(), digest) + .await?; + self.save_aux_field( + "ctime", + &SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs() + .to_string(), + digest, + ) + .await?; + + self.write(&RDB_OPCODE_SELECTDB.to_ne_bytes(), digest) + .await?; + // all data saved in db 0 by default + self.save_length(0, digest).await?; + + Ok(()) + } + + pub async fn save_kv_pair<'a>( + &mut self, + user_key: &str, + val: Value, + readonly_txn: &mut Transaction, + digest: &mut Digest<'a, u64>, + ) -> AsyncResult<()> { + let ttl = KeyDecoder::decode_key_ttl(&val); + if ttl > 0 { + self.write(&RDB_OPCODE_EXPIRETIME_MS.to_le_bytes(), digest) + .await?; + self.write(&ttl.to_le_bytes(), digest).await?; + } + + match KeyDecoder::decode_key_type(&val) { + DataType::String => self.save_string_obj(user_key, val, digest).await, + DataType::Hash => { + self.save_hash_obj(user_key, val, readonly_txn, digest) + .await + } + DataType::List => { + self.save_list_obj(user_key, val, readonly_txn, digest) + .await + } + DataType::Set => self.save_set_obj(user_key, val, readonly_txn, digest).await, + DataType::Zset => { + self.save_zset_obj(user_key, val, readonly_txn, digest) + .await + } + _ => panic!("RDB decode unknown key type"), + } + } + + pub async fn save_footer<'a>(&mut self, mut digest: Digest<'a, u64>) -> AsyncResult<()> { + self.write(&RDB_OPCODE_EOF.to_ne_bytes(), &mut digest) + .await?; + let checksum = digest.finalize(); + self.writer.write_all(&checksum.to_le_bytes()).await?; + self.writer.flush().await?; + Ok(()) + } + + async fn save_string_obj<'a>( + &mut self, + user_key: &str, + val: Value, + digest: &mut Digest<'a, u64>, + ) -> AsyncResult<()> { + let data_val = KeyDecoder::decode_key_string_value(&val); + let data_val_str = String::from_utf8_lossy(data_val.as_slice()); + debug!( + LOGGER, + "[saving string obj] key: {}, value: {}", user_key, data_val_str + ); + self.write(&RDB_TYPE_STRING.to_ne_bytes(), digest).await?; + self.save_raw_string(user_key, digest).await?; + self.save_raw_string(&data_val_str, digest).await?; + Ok(()) + } + + async fn save_hash_obj<'a>( + &mut self, + user_key: &str, + val: Value, + readonly_txn: &mut Transaction, + digest: &mut Digest<'a, u64>, + ) -> AsyncResult<()> { + self.write(&RDB_TYPE_HASH.to_ne_bytes(), digest).await?; + self.save_raw_string(user_key, digest).await?; + + let key = user_key.to_owned(); + let (_, version, _) = KeyDecoder::decode_key_meta(&val); + let bound_range: BoundRange = (KEY_ENCODER.encode_txnkv_hash_data_key_start(&key, version) + ..KEY_ENCODER.encode_txnkv_hash_data_key_end(&key, version)) + .into(); + let iter = readonly_txn.scan(bound_range, u32::MAX).await?; + let hash_map: HashMap = iter + .map(|kv| { + let field = + String::from_utf8(KeyDecoder::decode_key_hash_userkey_from_datakey(&key, kv.0)) + .unwrap(); + let field_val = String::from_utf8_lossy(kv.1.as_slice()).to_string(); + (field, field_val) + }) + .collect(); + let hash_map_vec: Vec = hash_map + .iter() + .map(|(field, value)| field.to_string() + ":" + value) + .collect(); + debug!( + LOGGER, + "[saving hash obj] key: {}, fields: [{}]", + user_key, + hash_map_vec.join(", ") + ); + + self.save_length(hash_map.len(), digest).await?; + for kv in hash_map { + self.save_raw_string(&kv.0, digest).await?; + self.save_raw_string(&kv.1, digest).await?; + } + + Ok(()) + } + + async fn save_list_obj<'a>( + &mut self, + user_key: &str, + val: Value, + readonly_txn: &mut Transaction, + digest: &mut Digest<'a, u64>, + ) -> AsyncResult<()> { + self.write(&RDB_TYPE_LIST_ZIPLIST.to_ne_bytes(), digest) + .await?; + self.save_raw_string(user_key, digest).await?; + + let key = user_key.to_owned(); + let (_, version, left, right) = KeyDecoder::decode_key_list_meta(&val); + let data_key_start = KEY_ENCODER.encode_txnkv_list_data_key(&key, left, version); + let range: RangeFrom = data_key_start..; + let from_range: BoundRange = range.into(); + let iter = readonly_txn + .scan(from_range, (right - left).try_into().unwrap()) + .await?; + let data_vals: Vec = iter + .map(|kv| String::from_utf8_lossy(kv.1.as_slice()).to_string()) + .collect(); + debug!( + LOGGER, + "[saving list obj] key: {}, members: [{}]", + user_key, + data_vals.join(", ") + ); + + self.save_ziplist(data_vals, digest).await?; + + Ok(()) + } + + async fn save_set_obj<'a>( + &mut self, + user_key: &str, + val: Value, + readonly_txn: &mut Transaction, + digest: &mut Digest<'a, u64>, + ) -> AsyncResult<()> { + self.write(&RDB_TYPE_SET.to_ne_bytes(), digest).await?; + self.save_raw_string(user_key, digest).await?; + + let key = user_key.to_owned(); + let (_, version, _) = KeyDecoder::decode_key_meta(&val); + let bound_range = KEY_ENCODER.encode_txnkv_set_data_key_range(&key, version); + let iter = readonly_txn.scan_keys(bound_range, u32::MAX).await?; + let members: Vec = iter + .map(|k| { + String::from_utf8(KeyDecoder::decode_key_set_member_from_datakey(&key, k)).unwrap() + }) + .collect(); + debug!( + LOGGER, + "[saving set obj] key: {}, members: [{}]", + user_key, + members.join(", ") + ); + + self.save_length(members.len(), digest).await?; + for member in members { + self.save_raw_string(&member, digest).await?; + } + + Ok(()) + } + + async fn save_zset_obj<'a>( + &mut self, + user_key: &str, + val: Value, + readonly_txn: &mut Transaction, + digest: &mut Digest<'a, u64>, + ) -> AsyncResult<()> { + self.write(&RDB_TYPE_ZSET_ZIPLIST.to_ne_bytes(), digest) + .await?; + self.save_raw_string(user_key, digest).await?; + + let key = user_key.to_owned(); + let (_, version, _) = KeyDecoder::decode_key_meta(&val); + let sub_meta_key_range = KEY_ENCODER.encode_txnkv_sub_meta_key_range(&key, version); + let size: i64 = readonly_txn + .scan(sub_meta_key_range, u32::MAX) + .await? + .map(|kv| i64::from_be_bytes(kv.1.try_into().unwrap())) + .sum(); + let bound_range = KEY_ENCODER.encode_txnkv_zset_score_key_range(&key, version); + let iter = readonly_txn + .scan(bound_range, size.try_into().unwrap()) + .await?; + let member_score_map: HashMap = iter + .map(|kv| { + let member = String::from_utf8_lossy(kv.1.as_slice()).to_string(); + let score = KeyDecoder::decode_key_zset_score_from_scorekey(&key, kv.0); + (member, score) + }) + .collect(); + let member_score_vec: Vec = member_score_map + .iter() + .map(|(member, score)| member.to_string() + ":" + score.to_string().as_str()) + .collect(); + debug!( + LOGGER, + "[save zset obj] key: {}, members: [{}]", + user_key, + member_score_vec.join(", ") + ); + + let mut zl_elements: Vec = Vec::with_capacity(member_score_map.len() * 2); + for (member, score) in member_score_map { + zl_elements.push(member); + zl_elements.push(score.to_string()); + } + self.save_ziplist(zl_elements, digest).await?; + + Ok(()) + } + + async fn save_aux_field<'a>( + &mut self, + key: &str, + val: &str, + digest: &mut Digest<'a, u64>, + ) -> AsyncResult<()> { + self.write(&RDB_OPCODE_AUX.to_ne_bytes(), digest).await?; + self.save_raw_string(key, digest).await?; + self.save_raw_string(val, digest).await?; + Ok(()) + } + + async fn save_ziplist<'a>( + &mut self, + values: Vec, + digest: &mut Digest<'a, u64>, + ) -> AsyncResult<()> { + if values.len() > u16::MAX as usize { + return Err(REDIS_LIST_TOO_LARGE_ERR); + } + + // zip list header + EOF + let mut zl_bytes = ZIP_LIST_HEADER_SIZE + 1; + let mut zl_tail_offset = ZIP_LIST_HEADER_SIZE; + let mut zl_buf = vec![0; ZIP_LIST_HEADER_SIZE]; + + let mut prev_len = 0; + for i in 0..values.len() { + let mut entry = self.encode_ziplist_entry(prev_len, values.get(i).unwrap())?; + prev_len = entry.len(); + zl_buf.append(&mut entry); + zl_bytes += prev_len; + if i < values.len() - 1 { + zl_tail_offset += prev_len; + } + } + zl_buf.push(ZIP_END); + + zl_buf[0..4].copy_from_slice(&(zl_bytes as u32).to_le_bytes()); + zl_buf[4..8].copy_from_slice(&(zl_tail_offset as u32).to_le_bytes()); + zl_buf[8..10].copy_from_slice(&(values.len() as u16).to_le_bytes()); + unsafe { + self.save_raw_string(&String::from_utf8_unchecked(zl_buf), digest) + .await?; + } + Ok(()) + } + + fn encode_ziplist_entry(&mut self, prev_len: usize, val: &str) -> AsyncResult> { + let mut entry_buf = vec![]; + + if prev_len < ZIP_BIG_PREV_LEN { + entry_buf.push(prev_len as u8); + } else { + entry_buf.push(ZIP_BIG_PREV_LEN as u8); + entry_buf.append(&mut (prev_len as u32).to_le_bytes().to_vec()); + } + + match val.parse::() { + Ok(int_val) => { + if (0..13).contains(&int_val) { + entry_buf.push(0xF0 | ((int_val + 1) as u8)); + } else if ((-1 << 7)..(1 << 7)).contains(&int_val) { + entry_buf.push(ZIP_INT_8B); + entry_buf.append(&mut (int_val as i8).to_le_bytes().to_vec()); + } else if ((-1 << 15)..(1 << 15)).contains(&int_val) { + entry_buf.push(ZIP_INT_16B); + entry_buf.append(&mut (int_val as i16).to_le_bytes().to_vec()); + } else if ((-1 << 23)..(1 << 23)).contains(&int_val) { + entry_buf.push(ZIP_INT_24B); + entry_buf.append(&mut (int_val as i32).to_le_bytes()[0..3].to_vec()); + } else if ((-1 << 31)..(1 << 31)).contains(&int_val) { + entry_buf.push(ZIP_INT_32B); + entry_buf.append(&mut (int_val as i32).to_le_bytes().to_vec()); + } else { + entry_buf.push(ZIP_INT_64B); + entry_buf.append(&mut int_val.to_le_bytes().to_vec()); + } + } + Err(_) => { + let len = val.len(); + if len < (1 << 6) { + entry_buf.push(len as u8); + } else if len < (1 << 14) { + entry_buf.push(((len >> 8) as u8) | (RDB_14BITLEN << 6)); + entry_buf.push(len as u8); + } else if len <= u32::MAX as usize { + entry_buf.push(RDB_32BITLEN); + entry_buf.append(&mut (len as u32).to_ne_bytes().to_vec()); + } else { + entry_buf.push(RDB_64BITLEN); + entry_buf.append(&mut (len as u64).to_ne_bytes().to_vec()); + } + entry_buf.append(&mut val.as_bytes().to_vec()); + } + } + + Ok(entry_buf) + } + + async fn save_raw_string<'a>( + &mut self, + s: &str, + digest: &mut Digest<'a, u64>, + ) -> AsyncResult<()> { + match self.try_integer_encoding(s, digest).await { + Ok(_) => Ok(()), + Err(_) => { + self.save_length(s.len(), digest).await?; + self.write(s.as_bytes(), digest).await?; + Ok(()) + } + } + } + + async fn try_integer_encoding<'a>( + &mut self, + s: &str, + digest: &mut Digest<'a, u64>, + ) -> AsyncResult<()> { + let int_val = s.parse::()?; + + if int_val >= i8::MIN as i64 && int_val <= i8::MAX as i64 { + self.buf[0] = ENCODE_INT8_PREFIX; + self.buf[1..2].copy_from_slice(&(int_val as i8).to_le_bytes()); + self.write_buf(2, digest).await?; + } else if int_val >= i16::MIN as i64 && int_val <= i16::MAX as i64 { + self.buf[0] = ENCODE_INT16_PREFIX; + self.buf[1..3].copy_from_slice(&(int_val as i16).to_le_bytes()); + self.write_buf(3, digest).await?; + } else if int_val >= i32::MIN as i64 && int_val <= i32::MAX as i64 { + self.buf[0] = ENCODE_INT32_PREFIX; + self.buf[1..5].copy_from_slice(&(int_val as i32).to_le_bytes()); + self.write_buf(5, digest).await?; + } else { + // out of i32 range, waive integer encoding + return Err(REDIS_VALUE_IS_NOT_INTEGER_ERR); + } + + Ok(()) + } + + async fn save_length<'a>( + &mut self, + len: usize, + digest: &mut Digest<'a, u64>, + ) -> AsyncResult<()> { + if len < (1 << 6) { + self.buf[0] = (len as u8) | (RDB_6BITLEN << 6); + self.write_buf(1, digest).await?; + } else if len < (1 << 14) { + self.buf[0] = ((len >> 8) as u8) | (RDB_14BITLEN << 6); + self.buf[1] = len as u8; + self.write_buf(2, digest).await?; + } else if len <= u32::MAX as usize { + self.buf[0] = RDB_32BITLEN; + self.buf[1..5].copy_from_slice(&(len as u32).to_ne_bytes()); + self.write_buf(5, digest).await?; + } else { + self.buf[0] = RDB_64BITLEN; + self.buf[1..9].copy_from_slice(&(len as u64).to_ne_bytes()); + self.write_buf(9, digest).await?; + } + + Ok(()) + } + + async fn write<'a>(&mut self, bytes: &[u8], digest: &mut Digest<'a, u64>) -> AsyncResult<()> { + self.writer.write_all(bytes).await?; + digest.update(bytes); + Ok(()) + } + + async fn write_buf<'a>(&mut self, end: usize, digest: &mut Digest<'a, u64>) -> AsyncResult<()> { + self.writer.write_all(&self.buf[..end]).await?; + self.buf.fill(0); + digest.update(&self.buf[..end]); + Ok(()) + } +} + +impl RDB { + pub async fn dump() -> AsyncResult<()> { + if DUMPING.load(Ordering::Relaxed) { + return Err(REDIS_DUMPING_ERR); + } + + DUMPING.store(true, Ordering::Relaxed); + get_txn_client()? + .exec_in_txn(None, |txn_rc| { + async move { + let rdb_file = RDB::create_rdb(); + let crc64 = Crc::::new(&CRC_64_GO_ISO); + let mut digest = crc64.digest(); + let mut encoder = RDBEncoder::new(rdb_file); + encoder.save_header(&mut digest).await?; + + //TODO call update_service_gc_safepoint with current timestamp (need client-rust support) + let mut readonly_txn = txn_rc.lock().await; + let mut left_bound = KEY_ENCODER.encode_txnkv_string(""); + let mut last_round_iter_count = 1; + while last_round_iter_count > 0 { + let range = left_bound.clone()..KEY_ENCODER.encode_txnkv_keyspace_end(); + let bound_range: BoundRange = range.into(); + //TODO configurable scan limit or stream scan + let iter = readonly_txn.scan(bound_range, 100).await?; + last_round_iter_count = 0; + for kv in iter { + if kv.0 == left_bound { + continue; + } + last_round_iter_count += 1; + left_bound = kv.0.clone(); + + let (user_key, is_meta_key) = + KeyDecoder::decode_key_userkey_from_metakey(&kv.0); + if is_meta_key { + let user_key_str = String::from_utf8(user_key).unwrap(); + encoder + .save_kv_pair( + &user_key_str, + kv.1, + &mut readonly_txn, + &mut digest, + ) + .await?; + } + } + } + //TODO call update_service_gc_safepoint with nonpositive ttl to remove the safepoint + + encoder.save_footer(digest).await?; + DUMPING.store(false, Ordering::Relaxed); + Ok(()) + } + .boxed() + }) + .await + } + + pub fn reset_dumping() { + DUMPING.store(false, Ordering::Relaxed); + } + + fn create_rdb() -> File { + File::from_std( + std::fs::File::options() + .create(true) + .truncate(true) + .write(true) + .open("dump.rdb") + .unwrap(), + ) + } +} diff --git a/src/tikv/errors.rs b/src/tikv/errors.rs index e7c0471..ad9472a 100644 --- a/src/tikv/errors.rs +++ b/src/tikv/errors.rs @@ -2,6 +2,7 @@ use mlua::prelude::LuaError; use std::num::{ParseFloatError, ParseIntError}; use thiserror::Error; use tikv_client::Error as TiKVError; +use toml::ser; #[derive(Error, Debug)] pub enum RTError { @@ -56,6 +57,30 @@ impl From for RTError { } } +impl From for RTError { + fn from(_: std::io::Error) -> Self { + REDIS_VALUE_IS_NOT_VALID_FLOAT_ERR + } +} + +impl From for RTError { + fn from(_: ser::Error) -> Self { + REDIS_VALUE_IS_NOT_VALID_FLOAT_ERR + } +} + +impl PartialEq for RTError { + fn eq(&self, other: &Self) -> bool { + match (self, other) { + (Self::String(s1), Self::String(s2)) => s1 == s2, + (Self::Owned(s1), Self::Owned(s2)) => s1 == s2, + // note that the comparison of RTError::TikvClient ignore the internal value for now + (Self::TikvClient(_), Self::TikvClient(_)) => true, + _ => false, + } + } +} + pub type AsyncResult = std::result::Result; pub const REDIS_WRONG_TYPE_ERR: RTError = @@ -94,3 +119,6 @@ pub const REDIS_EXEC_ERR: RTError = pub const REDIS_INVALID_CLIENT_ID_ERR: RTError = RTError::String("ERR Invalid client ID"); pub const REDIS_NO_SUCH_CLIENT_ERR: RTError = RTError::String("ERR No such client"); + +pub const REDIS_DUMPING_ERR: RTError = + RTError::String("Another dumping process is active, can't SAVE/BGSAVE right now."); From 65b44854d722384e04738a18acda4f4e7911ab5e Mon Sep 17 00:00:00 2001 From: Morgan279 Date: Wed, 28 Sep 2022 08:40:47 +0800 Subject: [PATCH 2/6] Clean up redundant code Signed-off-by: Morgan279 --- src/lib.rs | 1 - src/tikv/errors.rs | 10 ++-------- 2 files changed, 2 insertions(+), 9 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index e9b567e..3efee0a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,3 @@ -#![feature(concat_bytes)] #[macro_use] extern crate lazy_static; #[macro_use] diff --git a/src/tikv/errors.rs b/src/tikv/errors.rs index ad9472a..62f5b8c 100644 --- a/src/tikv/errors.rs +++ b/src/tikv/errors.rs @@ -2,7 +2,6 @@ use mlua::prelude::LuaError; use std::num::{ParseFloatError, ParseIntError}; use thiserror::Error; use tikv_client::Error as TiKVError; -use toml::ser; #[derive(Error, Debug)] pub enum RTError { @@ -59,13 +58,7 @@ impl From for RTError { impl From for RTError { fn from(_: std::io::Error) -> Self { - REDIS_VALUE_IS_NOT_VALID_FLOAT_ERR - } -} - -impl From for RTError { - fn from(_: ser::Error) -> Self { - REDIS_VALUE_IS_NOT_VALID_FLOAT_ERR + RDB_DUMP_IO_ERR } } @@ -122,3 +115,4 @@ pub const REDIS_NO_SUCH_CLIENT_ERR: RTError = RTError::String("ERR No such clien pub const REDIS_DUMPING_ERR: RTError = RTError::String("Another dumping process is active, can't SAVE/BGSAVE right now."); +pub const RDB_DUMP_IO_ERR: RTError = RTError::String("RDB dump failed because of the I/O error"); From b1ce5c301445d87c9546a6fc2fce4a5222e3ac58 Mon Sep 17 00:00:00 2001 From: yongman Date: Wed, 7 Sep 2022 13:55:40 +0800 Subject: [PATCH 3/6] Use stream scan for wide range scan Signed-off-by: yongman --- Cargo.lock | 21 +++++++++++++---- Cargo.toml | 2 +- src/gc.rs | 28 +++++++++++----------- src/tikv/hash.rs | 23 +++++++++++------- src/tikv/list.rs | 59 ++++++++++++++++++++++++++-------------------- src/tikv/set.rs | 17 ++++++++----- src/tikv/string.rs | 5 ++-- src/tikv/zset.rs | 31 ++++++++++++++---------- 8 files changed, 112 insertions(+), 74 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4c4ad0e..18b14ed 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1511,6 +1511,16 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "ordered-stream" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "034ce384018b245e8d8424bbe90577fbd91a533be74107e465e3474eb2285eef" +dependencies = [ + "futures-core", + "pin-project-lite", +] + [[package]] name = "parking" version = "2.0.0" @@ -2446,7 +2456,7 @@ dependencies = [ [[package]] name = "tikv-client" version = "0.1.0" -source = "git+https://github.com/yongman/client-rust.git?branch=dev#2648fe695ca7d67852ec0c108a2de8e261ef6d1b" +source = "git+https://github.com/yongman/client-rust.git?branch=dev-scanner#a773980806e3ca0540f15e063323b706b51b8e8f" dependencies = [ "async-recursion", "async-trait", @@ -2458,6 +2468,7 @@ dependencies = [ "grpcio", "lazy_static", "log", + "ordered-stream", "prometheus 0.12.0", "rand 0.8.5", "regex", @@ -2477,7 +2488,7 @@ dependencies = [ [[package]] name = "tikv-client-common" version = "0.1.0" -source = "git+https://github.com/yongman/client-rust.git?branch=dev#2648fe695ca7d67852ec0c108a2de8e261ef6d1b" +source = "git+https://github.com/yongman/client-rust.git?branch=dev-scanner#a773980806e3ca0540f15e063323b706b51b8e8f" dependencies = [ "futures 0.3.23", "grpcio", @@ -2493,7 +2504,7 @@ dependencies = [ [[package]] name = "tikv-client-pd" version = "0.1.0" -source = "git+https://github.com/yongman/client-rust.git?branch=dev#2648fe695ca7d67852ec0c108a2de8e261ef6d1b" +source = "git+https://github.com/yongman/client-rust.git?branch=dev-scanner#a773980806e3ca0540f15e063323b706b51b8e8f" dependencies = [ "async-trait", "futures 0.3.23", @@ -2506,7 +2517,7 @@ dependencies = [ [[package]] name = "tikv-client-proto" version = "0.1.0" -source = "git+https://github.com/yongman/client-rust.git?branch=dev#2648fe695ca7d67852ec0c108a2de8e261ef6d1b" +source = "git+https://github.com/yongman/client-rust.git?branch=dev-scanner#a773980806e3ca0540f15e063323b706b51b8e8f" dependencies = [ "futures 0.3.23", "grpcio", @@ -2520,7 +2531,7 @@ dependencies = [ [[package]] name = "tikv-client-store" version = "0.1.0" -source = "git+https://github.com/yongman/client-rust.git?branch=dev#2648fe695ca7d67852ec0c108a2de8e261ef6d1b" +source = "git+https://github.com/yongman/client-rust.git?branch=dev-scanner#a773980806e3ca0540f15e063323b706b51b8e8f" dependencies = [ "async-trait", "derive-new", diff --git a/Cargo.toml b/Cargo.toml index af73607..d51ab0c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,7 +26,7 @@ slog-term = { version = "2.4" } tokio = { version = "1", features = ["full"] } tokio-stream = "0.1" tokio-util = { version = "0.7.1", features = ["rt"] } -tikv-client = { git = "https://github.com/yongman/client-rust.git", branch = "dev" } +tikv-client = { git = "https://github.com/yongman/client-rust.git", branch = "dev-scanner" } #tikv-client = { path = "../client-rust" } lazy_static = "1.4.0" thiserror = "1" diff --git a/src/gc.rs b/src/gc.rs index de4b52c..b2427a1 100644 --- a/src/gc.rs +++ b/src/gc.rs @@ -1,4 +1,4 @@ -use futures::FutureExt; +use futures::{FutureExt, StreamExt}; use slog::{debug, error, info}; use std::collections::HashSet; use std::convert::TryInto; @@ -107,7 +107,7 @@ impl GcMaster { let bound_range = KEY_ENCODER.encode_txnkv_gc_version_key_range(); // TODO scan speed throttling - let iter_res = txn.scan(bound_range, u32::MAX).await; + let iter_res = txn.scan_stream(bound_range, u32::MAX).await; if iter_res.is_err() { error!( LOGGER, @@ -119,8 +119,8 @@ impl GcMaster { continue; } - let iter = iter_res.unwrap(); - for kv in iter { + let mut iter = iter_res.unwrap(); + while let Some(kv) = iter.next().await { let (user_key, version) = KeyDecoder::decode_key_gc_userkey_version(kv.0); let (slot_range_left, slot_range_right) = self.topo.myself_owned_slots(); @@ -248,8 +248,8 @@ impl GcWorker { // delete all data key of this key and version let bound_range = KEY_ENCODER.encode_txnkv_hash_data_key_range(&user_key, version); - let iter = txn.scan_keys(bound_range, u32::MAX).await?; - for k in iter { + let mut iter = txn.scan_keys_stream(bound_range, u32::MAX).await?; + while let Some(k) = iter.next().await { txn.delete(k).await?; } } @@ -261,8 +261,8 @@ impl GcWorker { // delete all data key of this key and version let bound_range = KEY_ENCODER.encode_txnkv_list_data_key_range(&user_key, version); - let iter = txn.scan_keys(bound_range, u32::MAX).await?; - for k in iter { + let mut iter = txn.scan_keys_stream(bound_range, u32::MAX).await?; + while let Some(k) = iter.next().await { txn.delete(k).await?; } } @@ -281,8 +281,8 @@ impl GcWorker { // delete all data key of this key and version let bound_range = KEY_ENCODER.encode_txnkv_set_data_key_range(&user_key, version); - let iter = txn.scan_keys(bound_range, u32::MAX).await?; - for k in iter { + let mut iter = txn.scan_keys_stream(bound_range, u32::MAX).await?; + while let Some(k) = iter.next().await { txn.delete(k).await?; } } @@ -304,16 +304,16 @@ impl GcWorker { &String::from_utf8_lossy(&task.user_key), task.version, ); - let iter = txn.scan_keys(bound_range, u32::MAX).await?; - for k in iter { + let mut iter = txn.scan_keys_stream(bound_range, u32::MAX).await?; + while let Some(k) = iter.next().await { txn.delete(k).await?; } // delete all data key of this key and version let bound_range = KEY_ENCODER.encode_txnkv_zset_data_key_range(&user_key, version); - let iter = txn.scan_keys(bound_range, u32::MAX).await?; - for k in iter { + let mut iter = txn.scan_keys_stream(bound_range, u32::MAX).await?; + while let Some(k) = iter.next().await { txn.delete(k).await?; } } diff --git a/src/tikv/hash.rs b/src/tikv/hash.rs index 36b7368..b60826e 100644 --- a/src/tikv/hash.rs +++ b/src/tikv/hash.rs @@ -13,7 +13,7 @@ use crate::{ Frame, }; -use futures::future::FutureExt; +use futures::{future::FutureExt, stream, StreamExt}; use slog::debug; use std::{collections::HashMap, convert::TryInto, ops::Range, sync::Arc}; use tikv_client::{BoundRange, Key, KvPair, Transaction, Value}; @@ -540,7 +540,7 @@ impl<'a> HashCommandCtx { ..KEY_ENCODER.encode_txnkv_hash_data_key_end(&key, version); let bound_range: BoundRange = range.into(); // scan return iterator - let iter = txn.scan(bound_range, u32::MAX).await?; + let iter = txn.scan_stream(bound_range, u32::MAX).await?; let resp: Vec; if with_field && with_value { @@ -550,9 +550,10 @@ impl<'a> HashCommandCtx { KeyDecoder::decode_key_hash_userkey_from_datakey( &key, kv.0, ); - [resp_bulk(field), resp_bulk(kv.1)] + stream::iter([resp_bulk(field), resp_bulk(kv.1)]) }) - .collect(); + .collect() + .await; } else if with_field { resp = iter .flat_map(|kv| { @@ -560,11 +561,15 @@ impl<'a> HashCommandCtx { KeyDecoder::decode_key_hash_userkey_from_datakey( &key, kv.0, ); - [resp_bulk(field)] + stream::iter([resp_bulk(field)]) }) - .collect(); + .collect() + .await; } else { - resp = iter.flat_map(|kv| [resp_bulk(kv.1)]).collect(); + resp = iter + .flat_map(|kv| stream::iter([resp_bulk(kv.1)])) + .collect() + .await; } Ok(resp_array(resp)) @@ -891,9 +896,9 @@ impl<'a> HashCommandCtx { let bound_range = KEY_ENCODER.encode_txnkv_hash_data_key_range(&key, version); // scan return iterator - let iter = txn.scan_keys(bound_range, u32::MAX).await?; + let mut iter = txn.scan_keys_stream(bound_range, u32::MAX).await?; - for k in iter { + while let Some(k) = iter.next().await { txn.delete(k).await?; } diff --git a/src/tikv/list.rs b/src/tikv/list.rs index f6cc306..0836ac2 100644 --- a/src/tikv/list.rs +++ b/src/tikv/list.rs @@ -14,7 +14,9 @@ use crate::utils::{resp_array, resp_bulk, resp_err, resp_int, resp_nil, resp_ok} use crate::{utils::key_is_expired, Frame}; use bytes::Bytes; use core::ops::RangeFrom; +use futures::future; use futures::future::FutureExt; +use futures::StreamExt; use std::convert::TryInto; use std::sync::Arc; use tikv_client::{BoundRange, Key, Transaction}; @@ -655,15 +657,16 @@ impl<'a> ListCommandCtx { KEY_ENCODER.encode_txnkv_list_data_key_range(&key, version); // iter will only return the matched kvpair - let mut iter = txn.scan(bound_range, u32::MAX).await?.filter(|kv| { - if kv.1 == pivot.to_vec() { - return true; - } - false - }); + let mut iter = + txn.scan_stream(bound_range, u32::MAX).await?.filter(|kv| { + if kv.1 == pivot.to_vec() { + return future::ready(true); + } + future::ready(false) + }); // yeild the first matched kvpair - if let Some(kv) = iter.next() { + if let Some(kv) = iter.next().await { // decode the idx from data key let idx = KeyDecoder::decode_key_list_idx_from_datakey(&key, kv.0); @@ -680,9 +683,10 @@ impl<'a> ListCommandCtx { .encode_txnkv_list_data_key_idx_range( &key, left, idx_op, version, ); - let iter = txn.scan(left_range, u32::MAX).await?; + let mut iter = + txn.scan_stream(left_range, u32::MAX).await?; - for kv in iter { + while let Some(kv) = iter.next().await { let key_idx = KeyDecoder::decode_key_list_idx_from_datakey( &key, kv.0, @@ -711,9 +715,10 @@ impl<'a> ListCommandCtx { right - 1, version, ); - let iter = txn.scan(right_range, u32::MAX).await?; + let mut iter = + txn.scan_stream(right_range, u32::MAX).await?; - for kv in iter { + while let Some(kv) = iter.next().await { let key_idx = KeyDecoder::decode_key_list_idx_from_datakey( &key, kv.0, @@ -813,18 +818,21 @@ impl<'a> ListCommandCtx { KEY_ENCODER.encode_txnkv_list_data_key_range(&key, version); // iter will only return the matched kvpair - let iter = - txn.scan(bound_range.clone(), u32::MAX).await?.filter(|kv| { + let iter = txn + .scan_stream(bound_range.clone(), u32::MAX) + .await? + .filter(|kv| { if kv.1 == ele.to_vec() { - return true; + return future::ready(true); } - false + future::ready(false) }); // hole saves the elements to be removed in order let mut hole: Vec = iter .map(|kv| KeyDecoder::decode_key_list_idx_from_datakey(&key, kv.0)) - .collect(); + .collect() + .await; // no matched element, return 0 if hole.is_empty() { @@ -838,9 +846,9 @@ impl<'a> ListCommandCtx { let mut removed_count = 0; if from_head { - let iter = txn.scan(bound_range, u32::MAX).await?; + let mut iter = txn.scan_stream(bound_range, u32::MAX).await?; - for kv in iter { + while let Some(kv) = iter.next().await { let key_idx = KeyDecoder::decode_key_list_idx_from_datakey( &key, kv.0.clone(), @@ -883,9 +891,10 @@ impl<'a> ListCommandCtx { txn.put(meta_key, new_meta_value).await?; } } else { - let iter = txn.scan_reverse(bound_range, u32::MAX).await?; + let mut iter = + txn.scan_reverse_stream(bound_range, u32::MAX).await?; - for kv in iter { + while let Some(kv) = iter.next().await { let key_idx = KeyDecoder::decode_key_list_idx_from_datakey( &key, kv.0.clone(), @@ -982,9 +991,9 @@ impl<'a> ListCommandCtx { } else { let bound_range = KEY_ENCODER.encode_txnkv_list_data_key_range(&key, version); - let iter = txn.scan_keys(bound_range, u32::MAX).await?; + let mut iter = txn.scan_keys_stream(bound_range, u32::MAX).await?; - for k in iter { + while let Some(k) = iter.next().await { txn.delete(k).await?; } txn.delete(meta_key).await?; @@ -1038,10 +1047,10 @@ impl<'a> ListCommandCtx { } else { let bound_range = KEY_ENCODER.encode_txnkv_list_data_key_range(&key, version); - let iter = txn.scan_keys(bound_range, u32::MAX).await?; + let mut iter = txn.scan_keys_stream(bound_range, u32::MAX).await?; - for k in iter { - txn.delete(k).await?; + while let Some(v) = iter.next().await { + txn.delete(v).await?; } txn.delete(meta_key).await?; } diff --git a/src/tikv/set.rs b/src/tikv/set.rs index 182c4f8..e158ba2 100644 --- a/src/tikv/set.rs +++ b/src/tikv/set.rs @@ -13,6 +13,7 @@ use crate::utils::count_unique_keys; use crate::utils::{key_is_expired, resp_array, resp_bulk, resp_err, resp_int, resp_nil}; use crate::Frame; use ::futures::future::FutureExt; +use futures::StreamExt; use rand::prelude::SliceRandom; use std::collections::HashMap; use std::convert::TryInto; @@ -469,7 +470,7 @@ impl SetCommandCtx { let bound_range = KEY_ENCODER.encode_txnkv_set_data_key_range(&key, version); - let iter = txn.scan_keys(bound_range, u32::MAX).await?; + let iter = txn.scan_keys_stream(bound_range, u32::MAX).await?; let resp = iter .map(|k| { @@ -478,7 +479,8 @@ impl SetCommandCtx { KeyDecoder::decode_key_set_member_from_datakey(&key, k); resp_bulk(user_key) }) - .collect(); + .collect() + .await; Ok(resp_array(resp)) } @@ -738,8 +740,9 @@ impl SetCommandCtx { let data_bound_range = KEY_ENCODER.encode_txnkv_set_data_key_range(&key, version); - let iter = txn.scan_keys(data_bound_range, u32::MAX).await?; - for k in iter { + let mut iter = + txn.scan_keys_stream(data_bound_range, u32::MAX).await?; + while let Some(k) = iter.next().await { txn.delete(k).await?; } @@ -803,8 +806,10 @@ impl SetCommandCtx { let data_bound_range = KEY_ENCODER.encode_txnkv_set_data_key_range(&key, version); - let iter = txn.scan_keys(data_bound_range, u32::MAX).await?; - for k in iter { + let mut iter = + txn.scan_keys_stream(data_bound_range, u32::MAX).await?; + + while let Some(k) = iter.next().await { txn.delete(k).await?; } diff --git a/src/tikv/string.rs b/src/tikv/string.rs index dc7660f..619b563 100644 --- a/src/tikv/string.rs +++ b/src/tikv/string.rs @@ -9,6 +9,7 @@ use crate::{ Frame, }; use ::futures::future::FutureExt; +use futures::StreamExt; use regex::bytes::Regex; use std::collections::HashMap; use std::str; @@ -875,11 +876,11 @@ impl StringCommandCtx { let bound_range: BoundRange = range.into(); // the iterator will scan all keyspace include sub metakey and datakey - let iter = txn.scan(bound_range, 100).await?; + let mut iter = txn.scan_stream(bound_range, 256).await?; // reset count to zero last_round_iter_count = 0; - for kv in iter { + while let Some(kv) = iter.next().await { // skip the left bound key, this should be exclusive if kv.0 == left_bound { continue; diff --git a/src/tikv/zset.rs b/src/tikv/zset.rs index 03c746e..87b1eb3 100644 --- a/src/tikv/zset.rs +++ b/src/tikv/zset.rs @@ -12,6 +12,7 @@ use crate::async_expire_zset_threshold_or_default; use crate::utils::{key_is_expired, resp_array, resp_bulk, resp_err, resp_int, resp_nil}; use crate::Frame; use ::futures::future::FutureExt; +use futures::StreamExt; use std::collections::HashMap; use std::convert::TryInto; use std::sync::Arc; @@ -467,9 +468,9 @@ impl ZsetCommandCtx { ); let range = start_key..=end_key; let bound_range: BoundRange = range.into(); - let iter = txn.scan(bound_range, u32::MAX).await?; + let iter = txn.scan_stream(bound_range, u32::MAX).await?; - Ok(resp_int(iter.count() as i64)) + Ok(resp_int(iter.count().await as i64)) } None => Ok(resp_int(0)), } @@ -535,10 +536,12 @@ impl ZsetCommandCtx { let bound_range = KEY_ENCODER.encode_txnkv_zset_score_key_range(&key, version); txn = txn_rc.lock().await; - let iter = txn.scan(bound_range, size.try_into().unwrap()).await?; + let mut iter = txn + .scan_stream(bound_range, size.try_into().unwrap()) + .await?; let mut idx = 0; - for kv in iter { + while let Some(kv) = iter.next().await { if idx < min { idx += 1; continue; @@ -646,9 +649,11 @@ impl ZsetCommandCtx { let range = start_key..end_key; let bound_range: BoundRange = range.into(); txn = txn_rc.lock().await; - let iter = txn.scan(bound_range, size.try_into().unwrap()).await?; + let mut iter = txn + .scan_stream(bound_range, size.try_into().unwrap()) + .await?; - for kv in iter { + while let Some(kv) = iter.next().await { let member = kv.1; if reverse { resp.insert(0, resp_bulk(member)); @@ -1176,10 +1181,12 @@ impl ZsetCommandCtx { let bound_range = KEY_ENCODER.encode_txnkv_zset_score_key_range(&key, version); txn = txn_rc.lock().await; - let iter = txn.scan(bound_range, size.try_into().unwrap()).await?; + let mut iter = txn + .scan_stream(bound_range, size.try_into().unwrap()) + .await?; let mut idx = 0; - for kv in iter { + while let Some(kv) = iter.next().await { if idx < min { idx += 1; continue; @@ -1392,8 +1399,8 @@ impl ZsetCommandCtx { } else { let bound_range = KEY_ENCODER.encode_txnkv_zset_data_key_range(&key, version); - let iter = txn.scan(bound_range, u32::MAX).await?; - for kv in iter { + let mut iter = txn.scan_stream(bound_range, u32::MAX).await?; + while let Some(kv) = iter.next().await { // kv.0 is member key // kv.1 is score // decode the score vec to i64 @@ -1476,8 +1483,8 @@ impl ZsetCommandCtx { } else { let bound_range = KEY_ENCODER.encode_txnkv_zset_data_key_range(&key, version); - let iter = txn.scan(bound_range, u32::MAX).await?; - for kv in iter { + let mut iter = txn.scan_stream(bound_range, u32::MAX).await?; + while let Some(kv) = iter.next().await { // kv.0 is member key // kv.1 is score // decode the score vec to i64 From 3c1a047099f549bb38a7df27aa67bbaa0c3ab0b0 Mon Sep 17 00:00:00 2001 From: Morgan279 Date: Wed, 28 Sep 2022 20:31:09 +0800 Subject: [PATCH 4/6] Change to stream scan and fix some defects Signed-off-by: Morgan279 --- src/rdb.rs | 49 ++++++++++++++++++++++++++++++++----------------- 1 file changed, 32 insertions(+), 17 deletions(-) diff --git a/src/rdb.rs b/src/rdb.rs index e69925a..04d59f8 100644 --- a/src/rdb.rs +++ b/src/rdb.rs @@ -4,8 +4,9 @@ use crate::tikv::errors::{ AsyncResult, REDIS_DUMPING_ERR, REDIS_LIST_TOO_LARGE_ERR, REDIS_VALUE_IS_NOT_INTEGER_ERR, }; use crate::tikv::{get_txn_client, KEY_ENCODER}; -use crc::{Crc, Digest, CRC_64_GO_ISO}; -use futures::FutureExt; +use ::futures::future::FutureExt; +use crc::{Algorithm, Crc, Digest}; +use futures::StreamExt; use slog::debug; use std::collections::HashMap; use std::convert::TryInto; @@ -53,6 +54,16 @@ const RDB_TYPE_HASH: u8 = 4; const RDB_TYPE_LIST_ZIPLIST: u8 = 10; const RDB_TYPE_ZSET_ZIPLIST: u8 = 12; +const REDIS_ALG: Algorithm = Algorithm { + poly: 0xad93d23594c935a9, + init: 0x0000000000000000, + refin: true, + refout: true, + xorout: 0x0000000000000000, + check: 0xe9c6d914c4b8d9ca, + residue: 0x0000000000000000, +}; + static DUMPING: AtomicBool = AtomicBool::new(false); pub struct RDB; @@ -75,7 +86,7 @@ impl RDBEncoder { .await?; self.save_aux_field("redis-ver", REDIS_VERSION, digest) .await?; - self.save_aux_field("redis-bits", &(usize::BITS * 8).to_string(), digest) + self.save_aux_field("redis-bits", &usize::BITS.to_string(), digest) .await?; self.save_aux_field( "ctime", @@ -171,7 +182,7 @@ impl RDBEncoder { let bound_range: BoundRange = (KEY_ENCODER.encode_txnkv_hash_data_key_start(&key, version) ..KEY_ENCODER.encode_txnkv_hash_data_key_end(&key, version)) .into(); - let iter = readonly_txn.scan(bound_range, u32::MAX).await?; + let iter = readonly_txn.scan_stream(bound_range, u32::MAX).await?; let hash_map: HashMap = iter .map(|kv| { let field = @@ -180,7 +191,8 @@ impl RDBEncoder { let field_val = String::from_utf8_lossy(kv.1.as_slice()).to_string(); (field, field_val) }) - .collect(); + .collect() + .await; let hash_map_vec: Vec = hash_map .iter() .map(|(field, value)| field.to_string() + ":" + value) @@ -218,11 +230,12 @@ impl RDBEncoder { let range: RangeFrom = data_key_start..; let from_range: BoundRange = range.into(); let iter = readonly_txn - .scan(from_range, (right - left).try_into().unwrap()) + .scan_stream(from_range, (right - left).try_into().unwrap()) .await?; let data_vals: Vec = iter .map(|kv| String::from_utf8_lossy(kv.1.as_slice()).to_string()) - .collect(); + .collect() + .await; debug!( LOGGER, "[saving list obj] key: {}, members: [{}]", @@ -248,12 +261,13 @@ impl RDBEncoder { let key = user_key.to_owned(); let (_, version, _) = KeyDecoder::decode_key_meta(&val); let bound_range = KEY_ENCODER.encode_txnkv_set_data_key_range(&key, version); - let iter = readonly_txn.scan_keys(bound_range, u32::MAX).await?; + let iter = readonly_txn.scan_keys_stream(bound_range, u32::MAX).await?; let members: Vec = iter .map(|k| { String::from_utf8(KeyDecoder::decode_key_set_member_from_datakey(&key, k)).unwrap() }) - .collect(); + .collect() + .await; debug!( LOGGER, "[saving set obj] key: {}, members: [{}]", @@ -290,7 +304,7 @@ impl RDBEncoder { .sum(); let bound_range = KEY_ENCODER.encode_txnkv_zset_score_key_range(&key, version); let iter = readonly_txn - .scan(bound_range, size.try_into().unwrap()) + .scan_stream(bound_range, size.try_into().unwrap()) .await?; let member_score_map: HashMap = iter .map(|kv| { @@ -298,14 +312,15 @@ impl RDBEncoder { let score = KeyDecoder::decode_key_zset_score_from_scorekey(&key, kv.0); (member, score) }) - .collect(); + .collect() + .await; let member_score_vec: Vec = member_score_map .iter() .map(|(member, score)| member.to_string() + ":" + score.to_string().as_str()) .collect(); debug!( LOGGER, - "[save zset obj] key: {}, members: [{}]", + "[saving zset obj] key: {}, members: [{}]", user_key, member_score_vec.join(", ") ); @@ -495,8 +510,8 @@ impl RDBEncoder { async fn write_buf<'a>(&mut self, end: usize, digest: &mut Digest<'a, u64>) -> AsyncResult<()> { self.writer.write_all(&self.buf[..end]).await?; - self.buf.fill(0); digest.update(&self.buf[..end]); + self.buf.fill(0); Ok(()) } } @@ -512,7 +527,7 @@ impl RDB { .exec_in_txn(None, |txn_rc| { async move { let rdb_file = RDB::create_rdb(); - let crc64 = Crc::::new(&CRC_64_GO_ISO); + let crc64 = Crc::::new(&REDIS_ALG); let mut digest = crc64.digest(); let mut encoder = RDBEncoder::new(rdb_file); encoder.save_header(&mut digest).await?; @@ -524,10 +539,10 @@ impl RDB { while last_round_iter_count > 0 { let range = left_bound.clone()..KEY_ENCODER.encode_txnkv_keyspace_end(); let bound_range: BoundRange = range.into(); - //TODO configurable scan limit or stream scan - let iter = readonly_txn.scan(bound_range, 100).await?; + //TODO configurable scan limit + let mut iter = readonly_txn.scan_stream(bound_range, 256).await?; last_round_iter_count = 0; - for kv in iter { + while let Some(kv) = iter.next().await { if kv.0 == left_bound { continue; } From 39a5c06e76199e49e890c1479af1da1ef36c132a Mon Sep 17 00:00:00 2001 From: Morgan279 Date: Wed, 28 Sep 2022 20:41:57 +0800 Subject: [PATCH 5/6] Add configurable rdb file name Signed-off-by: Morgan279 --- src/config.rs | 13 +++++++++++++ src/rdb.rs | 4 ++-- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/src/config.rs b/src/config.rs index ed01e46..8f3af84 100644 --- a/src/config.rs +++ b/src/config.rs @@ -51,6 +51,7 @@ struct Server { cluster_topology_interval: Option, cluster_topology_expire: Option, meta_key_number: Option, + rdb_file_name: Option, } #[derive(Debug, Deserialize, Clone)] @@ -347,6 +348,18 @@ pub fn config_meta_key_number_or_default() -> u16 { 100 } +pub fn config_rdb_file_name_or_default() -> String { + unsafe { + if let Some(c) = &SERVER_CONFIG { + if let Some(f) = c.server.rdb_file_name.clone() { + return f; + } + } + } + + "dump.rdb".to_owned() +} + fn log_level_str() -> String { unsafe { if let Some(c) = &SERVER_CONFIG { diff --git a/src/rdb.rs b/src/rdb.rs index 04d59f8..c6c35d8 100644 --- a/src/rdb.rs +++ b/src/rdb.rs @@ -1,4 +1,4 @@ -use crate::config::LOGGER; +use crate::config::{config_rdb_file_name_or_default, LOGGER}; use crate::tikv::encoding::{DataType, KeyDecoder}; use crate::tikv::errors::{ AsyncResult, REDIS_DUMPING_ERR, REDIS_LIST_TOO_LARGE_ERR, REDIS_VALUE_IS_NOT_INTEGER_ERR, @@ -585,7 +585,7 @@ impl RDB { .create(true) .truncate(true) .write(true) - .open("dump.rdb") + .open(config_rdb_file_name_or_default()) .unwrap(), ) } From 9923d92009a0647aed9d6980bbed965c74fe0d00 Mon Sep 17 00:00:00 2001 From: Morgan279 Date: Thu, 29 Sep 2022 13:43:45 +0800 Subject: [PATCH 6/6] Add some TODOs Signed-off-by: Morgan279 --- src/cmd/fake.rs | 1 + src/rdb.rs | 1 + 2 files changed, 2 insertions(+) diff --git a/src/cmd/fake.rs b/src/cmd/fake.rs index 620a186..d8c646a 100644 --- a/src/cmd/fake.rs +++ b/src/cmd/fake.rs @@ -233,6 +233,7 @@ impl Fake { } } "SAVE" => match RDB::dump().await { + // TODO make it possible to disable this command from configuration file Ok(()) => resp_ok(), Err(e) => { if e != REDIS_DUMPING_ERR { diff --git a/src/rdb.rs b/src/rdb.rs index c6c35d8..2332836 100644 --- a/src/rdb.rs +++ b/src/rdb.rs @@ -353,6 +353,7 @@ impl RDBEncoder { digest: &mut Digest<'a, u64>, ) -> AsyncResult<()> { if values.len() > u16::MAX as usize { + // TODO add quicklist and zset2 encodings to support big key dump return Err(REDIS_LIST_TOO_LARGE_ERR); }