@@ -5,15 +5,16 @@ use std::error::Error;
55use common:: rpc:: new_rpc_system;
66use capnp_rpc:: rpc_twoparty_capnp;
77use futures:: Future ;
8- use std:: cell:: Ref ;
98
109use super :: task:: Task ;
1110use common:: id:: { DataObjectId , TaskId } ;
1211use common:: convert:: { FromCapnp , ToCapnp } ;
1312use client:: dataobject:: DataObject ;
13+ use std:: cell:: RefCell ;
14+ use std:: cell:: RefMut ;
1415
1516pub struct Communicator {
16- core : Core ,
17+ core : RefCell < Core > ,
1718 service : :: client_capnp:: client_service:: Client ,
1819}
1920
@@ -36,19 +37,22 @@ impl Communicator {
3637
3738 let service = core. run ( request. send ( ) . promise ) ?. get ( ) ?. get_service ( ) ?;
3839
39- Ok ( Self { core, service } )
40+ Ok ( Self {
41+ core : RefCell :: new ( core) ,
42+ service,
43+ } )
4044 }
4145
42- pub fn new_session ( & mut self ) -> Result < i32 , Box < Error > > {
43- let id: i32 = self . core
46+ pub fn new_session ( & self ) -> Result < i32 , Box < Error > > {
47+ let id: i32 = self . comm ( )
4448 . run ( self . service . new_session_request ( ) . send ( ) . promise ) ?
4549 . get ( ) ?
4650 . get_session_id ( ) ;
4751
4852 Ok ( id)
4953 }
50- pub fn close_session ( & mut self , id : i32 ) -> Result < ( ) , Box < Error > > {
51- self . core . run ( {
54+ pub fn close_session ( & self , id : i32 ) -> Result < ( ) , Box < Error > > {
55+ self . comm ( ) . run ( {
5256 let mut req = self . service . close_session_request ( ) ;
5357 req. get ( ) . set_session_id ( id) ;
5458 req. send ( ) . promise
@@ -57,13 +61,18 @@ impl Communicator {
5761 Ok ( ( ) )
5862 }
5963
60- pub fn submit < D > ( & mut self , tasks : & [ Ref < Task > ] , data_objects : & [ D ] ) -> Result < ( ) , Box < Error > >
64+ pub fn submit < T , D > ( & self , tasks : & [ T ] , data_objects : & [ D ] ) -> Result < ( ) , Box < Error > >
6165 where
66+ T : AsRef < Task > ,
6267 D : AsRef < DataObject > ,
6368 {
6469 let mut req = self . service . submit_request ( ) ;
6570
66- to_capnp_list ! ( req. get( ) , tasks, init_tasks) ;
71+ to_capnp_list ! (
72+ req. get( ) ,
73+ tasks. iter( ) . map( |t| t. as_ref( ) ) . collect:: <Vec <& Task >>( ) ,
74+ init_tasks
75+ ) ;
6776 to_capnp_list ! (
6877 req. get( ) ,
6978 data_objects
@@ -72,47 +81,47 @@ impl Communicator {
7281 . collect:: <Vec <& DataObject >>( ) ,
7382 init_objects
7483 ) ;
75- self . core . run ( req. send ( ) . promise ) ?;
84+ self . comm ( ) . run ( req. send ( ) . promise ) ?;
7685
7786 Ok ( ( ) )
7887 }
7988
80- pub fn unkeep ( & mut self , objects : & [ DataObjectId ] ) -> Result < ( ) , Box < Error > > {
89+ pub fn unkeep ( & self , objects : & [ DataObjectId ] ) -> Result < ( ) , Box < Error > > {
8190 let mut req = self . service . unkeep_request ( ) ;
8291 to_capnp_list ! ( req. get( ) , objects, init_object_ids) ;
83- self . core . run ( req. send ( ) . promise ) ?;
92+ self . comm ( ) . run ( req. send ( ) . promise ) ?;
8493 Ok ( ( ) )
8594 }
8695
87- pub fn wait ( & mut self , tasks : & [ TaskId ] , objects : & [ DataObjectId ] ) -> Result < ( ) , Box < Error > > {
96+ pub fn wait ( & self , tasks : & [ TaskId ] , objects : & [ DataObjectId ] ) -> Result < ( ) , Box < Error > > {
8897 let mut req = self . service . wait_request ( ) ;
8998 to_capnp_list ! ( req. get( ) , tasks, init_task_ids) ;
9099 to_capnp_list ! ( req. get( ) , objects, init_object_ids) ;
91- self . core . run ( req. send ( ) . promise ) ?;
100+ self . comm ( ) . run ( req. send ( ) . promise ) ?;
92101 Ok ( ( ) )
93102 }
94103 pub fn wait_some (
95- & mut self ,
104+ & self ,
96105 tasks : & [ TaskId ] ,
97106 objects : & [ DataObjectId ] ,
98107 ) -> Result < ( Vec < TaskId > , Vec < DataObjectId > ) , Box < Error > > {
99108 let mut req = self . service . wait_some_request ( ) ;
100109 to_capnp_list ! ( req. get( ) , tasks, init_task_ids) ;
101110 to_capnp_list ! ( req. get( ) , objects, init_object_ids) ;
102- let res = self . core . run ( req. send ( ) . promise ) ?;
111+ let res = self . comm ( ) . run ( req. send ( ) . promise ) ?;
103112
104113 Ok ( (
105114 from_capnp_list ! ( res. get( ) ?, get_finished_tasks, TaskId ) ,
106115 from_capnp_list ! ( res. get( ) ?, get_finished_objects, DataObjectId ) ,
107116 ) )
108117 }
109118
110- pub fn fetch ( & mut self , object_id : DataObjectId ) -> Result < Vec < u8 > , Box < Error > > {
119+ pub fn fetch ( & self , object_id : DataObjectId ) -> Result < Vec < u8 > , Box < Error > > {
111120 let mut req = self . service . fetch_request ( ) ;
112121 object_id. to_capnp ( & mut req. get ( ) . get_id ( ) . unwrap ( ) ) ;
113122 req. get ( ) . set_size ( 1024 ) ;
114123
115- let response = self . core . run ( req. send ( ) . promise ) ?;
124+ let response = self . comm ( ) . run ( req. send ( ) . promise ) ?;
116125
117126 let reader = response. get ( ) ?;
118127 match reader. get_status ( ) . which ( ) ? {
@@ -124,9 +133,13 @@ impl Communicator {
124133 }
125134 }
126135
127- pub fn terminate_server ( & mut self ) -> Result < ( ) , Box < Error > > {
128- self . core
136+ pub fn terminate_server ( & self ) -> Result < ( ) , Box < Error > > {
137+ self . comm ( )
129138 . run ( self . service . terminate_server_request ( ) . send ( ) . promise ) ?;
130139 Ok ( ( ) )
131140 }
141+
142+ fn comm ( & self ) -> RefMut < Core > {
143+ self . core . borrow_mut ( )
144+ }
132145}
0 commit comments