Skip to content

Commit 271a94a

Browse files
authored
feat: proactively detect when stdout goes away on Unix / fixes for streaming command output (#53)
1 parent a6b4aab commit 271a94a

File tree

5 files changed

+153
-25
lines changed

5 files changed

+153
-25
lines changed

Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+3
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,9 @@ webpki-roots = "0.26.6"
5858
base64 = "0.22.1"
5959
tracing-subscriber = "0.3.19"
6060

61+
[target.'cfg(unix)'.dependencies]
62+
nix = { version = "0.29", default-features = false, features = ["poll"] }
63+
6164
[dev-dependencies]
6265
assert_cmd = "2.0.14"
6366
duct = "0.13.7"

src/commands/serve.rs

+80-12
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,27 @@ use scru128::Scru128Id;
22
use std::collections::HashMap;
33
use tracing::instrument;
44

5+
use serde::{Deserialize, Serialize};
6+
57
use crate::error::Error;
68
use crate::nu;
79
use crate::nu::commands;
810
use crate::nu::util::value_to_json;
9-
use crate::store::{FollowOption, Frame, ReadOptions, Store};
11+
use crate::store::{FollowOption, Frame, ReadOptions, Store, TTL};
12+
13+
// TODO: DRY with handlers
14+
#[derive(Clone, Debug, Serialize, Deserialize, Default)]
15+
pub struct ReturnOptions {
16+
pub suffix: Option<String>,
17+
pub ttl: Option<TTL>,
18+
}
1019

1120
#[derive(Clone)]
1221
struct Command {
1322
id: Scru128Id,
1423
engine: nu::Engine,
1524
definition: String,
25+
return_options: Option<ReturnOptions>,
1626
}
1727

1828
async fn handle_define(
@@ -59,7 +69,6 @@ pub async fn serve(
5969
if frame.topic == "xs.threshold" {
6070
break;
6171
}
62-
6372
if let Some(name) = frame.topic.strip_suffix(".define") {
6473
handle_define(&frame, name, &base_engine, &store, &mut commands).await;
6574
}
@@ -70,8 +79,23 @@ pub async fn serve(
7079
if let Some(name) = frame.topic.strip_suffix(".define") {
7180
handle_define(&frame, name, &base_engine, &store, &mut commands).await;
7281
} else if let Some(name) = frame.topic.strip_suffix(".call") {
73-
if let Some(command) = commands.get(name) {
74-
execute_command(command.clone(), frame, &store).await?;
82+
let name = name.to_owned();
83+
if let Some(command) = commands.get(&name) {
84+
let store = store.clone();
85+
let frame = frame.clone();
86+
let command = command.clone();
87+
tokio::spawn(async move {
88+
if let Err(e) = execute_command(command, &frame, &store).await {
89+
tracing::error!("Failed to execute command '{}': {:?}", name, e);
90+
let _ = store.append(
91+
Frame::builder(format!("{}.error", name), frame.context_id)
92+
.meta(serde_json::json!({
93+
"error": e.to_string(),
94+
}))
95+
.build(),
96+
);
97+
}
98+
});
7599
}
76100
}
77101
}
@@ -86,12 +110,12 @@ async fn register_command(
86110
) -> Result<Command, Error> {
87111
// Get definition from CAS
88112
let hash = frame.hash.as_ref().ok_or("Missing hash field")?;
89-
let definition = store.cas_read(hash).await?;
90-
let definition = String::from_utf8(definition)?;
113+
let definition_bytes = store.cas_read(hash).await?;
114+
let definition = String::from_utf8(definition_bytes)?;
91115

92116
let mut engine = base_engine.clone();
93117

94-
// Add addtional commands, scoped to this command's context
118+
// Add additional commands, scoped to this command's context
95119
engine.add_commands(vec![
96120
Box::new(commands::cat_command::CatCommand::new(
97121
store.clone(),
@@ -103,10 +127,14 @@ async fn register_command(
103127
)),
104128
])?;
105129

130+
// Parse the command configuration to extract return_options (ignore the process closure here)
131+
let (_closure, return_options) = parse_command_definition(&mut engine, &definition)?;
132+
106133
Ok(Command {
107134
id: frame.id,
108135
engine,
109136
definition,
137+
return_options,
110138
})
111139
}
112140

@@ -120,8 +148,9 @@ async fn register_command(
120148
)
121149
)
122150
)]
123-
async fn execute_command(command: Command, frame: Frame, store: &Store) -> Result<(), Error> {
151+
async fn execute_command(command: Command, frame: &Frame, store: &Store) -> Result<(), Error> {
124152
let store = store.clone();
153+
let frame = frame.clone();
125154

126155
tokio::task::spawn_blocking(move || {
127156
let base_meta = serde_json::json!({
@@ -139,19 +168,34 @@ async fn execute_command(command: Command, frame: Frame, store: &Store) -> Resul
139168
),
140169
)])?;
141170

142-
let closure = parse_command_definition(&mut engine, &command.definition)?;
171+
let (closure, _) = parse_command_definition(&mut engine, &command.definition)?;
143172

144173
// Run command and process pipeline
145174
match run_command(&engine, closure, &frame) {
146175
Ok(pipeline_data) => {
176+
let recv_suffix = command
177+
.return_options
178+
.as_ref()
179+
.and_then(|opts| opts.suffix.as_deref())
180+
.unwrap_or(".recv");
181+
let ttl = command
182+
.return_options
183+
.as_ref()
184+
.and_then(|opts| opts.ttl.clone());
185+
147186
// Process each value as a .recv event
148187
for value in pipeline_data {
149188
let hash = store.cas_insert_sync(value_to_json(&value).to_string())?;
150189
let _ = store.append(
151190
Frame::builder(
152-
format!("{}.recv", frame.topic.strip_suffix(".call").unwrap()),
191+
format!(
192+
"{}{}",
193+
frame.topic.strip_suffix(".call").unwrap(),
194+
recv_suffix
195+
),
153196
frame.context_id,
154197
)
198+
.maybe_ttl(ttl.clone())
155199
.hash(hash)
156200
.meta(serde_json::json!({
157201
"command_id": command.id.to_string(),
@@ -224,7 +268,7 @@ fn run_command(
224268
fn parse_command_definition(
225269
engine: &mut nu::Engine,
226270
script: &str,
227-
) -> Result<nu_protocol::engine::Closure, Error> {
271+
) -> Result<(nu_protocol::engine::Closure, Option<ReturnOptions>), Error> {
228272
let mut working_set = nu_protocol::engine::StateWorkingSet::new(&engine.state);
229273
let block = nu_parser::parse(&mut working_set, None, script.as_bytes(), false);
230274

@@ -240,12 +284,36 @@ fn parse_command_definition(
240284

241285
let config = result.into_value(nu_protocol::Span::unknown())?;
242286

287+
// Get the process closure (required)
243288
let process = config
244289
.get_data_by_key("process")
245290
.ok_or("No 'process' field found in command configuration")?
246291
.into_closure()?;
247292

293+
// Optionally parse return_options (using the same approach as in handlers)
294+
let return_options = if let Some(return_config) = config.get_data_by_key("return_options") {
295+
let record = return_config
296+
.as_record()
297+
.map_err(|_| "return must be a record")?;
298+
299+
let suffix = record
300+
.get("suffix")
301+
.map(|v| v.as_str().map_err(|_| "suffix must be a string"))
302+
.transpose()?
303+
.map(String::from);
304+
305+
let ttl = record
306+
.get("ttl")
307+
.map(|v| serde_json::from_str(&value_to_json(v).to_string()))
308+
.transpose()
309+
.map_err(|e| format!("invalid TTL: {}", e))?;
310+
311+
Some(ReturnOptions { suffix, ttl })
312+
} else {
313+
None
314+
};
315+
248316
engine.state.merge_env(&mut stack)?;
249317

250-
Ok(process)
318+
Ok((process, return_options))
251319
}

src/main.rs

+67-11
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,6 @@ async fn cat(args: CommandCat) -> Result<(), Box<dyn std::error::Error + Send +
247247
.as_deref()
248248
.and_then(|context| scru128::Scru128Id::from_str(context).ok())
249249
.or_else(|| (!args.all).then_some(ZERO_CONTEXT));
250-
251250
let last_id = if let Some(last_id) = &args.last_id {
252251
match scru128::Scru128Id::from_str(last_id) {
253252
Ok(id) => Some(id),
@@ -256,7 +255,6 @@ async fn cat(args: CommandCat) -> Result<(), Box<dyn std::error::Error + Send +
256255
} else {
257256
None
258257
};
259-
260258
// Build options in one chain
261259
let options = ReadOptions::builder()
262260
.tail(args.tail)
@@ -271,19 +269,77 @@ async fn cat(args: CommandCat) -> Result<(), Box<dyn std::error::Error + Send +
271269
.maybe_limit(args.limit.map(|l| l as usize))
272270
.maybe_context_id(context_id)
273271
.build();
274-
275272
let mut receiver = xs::client::cat(&args.addr, options, args.sse).await?;
276273
let mut stdout = tokio::io::stdout();
277274

278-
match async {
279-
while let Some(bytes) = receiver.recv().await {
280-
stdout.write_all(&bytes).await?;
281-
stdout.flush().await?;
275+
#[cfg(unix)]
276+
let result = {
277+
use nix::unistd::dup;
278+
use std::io::Write;
279+
use std::os::unix::io::{AsRawFd, FromRawFd};
280+
use tokio::io::unix::AsyncFd;
281+
282+
let stdout_fd = std::io::stdout().as_raw_fd();
283+
// Create a duplicate of the file descriptor so we can check it separately
284+
let dup_fd = dup(stdout_fd)?;
285+
let stdout_file = unsafe { std::fs::File::from_raw_fd(dup_fd) };
286+
let async_fd = AsyncFd::new(stdout_file)?;
287+
288+
async {
289+
loop {
290+
tokio::select! {
291+
maybe_bytes = receiver.recv() => {
292+
match maybe_bytes {
293+
Some(bytes) => {
294+
if let Err(e) = stdout.write_all(&bytes).await {
295+
if e.kind() == std::io::ErrorKind::BrokenPipe {
296+
break;
297+
}
298+
return Err(e);
299+
}
300+
stdout.flush().await?;
301+
}
302+
None => break,
303+
}
304+
},
305+
Ok(mut guard) = async_fd.writable() => {
306+
// Try a zero-byte write to check if stdout is closed
307+
match guard.try_io(|inner| {
308+
// We're just doing a test write to check for EPIPE
309+
match inner.get_ref().write(&[]) {
310+
Ok(_) => Ok(false), // Stdout is still open
311+
Err(e) if e.kind() == std::io::ErrorKind::BrokenPipe => Ok(true), // Stdout closed
312+
Err(e) => Err(e), // Other error
313+
}
314+
}) {
315+
Ok(Ok(true)) => break, // Stdout is closed
316+
Ok(Ok(false)) => { /* Stdout still open */ },
317+
Ok(Err(e)) => return Err(e), // Error occurred
318+
Err(_) => { /* Would block, try again later */ }
319+
}
320+
321+
guard.clear_ready();
322+
}
323+
}
324+
}
325+
Ok::<_, std::io::Error>(())
282326
}
283-
Ok::<_, std::io::Error>(())
284-
}
285-
.await
286-
{
327+
.await
328+
};
329+
330+
#[cfg(not(unix))]
331+
let result = {
332+
async {
333+
while let Some(bytes) = receiver.recv().await {
334+
stdout.write_all(&bytes).await?;
335+
stdout.flush().await?;
336+
}
337+
Ok::<_, std::io::Error>(())
338+
}
339+
.await
340+
};
341+
342+
match result {
287343
Ok(_) => Ok(()),
288344
Err(e) if e.kind() == std::io::ErrorKind::BrokenPipe => Ok(()),
289345
Err(e) => Err(e.into()),

xs.nu

+2-2
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ def conditional-pipe [
1717
condition: bool
1818
action: closure
1919
] {
20-
if $condition { do $action } else { $in }
20+
if $condition { do $action } else { }
2121
}
2222

2323
export def xs-addr [] {
@@ -94,7 +94,7 @@ export def .cat [
9494
limit: $limit
9595
context: (if not $all { (xs-context $context (metadata $context).span) })
9696
all: $all
97-
} | conditional-pipe (not ($detail or $all)) { reject context_id ttl }
97+
} | conditional-pipe (not ($detail or $all)) { each { reject context_id ttl } }
9898
}
9999

100100
def read_hash [hash?: any] {

0 commit comments

Comments
 (0)