Skip to content

Commit c5b83e5

Browse files
committed
[api] use shared communicator for Rust client, add session close
1 parent 4a12e7c commit c5b83e5

File tree

5 files changed

+110
-48
lines changed

5 files changed

+110
-48
lines changed

src/client/client.rs

Lines changed: 13 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,62 +1,30 @@
1-
use capnp::capability::Promise;
2-
use capnp_rpc::rpc_twoparty_capnp;
3-
use common::rpc::new_rpc_system;
4-
use futures::Future;
51
use std::error::Error;
62
use std::net::SocketAddr;
7-
use tokio_core::net::TcpStream;
8-
use tokio_core::reactor::Core;
93

104
use CLIENT_PROTOCOL_VERSION;
5+
use common::wrapped::WrappedRcRefCell;
116
use super::session::Session;
7+
use super::communicator::Communicator;
128

139
pub struct Client {
14-
core: Core,
15-
service: ::client_capnp::client_service::Client,
10+
comm: WrappedRcRefCell<Communicator>,
1611
}
1712

1813
impl Client {
19-
pub fn new(scheduler: &SocketAddr) -> Result<Self, Box<Error>> {
20-
let mut core = Core::new()?;
21-
let handle = core.handle();
22-
let stream = core.run(TcpStream::connect(&scheduler, &handle))?;
23-
stream.set_nodelay(true)?;
14+
pub fn new(scheduler: SocketAddr) -> Result<Self, Box<Error>> {
15+
let comm = WrappedRcRefCell::wrap(Communicator::new(scheduler, CLIENT_PROTOCOL_VERSION)?);
2416

25-
debug!("Connection to server {} established", scheduler);
26-
27-
let mut rpc = Box::new(new_rpc_system(stream, None));
28-
let bootstrap: ::server_capnp::server_bootstrap::Client =
29-
rpc.bootstrap(rpc_twoparty_capnp::Side::Server);
30-
handle.spawn(rpc.map_err(|err| panic!("RPC error: {}", err)));
31-
32-
let mut request = bootstrap.register_as_client_request();
33-
request.get().set_version(CLIENT_PROTOCOL_VERSION);
34-
35-
let service = core.run(
36-
request
37-
.send()
38-
.promise
39-
.and_then(|response| Promise::ok(pry!(response.get()).get_service())),
40-
)??;
41-
42-
Ok(Client { core, service })
17+
Ok(Client {
18+
comm
19+
})
4320
}
4421

45-
pub fn new_session(&mut self) -> Result<Session, Box<Error>> {
46-
let id: i32 = self.core.run(
47-
self.service
48-
.new_session_request()
49-
.send()
50-
.promise
51-
.and_then(|response| Promise::ok(pry!(response.get()).get_session_id())),
52-
)?;
53-
54-
Ok(Session { id })
22+
pub fn new_session(&self) -> Result<Session, Box<Error>> {
23+
let session_id = self.comm.get_mut().new_session()?;
24+
Ok(Session::new(session_id, self.comm.clone()))
5525
}
5626

57-
pub fn terminate_server(&mut self) -> Result<(), Box<Error>> {
58-
self.core
59-
.run(self.service.terminate_server_request().send().promise)?;
60-
Ok(())
27+
pub fn terminate_server(&self) -> Result<(), Box<Error>> {
28+
self.comm.get_mut().terminate_server()
6129
}
6230
}

src/client/communicator.rs

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
use tokio_core::reactor::Core;
2+
use std::net::SocketAddr;
3+
use tokio_core::net::TcpStream;
4+
use std::error::Error;
5+
use common::rpc::new_rpc_system;
6+
use capnp::capability::Promise;
7+
use capnp_rpc::rpc_twoparty_capnp;
8+
use futures::Future;
9+
10+
pub struct Communicator {
11+
core: Core,
12+
service: ::client_capnp::client_service::Client,
13+
}
14+
15+
impl Communicator {
16+
pub fn new(scheduler: SocketAddr, version: i32) -> Result<Self, Box<Error>> {
17+
let mut core = Core::new()?;
18+
let handle = core.handle();
19+
let stream = core.run(TcpStream::connect(&scheduler, &handle))?;
20+
stream.set_nodelay(true)?;
21+
22+
debug!("Connection to server {} established", scheduler);
23+
24+
let mut rpc = Box::new(new_rpc_system(stream, None));
25+
let bootstrap: ::server_capnp::server_bootstrap::Client =
26+
rpc.bootstrap(rpc_twoparty_capnp::Side::Server);
27+
handle.spawn(rpc.map_err(|err| panic!("RPC error: {}", err)));
28+
29+
let mut request = bootstrap.register_as_client_request();
30+
request.get().set_version(version);
31+
32+
let service = core.run(
33+
request
34+
.send()
35+
.promise
36+
.and_then(|response| Promise::ok(pry!(response.get()).get_service())),
37+
)??;
38+
39+
Ok(Self { core, service })
40+
}
41+
42+
pub fn new_session(&mut self) -> Result<i32, Box<Error>> {
43+
let id: i32 = self.core.run(
44+
self.service
45+
.new_session_request()
46+
.send()
47+
.promise
48+
.and_then(|response| Promise::ok(pry!(response.get()).get_session_id())),
49+
)?;
50+
51+
Ok(id)
52+
}
53+
54+
pub fn close_session(&mut self, id: i32) -> Result<bool, Box<Error>> {
55+
self.core.run({
56+
let mut req = self.service.close_session_request();
57+
req.get().set_session_id(id);
58+
req.send().promise
59+
})?;
60+
61+
Ok(true)
62+
}
63+
64+
pub fn terminate_server(&mut self) -> Result<(), Box<Error>> {
65+
self.core
66+
.run(self.service.terminate_server_request().send().promise)?;
67+
Ok(())
68+
}
69+
}

src/client/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,4 @@
11
pub mod client;
22
pub mod session;
3+
4+
mod communicator;

src/client/session.rs

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,25 @@
1+
use common::wrapped::WrappedRcRefCell;
2+
3+
use super::communicator::Communicator;
4+
15
pub struct Session {
2-
pub id: i32,
6+
id: i32,
7+
comm: WrappedRcRefCell<Communicator>
8+
}
9+
10+
impl Session {
11+
pub fn new(id: i32, comm: WrappedRcRefCell<Communicator>) -> Self {
12+
debug!("Session {} created", id);
13+
14+
Session {
15+
id, comm
16+
}
17+
}
18+
}
19+
20+
impl Drop for Session {
21+
fn drop(&mut self) {
22+
self.comm.get_mut().close_session(self.id).unwrap();
23+
debug!("Session {} destroyed", self.id);
24+
}
325
}

src/server/rpc/client.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -521,10 +521,11 @@ impl client_service::Server for ClientServiceImpl {
521521
Promise::ok(())
522522
}
523523

524+
#[allow(unreachable_code)]
524525
fn terminate_server(
525526
&mut self,
526-
params: client_service::TerminateServerParams,
527-
results: client_service::TerminateServerResults,
527+
_params: client_service::TerminateServerParams,
528+
_results: client_service::TerminateServerResults,
528529
) -> Promise<(), ::capnp::Error> {
529530
exit(0);
530531
Promise::ok(())

0 commit comments

Comments
 (0)