Skip to content

Commit b390253

Browse files
committed
Merge branch 'set-response-body' into hf/dont-set-response-body
2 parents 625636a + ad352be commit b390253

File tree

3 files changed

+100
-18
lines changed

3 files changed

+100
-18
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ repository = "https://github.com/hyperware-ai/process_lib"
99
license = "Apache-2.0"
1010

1111
[features]
12-
hyperapp = ["dep:futures-util", "dep:uuid", "logging"]
12+
hyperapp = ["dep:futures-util", "dep:futures-channel", "dep:uuid", "logging"]
1313
logging = ["dep:color-eyre", "dep:tracing", "dep:tracing-error", "dep:tracing-subscriber"]
1414
hyperwallet = ["dep:hex", "dep:sha3"]
1515
simulation-mode = []
@@ -42,6 +42,7 @@ url = "2.4.1"
4242
wit-bindgen = "0.42.1"
4343

4444
futures-util = { version = "0.3", optional = true }
45+
futures-channel = { version = "0.3", optional = true }
4546
uuid = { version = "1.0", features = ["v4"], optional = true }
4647

4748
color-eyre = { version = "0.6", features = ["capture-spantrace"], optional = true }

src/hyperapp.rs

Lines changed: 97 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
use std::cell::RefCell;
2-
use std::collections::HashMap;
2+
use std::collections::{HashMap, HashSet};
33
use std::future::Future;
44
use std::pin::Pin;
5+
use std::sync::{
6+
atomic::{AtomicBool, Ordering},
7+
Arc,
8+
};
59
use std::task::{Context, Poll};
610

