Skip to content

Commit

Permalink
feat: 完成支持通过nacos open api接口把nacos数据导出到中间数据文件功能 #138
Browse files Browse the repository at this point in the history
  • Loading branch information
heqingpan committed Oct 27, 2024
1 parent 39e5afd commit e08b0bb
Show file tree
Hide file tree
Showing 6 changed files with 214 additions and 34 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ mime_guess = { version = "2" }
rusqlite = { version = "0.25", features = ["bundled"] }
rsql_builder = "0.1.5"
inner-mem-cache = "0.1.7"
rnacos-web-dist-wrap = "=0.5.1"
nacos_rust_client = "0.2"
rnacos-web-dist-wrap = "=0.5.2"
nacos_rust_client = "0.3.2"
zip = "0.6"
tempfile = "3"

Expand Down
13 changes: 13 additions & 0 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,17 @@ pub enum Commands {
/// out to transfer middle data file
out: String,
},
#[command(arg_required_else_help = true)]
OpenapiToData {
/// nacos auth username,default is empty
#[arg(short, long, default_value = "")]
username: String,
/// nacos auth password,default is empty
#[arg(short, long, default_value = "")]
password: String,
/// nacos host ip:port; example: 127.0.0.1:8848
host: String,
/// out to transfer middle data file
out: String,
},
}
18 changes: 15 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use rnacos::common::appdata::AppShareData;
use rnacos::openapi::middle::auth_middle::ApiCheckAuth;
use rnacos::raft::NacosRaft;
use rnacos::transfer::data_to_sqlite::data_to_sqlite;
use rnacos::transfer::openapi_to_data::openapi_to_data;
use rnacos::transfer::sqlite_to_data::sqlite_to_data;
use rnacos::web_config::{app_config, console_config};
//#[global_allocator]
Expand All @@ -63,8 +64,9 @@ async fn main() -> Result<(), Box<dyn Error>> {
if let Some(cmd) = cli_opt.command {
return run_subcommand(cmd).await;
}
log::info!("version:{}, RUST_LOG:{}", get_app_version(), &rust_log);
log::info!("data dir:{}", sys_config.local_db_dir);
// 这里不使用log:info避免日志等级高于info时不打印
println!("version:{}, RUST_LOG:{}", get_app_version(), &rust_log);
println!("data dir:{}", sys_config.local_db_dir);
let factory_data = config_factory(sys_config.clone()).await?;
let app_data = build_share_data(factory_data.clone())?;
let http_addr = sys_config.get_http_addr();
Expand Down Expand Up @@ -125,7 +127,8 @@ async fn main() -> Result<(), Box<dyn Error>> {
if let Some(num) = sys_config.http_workers {
server = server.workers(num);
}
log::info!("rnacos started");
// 这里不使用log:info避免日志等级高于info时不打印
println!("rnacos started");
server.bind(http_addr)?.run().await?;
Ok(())
}
Expand All @@ -149,6 +152,15 @@ async fn run_subcommand(commands: Commands) -> Result<(), Box<dyn Error>> {
log::info!("sqlite to middle data, from:{file} to:{out}");
sqlite_to_data(&file, &out).await?;
}
Commands::OpenapiToData {
host,
username,
password,
out,
} => {
log::info!("openapi to middle data, from:{host} to:{out}");
openapi_to_data(&host, &username, &password, &out).await?;
}
}
Ok(())
}
Expand Down
30 changes: 30 additions & 0 deletions src/transfer/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,36 @@
use crate::common::actor_utils::create_actor_at_thread;
use crate::common::constant::{
CACHE_TREE_NAME, CONFIG_TREE_NAME, NAMESPACE_TREE_NAME, SEQUENCE_TREE_NAME, USER_TREE_NAME,
};
use crate::transfer::model::TransferWriterRequest;
use crate::transfer::writer::TransferWriterActor;
use actix::Addr;

pub mod data_to_sqlite;
pub mod model;
pub mod openapi_to_data;
pub mod reader;
pub mod sqlite;
pub mod sqlite_to_data;
pub mod writer;

