-
Notifications
You must be signed in to change notification settings - Fork 62
Expand file tree
/
Copy pathmain.rs
More file actions
125 lines (113 loc) · 4.64 KB
/
main.rs
File metadata and controls
125 lines (113 loc) · 4.64 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use chitchat::transport::UdpTransport;
use chitchat::{spawn_chitchat, Chitchat, ChitchatConfig, ChitchatId, FailureDetectorConfig};
use chitchat_test::{ApiResponse, SetKeyValueResponse};
use cool_id_generator::Size;
use poem::listener::TcpListener;
use poem::{Route, Server};
use poem_openapi::param::Query;
use poem_openapi::payload::{Json, PlainText};
use poem_openapi::{OpenApi, OpenApiService};
use structopt::StructOpt;
use tokio::sync::Mutex;
struct Api {
chitchat: Arc<Mutex<Chitchat>>,
}
#[OpenApi]
impl Api {
/// Chitchat state
#[oai(path = "/", method = "get")]
async fn index(&self) -> PlainText<String> {
let chitchat_guard = self.chitchat.lock().await;
let response = ApiResponse {
cluster_id: chitchat_guard.cluster_id().to_string(),
cluster_state: chitchat_guard.state_snapshot(),
live_nodes: chitchat_guard.live_nodes().cloned().collect::<Vec<_>>(),
dead_nodes: chitchat_guard.dead_nodes().cloned().collect::<Vec<_>>(),
};
PlainText(serde_json::to_string_pretty(&response).unwrap())
}
/// Sets a key-value pair on this node (without validation).
#[oai(path = "/set_kv/", method = "get")]
async fn set_kv(&self, key: Query<String>, value: Query<String>) -> Json<serde_json::Value> {
let mut chitchat_guard = self.chitchat.lock().await;
let mut cc_state = chitchat_guard.self_node_state();
cc_state.set(key.as_str(), value.as_str());
Json(serde_json::to_value(&SetKeyValueResponse { status: true }).unwrap())
}
/// Marks a key for deletion on this node (without validation).
#[oai(path = "/mark_for_deletion/", method = "get")]
async fn mark_for_deletion(&self, key: Query<String>) -> Json<serde_json::Value> {
let mut chitchat_guard = self.chitchat.lock().await;
let mut cc_state = chitchat_guard.self_node_state();
cc_state.delete(key.as_str());
Json(serde_json::to_value(&SetKeyValueResponse { status: true }).unwrap())
}
}
#[derive(Debug, StructOpt)]
#[structopt(name = "chitchat", about = "Chitchat test server.")]
struct Opt {
/// Defines the socket addr on which we should listen to.
#[structopt(long = "listen_addr", default_value = "127.0.0.1:10000")]
listen_addr: SocketAddr,
/// Defines the socket address (host:port) other servers should use to
/// reach this server.
///
/// It defaults to the listen address, but this is only valid
/// when all server are running on the same server.
#[structopt(long = "public_addr")]
public_addr: Option<SocketAddr>,
/// Node ID. Must be unique. If None, the node ID will be generated from
/// the public_addr and a random suffix.
#[structopt(long = "node_id")]
node_id: Option<String>,
#[structopt(long = "seed")]
seeds: Vec<String>,
#[structopt(long = "interval_ms", default_value = "500")]
interval: u64,
}
fn generate_server_id(public_addr: SocketAddr) -> String {
let cool_id = cool_id_generator::get_id(Size::Medium);
format!("server:{public_addr}-{cool_id}")
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let opt = Opt::from_args();
let public_addr = opt.public_addr.unwrap_or(opt.listen_addr);
let node_id = opt
.node_id
.unwrap_or_else(|| generate_server_id(public_addr));
let generation = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs();
let chitchat_id = ChitchatId::new(node_id, generation, public_addr);
let config = ChitchatConfig {
cluster_id: "testing".to_string(),
chitchat_id,
gossip_interval: Duration::from_millis(opt.interval),
listen_addr: opt.listen_addr,
seed_nodes: opt.seeds.clone(),
failure_detector_config: FailureDetectorConfig {
dead_node_grace_period: Duration::from_secs(10),
..FailureDetectorConfig::default()
},
marked_for_deletion_grace_period: Duration::from_secs(60),
catchup_callback: None,
extra_liveness_predicate: None,
};
let chitchat_handler = spawn_chitchat(config, Vec::new(), &UdpTransport).await?;
let chitchat = chitchat_handler.chitchat();
let api = Api { chitchat };
let api_service = OpenApiService::new(api, "Hello World", "1.0")
.server(format!("http://{}/", opt.listen_addr));
let docs = api_service.swagger_ui();
let app = Route::new().nest("/", api_service).nest("/docs", docs);
Server::new(TcpListener::bind(&opt.listen_addr))
.run(app)
.await?;
Ok(())
}