Skip to content
4 changes: 4 additions & 0 deletions src/conf/kiwi.conf
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,10 @@ memory 10M
# Log directory for Kiwi logs
# log-dir /data/kiwi_rs/logs

# Directory where RocksDB data files are stored.
# When db-instance-num > 1, each instance creates a subdirectory (e.g., ./db/0, ./db/1).
# db-dir ./db
Comment thread
coderabbitai[bot] marked this conversation as resolved.

# Enable Redis compatibility mode
# redis-compatible-mode yes

Expand Down
13 changes: 13 additions & 0 deletions src/conf/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ pub struct Config {
pub binding: String,
pub timeout: u32,
pub log_dir: String,
pub db_dir: String,
Comment thread
coderabbitai[bot] marked this conversation as resolved.
pub redis_compatible_mode: bool,
pub db_instance_num: usize,

Expand All @@ -242,6 +243,7 @@ impl Default for Config {
timeout: 50,
memory: 1024 * 1024 * 1024, // 1GB
log_dir: "/data/kiwi_rs/logs".to_string(),
db_dir: "./db".to_string(),
redis_compatible_mode: false,

rocksdb_max_subcompactions: 0,
Expand Down Expand Up @@ -347,6 +349,17 @@ impl Config {
"log-dir" => {
config.log_dir = value;
}
"db-dir" => {
let trimmed = value.trim();
if trimmed.is_empty() {
return Err(Error::InvalidConfig {
source: serde_ini::de::Error::Custom(
"db-dir must not be empty".to_string(),
),
});
}
config.db_dir = trimmed.to_string();
}
"redis-compatible-mode" => {
config.redis_compatible_mode =
parse_bool_from_string(&value).map_err(|e| Error::InvalidConfig {
Expand Down
26 changes: 26 additions & 0 deletions src/conf/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ mod tests {

assert_eq!(50, config.timeout);
assert_eq!("/data/kiwi_rs/logs", config.log_dir);
assert_eq!("./db", config.db_dir);
assert!(!config.redis_compatible_mode);
assert_eq!(3, config.db_instance_num);

Expand All @@ -78,6 +79,7 @@ mod tests {
timeout: 100,
redis_compatible_mode: false,
log_dir: "".to_string(),
db_dir: "./db".to_string(),
memory: 1024,
rocksdb_max_subcompactions: 0,
rocksdb_max_background_jobs: 4,
Expand All @@ -104,4 +106,28 @@ mod tests {
invalid_config.port = 8080;
assert!(invalid_config.validate().is_ok());
}

#[test]
fn test_db_dir_default() {
let config = Config::default();
assert_eq!("./db", config.db_dir);
}

#[test]
fn test_db_dir_from_config_file() {
use std::io::Write;

let filename = format!("kiwi_test_db_dir_{}.conf", std::process::id());
let tmp = std::env::temp_dir().join(filename);
let config_path = tmp.to_str().unwrap();
let mut f = std::fs::File::create(config_path).unwrap();
writeln!(f, "port 7379").unwrap();
writeln!(f, "db-dir /data/kiwi/db").unwrap();
drop(f);

let config = Config::load(config_path).unwrap();
assert_eq!("/data/kiwi/db", config.db_dir);

let _ = std::fs::remove_file(config_path);
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}
}
14 changes: 10 additions & 4 deletions src/net/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,9 @@ impl ServerFactory {
}
}
#[cfg(unix)]
"unix" => Some(Box::new(unix::UnixServer::new(addr))),
"unix" => unix::UnixServer::new(addr, None)
.ok()
.map(|s| Box::new(s) as Box<dyn ServerTrait>),
#[cfg(not(unix))]
Comment on lines 68 to 72
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. The create_server_with_mode path uses the dual-runtime NetworkServer for TCP (storage is initialized separately in initialize_storage_server). The Unix fallback here is a legacy path — I've left it as None (uses default ./db) since the dual-runtime architecture doesn't use this code path. Added a comment to document this.

"unix" => None,
_ => None,
Expand All @@ -110,13 +112,16 @@ impl ServerFactory {
pub fn create_legacy_server(
protocol: &str,
addr: Option<String>,
db_dir: Option<&str>,
) -> Option<Box<dyn ServerTrait>> {
match protocol.to_lowercase().as_str() {
"tcp" => TcpServer::new(addr)
"tcp" => TcpServer::new(addr, db_dir)
.ok()
.map(|s| Box::new(s) as Box<dyn ServerTrait>),
#[cfg(unix)]
"unix" => Some(Box::new(unix::UnixServer::new(addr))),
"unix" => unix::UnixServer::new(addr, db_dir)
.ok()
.map(|s| Box::new(s) as Box<dyn ServerTrait>),
#[cfg(not(unix))]
"unix" => None,
_ => None,
Expand Down Expand Up @@ -172,9 +177,10 @@ impl ServerFactory {
protocol: &str,
addr: Option<String>,
raft_node: Arc<dyn Send + Sync>,
db_dir: Option<&str>,
) -> Option<Box<dyn ServerTrait>> {
match protocol.to_lowercase().as_str() {
"tcp" => ClusterTcpServer::new(addr, raft_node)
"tcp" => ClusterTcpServer::new(addr, raft_node, db_dir)
.ok()
.map(|s| Box::new(s) as Box<dyn ServerTrait>),
_ => None,
Expand Down
9 changes: 4 additions & 5 deletions src/net/src/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,9 @@ pub struct TcpServer {
}

impl TcpServer {
pub fn new(addr: Option<String>) -> Result<Self, Box<dyn Error>> {
// TODO: Get storage options from config
pub fn new(addr: Option<String>, db_dir: Option<&str>) -> Result<Self, Box<dyn Error>> {
let storage_options = Arc::new(StorageOptions::default());
let db_path = PathBuf::from("./db");
let db_path = PathBuf::from(db_dir.unwrap_or("./db"));
let mut storage = Storage::new(1, 0);
let executor = Arc::new(CmdExecutorBuilder::new().build());

Expand Down Expand Up @@ -165,10 +164,10 @@ impl ClusterTcpServer {
pub fn new(
addr: Option<String>,
raft_node: Arc<dyn Send + Sync>,
db_dir: Option<&str>,
) -> Result<Self, Box<dyn Error>> {
// TODO: Get storage options from config
let storage_options = Arc::new(StorageOptions::default());
let db_path = PathBuf::from("./db");
let db_path = PathBuf::from(db_dir.unwrap_or("./db"));
let mut storage = Storage::new(1, 0);
let executor = Arc::new(CmdExecutorBuilder::new().build());

Expand Down
15 changes: 10 additions & 5 deletions src/net/src/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,28 @@
storage: Arc<Storage>,
cmd_table: Arc<CmdTable>,
executor: Arc<CmdExecutor>,
}

Check warning on line 33 in src/net/src/unix.rs

View workflow job for this annotation

GitHub Actions / cargo fmt (ubuntu-latest)

Diff in /home/runner/work/kiwi/kiwi/src/net/src/unix.rs

impl UnixServer {
pub fn new(path: Option<String>) -> Self {
pub fn new(
path: Option<String>,
db_dir: Option<&str>,
) -> Result<Self, Box<dyn Error>> {
let path = path.unwrap_or_else(|| "/tmp/kiwidb.sock".to_string());
let storage_options = Arc::new(StorageOptions::default());
let db_path = PathBuf::from("./db");
let db_path = PathBuf::from(db_dir.unwrap_or("./db"));
let mut storage = Storage::new(1, 0);
storage.open(storage_options, db_path).unwrap();
storage
.open(storage_options, db_path)
.map_err(|e| Box::new(e) as Box<dyn Error>)?;
let executor = Arc::new(CmdExecutorBuilder::new().build());
Comment on lines 38 to 44
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in the same change — new() returns Result and all callers in ServerFactory use .ok().


Self {
Ok(Self {
path,
storage: Arc::new(storage),
cmd_table: Arc::new(create_command_table()),
executor,
}
})
}
}

Expand Down
7 changes: 4 additions & 3 deletions src/server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,10 @@ fn main() -> std::io::Result<()> {
info!("Storage components initialized, starting storage server...");

// Initialize storage server in storage runtime (runs in background)

let db_dir = config.db_dir.clone();
storage_handle.spawn(async move {
info!("Initializing storage server...");
match initialize_storage_server(storage_receiver).await {
match initialize_storage_server(storage_receiver, &db_dir).await {
Ok(_) => {
error!("Storage server exited unexpectedly - this should never happen!");
}
Expand Down Expand Up @@ -260,12 +260,13 @@ fn main() -> std::io::Result<()> {
/// Initialize the storage server in the storage runtime
async fn initialize_storage_server(
request_receiver: tokio::sync::mpsc::Receiver<runtime::StorageRequest>,
db_dir: &str,
) -> Result<(), DualRuntimeError> {
info!("Initializing storage server...");

// Create storage options and path
let storage_options = Arc::new(StorageOptions::default());
let db_path = PathBuf::from("./db");
let db_path = PathBuf::from(db_dir);

// Create storage instance (not yet opened)
let mut storage = Storage::new(1, 0); // Single instance, db_id 0
Expand Down
Loading