diff --git a/openhcl/diag_server/src/diag_service.rs b/openhcl/diag_server/src/diag_service.rs index cc64452c5b..62302870c2 100644 --- a/openhcl/diag_server/src/diag_service.rs +++ b/openhcl/diag_server/src/diag_service.rs @@ -522,9 +522,7 @@ impl DiagServiceHandler { let mut inspection = InspectionBuilder::new(&request.path) .depth(Some(request.depth as usize)) .sensitivity(self.inspect_sensitivity_level) - .inspect(inspect::adhoc(|req| { - self.request_send.send(DiagRequest::Inspect(req.defer())); - })); + .inspect(inspect::send(&self.request_send, DiagRequest::Inspect)); // Don't return early on cancel, as we want to return the partial // inspection results. @@ -544,9 +542,7 @@ impl DiagServiceHandler { .sensitivity(self.inspect_sensitivity_level) .update( &request.value, - inspect::adhoc(|req| { - self.request_send.send(DiagRequest::Inspect(req.defer())); - }), + inspect::send(&self.request_send, DiagRequest::Inspect), ) .await?; Ok(UpdateResponse2 { new_value }) diff --git a/openhcl/underhill_core/src/emuplat/netvsp.rs b/openhcl/underhill_core/src/emuplat/netvsp.rs index 3d015e5b8f..47ec89027d 100644 --- a/openhcl/underhill_core/src/emuplat/netvsp.rs +++ b/openhcl/underhill_core/src/emuplat/netvsp.rs @@ -825,7 +825,9 @@ pub struct HclNetworkVFManagerEndpointInfo { pub endpoint: Box, } +#[derive(Inspect)] struct HclNetworkVFManagerSharedState { + #[inspect(flatten, send = "HclNetworkVfManagerMessage::Inspect")] worker_channel: mesh::Sender, } @@ -833,19 +835,14 @@ enum HclNetworkVFUpdateNotification { Update(Rpc<(), ()>), } +#[derive(Inspect)] pub struct HclNetworkVFManager { + #[inspect(flatten)] shared_state: Arc, + #[inspect(skip)] _task: Task<()>, } -impl Inspect for HclNetworkVFManager { - fn inspect(&self, req: inspect::Request<'_>) { - self.shared_state - .worker_channel - .send(HclNetworkVfManagerMessage::Inspect(req.defer())) - } -} - impl HclNetworkVFManager { pub async fn new( vtl2_vf_instance_id: Guid, diff --git a/openhcl/underhill_core/src/inspect_internal.rs b/openhcl/underhill_core/src/inspect_internal.rs index ba1da255f5..e73292714d 100644 --- a/openhcl/underhill_core/src/inspect_internal.rs +++ b/openhcl/underhill_core/src/inspect_internal.rs @@ -43,17 +43,17 @@ use pal_async::task::Spawn; pub(crate) fn inspect_internal_diagnostics( req: Request<'_>, - reinspect: Sender, - driver: DefaultDriver, + reinspect: &Sender, + driver: &DefaultDriver, ) { req.respond() .sensitivity_field("build_info", SensitivityLevel::Safe, build_info::get()) .sensitivity_child("net", SensitivityLevel::Safe, |req| { - net(req, reinspect, driver) + net(req, reinspect.clone(), driver) }); } -fn net(req: Request<'_>, reinspect: Sender, driver: DefaultDriver) { +fn net(req: Request<'_>, reinspect: Sender, driver: &DefaultDriver) { let defer = req.defer(); let driver2 = driver.clone(); driver @@ -64,7 +64,7 @@ fn net(req: Request<'_>, reinspect: Sender, driver: DefaultDriver) { let mut vm_inspection = InspectionBuilder::new("vm") .depth(Some(0)) .sensitivity(Some(SensitivityLevel::Sensitive)) - .inspect(inspect::adhoc(|req| reinspect.send(req.defer()))); + .inspect(inspect::send(&reinspect, |x| x)); vm_inspection.resolve().await; let Node::Dir(nodes) = vm_inspection.results() else { @@ -83,7 +83,7 @@ fn net(req: Request<'_>, reinspect: Sender, driver: DefaultDriver) { // The existence of a mac address is always known to the host, so this can always be Safe. resp.sensitivity_child(&mac_name, SensitivityLevel::Safe, |req| { - net_nic(req, nic_entry.name, reinspect.clone(), driver2.clone()); + net_nic(req, nic_entry.name, reinspect.clone(), &driver2); }); } }) @@ -93,7 +93,7 @@ fn net(req: Request<'_>, reinspect: Sender, driver: DefaultDriver) { // net/mac_address // Format for mac address is no separators, lowercase letters, e.g. 00155d121212. -fn net_nic(req: Request<'_>, name: String, reinspect: Sender, driver: DefaultDriver) { +fn net_nic(req: Request<'_>, name: String, reinspect: Sender, driver: &DefaultDriver) { let defer = req.defer(); driver .spawn("inspect-diagnostics-net-nic", async move { @@ -103,7 +103,7 @@ fn net_nic(req: Request<'_>, name: String, reinspect: Sender, driver: let mut vm_inspection = InspectionBuilder::new(&format!("vm/{name}")) .depth(Some(5)) .sensitivity(Some(SensitivityLevel::Sensitive)) - .inspect(inspect::adhoc(|req| reinspect.send(req.defer()))); + .inspect(inspect::send(&reinspect, |req| req)); vm_inspection.resolve().await; if let Node::Dir(nodes) = vm_inspection.results() { diff --git a/openhcl/underhill_core/src/lib.rs b/openhcl/underhill_core/src/lib.rs index 280748b40c..19a71169a3 100644 --- a/openhcl/underhill_core/src/lib.rs +++ b/openhcl/underhill_core/src/lib.rs @@ -545,8 +545,8 @@ async fn run_control( .sensitivity_child("uhdiag", SensitivityLevel::Safe, |req| { inspect_internal::inspect_internal_diagnostics( req, - diag_reinspect_send.clone(), - driver.clone(), + &diag_reinspect_send, + &driver, ) }); diff --git a/openhcl/underhill_core/src/nvme_manager/device.rs b/openhcl/underhill_core/src/nvme_manager/device.rs index e4894abf39..02313c5f79 100644 --- a/openhcl/underhill_core/src/nvme_manager/device.rs +++ b/openhcl/underhill_core/src/nvme_manager/device.rs @@ -334,16 +334,13 @@ impl NvmeDriverManager { #[derive(Inspect, Debug, Clone)] pub struct NvmeDriverManagerClient { - pci_id: String, #[inspect(skip)] + pci_id: String, + #[inspect(flatten, send = "NvmeDriverRequest::Inspect")] sender: mesh::Sender, } impl NvmeDriverManagerClient { - pub fn send_inspect(&self, deferred: Deferred) { - self.sender.send(NvmeDriverRequest::Inspect(deferred)); - } - pub async fn get_namespace(&self, nsid: u32) -> anyhow::Result { let span = tracing::info_span!( "nvme_device_manager_get_namespace", diff --git a/openhcl/underhill_core/src/nvme_manager/manager.rs b/openhcl/underhill_core/src/nvme_manager/manager.rs index 002a195ba6..c849b1481e 100644 --- a/openhcl/underhill_core/src/nvme_manager/manager.rs +++ b/openhcl/underhill_core/src/nvme_manager/manager.rs @@ -198,40 +198,21 @@ struct NvmeWorkerContext { save_restore_supported: bool, #[inspect(skip)] driver_source: VmTaskDriverSource, - #[inspect(skip)] + #[inspect(with = "|x| inspect::adhoc(|req| inspect::iter_by_key(&*x.read()).inspect(req))")] devices: Arc>>, #[inspect(skip)] nvme_driver_spawner: Arc, } #[derive(Inspect)] -#[inspect(extra = "NvmeManagerWorker::inspect_extra")] struct NvmeManagerWorker { - #[inspect(skip)] + #[inspect(with = "Vec::len")] tasks: Vec>, + #[inspect(flatten)] context: NvmeWorkerContext, } impl NvmeManagerWorker { - fn inspect_extra(&self, resp: &mut inspect::Response<'_>) { - resp.child("outstanding-tasks", |req| { - req.value(self.tasks.len()); - }); - - resp.child("devices", |req| { - let devices = self.context.devices.read(); - let mut resp = req.respond(); - for (pci_id, driver) in devices.iter() { - resp.field( - pci_id, - inspect::adhoc(|req| { - driver.client().send_inspect(req.defer()); - }), - ); - } - }); - } - async fn run(&mut self, mut recv: mesh::Receiver) { let (join_span, nvme_keepalive) = loop { let Some(req) = recv.next().await else { diff --git a/openvmm/hvlite_core/src/worker/dispatch.rs b/openvmm/hvlite_core/src/worker/dispatch.rs index 6403d84eb9..d77a1221a9 100644 --- a/openvmm/hvlite_core/src/worker/dispatch.rs +++ b/openvmm/hvlite_core/src/worker/dispatch.rs @@ -2690,9 +2690,8 @@ impl LoadedVm { while let Some(rpc) = worker_rpc.next().await { match rpc { WorkerRpc::Inspect(req) => req.respond(|resp| { - resp.merge(&state_units).merge(inspect::adhoc(|req| { - worker_rpc_send.send(WorkerRpc::Inspect(req.defer())); - })); + resp.merge(&state_units) + .merge(inspect::send(&worker_rpc_send, WorkerRpc::Inspect)); }), rpc => worker_rpc_send.send(rpc), } diff --git a/openvmm/membacking/src/mapping_manager/manager.rs b/openvmm/membacking/src/mapping_manager/manager.rs index 2546db1bd4..495740fcb3 100644 --- a/openvmm/membacking/src/mapping_manager/manager.rs +++ b/openvmm/membacking/src/mapping_manager/manager.rs @@ -24,19 +24,15 @@ use slab::Slab; use std::sync::Arc; /// The mapping manager. -#[derive(Debug)] +#[derive(Debug, Inspect)] pub struct MappingManager { + #[inspect( + flatten, + with = "|x| inspect::send(&x.req_send, MappingRequest::Inspect)" + )] client: MappingManagerClient, } -impl Inspect for MappingManager { - fn inspect(&self, req: inspect::Request<'_>) { - self.client - .req_send - .send(MappingRequest::Inspect(req.defer())); - } -} - impl MappingManager { /// Returns a new mapping manager that can map addresses up to `max_addr`. pub fn new(spawn: impl Spawn, max_addr: u64) -> Self { diff --git a/openvmm/membacking/src/region_manager.rs b/openvmm/membacking/src/region_manager.rs index f3b1b79562..c397dc4fe3 100644 --- a/openvmm/membacking/src/region_manager.rs +++ b/openvmm/membacking/src/region_manager.rs @@ -20,19 +20,15 @@ use thiserror::Error; use vmcore::local_only::LocalOnly; /// The region manager. -#[derive(Debug)] +#[derive(Debug, Inspect)] pub struct RegionManager { + #[inspect( + flatten, + with = "|x| inspect::send(&x.req_send, RegionRequest::Inspect)" + )] client: RegionManagerClient, } -impl Inspect for RegionManager { - fn inspect(&self, req: inspect::Request<'_>) { - self.client - .req_send - .send(RegionRequest::Inspect(req.defer())); - } -} - /// Provides access to the region manager. #[derive(Debug, MeshPayload, Clone)] pub struct RegionManagerClient { diff --git a/support/inspect/src/defer.rs b/support/inspect/src/defer.rs index ff601a67b9..f748fc01c5 100644 --- a/support/inspect/src/defer.rs +++ b/support/inspect/src/defer.rs @@ -9,6 +9,7 @@ use super::Request; use super::Response; use super::SensitivityLevel; use super::UpdateRequest; +use crate::Inspect; use crate::NumberFormat; use crate::RequestParams; use crate::RootParams; @@ -18,6 +19,43 @@ use alloc::boxed::Box; use alloc::string::String; use mesh::MeshPayload; +/// An [`Inspect`] implementation that defers the inspection to another +/// thread or context by sending the request on a [`mesh`] channel. +/// +/// # Usage +/// +/// ```rust +/// enum MyRpc { +/// Inspect(inspect::Deferred), +/// SomeWork(mesh::rpc::Rpc), +/// } +/// +/// fn inspect_remote(sender: &mesh::Sender, req: inspect::Request<'_>) { +/// req.respond().merge(inspect::send(sender, MyRpc::Inspect)); +/// } +/// ``` +pub fn send<'a, S: 'a + mesh::rpc::RpcSend + Copy, F: 'a + Fn(Deferred) -> S::Message>( + sender: S, + map: F, +) -> AsDeferred { + AsDeferred(sender, map) +} + +/// The return type of [`send`]. +pub struct AsDeferred(S, F); + +impl S::Message> Inspect for AsDeferred { + fn inspect(&self, req: Request<'_>) { + self.0.send_rpc(self.1(req.defer())); + } +} + +impl S::Message> InspectMut for AsDeferred { + fn inspect_mut(&mut self, req: Request<'_>) { + self.0.send_rpc(self.1(req.defer())); + } +} + impl Request<'_> { /// Defers the inspection request, producing a value that can be sent to /// another thread or context to continue the inspection asynchronously. diff --git a/support/inspect/src/lib.rs b/support/inspect/src/lib.rs index 1fdc2caa42..227ffdc1ef 100644 --- a/support/inspect/src/lib.rs +++ b/support/inspect/src/lib.rs @@ -165,6 +165,18 @@ pub use initiate::*; /// This can also be used to implement helper functions that implement /// [`Inspect`] to allow complex types to use the the derive macro. /// +/// ### `send = "expr"` +/// +/// Sends a deferred request message for the field so that it can be handled by +/// some remote asynchronous task (potentially on another thread or in another +/// process). The field must be a `mesh::Sender`, and `expr` must be a +/// function that maps from a `Deferred` to `T` (the request type of the +/// sender). +/// +/// This is shorthand for `with = "|x| inspect::send(x, expr)"`, which in turn +/// is roughly the same as `with = "|x| inspect::adhoc(|req| +/// x.send(expr(req.defer())))"`. +/// /// #### Examples /// The following structure has a field that is not normally inspectable, but we /// can use the derive macro with a helper pattern of making a new helper diff --git a/support/inspect_derive/src/lib.rs b/support/inspect_derive/src/lib.rs index f88293919e..03d6bfaffe 100644 --- a/support/inspect_derive/src/lib.rs +++ b/support/inspect_derive/src/lib.rs @@ -122,6 +122,21 @@ fn parse_string_attr(input: ParseStream<'_>) -> syn::Result { input.parse() } +fn parse_wrapped_attr(input: ParseStream<'_>) -> syn::Result { + let _: syn::token::Eq = input.parse()?; + let lit: LitStr = input.parse()?; + return lit.parse(); +} + +fn parse_wrapped_attr_with( + input: ParseStream<'_>, + parser: impl FnOnce(ParseStream<'_>) -> syn::Result, +) -> syn::Result { + let _: syn::token::Eq = input.parse()?; + let lit: LitStr = input.parse()?; + lit.parse_with(parser) +} + impl Parse for StructAttr { fn parse(input: ParseStream<'_>) -> syn::Result { let ident = Ident::parse_any(input)?; @@ -137,18 +152,18 @@ impl Parse for StructAttr { }; Self::Transparent(field_attr) } else if ident == "with" { - let with = parse_string_attr(input)?; - Self::With(with.parse()?) + Self::With(parse_wrapped_attr(input)?) } else if ident == "display" { Self::With(parse_quote_spanned!(ident.span()=> ::inspect::AsDisplay)) } else if ident == "debug" { Self::With(parse_quote_spanned!(ident.span()=> ::inspect::AsDebug)) } else if ident == "extra" { - let with = parse_string_attr(input)?; - Self::Extra(with.parse()?) + Self::Extra(parse_wrapped_attr(input)?) } else if ident == "bound" { - let val = parse_string_attr(input)?; - Self::Bound(val.parse_with(Punctuated::parse_terminated)?) + Self::Bound(parse_wrapped_attr_with( + input, + Punctuated::parse_terminated, + )?) } else if ident == "hex" { Self::Hex } else { @@ -196,8 +211,10 @@ impl Parse for FieldAttr { } else if ident == "sensitive" { Self::Sensitive } else if ident == "with" { - let with = parse_string_attr(input)?; - Self::With(with.parse()?) + Self::With(parse_wrapped_attr(input)?) + } else if ident == "send" { + let map: syn::Expr = parse_wrapped_attr(input)?; + Self::With(parse_quote!(|x| ::inspect::send(x, #map))) } else { return Err(syn::Error::new( ident.span(), @@ -214,8 +231,7 @@ impl Parse for EnumAttr { let kind = if ident == "skip" { Self::Skip } else if ident == "with" { - let with = parse_string_attr(input)?; - Self::With(with.parse()?) + Self::With(parse_wrapped_attr(input)?) } else if ident == "external_tag" { Self::ExternalTag } else if ident == "tag" { @@ -227,11 +243,12 @@ impl Parse for EnumAttr { } else if ident == "debug" { Self::With(parse_quote_spanned!(ident.span()=> ::inspect::AsDebug)) } else if ident == "extra" { - let with = parse_string_attr(input)?; - Self::Extra(with.parse()?) + Self::Extra(parse_wrapped_attr(input)?) } else if ident == "bound" { - let val = parse_string_attr(input)?; - Self::Bound(val.parse_with(Punctuated::parse_terminated)?) + Self::Bound(parse_wrapped_attr_with( + input, + Punctuated::parse_terminated, + )?) } else if ident == "hex" { Self::Hex } else { diff --git a/support/mesh/mesh_process/src/lib.rs b/support/mesh/mesh_process/src/lib.rs index 59a1c1848b..e58105bfa1 100644 --- a/support/mesh/mesh_process/src/lib.rs +++ b/support/mesh/mesh_process/src/lib.rs @@ -242,9 +242,13 @@ async fn node_from_environment() -> anyhow::Result> { /// send.send(String::from("message for new process")); /// # }) /// ``` +#[derive(Inspect)] pub struct Mesh { + #[inspect(rename = "name")] mesh_name: String, + #[inspect(flatten, send = "MeshRequest::Inspect")] request: mesh::Sender, + #[inspect(skip)] task: Task<()>, } @@ -373,12 +377,6 @@ struct NewHostParams { request_send: mesh::Sender, } -impl Inspect for Mesh { - fn inspect(&self, req: inspect::Request<'_>) { - self.request.send(MeshRequest::Inspect(req.defer())); - } -} - impl Mesh { /// Creates a new mesh with the given name. pub fn new(mesh_name: String) -> anyhow::Result { @@ -547,10 +545,10 @@ impl MeshInner { host.pid, ), }) - .merge(inspect::adhoc(|req| { - host.send - .send(HostRequest::Inspect(req.defer())); - })); + .merge(inspect::send( + &host.send, + HostRequest::Inspect, + )); }), ); } diff --git a/support/mesh/mesh_worker/src/worker.rs b/support/mesh/mesh_worker/src/worker.rs index 90885addbf..2cf9467417 100644 --- a/support/mesh/mesh_worker/src/worker.rs +++ b/support/mesh/mesh_worker/src/worker.rs @@ -153,19 +153,16 @@ impl WorkerHostRunner { /// Represents a running [`Worker`] instance providing the ability to restart, /// stop or wait for exit. To launch a worker and get a handle, use /// [`WorkerHost::launch_worker`] -#[derive(Debug, MeshPayload)] +#[derive(Debug, MeshPayload, Inspect)] pub struct WorkerHandle { + #[inspect(skip)] name: String, + #[inspect(flatten, send = "WorkerRpc::Inspect")] send: mesh::Sender>, + #[inspect(skip)] events: mesh::Receiver, } -impl Inspect for WorkerHandle { - fn inspect(&self, req: inspect::Request<'_>) { - self.send.send(WorkerRpc::Inspect(req.defer())) - } -} - impl Stream for WorkerHandle { type Item = WorkerEvent; diff --git a/vm/devices/get/guest_emulation_transport/src/client.rs b/vm/devices/get/guest_emulation_transport/src/client.rs index 2647f1cb10..01657aa53c 100644 --- a/vm/devices/get/guest_emulation_transport/src/client.rs +++ b/vm/devices/get/guest_emulation_transport/src/client.rs @@ -31,14 +31,8 @@ pub struct GuestEmulationTransportClient { version: get_protocol::ProtocolVersion, } -#[derive(Debug)] -struct ProcessLoopControl(mesh::Sender); - -impl Inspect for ProcessLoopControl { - fn inspect(&self, req: inspect::Request<'_>) { - self.0.send(msg::Msg::Inspect(req.defer())); - } -} +#[derive(Debug, Inspect)] +struct ProcessLoopControl(#[inspect(flatten, send = "msg::Msg::Inspect")] mesh::Sender); impl ProcessLoopControl { async fn call( diff --git a/vm/devices/net/mana_driver/src/mana.rs b/vm/devices/net/mana_driver/src/mana.rs index 23a1a3f453..e913de0d93 100644 --- a/vm/devices/net/mana_driver/src/mana.rs +++ b/vm/devices/net/mana_driver/src/mana.rs @@ -48,19 +48,18 @@ enum LinkStatus { } /// A MANA device. +#[derive(Inspect)] pub struct ManaDevice { + #[inspect(skip)] inner: Arc>, + #[inspect(skip)] inspect_task: Task<()>, + #[inspect(skip)] hwc_task: Option>, + #[inspect(flatten, send = "|x| x")] inspect_send: mesh::Sender, } -impl Inspect for ManaDevice { - fn inspect(&self, req: inspect::Request<'_>) { - self.inspect_send.send(req.defer()); - } -} - struct Inner { gdma: Mutex>, dev_id: GdmaDevId, diff --git a/vm/devices/storage/disk_nvme/nvme_driver/src/queue_pair.rs b/vm/devices/storage/disk_nvme/nvme_driver/src/queue_pair.rs index 6736f3e33a..d77aad1426 100644 --- a/vm/devices/storage/disk_nvme/nvme_driver/src/queue_pair.rs +++ b/vm/devices/storage/disk_nvme/nvme_driver/src/queue_pair.rs @@ -45,31 +45,24 @@ use zerocopy::FromZeros; /// Value for unused PRP entries, to catch/mitigate buffer size mismatches. const INVALID_PAGE_ADDR: u64 = !(PAGE_SIZE as u64 - 1); +#[derive(Inspect)] pub(crate) struct QueuePair { + #[inspect(skip)] task: Task, + #[inspect(skip)] cancel: Cancel, + #[inspect(flatten, with = "|x| inspect::send(&x.send, Req::Inspect)")] issuer: Arc, + #[inspect(skip)] mem: MemoryBlock, + #[inspect(skip)] qid: u16, + #[inspect(skip)] sq_entries: u16, + #[inspect(skip)] cq_entries: u16, } -impl Inspect for QueuePair { - fn inspect(&self, req: inspect::Request<'_>) { - let Self { - task: _, - cancel: _, - issuer, - mem: _, - qid: _, - sq_entries: _, - cq_entries: _, - } = self; - issuer.send.send(Req::Inspect(req.defer())); - } -} - impl PendingCommands { const CID_KEY_BITS: u32 = 10; const CID_KEY_MASK: u16 = (1 << Self::CID_KEY_BITS) - 1; diff --git a/vm/devices/storage/nvme/src/workers/coordinator.rs b/vm/devices/storage/nvme/src/workers/coordinator.rs index ab67966a5c..e1cdda1c0a 100644 --- a/vm/devices/storage/nvme/src/workers/coordinator.rs +++ b/vm/devices/storage/nvme/src/workers/coordinator.rs @@ -32,10 +32,15 @@ use vmcore::interrupt::Interrupt; use vmcore::vm_task::VmTaskDriver; use vmcore::vm_task::VmTaskDriverSource; +#[derive(InspectMut)] pub struct NvmeWorkers { + #[inspect(skip)] _task: Task<()>, + #[inspect(flatten, send = "CoordinatorRequest::Inspect")] send: mesh::Sender, + #[inspect(skip)] doorbells: Arc>, + #[inspect(skip)] state: EnableState, } @@ -47,12 +52,6 @@ enum EnableState { Resetting(PendingRpc<()>), } -impl InspectMut for NvmeWorkers { - fn inspect_mut(&mut self, req: inspect::Request<'_>) { - self.send.send(CoordinatorRequest::Inspect(req.defer())); - } -} - impl NvmeWorkers { pub fn new( driver_source: &VmTaskDriverSource, diff --git a/vm/devices/storage/nvme_test/src/workers/coordinator.rs b/vm/devices/storage/nvme_test/src/workers/coordinator.rs index 7a1a3084af..4c1010ed81 100644 --- a/vm/devices/storage/nvme_test/src/workers/coordinator.rs +++ b/vm/devices/storage/nvme_test/src/workers/coordinator.rs @@ -32,10 +32,15 @@ use vmcore::interrupt::Interrupt; use vmcore::vm_task::VmTaskDriver; use vmcore::vm_task::VmTaskDriverSource; +#[derive(InspectMut)] pub struct NvmeWorkers { + #[inspect(skip)] _task: Task<()>, + #[inspect(flatten, send = "CoordinatorRequest::Inspect")] send: mesh::Sender, + #[inspect(skip)] doorbells: Arc>, + #[inspect(skip)] state: EnableState, } @@ -47,12 +52,6 @@ enum EnableState { Resetting(PendingRpc<()>), } -impl InspectMut for NvmeWorkers { - fn inspect_mut(&mut self, req: inspect::Request<'_>) { - self.send.send(CoordinatorRequest::Inspect(req.defer())); - } -} - impl NvmeWorkers { pub fn new( driver_source: &VmTaskDriverSource, diff --git a/vm/devices/vmbus/vmbus_channel/src/channel.rs b/vm/devices/vmbus/vmbus_channel/src/channel.rs index 9e89006473..31c29ce9d7 100644 --- a/vm/devices/vmbus/vmbus_channel/src/channel.rs +++ b/vm/devices/vmbus/vmbus_channel/src/channel.rs @@ -162,8 +162,11 @@ impl ChannelControl { /// /// The channel will be revoked when this is dropped. #[must_use] +#[derive(Inspect)] pub(crate) struct GenericChannelHandle { + #[inspect(flatten, send = "StateRequest::Inspect")] state_req: mesh::Sender, + #[inspect(skip)] task: Task>, } @@ -241,12 +244,6 @@ impl GenericChannelHandle { } } -impl Inspect for GenericChannelHandle { - fn inspect(&self, req: inspect::Request<'_>) { - self.state_req.send(StateRequest::Inspect(req.defer())); - } -} - /// A handle to an offered channel. /// /// The channel will be revoked when this is dropped. diff --git a/vm/devices/vmbus/vmbus_client/src/lib.rs b/vm/devices/vmbus/vmbus_client/src/lib.rs index a330ce4c9e..c8809a4e57 100644 --- a/vm/devices/vmbus/vmbus_client/src/lib.rs +++ b/vm/devices/vmbus/vmbus_client/src/lib.rs @@ -107,9 +107,13 @@ pub trait PollPostMessage: Send { ) -> Poll<()>; } +#[derive(Inspect)] pub struct VmbusClient { + #[inspect(flatten, send = "TaskRequest::Inspect")] task_send: mesh::Sender, + #[inspect(skip)] access: VmbusClientAccess, + #[inspect(skip)] task: Task, } @@ -277,12 +281,6 @@ impl VmbusClient { } } -impl Inspect for VmbusClient { - fn inspect(&self, req: inspect::Request<'_>) { - self.task_send.send(TaskRequest::Inspect(req.defer())); - } -} - #[derive(Debug)] pub struct ConnectResult { pub version: VersionInfo, diff --git a/vm/devices/vmbus/vmbus_relay/src/lib.rs b/vm/devices/vmbus/vmbus_relay/src/lib.rs index e0847e26bf..d15bbd2a92 100644 --- a/vm/devices/vmbus/vmbus_relay/src/lib.rs +++ b/vm/devices/vmbus/vmbus_relay/src/lib.rs @@ -86,8 +86,11 @@ const REQUIRED_FEATURE_FLAGS: FeatureFlags = FeatureFlags::new() /// /// The relay will connect to the host when it first receives a start request through its state /// unit, and will remain connected until it is destroyed. +#[derive(Inspect, Debug)] pub struct HostVmbusTransport { + #[inspect(skip)] _relay_task: Task<()>, + #[inspect(flatten, send = "TaskRequest::Inspect")] task_send: mesh::Sender, } @@ -160,18 +163,6 @@ impl HostVmbusTransport { } } -impl Inspect for HostVmbusTransport { - fn inspect(&self, req: inspect::Request<'_>) { - self.task_send.send(TaskRequest::Inspect(req.defer())); - } -} - -impl Debug for HostVmbusTransport { - fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - writeln!(fmt, "HostVmbusTransport") - } -} - /// State needed to relay host-to-guest interrupts. struct InterruptRelay { /// Event signaled when the host sends an interrupt. @@ -203,17 +194,12 @@ impl Debug for RelayChannelRequest { } } +#[derive(Inspect)] struct RelayChannelInfo { + #[inspect(flatten, send = "RelayChannelRequest::Inspect")] relay_request_send: mesh::Sender, } -impl Inspect for RelayChannelInfo { - fn inspect(&self, req: inspect::Request<'_>) { - self.relay_request_send - .send(RelayChannelRequest::Inspect(req.defer())); - } -} - #[derive(Inspect)] #[inspect(external_tag)] enum ChannelInfo { diff --git a/vm/devices/vmbus/vmbus_server/src/lib.rs b/vm/devices/vmbus/vmbus_server/src/lib.rs index bcbc9b51fd..c54123797b 100644 --- a/vm/devices/vmbus/vmbus_server/src/lib.rs +++ b/vm/devices/vmbus/vmbus_server/src/lib.rs @@ -108,11 +108,17 @@ const VMBUS_MESSAGE_TYPE: u32 = 1; const MAX_CONCURRENT_HVSOCK_REQUESTS: usize = 16; +#[derive(Inspect)] pub struct VmbusServer { + #[inspect(flatten, send = "VmbusRequest::Inspect")] task_send: mesh::Sender, + #[inspect(skip)] control: Arc, + #[inspect(skip)] _message_port: Box, + #[inspect(skip)] _multiclient_message_port: Option>, + #[inspect(skip)] task: Task, } @@ -257,12 +263,6 @@ pub(crate) enum OfferRequest { ForceReset(Rpc<(), ()>), } -impl Inspect for VmbusServer { - fn inspect(&self, req: inspect::Request<'_>) { - self.task_send.send(VmbusRequest::Inspect(req.defer())); - } -} - struct ChannelEvent(Interrupt); impl EventPort for ChannelEvent { diff --git a/vm/vmcore/src/vmtime.rs b/vm/vmcore/src/vmtime.rs index 2fbf5d4c0e..0991be2369 100644 --- a/vm/vmcore/src/vmtime.rs +++ b/vm/vmcore/src/vmtime.rs @@ -29,7 +29,6 @@ use futures_concurrency::future::Race; use futures_concurrency::stream::Merge; use inspect::Inspect; use inspect::InspectMut; -use inspect::adhoc; use mesh::MeshPayload; use mesh::payload::Protobuf; use mesh::rpc::Rpc; @@ -337,11 +336,15 @@ impl TimerState { } /// A time keeper, which tracks the current time and all waiters. -#[derive(Debug)] +#[derive(Debug, InspectMut)] pub struct VmTimeKeeper { + #[inspect(skip)] _task: Task<()>, + #[inspect(flatten, send = "KeeperRequest::Inspect")] req_send: mesh::Sender, + #[inspect(skip)] builder: VmTimeSourceBuilder, + #[inspect(skip)] time: TimeState, } @@ -441,12 +444,6 @@ impl TimeState { } } -impl InspectMut for VmTimeKeeper { - fn inspect_mut(&mut self, req: inspect::Request<'_>) { - self.req_send.send(KeeperRequest::Inspect(req.defer())); - } -} - impl VmTimeKeeper { /// Creates a new time keeper with the specified current guest time. pub fn new(driver: &impl SpawnDriver, uptime: VmTime) -> Self { @@ -583,13 +580,14 @@ impl VmTimeSourceBuilder { /// /// There is one of these per VM time clock (i.e. one per VM). #[derive(Inspect)] -#[inspect(extra = "Self::inspect_extra")] struct PrimaryKeeper { #[inspect(skip)] req_recv: mesh::Receiver, #[inspect(skip)] new_recv: mesh::Receiver, - #[inspect(skip)] + #[inspect( + with = "|x| inspect::iter_by_key(x.iter().map(|(id, sender)| (id, inspect::send(sender, KeeperRequest::Inspect))))" + )] keepers: Vec<(u64, mesh::Sender)>, #[inspect(skip)] next_id: u64, @@ -610,15 +608,6 @@ enum NewKeeperRequest { } impl PrimaryKeeper { - fn inspect_extra(&self, resp: &mut inspect::Response<'_>) { - resp.fields( - "keepers", - self.keepers - .iter() - .map(|&(id, ref s)| (id, adhoc(|req| s.send(KeeperRequest::Inspect(req.defer()))))), - ); - } - async fn run(&mut self) { enum Event { New(NewKeeperRequest), diff --git a/vm/vmgs/vmgs_broker/src/client.rs b/vm/vmgs/vmgs_broker/src/client.rs index 2a4e482010..16e490105f 100644 --- a/vm/vmgs/vmgs_broker/src/client.rs +++ b/vm/vmgs/vmgs_broker/src/client.rs @@ -35,17 +35,12 @@ impl From> for VmgsClientError { } /// Client to interact with a backend-agnostic VMGS instance. -#[derive(Clone)] +#[derive(Clone, Inspect)] pub struct VmgsClient { + #[inspect(flatten, send = "VmgsBrokerRpc::Inspect")] pub(crate) control: mesh_channel::Sender, } -impl Inspect for VmgsClient { - fn inspect(&self, req: inspect::Request<'_>) { - self.control.send(VmgsBrokerRpc::Inspect(req.defer())); - } -} - impl VmgsClient { /// Get allocated and valid bytes from File Control Block for file_id. #[instrument(skip_all, fields(file_id))] diff --git a/vmm_core/src/partition_unit/vp_set.rs b/vmm_core/src/partition_unit/vp_set.rs index eed802df9f..8e95367d7c 100644 --- a/vmm_core/src/partition_unit/vp_set.rs +++ b/vmm_core/src/partition_unit/vp_set.rs @@ -709,23 +709,16 @@ pub struct VpSet { started: bool, } +#[derive(Inspect)] struct Vp { + #[inspect(flatten, send = "|req| VpEvent::State(StateEvent::Inspect(req))")] send: mesh::Sender, + #[inspect(skip)] done: mesh::OneshotReceiver<()>, + #[inspect(flatten)] vp_info: TargetVpInfo, } -impl Inspect for Vp { - fn inspect(&self, req: inspect::Request<'_>) { - req.respond() - .merge(&self.vp_info) - .merge(inspect::adhoc(|req| { - self.send - .send(VpEvent::State(StateEvent::Inspect(req.defer()))) - })); - } -} - impl VpSet { pub fn new(vtl_guest_memory: [Option; NUM_VTLS], halt: Arc) -> Self { let inner = Inner { diff --git a/vmm_core/state_unit/src/lib.rs b/vmm_core/state_unit/src/lib.rs index 5b4dbf2405..1c64bd4717 100644 --- a/vmm_core/state_unit/src/lib.rs +++ b/vmm_core/state_unit/src/lib.rs @@ -334,9 +334,7 @@ impl Inspect for Inner { }); } resp.field("unit_state", unit.state) - .merge(inspect::adhoc(|req| { - unit.send.send(StateRequest::Inspect(req.defer())) - })); + .merge(&inspect::send(&unit.send, StateRequest::Inspect)); }); } }