Skip to content

Commit

Permalink
rrhttp: parse verb,path
Browse files Browse the repository at this point in the history
  • Loading branch information
slivingston committed Oct 7, 2024
1 parent 707c01a commit 01e7816
Showing 1 changed file with 93 additions and 1 deletion.
94 changes: 93 additions & 1 deletion src/bin/rrhttp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,66 @@ use tokio::net::{TcpListener, TcpStream};
use tokio::runtime::Builder;
use tokio::{signal, time};

#[derive(Debug, PartialEq)]
enum HttpVerb {
Get,
Post,
}

#[derive(Debug)]
struct Request {
verb: HttpVerb,
path: String,
body: Option<serde_json::Value>,
}

impl Request {
fn new(blob: &[u8]) -> Result<Self, Box<dyn std::error::Error>> {
let mut verb = None;
let mut path = None;
let mut protocol_match = false;
let mut first_line = true;
for line in String::from_utf8_lossy(blob).lines() {
for word in line.split_whitespace() {
if first_line {
if verb.is_none() {
if word == "GET" {
verb = Some(HttpVerb::Get);
} else if word == "POST" {
verb = Some(HttpVerb::Post);
} else {
return Err(format!("unsupported verb {}", word).into());
}
} else if path.is_none() {
path = Some(String::from(word));
} else if protocol_match {
return Err("too many words on first line".into());
} else if word == "HTTP/1.1" {
protocol_match = true;
} else {
return Err(format!("unexpected protocol specifier {}", word).into());
}
}
}
first_line = false;
}
if verb.is_none() {
return Err("no request verb".into());
}
if path.is_none() {
return Err("no request path".into());
}
if !protocol_match {
return Err("no valid protocol string".into());
}
Ok(Request {
verb: verb.unwrap(),
path: path.unwrap(),
body: None,
})
}
}

async fn x_to_y_nofilter(
prefix: String,
mut x: tokio::net::tcp::OwnedReadHalf,
Expand Down Expand Up @@ -58,6 +118,38 @@ async fn x_to_y_nofilter(
}
}

async fn x_to_y_filter(
prefix: String,
mut x: tokio::net::tcp::OwnedReadHalf,
mut y: tokio::net::tcp::OwnedWriteHalf,
) {
let mut buf = [0; 1024];
loop {
let n = x.read(&mut buf).await.unwrap();
if n == 0 {
warn!("{}: read 0 bytes; exiting...", prefix);
return;
}
debug!("{}: read {} bytes", prefix, n);
let req = match Request::new(&buf[..]) {
Ok(r) => r,
Err(err) => {
warn!("{}", err);
continue;
}
};
match y.write(&buf[..n]).await {
Ok(n) => {
debug!("{}: wrote {} bytes", prefix, n);
}
Err(err) => {
error!("{}: error on write: {}", prefix, err);
return;
}
}
}
}

async fn main_per(ingress: TcpStream, egress: TcpStream) {
let ingress_peer_addr = ingress.peer_addr().unwrap();
let egress_peer_addr = egress.peer_addr().unwrap();
Expand All @@ -67,7 +159,7 @@ async fn main_per(ingress: TcpStream, egress: TcpStream) {
);
let (ingress_read, ingress_write) = ingress.into_split();
let (egress_read, egress_write) = egress.into_split();
let in_to_e = tokio::spawn(x_to_y_nofilter(
let in_to_e = tokio::spawn(x_to_y_filter(
format!("{} to {}", ingress_peer_addr, egress_peer_addr),
ingress_read,
egress_write,
Expand Down

0 comments on commit 01e7816

Please sign in to comment.