Skip to content

Commit fd548b2

Browse files
committed
--binary-prefix --text-prefix --base64 --base64-text
Partially addresses #78, #47. Closes #20.
1 parent 0a77077 commit fd548b2

File tree

6 files changed

+151
-13
lines changed

6 files changed

+151
-13
lines changed

Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ tokio-timer = "0.2.0"
5353
tempfile = "3.0.8"
5454
net2 = "0.2.33"
5555
anymap = "0.12.1"
56+
base64 = "0.10"
5657
#anymap = { path = "/mnt/src/git/anymap"}
5758

5859
# Rust 1.30.1 compat:

README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@ Pre-built binaries for Linux (usual and musl), Windows, OS X and Android are ava
227227
Limitations
228228
---
229229

230-
* Websocat can't mix sending text and binary WebSocket messages. This also affects `mirror:` specifier, making it a bit different from ws://echo.websocket.org.
230+
* It is not convenient when text and binary WebSocket messages are mixed. This affects `mirror:` specifier, making it a bit different from ws://echo.websocket.org. There are `--binary-prefix`, `--text-prefix` and `--base64` options to handle mixture of binary and text.
231231
* Current version of Websocat don't receive notification about closed sockets. This makes serving without `-E` or `-u` options or in backpressure scenarios prone to socket leak.
232232

233233
Building from source code

src/main.rs

+28
Original file line numberDiff line numberDiff line change
@@ -451,6 +451,30 @@ struct Opt {
451451
/// [A] Delay before reconnect attempt for `autoreconnect:` overlay.
452452
#[structopt(long = "--autoreconnect-delay-millis", default_value="20")]
453453
autoreconnect_delay_millis: u64,
454+
455+
456+
/// [A] Prepend specified text to each received WebSocket text message.
457+
/// Also strip this prefix from outgoing messages, explicitly marking
458+
/// them as text even if `--binary` is specified
459+
#[structopt(long = "--text-prefix")]
460+
pub ws_text_prefix: Option<String>,
461+
462+
/// [A] Prepend specified text to each received WebSocket binary message.
463+
/// Also strip this prefix from outgoing messages, explicitly marking
464+
/// them as binary even if `--text` is specified
465+
#[structopt(long = "--binary-prefix")]
466+
pub ws_binary_prefix: Option<String>,
467+
468+
/// Encode incoming binary WebSocket messages in one-line Base64
469+
/// If `--binary-prefix` (see `--help=full`) is set, outgoing WebSocket messages
470+
/// that start with the prefix are decoded from base64 prior to sending.
471+
#[structopt(long = "--base64")]
472+
pub ws_binary_base64: bool,
473+
474+
/// [A] Encode incoming text WebSocket messages in one-line Base64.
475+
/// I don't know whether it can be ever useful, but it's for symmetry with `--base64`.
476+
#[structopt(long = "--base64-text")]
477+
pub ws_text_base64: bool,
454478
}
455479

456480
// TODO: make it byte-oriented/OsStr?
@@ -693,6 +717,10 @@ fn run() -> Result<()> {
693717
max_messages
694718
max_messages_rev
695719
autoreconnect_delay_millis
720+
ws_text_prefix
721+
ws_binary_prefix
722+
ws_binary_base64
723+
ws_text_base64
696724
);
697725
#[cfg(feature = "ssl")]
698726
{

src/options.rs

+5
Original file line numberDiff line numberDiff line change
@@ -83,4 +83,9 @@ pub struct Options {
8383
pub request_headers: Vec<(http::header::HeaderName, http::header::HeaderValue)>,
8484

8585
pub autoreconnect_delay_millis: u64,
86+
87+
pub ws_text_prefix: Option<String>,
88+
pub ws_binary_prefix: Option<String>,
89+
pub ws_binary_base64: bool,
90+
pub ws_text_base64: bool,
8691
}

src/ws_peer.rs

+115-12
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
extern crate tokio_codec;
22
extern crate websocket;
3+
extern crate base64;
34

45
use self::websocket::stream::r#async::Stream as WsStream;
56
use self::websocket::OwnedMessage;
@@ -37,6 +38,11 @@ pub struct WsReadWrapper<T: WsStream + 'static> {
3738
pub debt: ReadDebt,
3839
pub pong_timeout: Option<(::tokio_timer::Delay, ::std::time::Duration)>,
3940
pub ping_aborter: Option<::futures::unsync::oneshot::Sender<()>>,
41+
42+
pub text_prefix: Option<String>,
43+
pub binary_prefix: Option<String>,
44+
pub binary_base64: bool,
45+
pub text_base64: bool,
4046
}
4147

