-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathblocking_receive.rs
58 lines (48 loc) · 1.93 KB
/
blocking_receive.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
#[macro_use]
extern crate log;
extern crate fern;
extern crate time;
extern crate mqtt;
use mqtt::async::{PersistenceType, Qos, MqttError, AsyncClient, AsyncConnectOptions, AsyncDisconnectOptions};
use std::error::Error;
fn conf_logger() {
let logger_config = fern::DispatchConfig {
format: Box::new(|msg: &str, level: &log::LogLevel, _location: &log::LogLocation| {
let t = time::now();
let ms = t.tm_nsec/1000_000;
format!("{}.{:3} [{}] {}", t.strftime("%Y-%m-%dT%H:%M:%S").unwrap(), ms, level, msg)
}),
output: vec![fern::OutputConfig::stderr()],
level: log::LogLevelFilter::Trace,
};
if let Err(e) = fern::init_global_logger(logger_config, log::LogLevelFilter::Trace) {
panic!("Failed to initialize global logger: {}", e);
}
}
fn setup_mqtt(server_address: &str, topic: &str, client_id: &str) -> Result<AsyncClient, MqttError> {
let connect_options = AsyncConnectOptions::new();
let mut client = try!(AsyncClient::new(server_address, client_id, PersistenceType::Nothing, None));
try!(client.connect(&connect_options));
try!(client.subscribe(topic, Qos::FireAndForget));
Ok(client)
}
fn main() {
// setup fern logger
conf_logger();
// start processing
info!("blocking receive test started");
info!("run: mosquitto_pub -t TestTopic -m somedata to send some messages to the test");
let topic = "TestTopic";
match setup_mqtt("tcp://localhost:1883", &topic, "TestClientId") {
Ok(mut client) => {
// thread blocks here until message is received
for message in client.messages(None) {
info!("{:?}", message);
}
let disconnect_options = AsyncDisconnectOptions::new();
client.disconnect(&disconnect_options).unwrap();
},
Err(e) => error!("{}; raw error: {}", e.description(), e)
}
info!("blocking receive test ended");
}