Skip to content

Commit

Permalink
feat(base): make able to trigger early drop with other resources (#465)
Browse files Browse the repository at this point in the history
* fix(ci): adjust log fileter

* chore: update `types/global.d.ts`

* feat(base): make able to trigger early drop with other resources

* chore: add integration tests
  • Loading branch information
nyannyacha authored Dec 25, 2024
1 parent 05fdb79 commit 667db65
Show file tree
Hide file tree
Showing 8 changed files with 283 additions and 81 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ env:
CARGO_TERM_COLOR: always
RUSTUP_MAX_RETRIES: 10
ORT_DYLIB_PATH: /tmp/onnxruntime/lib/libonnxruntime.so
RUST_LOG: event_worker=trace
RUST_LOG: sb_event_worker=trace

jobs:
cargo-fmt:
Expand Down
9 changes: 9 additions & 0 deletions crates/base/src/deno_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ pub struct RuntimeState {
pub event_loop_completed: Arc<AtomicFlag>,
pub terminated: Arc<AtomicFlag>,
pub found_inspector_session: Arc<AtomicFlag>,
pub mem_reached_half: Arc<AtomicFlag>,
}

impl RuntimeState {
Expand Down Expand Up @@ -1324,6 +1325,14 @@ where
}
}

if let Some(limit) = mem_state.limit {
if total_malloced_bytes >= limit / 2 {
state.mem_reached_half.raise();
} else {
state.mem_reached_half.lower();
}
}

if let Some(threshold_bytes) = beforeunload_mem_threshold.load().as_deref().copied()
{
let total_malloced_bytes = total_malloced_bytes as u64;
Expand Down
14 changes: 3 additions & 11 deletions crates/base/src/worker/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -460,8 +460,6 @@ impl WorkerPool {
if tx.send(Ok(CreateUserWorkerResult { key: uuid })).is_err() {
error!("main worker receiver dropped")
};

status.demand.fetch_add(1, Ordering::Release);
}
Err(err) => {
error!("{err:#}");
Expand Down Expand Up @@ -502,6 +500,8 @@ impl WorkerPool {
let cancel = worker.cancel.clone();
let (req_start_tx, req_end_tx) = profile.timing_tx_pair.clone();

profile.status.demand.fetch_add(1, Ordering::Release);

// Create a closure to handle the request and send the response
let request_handler = async move {
if !policy.is_per_worker() {
Expand Down Expand Up @@ -648,15 +648,7 @@ impl WorkerPool {
.get(&worker_uuid)
.map(|it| it.status.is_retired.clone())
{
Some(is_retired) if !is_retired.is_raised() => {
self.user_workers
.get(&worker_uuid)
.map(|it| it.status.demand.as_ref())
.unwrap()
.fetch_add(1, Ordering::Release);

Some(worker_uuid)
}
Some(is_retired) if !is_retired.is_raised() => Some(worker_uuid),

_ => {
self.retire(&worker_uuid);
Expand Down
Loading

0 comments on commit 667db65

Please sign in to comment.