Skip to content

Commit ad352be

Browse files
committed
Merge branch 'develop' into set-response-body
2 parents ac4cfd3 + d3bb304 commit ad352be

File tree

3 files changed

+100
-18
lines changed

3 files changed

+100
-18
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ repository = "https://github.com/hyperware-ai/process_lib"
99
license = "Apache-2.0"
1010

1111
[features]
12-
hyperapp = ["dep:futures-util", "dep:uuid", "logging"]
12+
hyperapp = ["dep:futures-util", "dep:futures-channel", "dep:uuid", "logging"]
1313
logging = ["dep:color-eyre", "dep:tracing", "dep:tracing-error", "dep:tracing-subscriber"]
1414
hyperwallet = ["dep:hex", "dep:sha3"]
1515
simulation-mode = []
@@ -42,6 +42,7 @@ url = "2.4.1"
4242
wit-bindgen = "0.42.1"
4343

4444
futures-util = { version = "0.3", optional = true }
45+
futures-channel = { version = "0.3", optional = true }
4546
uuid = { version = "1.0", features = ["v4"], optional = true }
4647

4748
color-eyre = { version = "0.6", features = ["capture-spantrace"], optional = true }

src/hyperapp.rs

Lines changed: 97 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
use std::cell::RefCell;
2-
use std::collections::HashMap;
2+
use std::collections::{HashMap, HashSet};
33
use std::future::Future;
44
use std::pin::Pin;
5+
use std::sync::{
6+
atomic::{AtomicBool, Ordering},
7+
Arc,
8+
};
59
use std::task::{Context, Poll};
610