pub(crate) fn init_writer_actor(data_file: &str) -> Addr<TransferWriterActor> {
let writer_actor = create_actor_at_thread(TransferWriterActor::new(data_file.into(), 0));
writer_actor.do_send(TransferWriterRequest::AddTableNameMap(
CONFIG_TREE_NAME.clone(),
));
writer_actor.do_send(TransferWriterRequest::AddTableNameMap(
SEQUENCE_TREE_NAME.clone(),
));
writer_actor.do_send(TransferWriterRequest::AddTableNameMap(
NAMESPACE_TREE_NAME.clone(),
));
writer_actor.do_send(TransferWriterRequest::AddTableNameMap(
USER_TREE_NAME.clone(),
));
writer_actor.do_send(TransferWriterRequest::AddTableNameMap(
CACHE_TREE_NAME.clone(),
));
writer_actor.do_send(TransferWriterRequest::InitHeader);
writer_actor
}
147 changes: 147 additions & 0 deletions src/transfer/openapi_to_data.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
use crate::common::constant::{CONFIG_TREE_NAME, EMPTY_STR, NAMESPACE_TREE_NAME};
use crate::config::core::{ConfigKey, ConfigValue};
use crate::config::model::ConfigValueDO;
use crate::namespace::model::{NamespaceDO, FROM_USER_VALUE};
use crate::now_millis_i64;
use crate::transfer::init_writer_actor;
use crate::transfer::model::{
TransferRecordDto, TransferWriterAsyncRequest, TransferWriterRequest,
};
use crate::transfer::sqlite::TableSeq;
use crate::transfer::writer::TransferWriterActor;
use actix::Addr;
use nacos_rust_client::client::api_model::NamespaceInfo;
use nacos_rust_client::client::config_client::api_model::{ConfigInfoDto, ConfigQueryParams};
use nacos_rust_client::client::{AuthInfo, ClientBuilder, ConfigClient};
use std::sync::Arc;

pub async fn openapi_to_data(
host: &str,
username: &str,
password: &str,
data_file: &str,
) -> anyhow::Result<()> {
let auth_info = if username.is_empty() || password.is_empty() {
None
} else {
Some(AuthInfo::new(username, password))
};
let config_client = ClientBuilder::new()
.set_endpoint_addrs(host)
.set_auth_info(auth_info)
.set_use_grpc(false)
.build_config_client();

let writer_actor = init_writer_actor(data_file);
let mut table_seq = TableSeq::default();

let result = config_client.get_namespace_list().await?;
let namespaces = result.data.unwrap_or_default();
apply_config(&mut table_seq, &namespaces, &config_client, &writer_actor).await?;
apply_tenant(&namespaces, &writer_actor)?;
writer_actor
.send(TransferWriterAsyncRequest::Flush)
.await
.ok();
Ok(())
}

fn apply_tenant(
namespace_list: &[NamespaceInfo],
writer_actor: &Addr<TransferWriterActor>,
) -> anyhow::Result<()> {
let mut count = 0;
for item in namespace_list {
let key = if let Some(v) = &item.namespace {
v.as_bytes().to_vec()
} else {
EMPTY_STR.as_bytes().to_vec()
};
let value_do = NamespaceDO {
namespace_id: item.namespace.clone(),
namespace_name: item.namespace_show_name.clone(),
r#type: Some(FROM_USER_VALUE.to_string()),
};
let record = TransferRecordDto {
table_name: Some(NAMESPACE_TREE_NAME.clone()),
key,
value: value_do.to_bytes()?,
table_id: 0,
};
writer_actor.do_send(TransferWriterRequest::AddRecord(record));
count += 1;
}
log::info!("transfer tenant count:{count}");
Ok(())
}

async fn apply_config(
table_seq: &mut TableSeq,
namespace_list: &[NamespaceInfo],
config_client: &ConfigClient,
writer_actor: &Addr<TransferWriterActor>,
) -> anyhow::Result<()> {
let mut count = 0;
for namespace_info in namespace_list {
let namespace_id = if let Some(namespace_id) = namespace_info.namespace.as_ref() {
namespace_id
} else {
return Err(anyhow::anyhow!("namespace_id is none"));
};
let mut current_page = 0;
let mut total_page = 1;
let mut total_count = 0;
let mut params = ConfigQueryParams {
tenant: Some(namespace_id.to_owned()),
page_no: Some(1),
page_size: Some(100),
..Default::default()
};
while current_page < total_page {
current_page += 1;
params.page_no = Some(current_page);
let res = config_client
.query_accurate_config_page(params.clone())
.await?;
total_page = res.pages_available.unwrap_or_default();
total_count = res.total_count.unwrap_or_default();
let configs = res.page_items.unwrap_or_default();
for config in &configs {
let key = ConfigKey::new(&config.data_id, &config.group, namespace_id);
let record = build_config_record(table_seq, key, config)?;
count += 1;
writer_actor.do_send(TransferWriterRequest::AddRecord(record));
}
if configs.is_empty() {
break;
}
}
log::info!("[namespace {}],config count:{}", namespace_id, total_count);
}
log::info!("transfer config total count:{count}");
Ok(())
}

