From 9f867361d200d4230822cfb409b58fb92c44655e Mon Sep 17 00:00:00 2001 From: Divy Srivastava Date: Fri, 24 Nov 2023 10:19:23 +0530 Subject: [PATCH 1/3] Faster wasm streaming in Rust --- ext/fetch/lib.rs | 1 + ext/fetch/wasm_streaming.rs | 160 ++++++++++++++++++++++++++++++++++++ 2 files changed, 161 insertions(+) create mode 100644 ext/fetch/wasm_streaming.rs diff --git a/ext/fetch/lib.rs b/ext/fetch/lib.rs index 7cde5584f6bb7b..cf96a38cc094cf 100644 --- a/ext/fetch/lib.rs +++ b/ext/fetch/lib.rs @@ -1,6 +1,7 @@ // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. mod fs_fetch_handler; +mod wasm_streaming; use std::borrow::Cow; use std::cell::RefCell; diff --git a/ext/fetch/wasm_streaming.rs b/ext/fetch/wasm_streaming.rs new file mode 100644 index 00000000000000..3846b55c869af8 --- /dev/null +++ b/ext/fetch/wasm_streaming.rs @@ -0,0 +1,160 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. + +use deno_core::error::type_error; +use deno_core::error::AnyError; +use deno_core::v8; +use deno_core::OpState; +use deno_core::ResourceId; +use std::cell::RefCell; +use std::rc::Rc; + +pub fn handle_wasm_streaming( + state: Rc>, + scope: &mut v8::HandleScope, + value: v8::Local, + mut wasm_streaming: v8::WasmStreaming, +) { + let (url, rid) = match compile_response(scope, value) { + Ok(Some((url, rid))) => (url, rid), + Ok(None) => { + // 2.7 + wasm_streaming.finish(); + return; + } + Err(e) => { + // 2.8 + wasm_streaming.abort(None); + return; + } + }; + + wasm_streaming.set_url(&url); + + tokio::task::spawn_local(async move { + loop { + let resource = state.borrow().resource_table.get_any(rid); + let resource = match resource { + Ok(r) => r, + Err(_) => { + wasm_streaming.abort(None); + return; + } + }; + + let bytes = match resource.read(65536).await { + Ok(bytes) => bytes, + Err(e) => { + wasm_streaming.abort(None); + return; + } + }; + if bytes.is_empty() { + break; + } + + wasm_streaming.on_bytes_received(&bytes); + } + + wasm_streaming.finish(); + }); +} + +// Partially implements https://webassembly.github.io/spec/web-api/#compile-a-potential-webassembly-response +pub fn compile_response( + scope: &mut v8::HandleScope, + value: v8::Local, +) -> Result, AnyError> { + let object = value + .to_object(scope) + .ok_or_else(|| type_error("Response is not an object."))?; + let url = get_string(scope, object, "url")?; + + // 2.3. + // The spec is ambiguous here, see + // https://github.com/WebAssembly/spec/issues/1138. The WPT tests expect + // the raw value of the Content-Type attribute lowercased. We ignore this + // for file:// because file fetches don't have a Content-Type. + if !url.starts_with("file://") { + let headers = get_value(scope, object, "headers")?; + let content_type = call_method(scope, headers, "get", "Content-Type")?; + + if content_type.to_lowercase() != "application/wasm" { + return Err(type_error("Response is not a wasm file.")); + } + } + + // 2.5 + let ok = get_value(scope, object, "ok")?; + if !ok.is_true() { + return Err(type_error("Response is not ok.")); + } + + let body = get_value(scope, object, "body")?; + + if body.is_null() { + return Ok(None); + } + let body = body + .to_object(scope) + .ok_or_else(|| type_error("Failed to get body object."))?; + let rid = get_value(scope, body, "rid")? + .to_uint32(scope) + .ok_or_else(|| type_error("Failed to get rid."))? + .value() as ResourceId; + + Ok(Some((url, rid))) +} + +fn get_value<'a, 'b>( + scope: &'b mut v8::HandleScope<'a>, + obj: v8::Local<'a, v8::Object>, + key: &'static str, +) -> Result, AnyError> { + let key = v8::String::new(scope, key) + .ok_or_else(|| type_error("Failed to create key."))?; + Ok( + obj + .get(scope, key.into()) + .ok_or_else(|| type_error("Failed to get value."))?, + ) +} + +fn get_string( + scope: &mut v8::HandleScope, + obj: v8::Local, + key: &'static str, +) -> Result { + let key = v8::String::new(scope, key) + .ok_or_else(|| type_error("Failed to create key."))?; + let value = obj + .get(scope, key.into()) + .ok_or_else(|| type_error("Failed to get value."))?; + + Ok(value.to_rust_string_lossy(scope)) +} + +fn call_method<'a>( + scope: &mut v8::HandleScope<'a>, + obj: v8::Local<'a, v8::Value>, + method: &'static str, + arg: &'static str, +) -> Result { + let key = v8::String::new(scope, method) + .ok_or_else(|| type_error("Failed to create key."))?; + let function = obj + .to_object(scope) + .ok_or_else(|| type_error("Failed to create object."))?; + let function = function + .get(scope, key.into()) + .ok_or_else(|| type_error("Failed to get value."))?; + let function: v8::Local = function.try_into()?; + let arg = v8::String::new(scope, arg) + .ok_or_else(|| type_error("Failed to create arg."))?; + let this = v8::undefined(scope).into(); + Ok( + function + .call(scope, this, &[arg.into()]) + .ok_or_else(|| type_error("Failed to call."))? + .to_rust_string_lossy(scope), + ) +} From ceb5dc6bdbe2981eb964bb0204916fb871d53f6a Mon Sep 17 00:00:00 2001 From: Divy Srivastava Date: Fri, 24 Nov 2023 13:02:07 +0530 Subject: [PATCH 2/3] Update --- ext/fetch/26_fetch.js | 2 ++ ext/fetch/lib.rs | 1 + ext/fetch/wasm_streaming.rs | 20 +++++++++++++------- runtime/js/99_main.js | 1 - runtime/worker.rs | 3 +++ 5 files changed, 19 insertions(+), 8 deletions(-) diff --git a/ext/fetch/26_fetch.js b/ext/fetch/26_fetch.js index e586d9a3a26612..c3ad69322c4f75 100644 --- a/ext/fetch/26_fetch.js +++ b/ext/fetch/26_fetch.js @@ -96,6 +96,8 @@ function opFetchSend(rid) { */ function createResponseBodyStream(responseBodyRid, terminator) { const readable = readableStreamForRid(responseBodyRid); + // internal, used by wasm streaming + readable.rid = responseBodyRid; function onAbort() { errorReadableStream(readable, terminator.reason); diff --git a/ext/fetch/lib.rs b/ext/fetch/lib.rs index cf96a38cc094cf..8e301a72045e27 100644 --- a/ext/fetch/lib.rs +++ b/ext/fetch/lib.rs @@ -70,6 +70,7 @@ pub use data_url; pub use reqwest; pub use fs_fetch_handler::FsFetchHandler; +pub use wasm_streaming::handle_wasm_streaming; #[derive(Clone)] pub struct Options { diff --git a/ext/fetch/wasm_streaming.rs b/ext/fetch/wasm_streaming.rs index 3846b55c869af8..4ecc12ced38481 100644 --- a/ext/fetch/wasm_streaming.rs +++ b/ext/fetch/wasm_streaming.rs @@ -8,6 +8,7 @@ use deno_core::ResourceId; use std::cell::RefCell; use std::rc::Rc; +// The Wasm streaming compilation pipeline. pub fn handle_wasm_streaming( state: Rc>, scope: &mut v8::HandleScope, @@ -23,38 +24,44 @@ pub fn handle_wasm_streaming( } Err(e) => { // 2.8 - wasm_streaming.abort(None); + let err = v8::String::new(scope, &e.to_string()).unwrap(); + wasm_streaming.abort(Some(err.into())); return; } }; wasm_streaming.set_url(&url); - tokio::task::spawn_local(async move { + deno_core::unsync::spawn(async move { + let view = deno_core::BufMutView::new(65536); loop { let resource = state.borrow().resource_table.get_any(rid); let resource = match resource { Ok(r) => r, Err(_) => { + /* TODO(littledivy): propgate err */ wasm_streaming.abort(None); return; } }; - let bytes = match resource.read(65536).await { + let (bytes, view) = match resource.read_byob(view).await { Ok(bytes) => bytes, Err(e) => { + println!("error reading wasm resource: {}", e); + /* TODO(littledivy): propgate err */ wasm_streaming.abort(None); return; } }; - if bytes.is_empty() { + if bytes == 0 { break; } - wasm_streaming.on_bytes_received(&bytes); + wasm_streaming.on_bytes_received(&view); } + /* XXX: crashes here if module compilation fails */ wasm_streaming.finish(); }); } @@ -150,10 +157,9 @@ fn call_method<'a>( let function: v8::Local = function.try_into()?; let arg = v8::String::new(scope, arg) .ok_or_else(|| type_error("Failed to create arg."))?; - let this = v8::undefined(scope).into(); Ok( function - .call(scope, this, &[arg.into()]) + .call(scope, obj, &[arg.into()]) .ok_or_else(|| type_error("Failed to call."))? .to_rust_string_lossy(scope), ) diff --git a/runtime/js/99_main.js b/runtime/js/99_main.js index 89296d632f6174..da24ddd375bb90 100644 --- a/runtime/js/99_main.js +++ b/runtime/js/99_main.js @@ -332,7 +332,6 @@ function runtimeStart( ) { core.setMacrotaskCallback(timers.handleTimerMacrotask); core.setMacrotaskCallback(promiseRejectMacrotaskCallback); - core.setWasmStreamingCallback(fetch.handleWasmStreaming); core.setReportExceptionCallback(event.reportException); ops.op_set_format_exception_callback(formatException); version.setVersions( diff --git a/runtime/worker.rs b/runtime/worker.rs index c60e189f78fe1b..e9328d29e296e7 100644 --- a/runtime/worker.rs +++ b/runtime/worker.rs @@ -501,6 +501,9 @@ impl MainWorker { let bootstrap_fn = v8::Local::new(scope, bootstrap_fn); let undefined = v8::undefined(scope); bootstrap_fn.call(scope, undefined.into(), &[args]).unwrap(); + + // Set Wasm streaming callback + deno_core::set_wasm_streaming_callback(scope, deno_fetch::handle_wasm_streaming); } /// See [JsRuntime::execute_script](deno_core::JsRuntime::execute_script) From 07b6baaf94ea25413fec9f3fac8cc8b86faccfbb Mon Sep 17 00:00:00 2001 From: Divy Srivastava Date: Tue, 28 Nov 2023 09:21:08 +0530 Subject: [PATCH 3/3] use v8 task spawner --- ext/fetch/wasm_streaming.rs | 41 ++++++++++++++++++++++++++++--------- runtime/worker.rs | 5 ++++- 2 files changed, 35 insertions(+), 11 deletions(-) diff --git a/ext/fetch/wasm_streaming.rs b/ext/fetch/wasm_streaming.rs index 4ecc12ced38481..4b2d9fb8d8da00 100644 --- a/ext/fetch/wasm_streaming.rs +++ b/ext/fetch/wasm_streaming.rs @@ -8,7 +8,7 @@ use deno_core::ResourceId; use std::cell::RefCell; use std::rc::Rc; -// The Wasm streaming compilation pipeline. +/// The Wasm streaming compilation pipeline. pub fn handle_wasm_streaming( state: Rc>, scope: &mut v8::HandleScope, @@ -33,36 +33,57 @@ pub fn handle_wasm_streaming( wasm_streaming.set_url(&url); deno_core::unsync::spawn(async move { - let view = deno_core::BufMutView::new(65536); loop { let resource = state.borrow().resource_table.get_any(rid); let resource = match resource { Ok(r) => r, Err(_) => { - /* TODO(littledivy): propgate err */ - wasm_streaming.abort(None); + state.borrow().borrow::().spawn( + move |scope| { + wasm_streaming.abort(Some( + v8::String::new(scope, "Failed to get resource.") + .unwrap() + .into(), + )) + }, + ); return; } }; + let view = deno_core::BufMutView::new(65536); let (bytes, view) = match resource.read_byob(view).await { Ok(bytes) => bytes, Err(e) => { - println!("error reading wasm resource: {}", e); - /* TODO(littledivy): propgate err */ - wasm_streaming.abort(None); + state.borrow().borrow::().spawn( + move |scope| { + wasm_streaming.abort(Some( + v8::String::new( + scope, + &format!("Error reading wasm resource: {}", e), + ) + .unwrap() + .into(), + )) + }, + ); + return; } }; + /* EOF */ if bytes == 0 { break; } - wasm_streaming.on_bytes_received(&view); + wasm_streaming.on_bytes_received(&view[..bytes]); } - /* XXX: crashes here if module compilation fails */ - wasm_streaming.finish(); + /* Spawn a task on JS loop to finish the streaming compilation */ + state + .borrow() + .borrow::() + .spawn(move |_| wasm_streaming.finish()); }); } diff --git a/runtime/worker.rs b/runtime/worker.rs index e9328d29e296e7..8ce050e50cdaa7 100644 --- a/runtime/worker.rs +++ b/runtime/worker.rs @@ -503,7 +503,10 @@ impl MainWorker { bootstrap_fn.call(scope, undefined.into(), &[args]).unwrap(); // Set Wasm streaming callback - deno_core::set_wasm_streaming_callback(scope, deno_fetch::handle_wasm_streaming); + deno_core::set_wasm_streaming_callback( + scope, + deno_fetch::handle_wasm_streaming, + ); } /// See [JsRuntime::execute_script](deno_core::JsRuntime::execute_script)