711
use crate::{
@@ -10,21 +14,22 @@ use crate::{
1014
logging::{error, info},
1115
set_state, timer, Address, BuildError, LazyLoadBlob, Message, Request, SendError,
1216
};
13-
use futures_util::task::noop_waker_ref;
17+
use futures_channel::{mpsc, oneshot};
18+
use futures_util::task::{waker_ref, ArcWake};
1419
use serde::{Deserialize, Serialize};
1520
use thiserror::Error;
1621
use uuid::Uuid;
1722

1823
thread_local! {
1924
static SPAWN_QUEUE: RefCell<Vec<Pin<Box<dyn Future<Output = ()>>>>> = RefCell::new(Vec::new());
2025

21-
2226
pub static APP_CONTEXT: RefCell<AppContext> = RefCell::new(AppContext {
2327
hidden_state: None,
2428
executor: Executor::new(),
2529
});
2630

2731
pub static RESPONSE_REGISTRY: RefCell<HashMap<String, Vec<u8>>> = RefCell::new(HashMap::new());
32+
pub static CANCELLED_RESPONSES: RefCell<HashSet<String>> = RefCell::new(HashSet::new());
2833

2934
pub static APP_HELPERS: RefCell<AppHelpers> = RefCell::new(AppHelpers {
3035
current_server: None,
@@ -203,10 +208,53 @@ pub struct Executor {
203208
tasks: Vec<Pin<Box<dyn Future<Output = ()>>>>,
204209
}
205210

206-
pub fn spawn(fut: impl Future<Output = ()> + 'static) {
211+
struct ExecutorWakeFlag {
212+
triggered: AtomicBool,
213+
}
214+
215+
impl ExecutorWakeFlag {
216+
fn new() -> Self {
217+
Self {
218+
triggered: AtomicBool::new(false),
219+
}
220+
}
221+
222+
fn take(&self) -> bool {
223+
self.triggered.swap(false, Ordering::SeqCst)
224+
}
225+
}
226+
227+
impl ArcWake for ExecutorWakeFlag {
228+
fn wake_by_ref(arc_self: &Arc<Self>) {
229+
arc_self.triggered.store(true, Ordering::SeqCst);
230+
}
231+
}
232+
233+
pub struct JoinHandle<T> {
234+
receiver: oneshot::Receiver<T>,
235+
}
236+
237+
impl<T> Future for JoinHandle<T> {
238+
type Output = Result<T, oneshot::Canceled>;
239+
240+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
241+
let receiver = &mut self.get_mut().receiver;
242+
Pin::new(receiver).poll(cx)
243+
}
244+
}
245+
246+
pub fn spawn<T>(fut: impl Future<Output = T> + 'static) -> JoinHandle<T>
247+
where
248+
T: 'static,
249+
{
250+
let (sender, receiver) = oneshot::channel();
207251
SPAWN_QUEUE.with(|queue| {
208-
queue.borrow_mut().push(Box::pin(fut));
209-
})
252+
queue.borrow_mut().push(Box::pin(async move {
253+
let result = fut.await;
254+
let _ = sender.send(result);
255+
}));
256+
});
257+
JoinHandle { receiver }
210258
}
211259

212260
impl Executor {
@@ -215,19 +263,24 @@ impl Executor {
215263
}
216264

217265
pub fn poll_all_tasks(&mut self) {
266+
let wake_flag = Arc::new(ExecutorWakeFlag::new());
218267
loop {
219268
// Drain any newly spawned tasks into our task list
220269
SPAWN_QUEUE.with(|queue| {
221270
self.tasks.append(&mut queue.borrow_mut());
222271
});
223272

224-
// Poll all tasks, collecting completed ones
273+
// Poll all tasks, collecting completed ones.
274+
// Put waker into context so tasks can wake the executor if needed.
225275
let mut completed = Vec::new();
226-
let mut ctx = Context::from_waker(noop_waker_ref());
276+
{
277+
let waker = waker_ref(&wake_flag);
278+
let mut ctx = Context::from_waker(&waker);
227279

228-
for i in 0..self.tasks.len() {
229-
if let Poll::Ready(()) = self.tasks[i].as_mut().poll(&mut ctx) {
230-
completed.push(i);
280+
for i in 0..self.tasks.len() {
281+
if let Poll::Ready(()) = self.tasks[i].as_mut().poll(&mut ctx) {
282+
completed.push(i);
283+
}
231284
}
232285
}
233286

@@ -238,9 +291,10 @@ impl Executor {
238291

239292
// Check if there are new tasks spawned during polling
240293
let has_new_tasks = SPAWN_QUEUE.with(|queue| !queue.borrow().is_empty());
294+
// Check if any task woke the executor that needs to be re-polled
295+
let was_woken = wake_flag.take();
241296

242-
// Continue if new tasks were spawned, otherwise we're done
243-
if !has_new_tasks {
297+
if !has_new_tasks && !was_woken {
244298
break;
245299
}
246300
}
@@ -250,6 +304,7 @@ struct ResponseFuture {
250304
correlation_id: String,
251305
// Capture HTTP context at creation time
252306
http_context: Option<HttpRequestContext>,
307+
resolved: bool,
253308
}
254309

255310
impl ResponseFuture {
@@ -261,6 +316,7 @@ impl ResponseFuture {
261316
Self {
262317
correlation_id,
263318
http_context,
319+
resolved: false,
264320
}
265321
}
266322
}
@@ -269,16 +325,18 @@ impl Future for ResponseFuture {
269325
type Output = Vec<u8>;
270326

271327
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
272-
let correlation_id = &self.correlation_id;
328+
let this = self.get_mut();
273329

274330
let maybe_bytes = RESPONSE_REGISTRY.with(|registry| {
275331
let mut registry_mut = registry.borrow_mut();
276-
registry_mut.remove(correlation_id)
332+
registry_mut.remove(&this.correlation_id)
277333
});
278334

279335
if let Some(bytes) = maybe_bytes {
336+
this.resolved = true;
337+
280338
// Restore this future's captured context
281-
if let Some(ref context) = self.http_context {
339+
if let Some(ref context) = this.http_context {
282340
APP_HELPERS.with(|helpers| {
283341
helpers.borrow_mut().current_http_context = Some(context.clone());
284342
});
@@ -291,6 +349,23 @@ impl Future for ResponseFuture {
291349
}
292350
}
293351

352+
impl Drop for ResponseFuture {
353+
fn drop(&mut self) {
354+
// We want to avoid cleaning up after successful responses
355+
if self.resolved {
356+
return;
357+
}
358+
359+
RESPONSE_REGISTRY.with(|registry| {
360+
registry.borrow_mut().remove(&self.correlation_id);
361+
});
362+
363+
CANCELLED_RESPONSES.with(|set| {
364+
set.borrow_mut().insert(self.correlation_id.clone());
365+
});
366+
}
367+
}
368+
294369
#[derive(Debug, Clone, Serialize, Deserialize, Error)]
295370
pub enum AppSendError {
296371
#[error("SendError: {0}")]
@@ -477,12 +552,17 @@ where
477552

478553
pub fn setup_server(
479554
ui_config: Option<&HttpBindingConfig>,
555+
ui_path: Option<String>,
480556
endpoints: &[Binding],
481557
) -> http::server::HttpServer {
482558
let mut server = http::server::HttpServer::new(5);
483559

484560
if let Some(ui) = ui_config {
485-
if let Err(e) = server.serve_ui("ui", vec!["/"], ui.clone()) {
561+
if let Err(e) = server.serve_ui(
562+
&ui_path.unwrap_or_else(|| "ui".to_string()),
563+
vec!["/"],
564+
ui.clone(),
565+
) {
486566
panic!("failed to serve UI: {e}. Make sure that a ui folder is in /pkg");
487567
}
488568
}

0 commit comments

Comments
 (0)