diff --git a/Cargo.toml b/Cargo.toml index aecd4eb3..b2703ee7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -71,8 +71,8 @@ mime_guess = { version = "2" } rusqlite = { version = "0.25", features = ["bundled"] } rsql_builder = "0.1.5" inner-mem-cache = "0.1.7" -rnacos-web-dist-wrap = "=0.5.1" -nacos_rust_client = "0.2" +rnacos-web-dist-wrap = "=0.5.2" +nacos_rust_client = "0.3.2" zip = "0.6" tempfile = "3" diff --git a/src/cli.rs b/src/cli.rs index fd0177aa..af9a5f25 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -30,4 +30,17 @@ pub enum Commands { /// out to transfer middle data file out: String, }, + #[command(arg_required_else_help = true)] + OpenapiToData { + /// nacos auth username,default is empty + #[arg(short, long, default_value = "")] + username: String, + /// nacos auth password,default is empty + #[arg(short, long, default_value = "")] + password: String, + /// nacos host ip:port; example: 127.0.0.1:8848 + host: String, + /// out to transfer middle data file + out: String, + }, } diff --git a/src/main.rs b/src/main.rs index 5f640fbf..1c8e2643 100644 --- a/src/main.rs +++ b/src/main.rs @@ -41,6 +41,7 @@ use rnacos::common::appdata::AppShareData; use rnacos::openapi::middle::auth_middle::ApiCheckAuth; use rnacos::raft::NacosRaft; use rnacos::transfer::data_to_sqlite::data_to_sqlite; +use rnacos::transfer::openapi_to_data::openapi_to_data; use rnacos::transfer::sqlite_to_data::sqlite_to_data; use rnacos::web_config::{app_config, console_config}; //#[global_allocator] @@ -63,8 +64,9 @@ async fn main() -> Result<(), Box> { if let Some(cmd) = cli_opt.command { return run_subcommand(cmd).await; } - log::info!("version:{}, RUST_LOG:{}", get_app_version(), &rust_log); - log::info!("data dir:{}", sys_config.local_db_dir); + // 这里不使用log:info避免日志等级高于info时不打印 + println!("version:{}, RUST_LOG:{}", get_app_version(), &rust_log); + println!("data dir:{}", sys_config.local_db_dir); let factory_data = config_factory(sys_config.clone()).await?; let app_data = build_share_data(factory_data.clone())?; let http_addr = sys_config.get_http_addr(); @@ -125,7 +127,8 @@ async fn main() -> Result<(), Box> { if let Some(num) = sys_config.http_workers { server = server.workers(num); } - log::info!("rnacos started"); + // 这里不使用log:info避免日志等级高于info时不打印 + println!("rnacos started"); server.bind(http_addr)?.run().await?; Ok(()) } @@ -149,6 +152,15 @@ async fn run_subcommand(commands: Commands) -> Result<(), Box> { log::info!("sqlite to middle data, from:{file} to:{out}"); sqlite_to_data(&file, &out).await?; } + Commands::OpenapiToData { + host, + username, + password, + out, + } => { + log::info!("openapi to middle data, from:{host} to:{out}"); + openapi_to_data(&host, &username, &password, &out).await?; + } } Ok(()) } diff --git a/src/transfer/mod.rs b/src/transfer/mod.rs index f9aab756..87336e55 100644 --- a/src/transfer/mod.rs +++ b/src/transfer/mod.rs @@ -1,6 +1,36 @@ +use crate::common::actor_utils::create_actor_at_thread; +use crate::common::constant::{ + CACHE_TREE_NAME, CONFIG_TREE_NAME, NAMESPACE_TREE_NAME, SEQUENCE_TREE_NAME, USER_TREE_NAME, +}; +use crate::transfer::model::TransferWriterRequest; +use crate::transfer::writer::TransferWriterActor; +use actix::Addr; + pub mod data_to_sqlite; pub mod model; +pub mod openapi_to_data; pub mod reader; pub mod sqlite; pub mod sqlite_to_data; pub mod writer; + +pub(crate) fn init_writer_actor(data_file: &str) -> Addr { + let writer_actor = create_actor_at_thread(TransferWriterActor::new(data_file.into(), 0)); + writer_actor.do_send(TransferWriterRequest::AddTableNameMap( + CONFIG_TREE_NAME.clone(), + )); + writer_actor.do_send(TransferWriterRequest::AddTableNameMap( + SEQUENCE_TREE_NAME.clone(), + )); + writer_actor.do_send(TransferWriterRequest::AddTableNameMap( + NAMESPACE_TREE_NAME.clone(), + )); + writer_actor.do_send(TransferWriterRequest::AddTableNameMap( + USER_TREE_NAME.clone(), + )); + writer_actor.do_send(TransferWriterRequest::AddTableNameMap( + CACHE_TREE_NAME.clone(), + )); + writer_actor.do_send(TransferWriterRequest::InitHeader); + writer_actor +} diff --git a/src/transfer/openapi_to_data.rs b/src/transfer/openapi_to_data.rs new file mode 100644 index 00000000..77c1cf09 --- /dev/null +++ b/src/transfer/openapi_to_data.rs @@ -0,0 +1,147 @@ +use crate::common::constant::{CONFIG_TREE_NAME, EMPTY_STR, NAMESPACE_TREE_NAME}; +use crate::config::core::{ConfigKey, ConfigValue}; +use crate::config::model::ConfigValueDO; +use crate::namespace::model::{NamespaceDO, FROM_USER_VALUE}; +use crate::now_millis_i64; +use crate::transfer::init_writer_actor; +use crate::transfer::model::{ + TransferRecordDto, TransferWriterAsyncRequest, TransferWriterRequest, +}; +use crate::transfer::sqlite::TableSeq; +use crate::transfer::writer::TransferWriterActor; +use actix::Addr; +use nacos_rust_client::client::api_model::NamespaceInfo; +use nacos_rust_client::client::config_client::api_model::{ConfigInfoDto, ConfigQueryParams}; +use nacos_rust_client::client::{AuthInfo, ClientBuilder, ConfigClient}; +use std::sync::Arc; + +pub async fn openapi_to_data( + host: &str, + username: &str, + password: &str, + data_file: &str, +) -> anyhow::Result<()> { + let auth_info = if username.is_empty() || password.is_empty() { + None + } else { + Some(AuthInfo::new(username, password)) + }; + let config_client = ClientBuilder::new() + .set_endpoint_addrs(host) + .set_auth_info(auth_info) + .set_use_grpc(false) + .build_config_client(); + + let writer_actor = init_writer_actor(data_file); + let mut table_seq = TableSeq::default(); + + let result = config_client.get_namespace_list().await?; + let namespaces = result.data.unwrap_or_default(); + apply_config(&mut table_seq, &namespaces, &config_client, &writer_actor).await?; + apply_tenant(&namespaces, &writer_actor)?; + writer_actor + .send(TransferWriterAsyncRequest::Flush) + .await + .ok(); + Ok(()) +} + +fn apply_tenant( + namespace_list: &[NamespaceInfo], + writer_actor: &Addr, +) -> anyhow::Result<()> { + let mut count = 0; + for item in namespace_list { + let key = if let Some(v) = &item.namespace { + v.as_bytes().to_vec() + } else { + EMPTY_STR.as_bytes().to_vec() + }; + let value_do = NamespaceDO { + namespace_id: item.namespace.clone(), + namespace_name: item.namespace_show_name.clone(), + r#type: Some(FROM_USER_VALUE.to_string()), + }; + let record = TransferRecordDto { + table_name: Some(NAMESPACE_TREE_NAME.clone()), + key, + value: value_do.to_bytes()?, + table_id: 0, + }; + writer_actor.do_send(TransferWriterRequest::AddRecord(record)); + count += 1; + } + log::info!("transfer tenant count:{count}"); + Ok(()) +} + +async fn apply_config( + table_seq: &mut TableSeq, + namespace_list: &[NamespaceInfo], + config_client: &ConfigClient, + writer_actor: &Addr, +) -> anyhow::Result<()> { + let mut count = 0; + for namespace_info in namespace_list { + let namespace_id = if let Some(namespace_id) = namespace_info.namespace.as_ref() { + namespace_id + } else { + return Err(anyhow::anyhow!("namespace_id is none")); + }; + let mut current_page = 0; + let mut total_page = 1; + let mut total_count = 0; + let mut params = ConfigQueryParams { + tenant: Some(namespace_id.to_owned()), + page_no: Some(1), + page_size: Some(100), + ..Default::default() + }; + while current_page < total_page { + current_page += 1; + params.page_no = Some(current_page); + let res = config_client + .query_accurate_config_page(params.clone()) + .await?; + total_page = res.pages_available.unwrap_or_default(); + total_count = res.total_count.unwrap_or_default(); + let configs = res.page_items.unwrap_or_default(); + for config in &configs { + let key = ConfigKey::new(&config.data_id, &config.group, namespace_id); + let record = build_config_record(table_seq, key, config)?; + count += 1; + writer_actor.do_send(TransferWriterRequest::AddRecord(record)); + } + if configs.is_empty() { + break; + } + } + log::info!("[namespace {}],config count:{}", namespace_id, total_count); + } + log::info!("transfer config total count:{count}"); + Ok(()) +} + +fn build_config_record( + table_seq: &mut TableSeq, + key: ConfigKey, + config_info: &ConfigInfoDto, +) -> anyhow::Result { + let content = Arc::new(config_info.content.clone().unwrap_or_default()); + let mut config_value = ConfigValue::new(content.clone()); + config_value.update_value( + content, + table_seq.next_config_id() as u64, + now_millis_i64(), + None, + None, + ); + let value_do: ConfigValueDO = config_value.into(); + let record = TransferRecordDto { + table_name: Some(CONFIG_TREE_NAME.clone()), + key: key.build_key().as_bytes().to_vec(), + value: value_do.to_bytes()?, + table_id: 0, + }; + Ok(record) +} diff --git a/src/transfer/sqlite_to_data.rs b/src/transfer/sqlite_to_data.rs index c8dbb941..ef7c6a98 100644 --- a/src/transfer/sqlite_to_data.rs +++ b/src/transfer/sqlite_to_data.rs @@ -1,13 +1,12 @@ -use crate::common::actor_utils::create_actor_at_thread; use crate::common::constant::{ - CACHE_TREE_NAME, CONFIG_TREE_NAME, EMPTY_ARC_STRING, EMPTY_STR, NAMESPACE_TREE_NAME, - SEQUENCE_TREE_NAME, USER_TREE_NAME, + CONFIG_TREE_NAME, EMPTY_ARC_STRING, EMPTY_STR, NAMESPACE_TREE_NAME, USER_TREE_NAME, }; use crate::config::core::{ConfigKey, ConfigValue}; use crate::config::model::ConfigValueDO; use crate::config::ConfigUtils; use crate::namespace::model::{NamespaceDO, FROM_USER_VALUE}; use crate::now_millis_i64; +use crate::transfer::init_writer_actor; use crate::transfer::model::{ TransferRecordDto, TransferWriterAsyncRequest, TransferWriterRequest, }; @@ -37,27 +36,6 @@ pub async fn sqlite_to_data(db_path: &str, data_file: &str) -> anyhow::Result<() Ok(()) } -fn init_writer_actor(data_file: &str) -> Addr { - let writer_actor = create_actor_at_thread(TransferWriterActor::new(data_file.into(), 0)); - writer_actor.do_send(TransferWriterRequest::AddTableNameMap( - CONFIG_TREE_NAME.clone(), - )); - writer_actor.do_send(TransferWriterRequest::AddTableNameMap( - SEQUENCE_TREE_NAME.clone(), - )); - writer_actor.do_send(TransferWriterRequest::AddTableNameMap( - NAMESPACE_TREE_NAME.clone(), - )); - writer_actor.do_send(TransferWriterRequest::AddTableNameMap( - USER_TREE_NAME.clone(), - )); - writer_actor.do_send(TransferWriterRequest::AddTableNameMap( - CACHE_TREE_NAME.clone(), - )); - writer_actor.do_send(TransferWriterRequest::InitHeader); - writer_actor -} - fn apply_config( conn: &Connection, table_seq: &mut TableSeq, @@ -93,7 +71,7 @@ fn apply_config( count += 1; writer_actor.do_send(TransferWriterRequest::AddRecord(record)); } - log::info!("transfer config count:{count}"); + log::info!("transfer config total count:{count}"); Ok(()) } @@ -103,8 +81,8 @@ fn build_config_record( config_do: ConfigDO, histories: Vec, ) -> anyhow::Result { - let current_current = config_do.content.unwrap_or_default(); - let mut config_value = ConfigValue::new(current_current.clone()); + let current_content = config_do.content.unwrap_or_default(); + let mut config_value = ConfigValue::new(current_content.clone()); let mut last_content = None; for item in histories { if let Some(content) = item.content { @@ -120,14 +98,14 @@ fn build_config_record( } } let need_pull_current = if let Some(last_content) = &last_content { - last_content != ¤t_current + last_content != ¤t_content } else { true }; if need_pull_current { let op_time = config_do.last_time.unwrap_or(now_millis_i64()); config_value.update_value( - current_current, + current_content, table_seq.next_config_id() as u64, op_time, None,