diff --git a/Cargo.lock b/Cargo.lock index 440565e8c1..5b39d224b3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4880,6 +4880,7 @@ dependencies = [ "pci_core", "pci_resources", "scsi_buffers", + "slab", "task_control", "thiserror 2.0.16", "tracelimit", diff --git a/vm/devices/storage/nvme_test/Cargo.toml b/vm/devices/storage/nvme_test/Cargo.toml index 1e0f49d914..916b3999fd 100644 --- a/vm/devices/storage/nvme_test/Cargo.toml +++ b/vm/devices/storage/nvme_test/Cargo.toml @@ -30,6 +30,7 @@ async-trait.workspace = true futures.workspace = true futures-concurrency.workspace = true parking_lot.workspace = true +slab.workspace = true task_control.workspace = true thiserror.workspace = true tracelimit.workspace = true diff --git a/vm/devices/storage/nvme_test/src/queue.rs b/vm/devices/storage/nvme_test/src/queue.rs index 5ac0d3b8b5..7b46ae9266 100644 --- a/vm/devices/storage/nvme_test/src/queue.rs +++ b/vm/devices/storage/nvme_test/src/queue.rs @@ -252,13 +252,9 @@ pub enum QueueError { } impl SubmissionQueue { - pub fn new( - doorbells: Arc>, - db_id: u16, - gpa: u64, - len: u16, - mem: GuestMemory, - ) -> Self { + pub fn new(cq: &CompletionQueue, db_id: u16, gpa: u64, len: u16) -> Self { + let doorbells = cq.head.doorbells.clone(); + let mem = cq.mem.clone(); doorbells.read().write(db_id, 0); Self { tail: DoorbellState::new(doorbells, db_id, len.into()), diff --git a/vm/devices/storage/nvme_test/src/workers/admin.rs b/vm/devices/storage/nvme_test/src/workers/admin.rs index 1ba6959f35..e209d5b458 100644 --- a/vm/devices/storage/nvme_test/src/workers/admin.rs +++ b/vm/devices/storage/nvme_test/src/workers/admin.rs @@ -98,9 +98,9 @@ pub struct AdminState { admin_sq: SubmissionQueue, admin_cq: CompletionQueue, #[inspect(with = "|x| inspect::iter_by_index(x).map_key(|x| x + 1)")] - io_sqs: Vec, + io_sqs: Vec>, #[inspect(with = "|x| inspect::iter_by_index(x).map_key(|x| x + 1)")] - io_cqs: Vec>, + io_cqs: Vec, #[inspect(skip)] sq_delete_response: mesh::Receiver, #[inspect(iter_by_index)] @@ -126,21 +126,16 @@ struct ChangedNamespace { #[derive(Inspect)] struct IoSq { - #[inspect(flatten)] - task: TaskControl, - driver: VmTaskDriver, pending_delete_cid: Option, - cqid: Option, + sq_idx: usize, + cqid: u16, } #[derive(Inspect)] struct IoCq { - #[inspect(hex)] - gpa: u64, - #[inspect(hex)] - len: u16, - interrupt: Option, - sqid: Option, + driver: VmTaskDriver, + #[inspect(flatten)] + task: TaskControl, } impl AdminState { @@ -165,22 +160,18 @@ impl AdminState { }) .collect(); + let admin_cq = CompletionQueue::new( + handler.config.doorbells.clone(), + 1, + handler.config.mem.clone(), + Some(handler.config.interrupts[0].clone()), + acq, + acqs, + ); + let mut state = Self { - admin_sq: SubmissionQueue::new( - handler.config.doorbells.clone(), - 0, - asq, - asqs, - handler.config.mem.clone(), - ), - admin_cq: CompletionQueue::new( - handler.config.doorbells.clone(), - 1, - handler.config.mem.clone(), - Some(handler.config.interrupts[0].clone()), - acq, - acqs, - ), + admin_sq: SubmissionQueue::new(&admin_cq, 0, asq, asqs), + admin_cq, io_sqs: Vec::new(), io_cqs: Vec::new(), sq_delete_response: Default::default(), @@ -199,11 +190,11 @@ impl AdminState { /// /// This future may be dropped and reissued. pub async fn drain(&mut self) { - for sq in &mut self.io_sqs { - sq.task.stop().await; - if let Some(state) = sq.task.state_mut() { + for cq in &mut self.io_cqs { + cq.task.stop().await; + if let Some(state) = cq.task.state_mut() { state.drain().await; - sq.task.remove(); + cq.task.remove(); } } } @@ -211,32 +202,28 @@ impl AdminState { /// Caller must ensure that no queues are active. fn set_max_queues(&mut self, handler: &AdminHandler, num_sqs: u16, num_cqs: u16) { self.io_sqs.truncate(num_sqs.into()); - self.io_sqs - .extend((self.io_sqs.len()..num_sqs.into()).map(|i| { - // This driver doesn't explicitly do any IO (that's handled by - // the storage backends), so the target VP doesn't matter. But - // set it anyway as a hint to the backend that this queue needs - // its own thread. - let driver = handler - .config - .driver_source - .builder() - .run_on_target(false) - .target_vp(0) - .build("nvme"); - - IoSq { - task: TaskControl::new(IoHandler::new( - handler.config.mem.clone(), - i as u16 + 1, - self.sq_delete_response.sender(), - )), - pending_delete_cid: None, - cqid: None, - driver, - } - })); - self.io_cqs.resize_with(num_cqs.into(), || None); + self.io_sqs.resize_with(num_sqs.into(), || None); + self.io_cqs.resize_with(num_cqs.into(), || { + // This driver doesn't explicitly do any IO (that's handled by + // the storage backends), so the target VP doesn't matter. But + // set it anyway as a hint to the backend that this queue needs + // its own thread. + let driver = handler + .config + .driver_source + .builder() + .run_on_target(false) + .target_vp(0) + .build("nvme"); + + IoCq { + driver, + task: TaskControl::new(IoHandler::new( + handler.config.mem.clone(), + self.sq_delete_response.sender(), + )), + } + }); } fn add_changed_namespace(&mut self, nsid: u32) { @@ -252,13 +239,13 @@ impl AdminState { namespace: &Arc, ) { // Update the IO queues. - for sq in &mut self.io_sqs { - let io_running = sq.task.stop().await; - if let Some(io_state) = sq.task.state_mut() { + for cq in &mut self.io_cqs { + let io_running = cq.task.stop().await; + if let Some(io_state) = cq.task.state_mut() { io_state.add_namespace(nsid, namespace.clone()); } if io_running { - sq.task.start(); + cq.task.start(); } } @@ -280,13 +267,13 @@ impl AdminState { async fn remove_namespace(&mut self, nsid: u32) { // Update the IO queues. - for sq in &mut self.io_sqs { - let io_running = sq.task.stop().await; - if let Some(io_state) = sq.task.state_mut() { + for cq in &mut self.io_cqs { + let io_running = cq.task.stop().await; + if let Some(io_state) = cq.task.state_mut() { io_state.remove_namespace(nsid); } if io_running { - sq.task.start(); + cq.task.start(); } } @@ -523,12 +510,14 @@ impl AdminHandler { .map(|()| Some(Default::default())), spec::AdminOpcode::CREATE_IO_SUBMISSION_QUEUE => self .handle_create_io_submission_queue(state, &command) + .await .map(|()| Some(Default::default())), spec::AdminOpcode::DELETE_IO_COMPLETION_QUEUE => self .handle_delete_io_completion_queue(state, &command) .map(|()| Some(Default::default())), spec::AdminOpcode::DELETE_IO_SUBMISSION_QUEUE => { self.handle_delete_io_submission_queue(state, &command) + .await } spec::AdminOpcode::ASYNCHRONOUS_EVENT_REQUEST => { self.handle_asynchronous_event_request(state, &command) @@ -568,18 +557,8 @@ impl AdminHandler { } Event::SqDeleteComplete(sqid) => { let sq = &mut state.io_sqs[sqid as usize - 1]; - let cid = sq.pending_delete_cid.take().unwrap(); - let cqid = sq.cqid.take().unwrap(); - sq.task.stop().await; - sq.task.remove(); - assert_eq!( - state.io_cqs[cqid as usize - 1] - .as_mut() - .unwrap() - .sqid - .take(), - Some(sqid) - ); + let cid = sq.as_mut().unwrap().pending_delete_cid.take().unwrap(); + *sq = None; (None, cid, Default::default()) } Event::NamespaceChange(nsid) => { @@ -764,8 +743,8 @@ impl AdminHandler { // Note that we don't support non-zero cdw10.save, since ONCS.save == 0. match spec::Feature(cdw10.fid()) { spec::Feature::NUMBER_OF_QUEUES => { - if state.io_sqs.iter().any(|sq| sq.task.has_state()) - || state.io_cqs.iter().any(|cq| cq.is_some()) + if state.io_sqs.iter().any(|sq| sq.is_some()) + || state.io_cqs.iter().any(|cq| cq.task.has_state()) { return Err(spec::Status::COMMAND_SEQUENCE_ERROR.into()); } @@ -849,15 +828,9 @@ impl AdminHandler { return Err(spec::Status::INVALID_FIELD_IN_COMMAND.into()); } let cqid = cdw10.qid(); - let io_queue = state - .io_cqs - .get_mut((cqid as usize).wrapping_sub(1)) - .ok_or(InvalidQueueIdentifier { - qid: cqid, - reason: InvalidQueueIdentifierReason::Oob, - })?; + let cq = &mut state.io_cqs[(cqid as usize).wrapping_sub(1)]; - if io_queue.is_some() { + if cq.task.has_state() { return Err(InvalidQueueIdentifier { qid: cqid, reason: InvalidQueueIdentifierReason::InUse, @@ -880,16 +853,25 @@ impl AdminHandler { return Err(spec::Status::INVALID_QUEUE_SIZE.into()); } - *io_queue = Some(IoCq { + let interrupt = interrupt.map(|iv| self.config.interrupts[iv as usize].clone()); + let namespaces = self.namespaces.clone(); + + let io_state = IoState::new( + &self.config.mem, + self.config.doorbells.clone(), gpa, - len: len0 + 1, + len0 + 1, + cqid, interrupt, - sqid: None, - }); + namespaces, + ); + + cq.task.insert(&cq.driver, "nvme-io", io_state); + cq.task.start(); Ok(()) } - fn handle_create_io_submission_queue( + async fn handle_create_io_submission_queue( &mut self, state: &mut AdminState, command: &spec::Command, @@ -908,7 +890,7 @@ impl AdminHandler { reason: InvalidQueueIdentifierReason::Oob, })?; - if sq.task.has_state() { + if sq.is_some() { return Err(InvalidQueueIdentifier { qid: sqid, reason: InvalidQueueIdentifierReason::InUse, @@ -917,17 +899,9 @@ impl AdminHandler { } let cqid = cdw11.cqid(); - let cq = state - .io_cqs - .get_mut((cqid as usize).wrapping_sub(1)) - .and_then(|x| x.as_mut()) - .ok_or(spec::Status::COMPLETION_QUEUE_INVALID)?; - - // Don't allow sharing completion queues. This isn't spec compliant - // but it simplifies the device significantly and OSes don't seem to - // mind. This could be fixed by having a slower path when completion - // queues are shared. - if cq.sqid.is_some() { + let cq = &mut state.io_cqs[(cqid as usize).wrapping_sub(1)]; + + if !cq.task.has_state() { return Err(spec::Status::COMPLETION_QUEUE_INVALID.into()); } @@ -937,33 +911,24 @@ impl AdminHandler { return Err(spec::Status::INVALID_QUEUE_SIZE.into()); } - cq.sqid = Some(sqid); - sq.cqid = Some(cqid); - let interrupt = cq - .interrupt - .map(|iv| self.config.interrupts[iv as usize].clone()); - let namespaces = self.namespaces.clone(); - let sq_len = len0 + 1; - let cq_gpa = cq.gpa; - let cq_len = cq.len; - let state = IoState::new( - &self.config.mem, - self.config.doorbells.clone(), - sq_gpa, - sq_len, - sqid, - cq_gpa, - cq_len, + let running = cq.task.stop().await; + let sq_idx = cq + .task + .state_mut() + .unwrap() + .create_sq(sqid, sq_gpa, len0 + 1); + if running { + cq.task.start(); + } + *sq = Some(IoSq { + sq_idx, + pending_delete_cid: None, cqid, - interrupt, - namespaces, - ); - sq.task.insert(&sq.driver, "nvme-io", state); - sq.task.start(); + }); Ok(()) } - fn handle_delete_io_submission_queue( + async fn handle_delete_io_submission_queue( &self, state: &mut AdminState, command: &spec::Command, @@ -976,9 +941,14 @@ impl AdminHandler { .ok_or(InvalidQueueIdentifier { qid: sqid, reason: InvalidQueueIdentifierReason::Oob, + })? + .as_mut() + .ok_or(InvalidQueueIdentifier { + qid: sqid, + reason: InvalidQueueIdentifierReason::NotInUse, })?; - if !sq.task.has_state() || sq.pending_delete_cid.is_some() { + if sq.pending_delete_cid.is_some() { return Err(InvalidQueueIdentifier { qid: sqid, reason: InvalidQueueIdentifierReason::NotInUse, @@ -986,8 +956,12 @@ impl AdminHandler { .into()); } - sq.task - .update_with(|sq, sq_state| sq.delete(sq_state.unwrap())); + let cq = &mut state.io_cqs[(sq.cqid as usize).wrapping_sub(1)]; + let running = cq.task.stop().await; + cq.task.state_mut().unwrap().delete_sq(sq.sq_idx); + if running { + cq.task.start(); + } sq.pending_delete_cid = Some(command.cdw0.cid()); Ok(None) } @@ -999,23 +973,27 @@ impl AdminHandler { ) -> Result<(), NvmeError> { let cdw10: spec::Cdw10DeleteIoQueue = command.cdw10.into(); let cqid = cdw10.qid(); - let cq = state - .io_cqs - .get_mut((cqid as usize).wrapping_sub(1)) - .ok_or(InvalidQueueIdentifier { + let cq = &mut state.io_cqs[(cqid as usize).wrapping_sub(1)]; + + let _active_cq = if !cq.task.has_state() { + return Err(InvalidQueueIdentifier { qid: cqid, - reason: InvalidQueueIdentifierReason::Oob, - })?; + reason: InvalidQueueIdentifierReason::NotInUse, + } + .into()); + }; + + // Check if any submission queues are still using this completion queue + let has_active_sqs = state + .io_sqs + .iter() + .any(|sq| sq.as_ref().map_or(false, |sq| sq.cqid == cqid)); - let active_cq = cq.as_ref().ok_or(InvalidQueueIdentifier { - qid: cqid, - reason: InvalidQueueIdentifierReason::NotInUse, - })?; - if active_cq.sqid.is_some() { + if has_active_sqs { return Err(spec::Status::INVALID_QUEUE_DELETION.into()); } - *cq = None; + cq.task.remove(); Ok(()) } diff --git a/vm/devices/storage/nvme_test/src/workers/io.rs b/vm/devices/storage/nvme_test/src/workers/io.rs index 03a3f8c944..e7a2c3779a 100644 --- a/vm/devices/storage/nvme_test/src/workers/io.rs +++ b/vm/devices/storage/nvme_test/src/workers/io.rs @@ -13,69 +13,72 @@ use crate::queue::SubmissionQueue; use crate::spec; use crate::spec::nvm; use crate::workers::MAX_DATA_TRANSFER_SIZE; +use futures::FutureExt; +use futures::StreamExt; use futures_concurrency::future::Race; use guestmem::GuestMemory; use inspect::Inspect; use parking_lot::RwLock; +use slab::Slab; use std::collections::BTreeMap; +use std::collections::VecDeque; use std::future::Future; -use std::future::pending; use std::future::poll_fn; use std::pin::Pin; use std::sync::Arc; +use std::task::Poll; use task_control::AsyncRun; use task_control::Cancelled; use task_control::InspectTask; use task_control::StopTask; use thiserror::Error; +use tracelimit; use unicycle::FuturesUnordered; use vmcore::interrupt::Interrupt; #[derive(Inspect)] pub struct IoHandler { mem: GuestMemory, - sqid: u16, #[inspect(skip)] admin_response: mesh::Sender, } #[derive(Inspect)] pub struct IoState { - sq: SubmissionQueue, + #[inspect(with = "|x| inspect::iter_by_key(x.iter().map(|(_, sq)| (sq.sqid, sq)))")] + sqs: Slab, cq: CompletionQueue, #[inspect(skip)] namespaces: BTreeMap>, #[inspect(skip)] ios: FuturesUnordered + Send>>>, - io_count: usize, - queue_state: IoQueueState, + #[inspect(with = "VecDeque::len")] + completions: VecDeque, } #[derive(Inspect)] -enum IoQueueState { - Active, - Deleting, - Deleted, +struct SqState { + sqid: u16, + sq: SubmissionQueue, + io_count: usize, + deleting: bool, } impl IoState { pub fn new( mem: &GuestMemory, doorbell: Arc>, - sq_gpa: u64, - sq_len: u16, - sq_id: u16, cq_gpa: u64, cq_len: u16, - cq_id: u16, + cqid: u16, interrupt: Option, namespaces: BTreeMap>, ) -> Self { Self { - sq: SubmissionQueue::new(doorbell.clone(), sq_id * 2, sq_gpa, sq_len, mem.clone()), + sqs: Slab::new(), cq: CompletionQueue::new( doorbell, - cq_id * 2 + 1, + cqid * 2 + 1, mem.clone(), interrupt, cq_gpa, @@ -83,8 +86,7 @@ impl IoState { ), namespaces, ios: FuturesUnordered::new(), - io_count: 0, - queue_state: IoQueueState::Active, + completions: VecDeque::new(), } } @@ -96,21 +98,46 @@ impl IoState { let _ = self.namespaces.remove(&nsid).unwrap(); } + pub fn has_sqs(&self) -> bool { + !self.sqs.is_empty() + } + + pub fn create_sq(&mut self, sqid: u16, sq_gpa: u64, sq_len: u16) -> usize { + self.sqs.insert(SqState { + sq: SubmissionQueue::new(&self.cq, sqid * 2, sq_gpa, sq_len), + deleting: false, + sqid, + io_count: 0, + }) + } + + pub fn delete_sq(&mut self, sq_idx: usize) { + let sq = &mut self.sqs[sq_idx]; + sq.deleting = true; + self.completions.retain(|io_result| { + if io_result.sq_idx != sq_idx { + return true; + } + sq.io_count -= 1; + tracelimit::warn_ratelimited!("dropped i/o completion during queue deletion"); + false + }); + } + /// Drains any pending IOs. /// /// This future may be dropped and reissued. pub async fn drain(&mut self) { - while self.ios.next().await.is_some() { - self.io_count -= 1; + while let Some(io_result) = self.ios.next().await { + self.sqs[io_result.sq_idx].io_count -= 1; } } } struct IoResult { - nsid: u32, + sq_idx: usize, cid: u16, - opcode: nvm::NvmOpcode, - result: Result, + result: CommandResult, } impl AsyncRun for IoHandler { @@ -139,120 +166,119 @@ enum HandlerError { } impl IoHandler { - pub fn new(mem: GuestMemory, sqid: u16, admin_response: mesh::Sender) -> Self { + pub fn new(mem: GuestMemory, admin_response: mesh::Sender) -> Self { Self { mem, - sqid, admin_response, } } - pub fn delete(&mut self, state: &mut IoState) { - match state.queue_state { - IoQueueState::Active => state.queue_state = IoQueueState::Deleting, - IoQueueState::Deleting | IoQueueState::Deleted => {} - } - } - async fn process(&mut self, state: &mut IoState) -> Result<(), HandlerError> { loop { - let deleting = match state.queue_state { - IoQueueState::Active => { - // Wait for a completion to be ready. This will be necessary either - // to post an immediate result or to post an IO completion. It's not - // strictly necessary to start a new IO, but handling that special - // case is not worth the complexity. - poll_fn(|cx| state.cq.poll_ready(cx)).await?; - false - } - IoQueueState::Deleting => { - if state.ios.is_empty() { - self.admin_response.send(self.sqid); - state.queue_state = IoQueueState::Deleted; - break; - } - true - } - IoQueueState::Deleted => break, - }; - enum Event { - Sq(Result), + Sq(usize, Result), Io(IoResult), + CompletionReady(Result), + Deleted(usize), } - let next_sqe = async { - if state.io_count < MAX_IO_QUEUE_DEPTH && !deleting { - Event::Sq(poll_fn(|cx| state.sq.poll_next(cx)).await) - } else { - pending().await + let event = poll_fn(|cx| { + for (sq_idx, sq) in &mut state.sqs { + if !sq.deleting && sq.io_count < MAX_IO_QUEUE_DEPTH { + if let Poll::Ready(r) = sq.sq.poll_next(cx) { + return Poll::Ready(Event::Sq(sq_idx, r)); + } + } else if sq.deleting && sq.io_count == 0 { + return Poll::Ready(Event::Deleted(sq_idx)); + } } - }; - - let next_io_completion = async { - if state.ios.is_empty() { - pending().await - } else { - Event::Io(state.ios.next().await.unwrap()) + if let Poll::Ready(Some(r)) = state.ios.poll_next_unpin(cx) { + return Poll::Ready(Event::Io(r)); } - }; + if !state.completions.is_empty() { + if let Poll::Ready(r) = state.cq.poll_ready(cx) { + return Poll::Ready(Event::CompletionReady( + r.map(|()| state.completions.pop_front().unwrap()), + )); + } + } + Poll::Pending + }) + .await; - let event = (next_sqe, next_io_completion).race().await; - let (cid, result) = match event { - Event::Io(io_result) => { - state.io_count -= 1; - let result = match io_result.result { - Ok(cr) => cr, - Err(err) => { - tracelimit::warn_ratelimited!( - error = &err as &dyn std::error::Error, - cid = io_result.cid, - nsid = io_result.nsid, - opcode = ?io_result.opcode, - "io error" - ); - err.into() - } - }; - (io_result.cid, result) + let io_result = match event { + Event::Io(io_result) => io_result, + Event::CompletionReady(r) => r?, + Event::Deleted(sq_idx) => { + let sq = state.sqs.remove(sq_idx); + self.admin_response.send(sq.sqid); + continue; } - Event::Sq(r) => { + Event::Sq(sq_idx, r) => { let command = r?; let cid = command.cdw0.cid(); if let Some(ns) = state.namespaces.get(&command.nsid) { let ns = ns.clone(); let io = Box::pin(async move { - let result = ns.nvm_command(MAX_DATA_TRANSFER_SIZE, &command).await; + let result = ns + .nvm_command(MAX_DATA_TRANSFER_SIZE, &command) + .await + .unwrap_or_else(|err| { + tracelimit::warn_ratelimited!( + error = &err as &dyn std::error::Error, + cid, + nsid = command.nsid, + opcode = ?nvm::NvmOpcode(command.cdw0.opcode()), + "io error" + ); + err.into() + }); IoResult { - nsid: command.nsid, - opcode: nvm::NvmOpcode(command.cdw0.opcode()), + sq_idx, cid, result, } }); state.ios.push(io); - state.io_count += 1; + state.sqs[sq_idx].io_count += 1; continue; } - (cid, spec::Status::INVALID_NAMESPACE_OR_FORMAT.into()) + IoResult { + cid, + sq_idx, + result: spec::Status::INVALID_NAMESPACE_OR_FORMAT.into(), + } } }; + let sq = &mut state.sqs[io_result.sq_idx]; let completion = spec::Completion { - dw0: result.dw[0], - dw1: result.dw[1], - sqhd: state.sq.sqhd(), - sqid: self.sqid, - cid, - status: spec::CompletionStatus::new().with_status(result.status.0), + dw0: io_result.result.dw[0], + dw1: io_result.result.dw[1], + sqhd: sq.sq.sqhd(), + sqid: sq.sqid, + cid: io_result.cid, + status: spec::CompletionStatus::new().with_status(io_result.result.status.0), }; - if !state.cq.write(completion)? { - assert!(deleting); - tracelimit::warn_ratelimited!("dropped i/o completion during queue deletion"); + + match state.cq.write(completion) { + Ok(true) => {} + Ok(false) => { + if !sq.deleting { + state.completions.push_back(io_result); + continue; + } + tracelimit::warn_ratelimited!("dropped i/o completion during queue deletion"); + } + Err(err) => { + state.completions.push_back(io_result); + return Err(err.into()); + } } + + sq.io_count -= 1; } - Ok(()) } }