4248
impl<T: WsStream + 'static> AsyncRead for WsReadWrapper<T> {}
@@ -54,6 +60,35 @@ impl<T: WsStream + 'static> Read for WsReadWrapper<T> {
5460
brokenpipe()
5561
}};
5662
}
63+
fn process_prefixes_and_base64<'a>(qbuf :&'a mut Vec<u8>, q: &mut &'a [u8], prefix: &Option<String>, base64: bool) {
64+
match (prefix, base64) {
65+
(None, false) => (),
66+
(Some(pr), false) => {
67+
debug!("prepending prefix");
68+
qbuf.reserve_exact(pr.len() + q.len());
69+
qbuf.extend_from_slice(pr.as_bytes());
70+
qbuf.extend_from_slice(q);
71+
*q = &mut qbuf[..];
72+
}
73+
(None, true) => {
74+
debug!("encoding to base64");
75+
qbuf.resize(q.len() * 3 / 2 + 3, 0);
76+
let r = base64::encode_config_slice(q, base64::STANDARD, &mut qbuf[..]);
77+
qbuf.resize(r, 0);
78+
qbuf.push(b'\n');
79+
*q = &mut qbuf[..];
80+
},
81+
(Some(pr), true) => {
82+
debug!("prepending prefix and encoding to base64");
83+
qbuf.extend_from_slice(pr.as_bytes());
84+
qbuf.resize(pr.len() + q.len() * 3 / 2 + 3, 0);
85+
let r = base64::encode_config_slice(q, base64::STANDARD, &mut qbuf[pr.len()..]);
86+
qbuf.resize(pr.len()+r, 0);
87+
qbuf.push(b'\n');
88+
*q = &mut qbuf[..];
89+
},
90+
}
91+
}
5792
loop {
5893
return match self.s.poll().map_err(io_other_error)? {
5994
Ready(Some(OwnedMessage::Close(_))) => {
@@ -97,14 +132,20 @@ impl<T: WsStream + 'static> Read for WsReadWrapper<T> {
97132
}
98133
Ready(Some(OwnedMessage::Text(x))) => {
99134
debug!("incoming text");
100-
match self.debt.process_message(buf, x.as_str().as_bytes()) {
135+
let mut qbuf : Vec<u8> = vec![];
136+
let mut q : &[u8] = x.as_str().as_bytes();
137+
process_prefixes_and_base64(&mut qbuf, &mut q, &self.text_prefix, self.text_base64);
138+
match self.debt.process_message(buf, q) {
101139
ProcessMessageResult::Return(x) => x,
102140
ProcessMessageResult::Recurse => continue,
103141
}
104142
}
105143
Ready(Some(OwnedMessage::Binary(x))) => {
106144
debug!("incoming binary");
107-
match self.debt.process_message(buf, x.as_slice()) {
145+
let mut qbuf : Vec<u8> = vec![];
146+
let mut q : &[u8] = x.as_slice();
147+
process_prefixes_and_base64(&mut qbuf, &mut q, &self.binary_prefix, self.binary_base64);
148+
match self.debt.process_message(buf, q) {
108149
ProcessMessageResult::Return(x) => x,
109150
ProcessMessageResult::Recurse => continue,
110151
}
@@ -135,14 +176,23 @@ pub enum Mode1 {
135176
Binary,
136177
}
137178

138-
pub struct WsWriteWrapper<T: WsStream + 'static>(pub MultiProducerWsSink<T>, pub Mode1, pub bool);
179+
pub struct WsWriteWrapper<T: WsStream + 'static> {
180+
pub sink: MultiProducerWsSink<T>,
181+
pub mode: Mode1,
182+
pub close_on_shutdown: bool,
183+
184+
pub text_prefix: Option<String>,
185+
pub binary_prefix: Option<String>,
186+
pub binary_base64: bool,
187+
pub text_base64: bool,
188+
}
139189

140190
impl<T: WsStream + 'static> AsyncWrite for WsWriteWrapper<T> {
141191
fn shutdown(&mut self) -> futures::Poll<(), std::io::Error> {
142-
if !self.2 {
192+
if !self.close_on_shutdown {
143193
return Ok(Ready(()));
144194
}
145-
let mut sink = self.0.borrow_mut();
195+
let mut sink = self.sink.borrow_mut();
146196
match sink
147197
.start_send(OwnedMessage::Close(None))
148198
.map_err(io_other_error)?
@@ -160,16 +210,56 @@ impl<T: WsStream + 'static> AsyncWrite for WsWriteWrapper<T> {
160210
}
161211

162212
impl<T: WsStream + 'static> Write for WsWriteWrapper<T> {
163-
fn write(&mut self, buf: &[u8]) -> IoResult<usize> {
164-
let om = match self.1 {
213+
fn write(&mut self, buf_: &[u8]) -> IoResult<usize> {
214+
let bufv;
215+
let mut effective_mode = self.mode;
216+
217+
let mut buf : &[u8] = buf_;
218+
219+
let origlen = buf.len();
220+
221+
if let Some(pr) = &self.text_prefix {
222+
if buf.starts_with(pr.as_bytes()) {
223+
effective_mode = Mode1::Text;
224+
buf = &buf[pr.len()..];
225+
}
226+
}
227+
if let Some(pr) = &self.binary_prefix {
228+
if buf.starts_with(pr.as_bytes()) {
229+
effective_mode = Mode1::Binary;
230+
buf = &buf[pr.len()..];
231+
}
232+
}
233+
234+
let decode_base64 = match effective_mode {
235+
Mode1::Binary => self.binary_base64,
236+
Mode1::Text => self.text_base64,
237+
};
238+
239+
if decode_base64 {
240+
if buf.last() == Some(&b'\n') {
241+
buf = &buf[..(buf.len()-1)];
242+
}
243+
if buf.last() == Some(&b'\r') {
244+
buf = &buf[..(buf.len()-1)];
245+
}
246+
if let Ok(v) = base64::decode(buf) {
247+
bufv = v;
248+
buf = &bufv[..];
249+
} else {
250+
error!("Failed to decode user-supplised base64 buffer. Sending message as is.");
251+
}
252+
}
253+
254+
let om = match effective_mode {
165255
Mode1::Binary => OwnedMessage::Binary(buf.to_vec()),
166256
Mode1::Text => {
167257
let text_tmp;
168258
let text = match ::std::str::from_utf8(buf) {
169259
Ok(x) => x,
170260
Err(_) => {
171261
error!(
172-
"Invalid UTF-8 in --text mode. Sending lossy data. May be \
262+
"Invalid UTF-8 in a text WebSocket message. Sending lossy data. May be \
173263
caused by unlucky buffer splits."
174264
);
175265
text_tmp = String::from_utf8_lossy(buf);
@@ -179,14 +269,14 @@ impl<T: WsStream + 'static> Write for WsWriteWrapper<T> {
179269
OwnedMessage::Text(text.to_string())
180270
}
181271
};
182-
match self.0.borrow_mut().start_send(om).map_err(io_other_error)? {
272+
match self.sink.borrow_mut().start_send(om).map_err(io_other_error)? {
183273
futures::AsyncSink::NotReady(_) => wouldblock(),
184-
futures::AsyncSink::Ready => Ok(buf.len()),
274+
futures::AsyncSink::Ready => Ok(origlen),
185275
}
186276
}
187277
fn flush(&mut self) -> IoResult<()> {
188278
match self
189-
.0
279+
.sink
190280
.borrow_mut()
191281
.poll_complete()
192282
.map_err(io_other_error)?
@@ -362,8 +452,21 @@ pub fn finish_building_ws_peer<S>(opts: &super::Options, duplex: Duplex<S>, clos
362452
debt: super::readdebt::ReadDebt(Default::default(), opts.read_debt_handling, zmsgh),
363453
pong_timeout,
364454
ping_aborter,
455+
text_prefix: opts.ws_text_prefix.clone(),
456+
binary_prefix: opts.ws_binary_prefix.clone(),
457+
binary_base64: opts.ws_binary_base64,
458+
text_base64: opts.ws_text_base64,
459+
};
460+
let ws_sin = WsWriteWrapper{
461+
sink: mpsink,
462+
mode: mode1,
463+
close_on_shutdown,
464+
465+
text_prefix: opts.ws_text_prefix.clone(),
466+
binary_prefix: opts.ws_binary_prefix.clone(),
467+
binary_base64: opts.ws_binary_base64,
468+
text_base64: opts.ws_text_base64,
365469
};
366-
let ws_sin = WsWriteWrapper(mpsink, mode1, close_on_shutdown);
367470

368471
Peer::new(ws_str, ws_sin, hup)
369472
}

0 commit comments

Comments
 (0)