Skip to content

Commit c8276fe

Browse files
committed
waitfordata: overlay
1 parent dc0e921 commit c8276fe

File tree

2 files changed

+82
-2
lines changed

2 files changed

+82
-2
lines changed

src/all_peers.rs

+1
Original file line numberDiff line numberDiff line change
@@ -133,5 +133,6 @@ macro_rules! list_of_all_specifier_classes {
133133
#[cfg(feature = "wasm_plugins")]
134134
$your_macro!($crate::wasm_transform_peer::WasmTransformDClass);
135135

136+
$your_macro!($crate::reconnect_peer::WaitForDataClass);
136137
};
137138
}

src/reconnect_peer.rs

+81-2
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,9 @@ struct State {
6161
aux: State2,
6262
reconnect_delay: std::time::Duration,
6363
ratelimiter: Option<tokio_timer::Delay>,
64+
reconnect_count_limit: Option<usize>,
65+
/// Do not initiate connection now, return not ready outcome instead
66+
pegged_until_write: bool,
6467
}
6568

6669
/// This implementation's poll is to be reused many times, both after returning item and error
@@ -85,17 +88,30 @@ impl State {
8588
Ok(Async::NotReady) => return Ok(Async::NotReady),
8689
}
8790
}
88-
let cp = self.cp.clone();
8991
if let Some(ref mut p) = *pp {
9092
return Ok(Async::Ready(p));
9193
}
94+
let cp = self.cp.clone();
9295

9396
// Peer is not present: trying to create a new one
9497

98+
if self.pegged_until_write {
99+
return Ok(Async::NotReady);
100+
}
101+
if self.reconnect_count_limit == Some(0) {
102+
info!("autoreconnector reconnect limit reached. Failing connection.");
103+
return Err(Box::new(simple_err("No more connections allowed".to_owned())));
104+
}
105+
95106
if let Some(mut bnpf) = nn.take() {
96107
match bnpf.poll() {
97108
Ok(Async::Ready(p)) => {
98109
*pp = Some(p);
110+
111+
if let Some(ref mut cl) = self.reconnect_count_limit {
112+
*cl -= 1;
113+
}
114+
99115
continue;
100116
}
101117
Ok(Async::NotReady) => {
@@ -106,11 +122,15 @@ impl State {
106122
// Stop on error:
107123
//return Err(_x);
108124

125+
if let Some(ref mut cl) = self.reconnect_count_limit {
126+
*cl -= 1;
127+
}
128+
109129
// Just reconnect again on error
110130

111131
if !aux.already_warned {
112132
aux.already_warned = true;
113-
warn!("Reconnecting failed. Further failed reconnects announcements will have lower severity.");
133+
warn!("Reconnecting failed. Further failed reconnects announcements will have lower log severity.");
114134
} else {
115135
info!("Reconnecting failed.");
116136
}
@@ -204,6 +224,7 @@ impl AsyncRead for PeerHandle {}
204224
impl Write for PeerHandle {
205225
fn write(&mut self, b: &[u8]) -> Result<usize, IoError> {
206226
let mut state = self.0.borrow_mut();
227+
state.pegged_until_write = false;
207228
main_loop!(state, p, bytes p.1.write(b));
208229
}
209230
fn flush(&mut self) -> Result<(), IoError> {
@@ -229,9 +250,67 @@ pub fn autoreconnector(s: Rc<dyn Specifier>, cp: ConstructParams) -> BoxedNewPee
229250
aux: Default::default(),
230251
reconnect_delay,
231252
ratelimiter: None,
253+
reconnect_count_limit: None,
254+
pegged_until_write: false,
232255
}));
233256
let ph1 = PeerHandle(s.clone());
234257
let ph2 = PeerHandle(s);
235258
let peer = Peer::new(ph1, ph2, None /* we handle hups ourselves */);
236259
Box::new(ok(peer)) as BoxedNewPeerFuture
237260
}
261+
262+
263+
pub fn waitfordata(s: Rc<dyn Specifier>, cp: ConstructParams) -> BoxedNewPeerFuture {
264+
let reconnect_delay = std::time::Duration::from_millis(cp.program_options.autoreconnect_delay_millis);
265+
let s = Rc::new(RefCell::new(State {
266+
cp,
267+
s,
268+
p: None,
269+
n: None,
270+
aux: Default::default(),
271+
reconnect_delay, // unused
272+
ratelimiter: None,
273+
reconnect_count_limit: Some(1),
274+
pegged_until_write: true,
275+
}));
276+
let ph1 = PeerHandle(s.clone());
277+
let ph2 = PeerHandle(s);
278+
let peer = Peer::new(ph1, ph2, None /* we handle hups ourselves, though shouldn't probably */);
279+
Box::new(ok(peer)) as BoxedNewPeerFuture
280+
}
281+
282+
283+
#[derive(Debug)]
284+
pub struct WaitForData(pub Rc<dyn Specifier>);
285+
impl Specifier for WaitForData {
286+
fn construct(&self, cp: ConstructParams) -> PeerConstructor {
287+
once(waitfordata(self.0.clone(), cp))
288+
}
289+
specifier_boilerplate!(singleconnect has_subspec globalstate);
290+
self_0_is_subspecifier!(...);
291+
}
292+
293+
specifier_class!(
294+
name = WaitForDataClass,
295+
target = WaitForData,
296+
prefixes = ["waitfordata:", "wait-for-data:"],
297+
arg_handling = subspec,
298+
overlay = true,
299+
MessageBoundaryStatusDependsOnInnerType,
300+
SingleConnect,
301+
help = r#"
302+
Wait for some data to pending being written before starting connecting. [A]
303+
304+
Example: Connect to the TCP server on the left side immediately, but connect to
305+
the TCP server on the right side only after some data gets written by the first connection
306+
307+
308+
websocat -b tcp:127.0.0.1:1234 waitfordata:tcp:127.0.0.1:1235
309+
310+
Example: Connect to first WebSocket server, wait for some incoming WebSocket message, then
311+
connect to the second WebSocket server and start exchanging text and binary WebSocket messages
312+
between them.
313+
314+
websocat -b --binary-prefix=b --text-prefix=t ws://127.0.0.1:1234 waitfordata:ws://127.0.0.1:1235/
315+
"#
316+
);

0 commit comments

Comments
 (0)