fn build_config_record(
table_seq: &mut TableSeq,
key: ConfigKey,
config_info: &ConfigInfoDto,
) -> anyhow::Result<TransferRecordDto> {
let content = Arc::new(config_info.content.clone().unwrap_or_default());
let mut config_value = ConfigValue::new(content.clone());
config_value.update_value(
content,
table_seq.next_config_id() as u64,
now_millis_i64(),
None,
None,
);
let value_do: ConfigValueDO = config_value.into();
let record = TransferRecordDto {
table_name: Some(CONFIG_TREE_NAME.clone()),
key: key.build_key().as_bytes().to_vec(),
value: value_do.to_bytes()?,
table_id: 0,
};
Ok(record)
}
36 changes: 7 additions & 29 deletions src/transfer/sqlite_to_data.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
use crate::common::actor_utils::create_actor_at_thread;
use crate::common::constant::{
CACHE_TREE_NAME, CONFIG_TREE_NAME, EMPTY_ARC_STRING, EMPTY_STR, NAMESPACE_TREE_NAME,
SEQUENCE_TREE_NAME, USER_TREE_NAME,
CONFIG_TREE_NAME, EMPTY_ARC_STRING, EMPTY_STR, NAMESPACE_TREE_NAME, USER_TREE_NAME,
};
use crate::config::core::{ConfigKey, ConfigValue};
use crate::config::model::ConfigValueDO;
use crate::config::ConfigUtils;
use crate::namespace::model::{NamespaceDO, FROM_USER_VALUE};
use crate::now_millis_i64;
use crate::transfer::init_writer_actor;
use crate::transfer::model::{
TransferRecordDto, TransferWriterAsyncRequest, TransferWriterRequest,
};
Expand Down Expand Up @@ -37,27 +36,6 @@ pub async fn sqlite_to_data(db_path: &str, data_file: &str) -> anyhow::Result<()
Ok(())
}

fn init_writer_actor(data_file: &str) -> Addr<TransferWriterActor> {
let writer_actor = create_actor_at_thread(TransferWriterActor::new(data_file.into(), 0));
writer_actor.do_send(TransferWriterRequest::AddTableNameMap(
CONFIG_TREE_NAME.clone(),
));
writer_actor.do_send(TransferWriterRequest::AddTableNameMap(
SEQUENCE_TREE_NAME.clone(),
));
writer_actor.do_send(TransferWriterRequest::AddTableNameMap(
NAMESPACE_TREE_NAME.clone(),
));
writer_actor.do_send(TransferWriterRequest::AddTableNameMap(
USER_TREE_NAME.clone(),
));
writer_actor.do_send(TransferWriterRequest::AddTableNameMap(
CACHE_TREE_NAME.clone(),
));
writer_actor.do_send(TransferWriterRequest::InitHeader);
writer_actor
}

fn apply_config(
conn: &Connection,
table_seq: &mut TableSeq,
Expand Down Expand Up @@ -93,7 +71,7 @@ fn apply_config(
count += 1;
writer_actor.do_send(TransferWriterRequest::AddRecord(record));
}
log::info!("transfer config count:{count}");
log::info!("transfer config total count:{count}");
Ok(())
}

Expand All @@ -103,8 +81,8 @@ fn build_config_record(
config_do: ConfigDO,
histories: Vec<ConfigHistoryDO>,
) -> anyhow::Result<TransferRecordDto> {
let current_current = config_do.content.unwrap_or_default();
let mut config_value = ConfigValue::new(current_current.clone());
let current_content = config_do.content.unwrap_or_default();
let mut config_value = ConfigValue::new(current_content.clone());
let mut last_content = None;
for item in histories {
if let Some(content) = item.content {
Expand All @@ -120,14 +98,14 @@ fn build_config_record(
}
}
let need_pull_current = if let Some(last_content) = &last_content {
last_content != &current_current
last_content != &current_content
} else {
true
};
if need_pull_current {
let op_time = config_do.last_time.unwrap_or(now_millis_i64());
config_value.update_value(
current_current,
current_content,
table_seq.next_config_id() as u64,
op_time,
None,
Expand Down

0 comments on commit e08b0bb

Please sign in to comment.