711
use crate::{
@@ -10,21 +14,22 @@ use crate::{
1014
logging::{error, info},
1115
set_state, timer, Address, BuildError, LazyLoadBlob, Message, Request, SendError,
1216
};
13-
use futures_util::task::noop_waker_ref;
17+
use futures_channel::{mpsc, oneshot};
18+
use futures_util::task::{waker_ref, ArcWake};
1419
use serde::{Deserialize, Serialize};
1520
use thiserror::Error;
1621
use uuid::Uuid;
1722

1823
thread_local! {
1924
static SPAWN_QUEUE: RefCell<Vec<Pin<Box<dyn Future<Output = ()>>>>> = RefCell::new(Vec::new());
2025

21-
2226
pub static APP_CONTEXT: RefCell<AppContext> = RefCell::new(AppContext {
2327
hidden_state: None,
2428
executor: Executor::new(),
2529
});
2630

2731
pub static RESPONSE_REGISTRY: RefCell<HashMap<String, Vec<u8>>> = RefCell::new(HashMap::new());
32+
pub static CANCELLED_RESPONSES: RefCell<HashSet<String>> = RefCell::new(HashSet::new());
2833

2934
pub static APP_HELPERS: RefCell<AppHelpers> = RefCell::new(AppHelpers {
3035
current_server: None,
@@ -173,10 +178,53 @@ pub struct Executor {
173178
tasks: Vec<Pin<Box<dyn Future<Output = ()>>>>,
174179
}
175180

176-
pub fn spawn(fut: impl Future<Output = ()> + 'static) {
181+
struct ExecutorWakeFlag {
182+
triggered: AtomicBool,
183+
}
184+
185+
impl ExecutorWakeFlag {
186+
fn new() -> Self {
187+
Self {
188+
triggered: AtomicBool::new(false),
189+
}
190+
}
191+
192+
fn take(&self) -> bool {
193+
self.triggered.swap(false, Ordering::SeqCst)
194+
}
195+
}
196+
197+
impl ArcWake for ExecutorWakeFlag {
198+
fn wake_by_ref(arc_self: &Arc<Self>) {
199+
arc_self.triggered.store(true, Ordering::SeqCst);
200+
}
201+
}
202+
203+
pub struct JoinHandle<T> {
204+
receiver: oneshot::Receiver<T>,
205+
}
206+
207+
impl<T> Future for JoinHandle<T> {
208+
type Output = Result<T, oneshot::Canceled>;
209+
210+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
211+
let receiver = &mut self.get_mut().receiver;
212+
Pin::new(receiver).poll(cx)
213+
}
214+
}
215+
216+
pub fn spawn<T>(fut: impl Future<Output = T> + 'static) -> JoinHandle<T>
217+
where
218+
T: 'static,
219+
{
220+
let (sender, receiver) = oneshot::channel();
177221
SPAWN_QUEUE.with(|queue| {
178-
queue.borrow_mut().push(Box::pin(fut));
179-
})
222+
queue.borrow_mut().push(Box::pin(async move {
223+
let result = fut.await;
224+
let _ = sender.send(result);
225+
}));
226+
});
227+
JoinHandle { receiver }
180228
}
181229

182230
impl Executor {
@@ -185,19 +233,24 @@ impl Executor {
185233
}
186234

187235
pub fn poll_all_tasks(&mut self) {
236+
let wake_flag = Arc::new(ExecutorWakeFlag::new());
188237
loop {
189238
// Drain any newly spawned tasks into our task list
190239
SPAWN_QUEUE.with(|queue| {
191240
self.tasks.append(&mut queue.borrow_mut());
192241
});
193242

194-
// Poll all tasks, collecting completed ones
243+
// Poll all tasks, collecting completed ones.
244+
// Put waker into context so tasks can wake the executor if needed.
195245
let mut completed = Vec::new();
196-
let mut ctx = Context::from_waker(noop_waker_ref());
246+
{
247+
let waker = waker_ref(&wake_flag);
248+
let mut ctx = Context::from_waker(&waker);
197249

198-
for i in 0..self.tasks.len() {
199-
if let Poll::Ready(()) = self.tasks[i].as_mut().poll(&mut ctx) {
200-
completed.push(i);
250+
for i in 0..self.tasks.len() {
251+
if let Poll::Ready(()) = self.tasks[i].as_mut().poll(&mut ctx) {
252+
completed.push(i);
253+
}
201254
}
202255
}
203256

@@ -208,9 +261,10 @@ impl Executor {
208261

209262
// Check if there are new tasks spawned during polling
210263
let has_new_tasks = SPAWN_QUEUE.with(|queue| !queue.borrow().is_empty());
264+
// Check if any task woke the executor that needs to be re-polled
265+
let was_woken = wake_flag.take();
211266

212-
// Continue if new tasks were spawned, otherwise we're done
213-
if !has_new_tasks {
267+
if !has_new_tasks && !was_woken {
214268
break;
215269
}
216270
}
@@ -220,6 +274,7 @@ struct ResponseFuture {
220274
correlation_id: String,
221275
// Capture HTTP context at creation time
222276
http_context: Option<HttpRequestContext>,
277+
resolved: bool,
223278
}
224279

225280
impl ResponseFuture {
@@ -231,6 +286,7 @@ impl ResponseFuture {
231286
Self {
232287
correlation_id,
233288
http_context,
289+
resolved: false,
234290
}
235291
}
236292
}
@@ -239,16 +295,18 @@ impl Future for ResponseFuture {
239295
type Output = Vec<u8>;
240296

241297
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
242-
let correlation_id = &self.correlation_id;
298+
let this = self.get_mut();
243299

244300
let maybe_bytes = RESPONSE_REGISTRY.with(|registry| {
245301
let mut registry_mut = registry.borrow_mut();
246-
registry_mut.remove(correlation_id)
302+
registry_mut.remove(&this.correlation_id)
247303
});
248304

249305
if let Some(bytes) = maybe_bytes {
306+
this.resolved = true;
307+
250308
// Restore this future's captured context
251-
if let Some(ref context) = self.http_context {
309+
if let Some(ref context) = this.http_context {
252310
APP_HELPERS.with(|helpers| {
253311
helpers.borrow_mut().current_http_context = Some(context.clone());
254312
});
@@ -261,6 +319,23 @@ impl Future for ResponseFuture {
261319
}
262320
}
263321

322+
impl Drop for ResponseFuture {
323+
fn drop(&mut self) {
324+
// We want to avoid cleaning up after successful responses
325+
if self.resolved {
326+
return;
327+
}
328+
329+
RESPONSE_REGISTRY.with(|registry| {
330+
registry.borrow_mut().remove(&self.correlation_id);
331+
});
332+
333+
CANCELLED_RESPONSES.with(|set| {
334+
set.borrow_mut().insert(self.correlation_id.clone());
335+
});
336+
}
337+
}
338+
264339
#[derive(Debug, Clone, Serialize, Deserialize, Error)]
265340
pub enum AppSendError {
266341
#[error("SendError: {0}")]
@@ -447,12 +522,17 @@ where
447522

448523
pub fn setup_server(
449524
ui_config: Option<&HttpBindingConfig>,
525+
ui_path: Option<String>,
450526
endpoints: &[Binding],
451527
) -> http::server::HttpServer {
452528
let mut server = http::server::HttpServer::new(5);
453529

454530
if let Some(ui) = ui_config {
455-
if let Err(e) = server.serve_ui("ui", vec!["/"], ui.clone()) {
531+
if let Err(e) = server.serve_ui(
532+
&ui_path.unwrap_or_else(|| "ui".to_string()),
533+
vec!["/"],
534+
ui.clone(),
535+
) {
456536
panic!("failed to serve UI: {e}. Make sure that a ui folder is in /pkg");
457537
}
458538
}

0 commit comments

Comments
 (0)