diff --git a/src/net_protocol.rs b/src/net_protocol.rs index 83d4c72ab..5fc5e3ea2 100644 --- a/src/net_protocol.rs +++ b/src/net_protocol.rs @@ -55,6 +55,7 @@ impl Default for GcState { #[derive(Debug)] pub struct Blobs { rt: LocalPoolHandle, + pub(crate) tokio_rt: tokio::runtime::Handle, store: S, events: EventSender, downloader: Downloader, @@ -125,6 +126,7 @@ pub struct Builder { store: S, events: Option, gc_config: Option, + tokio_rt: Option, } impl Builder { @@ -139,6 +141,12 @@ impl Builder { self } + /// Set the tokio runtime handle to use for the rpc handler + pub fn tokio_rt(mut self, value: tokio::runtime::Handle) -> Self { + self.tokio_rt = Some(value); + self + } + /// Build the Blobs protocol handler. /// You need to provide a local pool handle and an endpoint. pub fn build(self, rt: &LocalPoolHandle, endpoint: &Endpoint) -> Arc> { @@ -149,6 +157,8 @@ impl Builder { self.events.unwrap_or_default(), downloader, endpoint.clone(), + self.tokio_rt + .unwrap_or_else(|| tokio::runtime::Handle::current()), )) } } @@ -160,6 +170,7 @@ impl Blobs { store, events: None, gc_config: None, + tokio_rt: None, } } } @@ -187,9 +198,11 @@ impl Blobs { events: EventSender, downloader: Downloader, endpoint: Endpoint, + tokio_rt: tokio::runtime::Handle, ) -> Self { Self { rt, + tokio_rt, store, events, downloader, diff --git a/src/rpc.rs b/src/rpc.rs index 04f9d00bc..6b467bc5f 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -901,8 +901,11 @@ impl RpcHandler { let (listener, connector) = quic_rpc::transport::flume::channel(1); let listener = RpcServer::new(listener); let client = RpcClient::new(connector); - let _handler = listener - .spawn_accept_loop(move |req, chan| blobs.clone().handle_rpc_request(req, chan)); + let tokio_rt = blobs.tokio_rt.clone(); + let handler = tokio_rt.spawn( + listener.accept_loop(move |req, chan| blobs.clone().handle_rpc_request(req, chan)), + ); + let _handler = AbortOnDropHandle::new(handler); Self { client, _handler } } } diff --git a/src/rpc/client/blobs.rs b/src/rpc/client/blobs.rs index 3d8e1e182..8ba30382f 100644 --- a/src/rpc/client/blobs.rs +++ b/src/rpc/client/blobs.rs @@ -1074,6 +1074,7 @@ mod tests { events, downloader, endpoint.clone(), + tokio::runtime::Handle::current(), )); router = router.accept(crate::ALPN, blobs.clone());