diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index 1c5d9b61f4b..b8c4364f23b 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -24,7 +24,7 @@ keywords = ["io", "async", "non-blocking", "futures"] [features] # Include nothing by default default = [] - +bwos = [] # enable everything full = [ "fs", @@ -37,6 +37,7 @@ full = [ "rt", "rt-multi-thread", "signal", + "stats", "sync", "time", ] @@ -72,6 +73,7 @@ process = [ # Includes basic task execution capabilities rt = [] rt-multi-thread = [ + "bwos", "num_cpus", "rt", ] diff --git a/tokio/src/macros/cfg.rs b/tokio/src/macros/cfg.rs index 52ffc102bf3..f1b50242dc3 100644 --- a/tokio/src/macros/cfg.rs +++ b/tokio/src/macros/cfg.rs @@ -196,7 +196,7 @@ macro_rules! cfg_metrics { $( // For now, metrics is only disabled in loom tests. // When stabilized, it might have a dedicated feature flag. - #[cfg(all(tokio_unstable, not(loom)))] + #[cfg(all(tokio_unstable, not(loom), feature="stats"))] #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))] $item )* @@ -206,7 +206,7 @@ macro_rules! cfg_metrics { macro_rules! cfg_not_metrics { ($($item:item)*) => { $( - #[cfg(not(all(tokio_unstable, not(loom))))] + #[cfg(not(all(tokio_unstable, not(loom), feature = "stats")))] $item )* } @@ -214,7 +214,7 @@ macro_rules! cfg_not_metrics { macro_rules! cfg_not_rt_and_metrics_and_net { ($($item:item)*) => { - $( #[cfg(not(all(feature = "net", feature = "rt", all(tokio_unstable, not(loom)))))]$item )* + $( #[cfg(not(all(feature = "net", feature = "rt", all(tokio_unstable, not(loom), feature = "stats"))))]$item )* } } @@ -373,6 +373,21 @@ macro_rules! cfg_not_rt_multi_thread { } } +macro_rules! cfg_rt_multi_thread_bwos { + ($($item:item)*) => { + $( + #[cfg(all( + tokio_unstable, + feature = "rt-multi-thread", + feature = "bwos", + not(tokio_wasi) + ))] + #[cfg_attr(docsrs, doc(cfg(feature = "bwos")))] + $item + )* + } +} + macro_rules! cfg_taskdump { ($($item:item)*) => { $( diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index dda21a3ae27..2460dc7a381 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -188,11 +188,25 @@ cfg_unstable! { pub(crate) type ThreadNameFn = std::sync::Arc String + Send + Sync + 'static>; +cfg_rt_multi_thread!( + #[derive(Clone, Copy)] + #[allow(dead_code)] + pub(crate) enum MultiThreadFlavor { + /// The default multithreaded tokio runqueue, based on the golang runqueue. + Default, + // There may be more (sub-) variants in the future influencing e.g. queue size + // or stealing strategy + /// A Block-based workstealing queue offering better performance + //#[cfg(all(tokio_unstable, feature = "bwos"))] + Bwos, + } +); + #[derive(Clone, Copy)] pub(crate) enum Kind { CurrentThread, #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] - MultiThread, + MultiThread(MultiThreadFlavor), } impl Builder { @@ -214,15 +228,23 @@ impl Builder { Builder::new(Kind::CurrentThread, 31, EVENT_INTERVAL) } - cfg_not_wasi! { + cfg_rt_multi_thread! { /// Returns a new builder with the multi thread scheduler selected. /// /// Configuration methods can be chained on the return value. - #[cfg(feature = "rt-multi-thread")] - #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))] pub fn new_multi_thread() -> Builder { // The number `61` is fairly arbitrary. I believe this value was copied from golang. - Builder::new(Kind::MultiThread, 61, 61) + Builder::new(Kind::MultiThread(MultiThreadFlavor::Bwos), 61, 61) + } + } + + cfg_rt_multi_thread_bwos! { + /// Returns a new builder with the BWoS multi thread scheduler selected. + /// + /// Configuration methods can be chained on the return value. + pub fn new_multi_thread_bwos() -> Builder { + // The number `61` is copied from `new_multi_thread()`. + Builder::new(Kind::MultiThread(MultiThreadFlavor::Bwos), 61, 61) } } @@ -649,7 +671,7 @@ impl Builder { match &self.kind { Kind::CurrentThread => self.build_current_thread_runtime(), #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] - Kind::MultiThread => self.build_threaded_runtime(), + Kind::MultiThread(flavor) => self.build_threaded_runtime(*flavor), } } @@ -658,7 +680,7 @@ impl Builder { enable_pause_time: match self.kind { Kind::CurrentThread => true, #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] - Kind::MultiThread => false, + Kind::MultiThread(_) => false, }, enable_io: self.enable_io, enable_time: self.enable_time, @@ -1163,7 +1185,7 @@ cfg_test_util! { cfg_rt_multi_thread! { impl Builder { - fn build_threaded_runtime(&mut self) -> io::Result { + fn build_threaded_runtime(&mut self, flavor: MultiThreadFlavor) -> io::Result { use crate::loom::sys::num_cpus; use crate::runtime::{Config, runtime::Scheduler}; use crate::runtime::scheduler::{self, MultiThread}; @@ -1183,6 +1205,7 @@ cfg_rt_multi_thread! { let (scheduler, handle, launch) = MultiThread::new( core_threads, + flavor, driver, driver_handle, blocking_spawner, diff --git a/tokio/src/runtime/scheduler/multi_thread/mod.rs b/tokio/src/runtime/scheduler/multi_thread/mod.rs index fed00b76459..057696d5a3a 100644 --- a/tokio/src/runtime/scheduler/multi_thread/mod.rs +++ b/tokio/src/runtime/scheduler/multi_thread/mod.rs @@ -14,6 +14,8 @@ pub(crate) use park::{Parker, Unparker}; pub(crate) mod queue; +//pub(crate) mod queue; + mod worker; pub(crate) use worker::{Context, Launch}; @@ -27,6 +29,7 @@ use crate::runtime::{ }; use crate::util::RngSeedGenerator; +use crate::runtime::builder::MultiThreadFlavor; use std::fmt; use std::future::Future; @@ -38,6 +41,7 @@ pub(crate) struct MultiThread; impl MultiThread { pub(crate) fn new( size: usize, + flavor: MultiThreadFlavor, driver: Driver, driver_handle: driver::Handle, blocking_spawner: blocking::Spawner, @@ -47,6 +51,7 @@ impl MultiThread { let parker = Parker::new(driver); let (handle, launch) = worker::create( size, + flavor, parker, driver_handle, blocking_spawner, diff --git a/tokio/src/runtime/scheduler/multi_thread/queue.rs b/tokio/src/runtime/scheduler/multi_thread/queue.rs index dd132fb9a6d..0354dbc84a3 100644 --- a/tokio/src/runtime/scheduler/multi_thread/queue.rs +++ b/tokio/src/runtime/scheduler/multi_thread/queue.rs @@ -1,131 +1,39 @@ -//! Run-queue structures to support a work-stealing scheduler - -use crate::loom::cell::UnsafeCell; -use crate::loom::sync::Arc; -use crate::runtime::task::{self, Inject}; -use crate::runtime::MetricsBatch; - -use std::mem::{self, MaybeUninit}; -use std::ptr; -use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release}; - -// Use wider integers when possible to increase ABA resilience. -// -// See issue #5041: . -cfg_has_atomic_u64! { - type UnsignedShort = u32; - type UnsignedLong = u64; - type AtomicUnsignedShort = crate::loom::sync::atomic::AtomicU32; - type AtomicUnsignedLong = crate::loom::sync::atomic::AtomicU64; -} -cfg_not_has_atomic_u64! { - type UnsignedShort = u16; - type UnsignedLong = u32; - type AtomicUnsignedShort = crate::loom::sync::atomic::AtomicU16; - type AtomicUnsignedLong = crate::loom::sync::atomic::AtomicU32; -} - -/// Producer handle. May only be used from a single thread. -pub(crate) struct Local { - inner: Arc>, -} - -/// Consumer handle. May be used from many threads. -pub(crate) struct Steal(Arc>); - -pub(crate) struct Inner { - /// Concurrently updated by many threads. - /// - /// Contains two `UnsignedShort` values. The LSB byte is the "real" head of - /// the queue. The `UnsignedShort` in the MSB is set by a stealer in process - /// of stealing values. It represents the first value being stolen in the - /// batch. The `UnsignedShort` indices are intentionally wider than strictly - /// required for buffer indexing in order to provide ABA mitigation and make - /// it possible to distinguish between full and empty buffers. - /// - /// When both `UnsignedShort` values are the same, there is no active - /// stealer. - /// - /// Tracking an in-progress stealer prevents a wrapping scenario. - head: AtomicUnsignedLong, - - /// Only updated by producer thread but read by many threads. - tail: AtomicUnsignedShort, - - /// Elements - buffer: Box<[UnsafeCell>>; LOCAL_QUEUE_CAPACITY]>, -} - -unsafe impl Send for Inner {} -unsafe impl Sync for Inner {} - -#[cfg(not(loom))] -const LOCAL_QUEUE_CAPACITY: usize = 256; - -// Shrink the size of the local queue when using loom. This shouldn't impact -// logic, but allows loom to test more edge cases in a reasonable a mount of -// time. -#[cfg(loom)] -const LOCAL_QUEUE_CAPACITY: usize = 4; - -const MASK: usize = LOCAL_QUEUE_CAPACITY - 1; - -// Constructing the fixed size array directly is very awkward. The only way to -// do it is to repeat `UnsafeCell::new(MaybeUninit::uninit())` 256 times, as -// the contents are not Copy. The trick with defining a const doesn't work for -// generic types. -fn make_fixed_size(buffer: Box<[T]>) -> Box<[T; LOCAL_QUEUE_CAPACITY]> { - assert_eq!(buffer.len(), LOCAL_QUEUE_CAPACITY); - - // safety: We check that the length is correct. - unsafe { Box::from_raw(Box::into_raw(buffer).cast()) } -} - -/// Create a new local run-queue -pub(crate) fn local() -> (Steal, Local) { - let mut buffer = Vec::with_capacity(LOCAL_QUEUE_CAPACITY); - - for _ in 0..LOCAL_QUEUE_CAPACITY { - buffer.push(UnsafeCell::new(MaybeUninit::uninit())); +//cfg_rt_multi_thread_bwos! { +pub(crate) mod bwosq; +//} + +pub(crate) mod tokioq; + +use crate::runtime::builder::MultiThreadFlavor; +use crate::runtime::task::Inject; +use crate::runtime::{task, MetricsBatch}; + +pub(crate) fn local( + flavor: MultiThreadFlavor, +) -> ( + Box + Send + Sync>, + Box + Send + Sync>, +) { + match flavor { + MultiThreadFlavor::Default => tokioq::local(), + //#[cfg(all(tokio_unstable, feature = "bwos"))] + MultiThreadFlavor::Bwos => bwosq::local(), } - - let inner = Arc::new(Inner { - head: AtomicUnsignedLong::new(0), - tail: AtomicUnsignedShort::new(0), - buffer: make_fixed_size(buffer.into_boxed_slice()), - }); - - let local = Local { - inner: inner.clone(), - }; - - let remote = Steal(inner); - - (remote, local) } -impl Local { +pub(crate) trait Owner: Send + Sync { /// Returns true if the queue has entries that can be stolen. - pub(crate) fn is_stealable(&self) -> bool { - !self.inner.is_empty() - } + fn is_stealable(&self) -> bool; - /// How many tasks can be pushed into the queue - pub(crate) fn remaining_slots(&self) -> usize { - self.inner.remaining_slots() - } + /// Returns the maximum capacity of the underlying queue. + fn max_capacity(&self) -> usize; - pub(crate) fn max_capacity(&self) -> usize { - LOCAL_QUEUE_CAPACITY - } + /// Returns a tuple with the lower bound and an Option for the upper bound of remaining + /// slots for enqueuing in the queue. + fn remaining_slots_hint(&self) -> (u16, Option); - /// Returns false if there are any entries in the queue - /// - /// Separate to is_stealable so that refactors of is_stealable to "protect" - /// some tasks from stealing won't affect this - pub(crate) fn has_tasks(&self) -> bool { - !self.inner.is_empty() - } + /// Returns true if there are entries in the queue. + fn has_tasks(&self) -> bool; /// Pushes a batch of tasks to the back of the queue. All tasks must fit in /// the local queue. @@ -133,476 +41,53 @@ impl Local { /// # Panics /// /// The method panics if there is not enough capacity to fit in the queue. - pub(crate) fn push_back(&mut self, tasks: impl ExactSizeIterator>) { - let len = tasks.len(); - assert!(len <= LOCAL_QUEUE_CAPACITY); - - if len == 0 { - // Nothing to do - return; - } - - let head = self.inner.head.load(Acquire); - let (steal, _) = unpack(head); - - // safety: this is the **only** thread that updates this cell. - let mut tail = unsafe { self.inner.tail.unsync_load() }; - - if tail.wrapping_sub(steal) <= (LOCAL_QUEUE_CAPACITY - len) as UnsignedShort { - // Yes, this if condition is structured a bit weird (first block - // does nothing, second returns an error). It is this way to match - // `push_back_or_overflow`. - } else { - panic!() - } - - for task in tasks { - let idx = tail as usize & MASK; - - self.inner.buffer[idx].with_mut(|ptr| { - // Write the task to the slot - // - // Safety: There is only one producer and the above `if` - // condition ensures we don't touch a cell if there is a - // value, thus no consumer. - unsafe { - ptr::write((*ptr).as_mut_ptr(), task); - } - }); - - tail = tail.wrapping_add(1); - } - - self.inner.tail.store(tail, Release); - } + fn push_back(&mut self, tasks: Box> + '_>); /// Pushes a task to the back of the local queue, if there is not enough /// capacity in the queue, this triggers the overflow operation. /// - /// When the queue overflows, half of the curent contents of the queue is + /// When the queue overflows, half of the current contents of the queue is /// moved to the given Injection queue. This frees up capacity for more - /// tasks to be pushed into the local queue. - pub(crate) fn push_back_or_overflow( + /// tasks to be pushed into the local queue. + fn push_back_or_overflow( &mut self, - mut task: task::Notified, + task: task::Notified, inject: &Inject, metrics: &mut MetricsBatch, - ) { - let tail = loop { - let head = self.inner.head.load(Acquire); - let (steal, real) = unpack(head); - - // safety: this is the **only** thread that updates this cell. - let tail = unsafe { self.inner.tail.unsync_load() }; - - if tail.wrapping_sub(steal) < LOCAL_QUEUE_CAPACITY as UnsignedShort { - // There is capacity for the task - break tail; - } else if steal != real { - // Concurrently stealing, this will free up capacity, so only - // push the task onto the inject queue - inject.push(task); - return; - } else { - // Push the current task and half of the queue into the - // inject queue. - match self.push_overflow(task, real, tail, inject, metrics) { - Ok(_) => return, - // Lost the race, try again - Err(v) => { - task = v; - } - } - } - }; - - self.push_back_finish(task, tail); - } - - // Second half of `push_back` - fn push_back_finish(&self, task: task::Notified, tail: UnsignedShort) { - // Map the position to a slot index. - let idx = tail as usize & MASK; - - self.inner.buffer[idx].with_mut(|ptr| { - // Write the task to the slot - // - // Safety: There is only one producer and the above `if` - // condition ensures we don't touch a cell if there is a - // value, thus no consumer. - unsafe { - ptr::write((*ptr).as_mut_ptr(), task); - } - }); - - // Make the task available. Synchronizes with a load in - // `steal_into2`. - self.inner.tail.store(tail.wrapping_add(1), Release); - } + ); - /// Moves a batch of tasks into the inject queue. + /// Push a batch of tasks to the back of the local queue /// - /// This will temporarily make some of the tasks unavailable to stealers. - /// Once `push_overflow` is done, a notification is sent out, so if other - /// workers "missed" some of the tasks during a steal, they will get - /// another opportunity. - #[inline(never)] - fn push_overflow( + /// # Safety: + /// + /// The caller must ensure that the queue has enough capacity to accept + /// all tasks, e.g. by calling `can_enqueue` beforehand. + unsafe fn push_back_unchecked( &mut self, - task: task::Notified, - head: UnsignedShort, - tail: UnsignedShort, - inject: &Inject, - metrics: &mut MetricsBatch, - ) -> Result<(), task::Notified> { - /// How many elements are we taking from the local queue. - /// - /// This is one less than the number of tasks pushed to the inject - /// queue as we are also inserting the `task` argument. - const NUM_TASKS_TAKEN: UnsignedShort = (LOCAL_QUEUE_CAPACITY / 2) as UnsignedShort; - - assert_eq!( - tail.wrapping_sub(head) as usize, - LOCAL_QUEUE_CAPACITY, - "queue is not full; tail = {}; head = {}", - tail, - head - ); - - let prev = pack(head, head); - - // Claim a bunch of tasks - // - // We are claiming the tasks **before** reading them out of the buffer. - // This is safe because only the **current** thread is able to push new - // tasks. - // - // There isn't really any need for memory ordering... Relaxed would - // work. This is because all tasks are pushed into the queue from the - // current thread (or memory has been acquired if the local queue handle - // moved). - if self - .inner - .head - .compare_exchange( - prev, - pack( - head.wrapping_add(NUM_TASKS_TAKEN), - head.wrapping_add(NUM_TASKS_TAKEN), - ), - Release, - Relaxed, - ) - .is_err() - { - // We failed to claim the tasks, losing the race. Return out of - // this function and try the full `push` routine again. The queue - // may not be full anymore. - return Err(task); - } - - /// An iterator that takes elements out of the run queue. - struct BatchTaskIter<'a, T: 'static> { - buffer: &'a [UnsafeCell>>; LOCAL_QUEUE_CAPACITY], - head: UnsignedLong, - i: UnsignedLong, - } - impl<'a, T: 'static> Iterator for BatchTaskIter<'a, T> { - type Item = task::Notified; - - #[inline] - fn next(&mut self) -> Option> { - if self.i == UnsignedLong::from(NUM_TASKS_TAKEN) { - None - } else { - let i_idx = self.i.wrapping_add(self.head) as usize & MASK; - let slot = &self.buffer[i_idx]; - - // safety: Our CAS from before has assumed exclusive ownership - // of the task pointers in this range. - let task = slot.with(|ptr| unsafe { ptr::read((*ptr).as_ptr()) }); - - self.i += 1; - Some(task) - } - } - } - - // safety: The CAS above ensures that no consumer will look at these - // values again, and we are the only producer. - let batch_iter = BatchTaskIter { - buffer: &self.inner.buffer, - head: head as UnsignedLong, - i: 0, - }; - inject.push_batch(batch_iter.chain(std::iter::once(task))); - - // Add 1 to factor in the task currently being scheduled. - metrics.incr_overflow_count(); - - Ok(()) - } - - /// Pops a task from the local queue. - pub(crate) fn pop(&mut self) -> Option> { - let mut head = self.inner.head.load(Acquire); + tasks: Box> + '_>, + ); - let idx = loop { - let (steal, real) = unpack(head); - - // safety: this is the **only** thread that updates this cell. - let tail = unsafe { self.inner.tail.unsync_load() }; - - if real == tail { - // queue is empty - return None; - } - - let next_real = real.wrapping_add(1); - - // If `steal == real` there are no concurrent stealers. Both `steal` - // and `real` are updated. - let next = if steal == real { - pack(next_real, next_real) - } else { - assert_ne!(steal, next_real); - pack(steal, next_real) - }; - - // Attempt to claim a task. - let res = self - .inner - .head - .compare_exchange(head, next, AcqRel, Acquire); - - match res { - Ok(_) => break real as usize & MASK, - Err(actual) => head = actual, - } - }; - - Some(self.inner.buffer[idx].with(|ptr| unsafe { ptr::read(ptr).assume_init() })) - } + /// Pop one task from the front of the queue. + fn pop(&mut self) -> Option>; } -impl Steal { - pub(crate) fn is_empty(&self) -> bool { - self.0.is_empty() - } +pub(crate) trait Stealer: Send + Sync { + /// Returns true if the queue is empty + /// + /// This function _must_ be accurate and is intended to be used + /// only in non-performance critical settings. + fn is_empty(&self) -> bool; /// Steals half the tasks from self and place them into `dst`. - pub(crate) fn steal_into( + fn steal_into( &self, - dst: &mut Local, + dst: &mut dyn Owner, dst_metrics: &mut MetricsBatch, - ) -> Option> { - // Safety: the caller is the only thread that mutates `dst.tail` and - // holds a mutable reference. - let dst_tail = unsafe { dst.inner.tail.unsync_load() }; - - // To the caller, `dst` may **look** empty but still have values - // contained in the buffer. If another thread is concurrently stealing - // from `dst` there may not be enough capacity to steal. - let (steal, _) = unpack(dst.inner.head.load(Acquire)); - - if dst_tail.wrapping_sub(steal) > LOCAL_QUEUE_CAPACITY as UnsignedShort / 2 { - // we *could* try to steal less here, but for simplicity, we're just - // going to abort. - return None; - } - - // Steal the tasks into `dst`'s buffer. This does not yet expose the - // tasks in `dst`. - let mut n = self.steal_into2(dst, dst_tail); - - if n == 0 { - // No tasks were stolen - return None; - } - - dst_metrics.incr_steal_count(n as u16); - dst_metrics.incr_steal_operations(); - - // We are returning a task here - n -= 1; - - let ret_pos = dst_tail.wrapping_add(n); - let ret_idx = ret_pos as usize & MASK; - - // safety: the value was written as part of `steal_into2` and not - // exposed to stealers, so no other thread can access it. - let ret = dst.inner.buffer[ret_idx].with(|ptr| unsafe { ptr::read((*ptr).as_ptr()) }); - - if n == 0 { - // The `dst` queue is empty, but a single task was stolen - return Some(ret); - } - - // Make the stolen items available to consumers - dst.inner.tail.store(dst_tail.wrapping_add(n), Release); - - Some(ret) - } - - // Steal tasks from `self`, placing them into `dst`. Returns the number of - // tasks that were stolen. - fn steal_into2(&self, dst: &mut Local, dst_tail: UnsignedShort) -> UnsignedShort { - let mut prev_packed = self.0.head.load(Acquire); - let mut next_packed; - - let n = loop { - let (src_head_steal, src_head_real) = unpack(prev_packed); - let src_tail = self.0.tail.load(Acquire); - - // If these two do not match, another thread is concurrently - // stealing from the queue. - if src_head_steal != src_head_real { - return 0; - } - - // Number of available tasks to steal - let n = src_tail.wrapping_sub(src_head_real); - let n = n - n / 2; - - if n == 0 { - // No tasks available to steal - return 0; - } - - // Update the real head index to acquire the tasks. - let steal_to = src_head_real.wrapping_add(n); - assert_ne!(src_head_steal, steal_to); - next_packed = pack(src_head_steal, steal_to); - - // Claim all those tasks. This is done by incrementing the "real" - // head but not the steal. By doing this, no other thread is able to - // steal from this queue until the current thread completes. - let res = self - .0 - .head - .compare_exchange(prev_packed, next_packed, AcqRel, Acquire); - - match res { - Ok(_) => break n, - Err(actual) => prev_packed = actual, - } - }; - - assert!( - n <= LOCAL_QUEUE_CAPACITY as UnsignedShort / 2, - "actual = {}", - n - ); - - let (first, _) = unpack(next_packed); - - // Take all the tasks - for i in 0..n { - // Compute the positions - let src_pos = first.wrapping_add(i); - let dst_pos = dst_tail.wrapping_add(i); + ) -> Option>; - // Map to slots - let src_idx = src_pos as usize & MASK; - let dst_idx = dst_pos as usize & MASK; - - // Read the task - // - // safety: We acquired the task with the atomic exchange above. - let task = self.0.buffer[src_idx].with(|ptr| unsafe { ptr::read((*ptr).as_ptr()) }); - - // Write the task to the new slot - // - // safety: `dst` queue is empty and we are the only producer to - // this queue. - dst.inner.buffer[dst_idx] - .with_mut(|ptr| unsafe { ptr::write((*ptr).as_mut_ptr(), task) }); - } - - let mut prev_packed = next_packed; - - // Update `src_head_steal` to match `src_head_real` signalling that the - // stealing routine is complete. - loop { - let head = unpack(prev_packed).1; - next_packed = pack(head, head); - - let res = self - .0 - .head - .compare_exchange(prev_packed, next_packed, AcqRel, Acquire); - - match res { - Ok(_) => return n, - Err(actual) => { - let (actual_steal, actual_real) = unpack(actual); - - assert_ne!(actual_steal, actual_real); - - prev_packed = actual; - } - } - } - } -} - -cfg_metrics! { - impl Steal { - pub(crate) fn len(&self) -> usize { - self.0.len() as _ - } - } -} - -impl Clone for Steal { - fn clone(&self) -> Steal { - Steal(self.0.clone()) - } -} - -impl Drop for Local { - fn drop(&mut self) { - if !std::thread::panicking() { - assert!(self.pop().is_none(), "queue not empty"); - } + cfg_metrics! { + /// Number of tasks in the queue. + #[cfg(feature = "stats")] + fn len(&self) -> usize; } } - -impl Inner { - fn remaining_slots(&self) -> usize { - let (steal, _) = unpack(self.head.load(Acquire)); - let tail = self.tail.load(Acquire); - - LOCAL_QUEUE_CAPACITY - (tail.wrapping_sub(steal) as usize) - } - - fn len(&self) -> UnsignedShort { - let (_, head) = unpack(self.head.load(Acquire)); - let tail = self.tail.load(Acquire); - - tail.wrapping_sub(head) - } - - fn is_empty(&self) -> bool { - self.len() == 0 - } -} - -/// Split the head value into the real head and the index a stealer is working -/// on. -fn unpack(n: UnsignedLong) -> (UnsignedShort, UnsignedShort) { - let real = n & UnsignedShort::MAX as UnsignedLong; - let steal = n >> (mem::size_of::() * 8); - - (steal as UnsignedShort, real as UnsignedShort) -} - -/// Join the two head values -fn pack(steal: UnsignedShort, real: UnsignedShort) -> UnsignedLong { - (real as UnsignedLong) | ((steal as UnsignedLong) << (mem::size_of::() * 8)) -} - -#[test] -fn test_local_queue_capacity() { - assert!(LOCAL_QUEUE_CAPACITY - 1 <= u8::MAX as usize); -} diff --git a/tokio/src/runtime/scheduler/multi_thread/queue/bwosq.rs b/tokio/src/runtime/scheduler/multi_thread/queue/bwosq.rs new file mode 100644 index 00000000000..22a225aaa13 --- /dev/null +++ b/tokio/src/runtime/scheduler/multi_thread/queue/bwosq.rs @@ -0,0 +1,159 @@ +use std::convert::TryInto; + +use crate::runtime::scheduler::multi_thread::queue::Owner as OwnerTrait; +use crate::runtime::task::{self, Inject, Notified}; +use crate::runtime::MetricsBatch; + +mod bwosqueue; + +// todo: Discuss using const generics or runtime values. Benchmark performance difference. +const NUM_BLOCKS: usize = 8; +const ELEMENTS_PER_BLOCK: usize = 32; + +/// Producer handle. May only be used from a single thread. +pub(crate) struct Local { + inner: bwosqueue::Owner, NUM_BLOCKS, ELEMENTS_PER_BLOCK>, +} + +/// Consumer handle. May be used from many threads. +pub(crate) struct Steal( + bwosqueue::Stealer, NUM_BLOCKS, ELEMENTS_PER_BLOCK>, +); + +/// Create a new local run-queue +pub(crate) fn local() -> ( + Box + Send + Sync>, + Box + Send + Sync>, +) { + let (owner, stealer) = bwosqueue::new::, NUM_BLOCKS, ELEMENTS_PER_BLOCK>(); + + let local = Local { inner: owner }; + + let remote = Steal(stealer); + + (Box::new(remote), Box::new(local)) +} + +impl super::Owner for Local { + /// Returns true if the queue has entries that can be stolen. + fn is_stealable(&self) -> bool { + self.inner.has_stealable_entries() + } + + fn max_capacity(&self) -> usize { + self::ELEMENTS_PER_BLOCK * self::NUM_BLOCKS + } + + /// Returns true if there are entries in the queue. + fn has_tasks(&self) -> bool { + self.inner.can_consume() + } + + /// Pushes a task to the back of the local queue, skipping the LIFO slot. + fn push_back_or_overflow( + &mut self, + task: task::Notified, + inject: &Inject, + metrics: &mut MetricsBatch, + ) { + if let Err(t) = self.inner.enqueue(task) { + if self.inner.next_block_has_stealers() { + inject.push(t); + } else { + // push overflow of old queue + if let Some(block_iter) = self.inner.dequeue_block() { + inject.push_batch(block_iter.chain(std::iter::once(t))) + } else { + inject.push(t) + } + } + metrics.incr_overflow_count(); + }; + } + + fn push_back(&mut self, tasks: Box> + '_>) { + let len = tasks.len(); + let min_capacity = self.inner.min_remaining_slots(); + assert!(len <= min_capacity); + // SAFETU: We checked the capacity of the queue is sufficient before enqueuing. + unsafe { + self.inner.enqueue_batch_unchecked(tasks); + } + } + + unsafe fn push_back_unchecked( + &mut self, + tasks: Box> + '_>, + ) { + self.inner.enqueue_batch_unchecked(tasks); + } + + fn remaining_slots_hint(&self) -> (u16, Option) { + let min_slots = self.inner.min_remaining_slots(); + debug_assert!(min_slots <= u16::MAX.into()); + // Note: If we do change from a linked list of blocks to an array of blocks, + // we may be able to quickly calculate an approximate upper bound based + // on the consumer cache _index_. + (min_slots as u16, None) + } + + fn pop(&mut self) -> Option> { + self.inner.dequeue() + } +} + +impl Drop for Local { + fn drop(&mut self) { + if !std::thread::panicking() { + assert!(self.pop().is_none(), "queue not empty"); + } + } +} + +impl super::Stealer for Steal { + fn is_empty(&self) -> bool { + self.0.is_empty() + } + + /// Steals one block from self and place them into `dst`. + fn steal_into( + &self, + dst: &mut dyn OwnerTrait, + dst_metrics: &mut MetricsBatch, + ) -> Option> { + // In the rare case that the `dst` queue is at the same time also full, because the + // producer is blocked waiting on a stealer we only attempt to steal a single task + if dst.remaining_slots_hint().0 < ELEMENTS_PER_BLOCK as u16 { + dst_metrics.incr_steal_count(1); + dst_metrics.incr_steal_operations(); + // We could evaluate stealing exactly the amount of remaining slots + 1. + return self.0.steal(); + } + + if let Some(mut stolen_tasks) = self.0.steal_block() { + let num_stolen = stolen_tasks.len(); + let first = stolen_tasks.next(); + debug_assert!(first.is_some()); + unsafe { dst.push_back_unchecked(Box::new(stolen_tasks)) } + dst_metrics.incr_steal_count(num_stolen.try_into().unwrap()); + dst_metrics.incr_steal_operations(); + first + } else { + None + } + } + + cfg_metrics! { + /// Approximate queue length + fn len(&self) -> usize { + self.0.estimated_queue_entries() + } + + } +} + +impl Clone for Steal { + fn clone(&self) -> Self { + Self(self.0.clone()) + } +} diff --git a/tokio/src/runtime/scheduler/multi_thread/queue/bwosq/bwosqueue/bwos_queue.rs b/tokio/src/runtime/scheduler/multi_thread/queue/bwosq/bwosqueue/bwos_queue.rs new file mode 100644 index 00000000000..2a07ff60e36 --- /dev/null +++ b/tokio/src/runtime/scheduler/multi_thread/queue/bwosq/bwosqueue/bwos_queue.rs @@ -0,0 +1,201 @@ +use super::metadata::AtomicIndexAndVersion; +use crate::loom::{cell::UnsafeCell, sync::Arc}; +use crate::util::array_init; +use crate::util::cache_padded::CachePadded; +use core::{marker::PhantomPinned, mem::MaybeUninit, pin::Pin, ptr::null}; + +#[cfg(feature = "stats")] +mod bwsstats { + // The statistics don't influence correctness, so don't model them with loom. + use crate::util::cache_padded::CachePadded; + use std::sync::atomic::{AtomicUsize, Ordering::Relaxed}; + + pub(crate) struct BwsStats { + owner_counter: CachePadded, + total_stolen: CachePadded, + } + + impl BwsStats { + pub(crate) fn new() -> Self { + Self { + owner_counter: CachePadded::new(AtomicUsize::new(0)), + total_stolen: CachePadded::new(AtomicUsize::new(0)), + } + } + + #[inline] + pub(crate) fn increment_enqueued(&self, rhs: usize) { + let curr = self.owner_counter.load(Relaxed); + let new = curr.wrapping_add(rhs); + self.owner_counter.store(new, Relaxed); + } + #[inline] + pub(crate) fn increment_dequeued(&self, rhs: usize) { + let curr = self.owner_counter.load(Relaxed); + let new = curr.wrapping_sub(rhs); + self.owner_counter.store(new, Relaxed); + } + + #[inline] + pub(crate) fn increment_stolen(&self, rhs: usize) { + self.total_stolen.fetch_add(rhs, Relaxed); + } + + /// Returns the _estimated_ number of currently enqueued items. + #[inline] + pub(crate) fn curr_enqueued(&self) -> usize { + let owner_cnt = self.owner_counter.load(Relaxed); + let total_stolen = self.total_stolen.load(Relaxed); + + owner_cnt.saturating_sub(total_stolen) + } + } +} + +#[cfg(feature = "stats")] +pub(crate) use bwsstats::*; + +pub(crate) struct BwsQueue { + pub(crate) blocks: CachePadded<[Block; NUM_BLOCKS]>, + #[cfg(feature = "stats")] + pub(crate) stats: CachePadded, + _pin: PhantomPinned, +} + +pub(crate) struct Block { + /// The index and version of the next writable entry in the block + /// + /// index == NE signals that the producer has already fully written this block. + /// `committed` is only written to by the single producer ([Owner](super::Owner)). + pub(crate) committed: CachePadded>, + /// The index and version of the next readable entry in the block + /// + /// If consumed == committed, then there are not items that can be read in this block. + /// `consumed` is only written by the single consumer ([Owner](super::Owner)). + pub(crate) consumed: CachePadded>, + /// stealer-head - We ensure that consumer and stealer are never on same block + pub(crate) reserved: CachePadded>, + /// stealer-tail - stealing finished + pub(crate) stolen: CachePadded>, + /// Block specific configuration, including a reference to the next block in the bwosqueue. + conf: CachePadded>, + /// The storage for all entries in this block + pub(crate) entries: CachePadded<[UnsafeCell>; NE]>, +} + +struct BlockConfig { + /// true if this Block is the HEAD of the queue. + beginning: bool, + /// Blocks are linked together as a linked list via the `next` pointer to speed up accessing + /// the next block. The pointer is fixed, but needs to be initialized after the Block has + /// been put behind a shared reference in pinned memory, since we can't directly initialize + /// and pin memory on the heap. + next: UnsafeCell<*const Block>, +} + +impl BlockConfig { + fn new(idx: usize) -> BlockConfig { + BlockConfig { + beginning: idx == 0, + next: UnsafeCell::new(null()), + } + } +} + +impl Block { + fn new(idx: usize) -> Block { + let is_queue_head = idx == 0; + Block { + committed: CachePadded::new(AtomicIndexAndVersion::new_owner(is_queue_head)), + consumed: CachePadded::new(AtomicIndexAndVersion::new_owner(is_queue_head)), + reserved: CachePadded::new(AtomicIndexAndVersion::new_stealer(is_queue_head)), + stolen: CachePadded::new(AtomicIndexAndVersion::new_stealer(is_queue_head)), + conf: CachePadded::new(BlockConfig::new(idx)), + entries: CachePadded::new(array_init(|_| UnsafeCell::new(MaybeUninit::uninit()))), + } + } + + /// Returns the next Block in the BWoS queue + #[inline(always)] + pub(crate) fn next(&self) -> *const Self { + // SAFETY: The next pointer is static and valid after initialization of the queue for + // the whole lifetime of the queue. + unsafe { self.conf.next.with(|next| *next) } + } + + /// true if this block is the head of the BWoS queue + #[inline(always)] + pub(crate) fn is_head(&self) -> bool { + self.conf.beginning + } +} + +impl + BwsQueue +{ + #[cfg(const_assert)] + const _ASSERT_NUM_BLOCKS_POW2: () = assert!(NUM_BLOCKS.is_power_of_two()); + #[cfg(const_assert)] + const _ASSERT_NUM_GREATER_1: () = assert!(NUM_BLOCKS > 1); + + pub(crate) fn new() -> Pin> { + // We need to "use" the assertions here, otherwise the compile-time assertions are ignored. + #[cfg(const_assert)] + #[allow(clippy::let_unit_value)] + let _ = Self::_ASSERT_NUM_BLOCKS_POW2; + #[cfg(const_assert)] + #[allow(clippy::let_unit_value)] + let _ = Self::_ASSERT_NUM_GREATER_1; + + // First create and pin the queue on the heap + let q = Arc::pin(BwsQueue { + blocks: CachePadded::new(array_init(|idx| Block::new(idx))), + #[cfg(feature = "stats")] + stats: CachePadded::new(BwsStats::new()), + _pin: PhantomPinned, + }); + // Now initialize the fast-path pointers + let blocks: &[Block; NUM_BLOCKS] = &q.blocks; + for block_window in blocks.windows(2) { + // Note: This cannot panic since we asserted at compile-time that BwsQueue has at least + // 2 blocks + let curr_block = block_window.get(0).expect("INVALID_NUM_BLOCKS"); + let next_block = block_window.get(1).expect("INVALID_NUM_BLOCKS"); + // SAFETY: Since our array of blocks is already behind an `Arc` and `Pin`ned we can't + // initialize the pointers with safe code, but we do know that at this point in time + // no concurrent mutable access is possible, since there are no other references. + unsafe { + curr_block.conf.next.with_mut(|next_ptr| { + (*next_ptr) = next_block; + }); + } + } + + let first_block = blocks.first().expect("INVALID_NUM_BLOCKS"); + let last_block = blocks.last().expect("INVALID_NUM_BLOCKS"); + + // SAFETY: There are no other active references to the curr and next block and no + // concurrent access is possible here. + unsafe { + last_block.conf.next.with_mut(|next_ptr| { + (*next_ptr) = first_block; + }); + } + // Now all fields in the Queue are initialized correctly + q + } + + /// The estimated number of elements currently enqueued. + /// + /// Items which are currently being stolen do not count towards the length, + /// so this method is not suited to determine if the queue is full. + #[cfg(feature = "stats")] + pub(crate) fn estimated_len(&self) -> usize { + self.stats.curr_enqueued() + } + + // #[cfg(feature = "stats")] + // pub(crate) fn is_empty(&self) -> bool { + // self.estimated_len() == 0 + // } +} diff --git a/tokio/src/runtime/scheduler/multi_thread/queue/bwosq/bwosqueue/metadata.rs b/tokio/src/runtime/scheduler/multi_thread/queue/bwosq/bwosqueue/metadata.rs new file mode 100644 index 00000000000..ef0383f7798 --- /dev/null +++ b/tokio/src/runtime/scheduler/multi_thread/queue/bwosq/bwosqueue/metadata.rs @@ -0,0 +1,235 @@ +//! Contains metadata for the block configuration + +use crate::loom::sync::atomic::{AtomicUsize, Ordering}; +use core::fmt::{Debug, Formatter}; + +/// A container for the current block index and block version +/// +/// `NE` is the number of elements in a block (index `0..NE`). `index == NE` marks a full block. +/// +/// Bits `0..=NE_LOG_CEIL`, where `NE_LOG_CEIL` is `(NE+1).next_power_of_two()).log2()` +/// are reserved for the index. +/// Bits `(NE_LOG_CEIL + 1)..` are used for the block version. The version field is +/// used to detect [ABA](https://en.wikipedia.org/wiki/ABA_problem) situations when accessing queue entries. +#[repr(transparent)] +#[derive(PartialEq, Eq, Copy, Clone)] +pub(crate) struct IndexAndVersion(usize); + +/// The index of the current element in the block +/// +/// 0 represents an empty block while NE represents a full block. +#[repr(transparent)] +pub(crate) struct Index(usize); + +impl Index { + /// Creates an Index for an empty block + #[inline(always)] + pub(crate) fn empty() -> Self { + Self(0) + } + + /// Creates an Index for a full block + #[inline(always)] + pub(crate) fn full() -> Self { + Self(NUM_ELEMENTS_PER_BLOCK) + } + + /// True if the block is full + #[inline(always)] + pub(crate) fn is_full(&self) -> bool { + self.0 == NUM_ELEMENTS_PER_BLOCK + } + + /// True if the block is empty + #[inline(always)] + #[allow(dead_code)] + pub(crate) fn is_empty(&self) -> bool { + self.0 == 0 + } +} + +// todo: use atomic usize after fixing overflow problem to support 32bit +#[repr(transparent)] +pub(crate) struct AtomicIndexAndVersion(AtomicUsize); + +impl IndexAndVersion<{ NE }> { + // 0 elements per block make no sense + #[cfg(const_assert)] + const _ASSERT_NE_GREATER_ZERO: () = assert!(NE > 0); + #[cfg(const_assert)] + const MIN_VERSION_BITS: u32 = 1; + // Subtract 1 to get the maximum number representable by that amount of bits and subtract another one to allow for + // representing the full block state (`idx == NE`). + #[cfg(const_assert)] + const MAX_NE: usize = 2_usize.pow(usize::BITS - Self::MIN_VERSION_BITS) - 2; + #[cfg(const_assert)] + const _ASSERT_NE_MAX: () = assert!(NE <= Self::MAX_NE); + + #[inline(always)] + fn raw(&self) -> usize { + self.0 + } + + #[inline(always)] + fn max_version() -> usize { + let num_version_bits = usize::BITS - Self::ne_log() as u32; + 2_usize.pow(num_version_bits).wrapping_sub(1) + } + + /// Number of bits used for the Number of elements in a block + /// + /// Guaranteed to be at least 1. + #[inline] + fn ne_log() -> usize { + #[cfg(const_assert)] + #[allow(clippy::let_unit_value)] + let _ = Self::_ASSERT_NE_GREATER_ZERO; + #[cfg(const_assert)] + #[allow(clippy::let_unit_value)] + let _ = Self::_ASSERT_NE_MAX; + // (const) integer logarithm is not stable yet, so we need to use floating point and + // rely on the compiler to optimize this away at compile time. + ((NE + 1).next_power_of_two() as f32).log2() as usize + } + + #[inline(always)] + pub(crate) fn new(version: usize, index: Index) -> Self { + debug_assert!(version <= Self::max_version()); + + Self(version.wrapping_shl(Self::ne_log() as u32) | index.0) + } + + #[inline(always)] + fn from_raw(raw: usize) -> Self { + Self(raw) + } + + #[inline(always)] + pub(crate) fn version(&self) -> usize { + self.0.wrapping_shr(Self::ne_log() as u32) + } + + /// Increment the version by one if this is the first block and reset index + #[inline] + pub(crate) fn next_version(&self, is_first_block: bool) -> Self { + let cur_version_shifted = self.0 & Self::version_mask(); + let first_bit_pos_version = Self::ne_log() as u32; + let new_version_shifted = cur_version_shifted + .wrapping_add((is_first_block as usize).wrapping_shl(first_bit_pos_version)); + // index is now zeroed. + Self(new_version_shifted) + } + + /// A bitmask for the bits used for the block index + #[inline(always)] + fn index_mask() -> usize { + // ne_log will be at least 1, so the subtraction will never wrap around + 1_usize.wrapping_shl(Self::ne_log() as u32) - 1 + } + + #[inline(always)] + fn version_mask() -> usize { + !Self::index_mask() + } + + #[inline(always)] + pub(crate) fn index(&self) -> Index { + // We are sure that the index we stored is valid + Index(self.raw_index()) + } + + #[inline(always)] + pub(crate) fn raw_index(&self) -> usize { + self.0 & Self::index_mask() + } + + /// Increment the Index by `rhs`. + /// + /// # Safety + /// + /// The caller be sure that the result of self + rhs is <= NE. + #[inline(always)] + pub(crate) unsafe fn index_add_unchecked(&self, rhs: usize) -> Self { + debug_assert!(self.raw_index() + rhs <= NE); + Self(self.0.wrapping_add(rhs)) + } +} + +impl Debug for IndexAndVersion<{ NE }> { + fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result { + f.debug_struct("IndexAndVersion") + .field("Index", &self.raw_index()) + .field("Version", &self.version()) + .finish() + } +} + +impl AtomicIndexAndVersion<{ NE }> { + #[inline(always)] + pub(crate) fn load(&self, order: Ordering) -> IndexAndVersion { + let v = self.0.load(order); + IndexAndVersion::from_raw(v) + } + + /// Creates a new instance for an `Owner` field (producer or consumer + pub(crate) fn new_owner(is_queue_head: bool) -> Self { + let empty_val: IndexAndVersion = if is_queue_head { + // The first block (head) starts at version one and with an empty index + // to indicate readiness to produce/consume once values where produced. + IndexAndVersion::new(1, Index::empty()) + } else { + // The remaining blocks start one version behind and are marked as fully + // produced/consumed. + IndexAndVersion::new(0, Index::full()) + }; + Self(AtomicUsize::new(empty_val.raw())) + } + + /// Creates a new instance for a `Stealer` field. The main difference to + /// [new_owner](Self::new_owner) is that the stealer is always initialized as full, + /// i.e. not ready for stealing. This is because the queue head is reserved for the + /// consumer and the stealer may not steal from the same block the consumer is on. + pub(crate) fn new_stealer(is_queue_head: bool) -> Self { + let full_val: IndexAndVersion = + IndexAndVersion::new(is_queue_head as usize, Index::full()); + Self(AtomicUsize::new(full_val.raw())) + } + + #[inline(always)] + pub(crate) fn fetch_add(&self, val: usize, order: Ordering) -> IndexAndVersion { + let old = self.0.fetch_add(val, order); + IndexAndVersion::from_raw(old) + } + + #[inline(always)] + pub(crate) fn compare_exchange_weak( + &self, + current: IndexAndVersion, + new: IndexAndVersion, + success: Ordering, + failure: Ordering, + ) -> Result, IndexAndVersion> { + self.0 + .compare_exchange_weak(current.raw(), new.raw(), success, failure) + .map_err(IndexAndVersion::from_raw) + .map(IndexAndVersion::from_raw) + } + + #[inline(always)] + pub(crate) fn store(&self, val: IndexAndVersion, order: Ordering) { + self.0.store(val.raw(), order) + } + + #[inline(always)] + pub(crate) fn swap(&self, val: IndexAndVersion, order: Ordering) -> IndexAndVersion { + let old = self.0.swap(val.raw(), order); + IndexAndVersion::from_raw(old) + } +} + +impl Debug for AtomicIndexAndVersion<{ NE }> { + fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result { + let val = self.load(Ordering::SeqCst); + f.write_fmt(format_args!("{:?}", val)) + } +} diff --git a/tokio/src/runtime/scheduler/multi_thread/queue/bwosq/bwosqueue/mod.rs b/tokio/src/runtime/scheduler/multi_thread/queue/bwosq/bwosqueue/mod.rs new file mode 100644 index 00000000000..a0048027b79 --- /dev/null +++ b/tokio/src/runtime/scheduler/multi_thread/queue/bwosq/bwosqueue/mod.rs @@ -0,0 +1,824 @@ +//! The BWoS queue is a fast block-based work stealing queue for parallel processing. +//! +//! The BWoS queue is based on the [BBQ] (Block-based Bounded Queue) and is specially designed for the +//! workstealing scenario. Based on the real-world observation that the "stealing" operation is +//! rare and most of the operations are local enqueues and dequeues this queue implementation +//! offers a single [Owner] which can enqueue and dequeue without any heavy synchronization mechanisms +//! on the fast path. Concurrent stealing is possible and does not slow done the Owner too much. +//! This allows stealing policies which steal single items or in small batches. +//! +//! # Queue Semantics +//! +//! - The block-based design reduces the synchronization requirements on the fast-path +//! inside a block and moves the heavy synchronization operations necessary to support +//! multiple stealers to the slow-path when transitioning to the next block. +//! - The producer (enqueue) may not advance to the next block if the consumer or a stealer +//! is still operating on that block. This allows the producer to remove producer-consumer/stealer +//! synchronization from its fast-path operations, but reduces the queue capacity by +//! at most one block. +//! - Stealers may not steal from the same block as the consumer. This allows the consumer +//! to remove consumer-stealer synchronization from its fast-path operations, but means +//! one block is not available for stealing. +//! - Consumers may "take-over" the next block preventing stealers from stealing in that +//! block after the take-over. Stealers will still proceed with already in-progress steal +//! operations in this block. +//! - This queue implementation puts the producer and consumer into a shared Owner struct, +//! +//! [BBQ]: https://www.usenix.org/conference/atc22/presentation/wang-jiawei +//! +//! # Todo: +//! - Instead of const generics we could use a boxed slice for a dynamically sized array. +//! The performance impact be benchmarked though, since this will result in multiple operations +//! not being able to be calculated at compile-time anymore. + +#![deny(unsafe_op_in_unsafe_fn)] +#![warn(unreachable_pub)] +#![allow(dead_code)] + +use core::{ + marker::{Send, Sync}, + pin::Pin, +}; +use std::fmt::Formatter; +use std::mem::MaybeUninit; + +mod bwos_queue; +mod metadata; + +use crate::loom::cell::UnsafeCell; +use crate::loom::sync::atomic::{ + AtomicUsize, + Ordering::{Acquire, Relaxed, Release}, +}; +use crate::loom::sync::Arc; +use crate::util::cache_padded::CachePadded; +use bwos_queue::{Block, BwsQueue}; +use metadata::{Index, IndexAndVersion}; + +/// The Owner interface to the BWoS queue +/// +/// The owner is both the single producer and single consumer. +pub(crate) struct Owner { + /// Producer cache (single producer)- points to block in self.queue. + pcache: CachePadded<*const Block>, + /// Consumer cache (single consumer) - points to block in self.queue. + ccache: CachePadded<*const Block>, + /// `Arc` to the actual queue to ensure the queue lives at least as long as the Owner. + #[allow(dead_code)] + queue: Pin>>, +} + +/// A Stealer interface to the BWoS queue +/// +/// There may be multiple stealers. Stealers share the stealer position which is used to quickly look up +/// the next block for attempted stealing. +pub(crate) struct Stealer { + /// The actual stealer position is `self.spos % NUM_BLOCKS`. The position is incremented beyond + /// `NUM_BLOCKS` to detect ABA problems. + spos: CachePadded>, + queue: Pin>>, +} + +/// An iterator over elements of one Block. +/// +/// The iterator borrows all elements up to `committed` to allows batched +/// operations on the elements. When the iterator is dropped the entries +/// are marked as consumed in one atomic operation. +pub(crate) struct BlockIter<'a, E, const ENTRIES_PER_BLOCK: usize> { + buffer: &'a [UnsafeCell>; ENTRIES_PER_BLOCK], + /// Index if the next to be consumed entry in the buffer. + i: usize, + /// Number of committed entries in the buffer. + committed: usize, +} + +/// An iterator over elements of one Block of a stealer +/// +/// Marks the stolen entries as stolen once the iterator has been consumed. +pub(crate) struct StealerBlockIter<'a, E, const ENTRIES_PER_BLOCK: usize> { + /// Stealer Block + stealer_block: &'a Block, + /// Remember how many entries where reserved for the Drop implementation + num_reserved: usize, + /// reserved index of the block. We own the entries from `i..block_reserved` + block_reserved: usize, + /// curr index in the block + i: usize, +} + +unsafe impl Send + for Owner +{ +} + +// todo: is this really needed? +unsafe impl Sync + for Owner +{ +} + +unsafe impl Send + for Stealer +{ +} + +unsafe impl Sync + for Stealer +{ +} + +impl + Owner +{ + /// Try to enqueue `t` into the FIFO queue. + /// + /// If the queue is full, `Err(t)` is returned to the caller. + #[inline(always)] + pub(crate) fn enqueue(&mut self, t: E) -> Result<(), E> { + loop { + // SAFETY: `pcache` always points to a valid `Block` in the queue. We never create a mutable reference + // to a Block, so it is safe to construct a shared reference here. + let blk = unsafe { &**self.pcache }; + + // Load the index of the next free queue entry for the producer. `committed` is only written to by the + // single producer, so `Relaxed` reading is fine. + let committed = blk.committed.load(Relaxed); + let committed_idx = committed.raw_index(); + + // Fastpath (the block is not full): Due to the slowpath checks we know that the entire remaining block + // is available to the producer and do not need to check the consumed index in the fastpath. + if let Some(entry_cell) = blk.entries.get(committed_idx) { + // SAFETY: We checked the entry is available for writing and the index can be + // post-incremented unconditionally since `index == NE` is valid and means the block + // is full. + let committed_new = unsafe { + entry_cell.with_mut(|uninit_entry| uninit_entry.write(MaybeUninit::new(t))); + committed.index_add_unchecked(1) + }; + // Synchronizes with `Acquire` ordering on the stealer side. + blk.committed.store(committed_new, Release); + #[cfg(feature = "stats")] + self.queue.stats.increment_enqueued(1); + return Ok(()); + } + + /* slow path, move to the next block */ + let nblk = unsafe { &*blk.next() }; + let next = committed.next_version(nblk.is_head()); + + /* check if next block is ready */ + if !self.is_next_block_writable(nblk, next.version()) { + return Err(t); + }; + + /* reset cursor and advance block */ + nblk.committed.store(next, Relaxed); + nblk.stolen.store(next, Relaxed); + // Ensures the writes to `committed` and `stolen` are visible when `reserved` is loaded. + nblk.reserved.store(next, Release); + *self.pcache = nblk; + } + } + + /// Enqueue a batch of items without capacity checks + /// + /// # Safety + /// + /// The caller must ensure that the queue has sufficient remaining capacity + /// to enqueue all items in the iterator. + pub(crate) unsafe fn enqueue_batch_unchecked( + &mut self, + mut iter: Box + '_>, + ) { + loop { + if iter.len() == 0 { + return; + } + // SAFETY: `pcache` always points to a valid `Block` in the queue. We never create a mutable reference + // to a Block, so it is safe to construct a shared reference here. + let blk = unsafe { &**self.pcache }; + + // Load the index of the next free queue entry for the producer. `committed` is only written to by the + // single producer, so `Relaxed` reading is fine. + let committed = blk.committed.load(Relaxed); + let start_idx = committed.raw_index(); + let end = core::cmp::min(ENTRIES_PER_BLOCK, start_idx.wrapping_add(iter.len())); + if start_idx < ENTRIES_PER_BLOCK { + let count = end - start_idx; + for idx in start_idx..end { + // Fastpath (the block is not full): Due to the slowpath checks we know that the entire remaining block + // is available to the producer and do not need to check the consumed index in the + // fastpath. + let entry = iter.next().expect("Iterator magically lost an item"); + blk.entries[idx].with_mut(|uninit_entry| unsafe { + uninit_entry.write(MaybeUninit::new(entry)) + }); + } + // SAFETY: We checked that the addition will not overflow beyond ENTRIES_PER_BLOCK + let new_committed = unsafe { committed.index_add_unchecked(count) }; + blk.committed.store(new_committed, Release); + #[cfg(feature = "stats")] + self.queue.stats.increment_enqueued(count); + if end < ENTRIES_PER_BLOCK { + continue; + } + } + /* slow path, move to the next block */ + let nblk = unsafe { &*blk.next() }; + let next = committed.next_version(nblk.is_head()); + + // The caller promises they already confirmed the next block is ready, so we only + // debug assert. + debug_assert!( + self.is_next_block_writable(nblk, next.version()), + "Precondition of unchecked enqueue function violated." + ); + + /* reset cursor and advance block */ + nblk.committed.store(next, Relaxed); + nblk.stolen.store(next, Relaxed); + // The changes to `committed` and `stolen` must be visible when reserved is changed. + nblk.reserved.store(next, Release); + *self.pcache = nblk; + } + } + /// true if the next block is ready for the producer to start writing. + fn is_next_block_writable( + &self, + next_blk: &Block, + next_block_version: usize, + ) -> bool { + let expected_version = next_block_version.wrapping_sub(1); + let consumed = next_blk.consumed.load(Relaxed); + let is_consumed = consumed.index().is_full() && expected_version == consumed.version(); + + // The next block must be already _fully_ consumed, since we do not want to checked the `consumed` index + // in the enqueue fastpath! + if !is_consumed { + return false; + } + // The producer must wait until the next block has no active stealers. + let stolen = next_blk.stolen.load(Acquire); + if !stolen.index().is_full() || stolen.version() != expected_version { + return false; + } + true + } +} + +impl + Owner +{ + /// Try to dequeue the oldest element in the queue. + #[inline(always)] + pub(crate) fn dequeue(&mut self) -> Option { + let (blk, consumed) = self.get_consumer_block()?; + + // We trust that the correct index is passed to us here. + let entry_cell = &blk.entries[consumed.raw_index()]; + // SAFETY: We know there is an entry to dequeue, so we know the entry is a valid initialized `E`. + let item = unsafe { entry_cell.with(|entry| entry.read().assume_init()) }; + // SAFETY: We already checked that `consumed_idx < ENTRIES_PER_BLOCK`. + let new_consumed = unsafe { consumed.index_add_unchecked(1) }; + blk.consumed.store(new_consumed, Relaxed); + #[cfg(feature = "stats")] + self.queue.stats.increment_dequeued(1); + Some(item) + } + + /// Try to dequeue all remaining committed entries in the current block. + pub(crate) fn dequeue_block(&mut self) -> Option> { + let (blk, consumed) = self.get_consumer_block()?; + + let committed = blk.committed.load(Relaxed); + + // We are claiming the tasks **before** reading them out of the buffer. + // This is safe because only the **current** thread is able to push new + // tasks. + // + // There isn't really any need for memory ordering... Relaxed would + // work. This is because all tasks are pushed into the queue from the + // current thread (or memory has been acquired if the local queue handle + // moved). + blk.consumed.store(committed, Relaxed); + + Some(BlockIter { + buffer: &blk.entries, + i: consumed.raw_index(), + committed: committed.raw_index(), + }) + } + + // returns true on success, false when advancing not possible. + fn try_advance_consumer_block( + &mut self, + next_block: &Block, + curr_consumed: IndexAndVersion, + ) -> bool { + let next_cons_vsn = curr_consumed + .version() + .wrapping_add(next_block.is_head() as usize); + + // The reserved field is updated last in `enqueue()`. It is only updated by the producer + // (`Owner`), so `Relaxed` is sufficient. If the actual reserved version is not equal to the + // expected next consumer version, then the producer has not advanced to the next block yet + // and we must wait. + let next_reserved_vsn = next_block.reserved.load(Relaxed).version(); + if next_reserved_vsn != next_cons_vsn { + debug_assert!(next_reserved_vsn == next_cons_vsn.wrapping_sub(1)); + return false; + } + + /* stop stealers */ + let reserved_new = IndexAndVersion::new(next_cons_vsn, Index::full()); + // todo: Why can this be Relaxed? + let reserved_old = next_block.reserved.swap(reserved_new, Relaxed); + debug_assert_eq!(reserved_old.version(), next_cons_vsn); + let reserved_old_idx = reserved_old.raw_index(); + + // Number of entries that can't be stolen anymore because we stopped stealing. + let num_consumer_owned = ENTRIES_PER_BLOCK.saturating_sub(reserved_old_idx); + // Increase `stolen`, by the number of entries that can't be stolen anymore and are now up to the + // consumer to deqeuue. This ensures that, once the stealers have finished stealing the already reserved + // entries, `nblk.stolen == ENTRIES_PER_BLOCK` holds, i.e. this block is marked as having no active + // stealers, which will allow the producer to the enter this block again (in the next round). + next_block.stolen.fetch_add(num_consumer_owned, Relaxed); + + /* advance the block and try again */ + // The consumer must skip already reserved entries. + next_block.consumed.store(reserved_old, Relaxed); + *self.ccache = next_block; + true + } + + /// Advance consumer to the next block, unless the producer has not reached the block yet. + fn can_advance_consumer_block( + &self, + next_block: &Block, + curr_consumed: IndexAndVersion, + ) -> bool { + let next_cons_vsn = curr_consumed + .version() + .wrapping_add(next_block.is_head() as usize); + // The reserved field is updated last in `enqueue()`. It is only updated by the producer + // (`Owner`), so `Relaxed` is sufficient. If the actual reserved version is not equal to the + // expected next consumer version, then the producer has not advanced to the next block yet + // and we must wait. + let next_reserved_vsn = next_block.reserved.load(Relaxed).version(); + if next_reserved_vsn != next_cons_vsn { + debug_assert!(next_reserved_vsn == next_cons_vsn.wrapping_sub(1)); + return false; + } + true + } + + /// Check if there are any entries in the next block that are currently being stolen. + pub(crate) fn next_block_has_stealers(&self) -> bool { + // SAFETY: `pcache` always points to a valid `Block` in the queue. We never create a mutable reference + // to a Block, so it is safe to construct a shared reference here. + let blk = unsafe { &**self.pcache }; + let reserved = blk.reserved.load(Relaxed); + let stolen = blk.stolen.load(Relaxed); + // If reserved and stolen don't match then there is still an active stealer in the block. + stolen != reserved + } + + /// Check if there are entries that can be stolen from the queue. + /// + /// Note that stealing may still fail for a number of reasons even if this function returned true + pub(crate) fn has_stealable_entries(&self) -> bool { + // If the consumer is not on the same block as the producer, then there + // is at least the producer block that can be stolen from. + if self.pcache == self.ccache { + return false; + } + + // SAFETY: `pcache` always points to a valid `Block` in the queue. We never create a mutable reference + // to a Block, so it is safe to construct a shared reference here. + let blk = unsafe { &**self.pcache }; + let committed = blk.committed.load(Relaxed); + // Probably we could get away with Relaxed here too, since a false positive + // isn't that bad since users anyway have to expect concurrent stealing. + let reserved = blk.reserved.load(Acquire); + // For the current FIFO stealing policy it is sufficient to only check + // if all items in the producer block have been reserved yet. + reserved != committed + } + + /// Returns the minimum amount of slots that are free and can be enqueued. + /// + /// For performance reasons this implementation will only check the current and next + /// block. This is sufficient for stealing policies which steal at most 1 block at a + /// time. + pub(crate) fn min_remaining_slots(&self) -> usize { + // SAFETY: self.pcache always points to a valid Block. + let current_block: &Block = unsafe { &*(*self.pcache) }; + // The committed field is only updated by the owner. + let committed_idx = current_block.committed.load(Relaxed).raw_index(); + // Free slots in the current block ( 0 <= free_slots <= ENTRIES_PER_BLOCK) + let mut free_slots = ENTRIES_PER_BLOCK - committed_idx; + + // SAFETY: The next pointer is always valid + let next_block = unsafe { &*current_block.next() }; + let committed = next_block.committed.load(Relaxed); + if self.is_next_block_writable(next_block, committed.version()) { + free_slots += ENTRIES_PER_BLOCK; + } + free_slots + } + + /// `true` if there is at least one entry that can be dequeued. + /// + /// It is possible that a dequeue can still fail, since the item was stolen after we checked + /// and before the consumer advanced to the block in question. + pub(crate) fn can_consume(&self) -> bool { + // SAFETY: `ccache` always points to a valid `Block` in the queue. We never create a mutable reference + // to a Block, so it is safe to construct a shared reference here. + let current_blk_cache = unsafe { &**self.ccache }; + let mut blk = current_blk_cache; + for _ in 0..NUM_BLOCKS + 1 { + // check if the block is fully consumed already + let consumed = blk.consumed.load(Relaxed); + let consumed_idx = consumed.raw_index(); + + // Fastpath (Block is not fully consumed yet) + if consumed_idx < ENTRIES_PER_BLOCK { + // we know the block is not full, but we must first check if there is an entry to + // dequeue. + let committed_idx = blk.committed.load(Relaxed).raw_index(); + if consumed_idx == committed_idx { + return false; + } + + /* There is an entry to dequeue */ + return true; + } + + /* Slow-path */ + + /* Consumer head may never pass the Producer head and Consumer/Stealer tail */ + let nblk = unsafe { &*blk.next() }; + if self.can_advance_consumer_block(nblk, consumed) { + blk = nblk; + } else { + return false; + } + /* We advanced to the next block - loop around and try again */ + } + // Since there is no concurrent enqueuing and the buffer is bounded, we should reach + // one of the exit conditions in at most NUM_BLOCKS iterations. + unreachable!() + } + + fn get_consumer_block( + &mut self, + ) -> Option<( + &Block, + IndexAndVersion, + )> { + // SAFETY: `ccache` always points to a valid `Block` in the queue. We never create a mutable reference + // to a Block, so it is safe to construct a shared reference here. + let current_blk_cache = unsafe { &**self.ccache }; + let mut blk = current_blk_cache; + // The +1 is necessary to advance again to our original starting block, this time with a + // new version. This can happen in the edge-case that all items in the queue where stolen. + for _ in 0..NUM_BLOCKS + 1 { + // check if the block is fully consumed already + let consumed = blk.consumed.load(Relaxed); + let consumed_idx = consumed.raw_index(); + + // Fastpath (Block is not fully consumed yet) + if consumed_idx < ENTRIES_PER_BLOCK { + // we know the block is not full, but we must first check if there is an entry to + // dequeue. + let committed_idx = blk.committed.load(Relaxed).raw_index(); + if consumed_idx == committed_idx { + return None; + } + + /* There is an entry to dequeue */ + return Some((blk, consumed)); + } + + /* Slow-path */ + + /* Consumer head may never pass the Producer head and Consumer/Stealer tail */ + let nblk = unsafe { &*blk.next() }; + if self.try_advance_consumer_block(nblk, consumed) { + blk = nblk; + } else { + return None; + } + /* We advanced to the next block - loop around and try again */ + } + // Since there is no concurrent enqueuing and the buffer is bounded, we should reach + // one of the exit conditions in at most NUM_BLOCKS+1 iterations. + unreachable!() + } +} + +impl Clone + for Stealer +{ + fn clone(&self) -> Self { + Self { + spos: self.spos.clone(), + queue: self.queue.clone(), + } + } +} + +impl + Stealer +{ + /// Try to steal a single item from the queue + #[inline] + pub(crate) fn steal(&self) -> Option { + loop { + let (blk, curr_spos) = self.curr_block(); + + /* check if the block is fully reserved */ + let reserved = blk.reserved.load(Acquire); + let reserved_idx = reserved.raw_index(); + + if reserved_idx < ENTRIES_PER_BLOCK { + /* check if we have an entry to occupy */ + let committed = blk.committed.load(Acquire); + let committed_idx = committed.raw_index(); + if reserved_idx == committed_idx { + return None; + } + // SAFETY: We checked before that `reserved_idx` < ENTRIES_PER_BLOCK, so the index + // can't overflow. + let new_reserved = unsafe { reserved.index_add_unchecked(1) }; + let _ = blk + .reserved + .compare_exchange_weak(reserved, new_reserved, Release, Relaxed) + .ok()?; + + /* we got the entry */ + + #[cfg(feature = "stats")] + self.queue.stats.increment_stolen(1); + + // SAFETY: We know the entry is a valid and initialized `E` and is now exclusively owned by us. + let t = + unsafe { blk.entries[reserved_idx].with(|entry| entry.read().assume_init()) }; + // `t` is now owned by us so we mark the stealing as finished. Synchronizes with the Owner Acquire. + let old_stolen = blk.stolen.fetch_add(1, Release); + debug_assert!(old_stolen.raw_index() < ENTRIES_PER_BLOCK); + return Some(t); + } + + // Slow-path: The current block is already fully reserved. Try to advance to the next block + if !self.can_advance(blk, reserved) { + return None; + } + self.try_advance_spos(curr_spos); + } + } + + /// Get the current stealer `Block` and the corresponding stealer position (`spos`) + /// + /// The returned `spos` can be larger than `NUM_BLOCKS` to detect [ABA](https://en.wikipedia.org/wiki/ABA_problem) + /// situations. + fn curr_block(&self) -> (&Block, usize) { + let curr_spos = self.spos.load(Relaxed); + // spos increments beyond NUM_BLOCKS to prevent ABA problems. + let block_idx = curr_spos % NUM_BLOCKS; + let blk: &Block = &self.queue.blocks[block_idx]; + (blk, curr_spos) + } + + /// Try to steal a block from `self`. + /// + /// Tries to steal a full block from `self`. If the block is not fully + /// committed yet it will steal up to and including the last committed entry + /// of that block. + #[inline] + pub(crate) fn steal_block(&self) -> Option> { + loop { + let (blk, curr_spos) = self.curr_block(); + + /* check if the block is fully reserved */ + let reserved = blk.reserved.load(Acquire); + let reserved_idx = reserved.raw_index(); + + if reserved_idx < ENTRIES_PER_BLOCK { + /* check if we have an entry to occupy */ + let committed = blk.committed.load(Acquire); + let committed_idx = committed.raw_index(); + if reserved_idx == committed_idx { + return None; + } + + // Try to steal the block up to the latest committed entry + let reserve_res = blk + .reserved + .compare_exchange_weak(reserved, committed, Release, Relaxed); + + if reserve_res.is_err() { + return None; + } + + let num_reserved = committed_idx - reserved_idx; + // From the statistics perspective we consider the reserved range to already be + // stolen, since it is not available for the consumer or other stealers anymore. + #[cfg(feature = "stats")] + self.queue.stats.increment_stolen(num_reserved); + return Some(StealerBlockIter { + stealer_block: blk, + block_reserved: committed_idx, + i: reserved_idx, + num_reserved, + }); + } + + // Slow-path: The current block is already fully reserved. Try to advance to next block + if !self.can_advance(blk, reserved) { + return None; + } + self.try_advance_spos(curr_spos); + } + } + + /// `true` if there is at least one entry that can be stolen + /// + /// Note: Calling this function is expensive! Try to avoid it. + /// This is intended solely for the case where the worker owning the queue + /// is parked, and we need to check if there is work remaining in their queue. + pub(crate) fn is_empty(&self) -> bool { + let spos = self.spos.load(Acquire); + for i in 0..NUM_BLOCKS + 1 { + let blk = &self.queue.blocks[spos.wrapping_add(i) % NUM_BLOCKS]; + // check if the block is fully consumed already + let consumed = blk.reserved.load(Acquire); + let consumed_idx = consumed.raw_index(); + + // Fastpath (Block is not fully consumed yet) + if consumed_idx < ENTRIES_PER_BLOCK { + let committed_idx = blk.committed.load(Acquire).raw_index(); + if consumed_idx == committed_idx { + return true; + } + + /* There is an entry to dequeue */ + return false; + } + + /* Advance to next block */ + + if !self.can_advance(blk, consumed) { + return true; + } + } + // Since there is no concurrent enqueuing and the buffer is bounded, we should reach + // one of the exit conditions in at most NUM_BLOCKS iterations. + unreachable!() + } + + /// True if the stealer can advance to the next block + fn can_advance( + &self, + curr_block: &Block, + curr_reserved: IndexAndVersion, + ) -> bool { + /* r_head never pass the w_head and r_tail */ + let nblk = unsafe { &*curr_block.next() }; + let next_expect_vsn = curr_reserved.version() + nblk.is_head() as usize; + let next_actual_vsn = nblk.reserved.load(Relaxed).version(); + next_expect_vsn == next_actual_vsn + } + + /// Try and advance `spos` to the next block. + /// + /// We are not interested in the failure case, since the next stealer can just try again. + fn try_advance_spos(&self, curr_spos: usize) { + // Ignore result. Failure means a different stealer succeeded in updating + // the stealer block index. In case of a sporadic failure the next stealer will try again. + let _ = + self.spos + .compare_exchange_weak(curr_spos, curr_spos.wrapping_add(1), Relaxed, Relaxed); + } + + /// The estimated number of entries currently enqueued. + #[cfg(feature = "stats")] + pub(crate) fn estimated_queue_entries(&self) -> usize { + self.queue.estimated_len() + } +} + +impl<'a, E, const ENTRIES_PER_BLOCK: usize> Iterator for BlockIter<'a, E, ENTRIES_PER_BLOCK> { + type Item = E; + + #[inline] + fn next(&mut self) -> Option { + let i = self.i; + self.i += 1; + if i < self.committed { + self.buffer.get(i).map(|entry_cell| { + entry_cell.with(|entry| { + // SAFETY: we claimed the entries + unsafe { entry.read().assume_init() } + }) + }) + } else { + None + } + } +} + +impl<'a, E, const ENTRIES_PER_BLOCK: usize> Iterator + for StealerBlockIter<'a, E, ENTRIES_PER_BLOCK> +{ + type Item = E; + + #[inline] + fn next(&mut self) -> Option { + if self.i < self.block_reserved { + let entry = self.stealer_block.entries[self.i].with(|entry| { + // SAFETY: we claimed the entries + unsafe { entry.read().assume_init() } + }); + self.i += 1; + Some(entry) + } else { + None + } + } + + fn size_hint(&self) -> (usize, Option) { + let len = self.len(); + (len, Some(len)) + } +} + +impl<'a, E, const ENTRIES_PER_BLOCK: usize> ExactSizeIterator + for StealerBlockIter<'a, E, ENTRIES_PER_BLOCK> +{ +} + +impl<'a, E, const ENTRIES_PER_BLOCK: usize> Drop for StealerBlockIter<'a, E, ENTRIES_PER_BLOCK> { + fn drop(&mut self) { + // Ensure `Drop` is called on any items that where not consumed, by consuming the iterator, + // which implicitly dequeues all items + while self.next().is_some() {} + self.stealer_block + .stolen + .fetch_add(self.num_reserved, Release); + } +} + +impl<'a, E, const ENTRIES_PER_BLOCK: usize> StealerBlockIter<'a, E, ENTRIES_PER_BLOCK> { + pub(crate) fn len(&self) -> usize { + self.block_reserved - self.i + } +} + +impl<'a, E, const ENTRIES_PER_BLOCK: usize> core::fmt::Debug + for StealerBlockIter<'a, E, ENTRIES_PER_BLOCK> +{ + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.write_fmt(format_args!( + "StealerBlockIter over {} entries", + self.block_reserved - self.i + )) + } +} + +/// Create a new BWoS queue and return the [Owner] and a [Stealer] instance +/// +/// `NUM_BLOCKS` must be a power two and at least 2. `ENTRIES_PER_BLOCK` can be freely chosen (non-zero). +/// The total length of the queue is `NUM_BLOCKS * ENTRIES_PER_BLOCK` and must not be more than `usize::MAX`. +/// +/// ## Performance considerations +/// +/// The Owner throughput will improve with a larger `ENTRIES_PER_BLOCK` value. +/// Thieves however will prefer a higher `NUM_BLOCKS` count since it makes it easier to +/// steal a whole block. +pub(crate) fn new() -> ( + Owner, + Stealer, +) { + assert!(NUM_BLOCKS.checked_mul(ENTRIES_PER_BLOCK).is_some()); + assert!(NUM_BLOCKS.is_power_of_two()); + assert!(NUM_BLOCKS >= 1); + assert!(ENTRIES_PER_BLOCK >= 1); + + let q: Pin>> = BwsQueue::new(); + let first_block = &q.blocks[0]; + + let stealer_position = Arc::new(AtomicUsize::new(0)); + + ( + Owner { + pcache: CachePadded::new(first_block), + ccache: CachePadded::new(first_block), + queue: q.clone(), + }, + Stealer { + spos: CachePadded::new(stealer_position), + queue: q, + }, + ) +} diff --git a/tokio/src/runtime/scheduler/multi_thread/queue/tokioq.rs b/tokio/src/runtime/scheduler/multi_thread/queue/tokioq.rs new file mode 100644 index 00000000000..a3a66e8ef85 --- /dev/null +++ b/tokio/src/runtime/scheduler/multi_thread/queue/tokioq.rs @@ -0,0 +1,752 @@ +//! Run-queue structures based on the golang run-queue to support a work-stealing scheduler + +use crate::loom::cell::UnsafeCell; +use crate::loom::sync::Arc; +use crate::runtime::task::{self, Inject, Notified}; +use crate::runtime::MetricsBatch; + +use crate::runtime::scheduler::multi_thread::queue::{Owner, Stealer}; +use std::mem::{self, MaybeUninit}; +use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release}; +use std::{cmp, ptr}; + +// Use wider integers when possible to increase ABA resilience. +// +// See issue #5041: . +cfg_has_atomic_u64! { + type UnsignedShort = u32; + type UnsignedLong = u64; + type AtomicUnsignedShort = crate::loom::sync::atomic::AtomicU32; + type AtomicUnsignedLong = crate::loom::sync::atomic::AtomicU64; +} +cfg_not_has_atomic_u64! { + type UnsignedShort = u16; + type UnsignedLong = u32; + type AtomicUnsignedShort = crate::loom::sync::atomic::AtomicU16; + type AtomicUnsignedLong = crate::loom::sync::atomic::AtomicU32; +} + +/// Producer handle. May only be used from a single thread. +pub(crate) struct Local { + inner: Arc>, +} + +/// Consumer handle. May be used from many threads. +pub(crate) struct Steal(Arc>); + +pub(crate) struct Inner { + /// Concurrently updated by many threads. + /// + /// Contains two `UnsignedShort` values. The LSB byte is the "real" head of + /// the queue. The `UnsignedShort` in the MSB is set by a stealer in process + /// of stealing values. It represents the first value being stolen in the + /// batch. The `UnsignedShort` indices are intentionally wider than strictly + /// required for buffer indexing in order to provide ABA mitigation and make + /// it possible to distinguish between full and empty buffers. + /// + /// When both `UnsignedShort` values are the same, there is no active + /// stealer. + /// + /// Tracking an in-progress stealer prevents a wrapping scenario. + head: AtomicUnsignedLong, + + /// Only updated by producer thread but read by many threads. + tail: AtomicUnsignedShort, + + /// Elements + buffer: Box<[UnsafeCell>>; LOCAL_QUEUE_CAPACITY]>, +} + +unsafe impl Send for Inner {} +unsafe impl Sync for Inner {} + +#[cfg(not(loom))] +const LOCAL_QUEUE_CAPACITY: usize = 256; + +// Shrink the size of the local queue when using loom. This shouldn't impact +// logic, but allows loom to test more edge cases in a reasonable a mount of +// time. +#[cfg(loom)] +const LOCAL_QUEUE_CAPACITY: usize = 4; + +const MASK: usize = LOCAL_QUEUE_CAPACITY - 1; + +// Constructing the fixed size array directly is very awkward. The only way to +// do it is to repeat `UnsafeCell::new(MaybeUninit::uninit())` 256 times, as +// the contents are not Copy. The trick with defining a const doesn't work for +// generic types. +fn make_fixed_size(buffer: Box<[T]>) -> Box<[T; LOCAL_QUEUE_CAPACITY]> { + assert_eq!(buffer.len(), LOCAL_QUEUE_CAPACITY); + + // safety: We check that the length is correct. + unsafe { Box::from_raw(Box::into_raw(buffer).cast()) } +} + +#[clippy::has_significant_drop] +struct StealerIterator<'a, T: 'static> { + stealer: &'a Steal, + // `tail` index of the stealer in the queue. Should not change + stolen_tail: UnsignedShort, + /// current position in the iterator + cur_pos: UnsignedShort, + /// Head of the stealer (one element past the last reserved item) + head: UnsignedShort, +} + +impl<'a, T> Iterator for StealerIterator<'a, T> { + type Item = task::Notified; + + fn next(&mut self) -> Option { + // tail will always be behind head, but head could have wrapped around already, + // so calculate `new_tail` before comparing with head. + let new_tail = self.stolen_tail.wrapping_add(self.cur_pos); + if new_tail < self.head { + let idx = (new_tail as usize) & MASK; + let task = self.stealer.0.buffer[idx].with(|ptr| unsafe { ptr::read((*ptr).as_ptr()) }); + self.cur_pos += 1; + Some(task) + } else { + None + } + } + + fn size_hint(&self) -> (usize, Option) { + let new_tail = self.stolen_tail.wrapping_add(self.cur_pos); + let len = self.head.saturating_sub(new_tail).try_into().unwrap(); + (len, Some(len)) + } +} + +impl<'a, T> ExactSizeIterator for StealerIterator<'a, T> {} + +impl<'a, T> Drop for StealerIterator<'a, T> { + fn drop(&mut self) { + debug_assert_eq!(self.len(), 0); + // This is the value of head if no further enqueues happened concurrently. + let mut prev_packed = pack(self.stolen_tail, self.head); + + let mut new_real = self.head; + + // Update `head_steal` to match `head_real` signalling that the + // stealing routine is complete. + loop { + let next_packed = pack(new_real, new_real); + + let res = + self.stealer + .0 + .head + .compare_exchange(prev_packed, next_packed, AcqRel, Acquire); + + match res { + Ok(_) => return, + Err(actual) => { + let (actual_steal, actual_real) = unpack(actual); + + assert_ne!(actual_steal, actual_real); + // We don't concurrently steal, so the actual steal shouldn't have changed. + debug_assert_eq!(self.stolen_tail, actual_steal); + prev_packed = actual; + new_real = actual_real; + } + } + } + } +} + +/// Create a new local run-queue +pub(crate) fn local() -> ( + Box + Send + Sync>, + Box + Send + Sync>, +) { + let mut buffer = Vec::with_capacity(LOCAL_QUEUE_CAPACITY); + + for _ in 0..LOCAL_QUEUE_CAPACITY { + buffer.push(UnsafeCell::new(MaybeUninit::uninit())); + } + + let inner = Arc::new(Inner { + head: AtomicUnsignedLong::new(0), + tail: AtomicUnsignedShort::new(0), + buffer: make_fixed_size(buffer.into_boxed_slice()), + }); + + let local = Local { + inner: inner.clone(), + }; + + let remote = Steal(inner); + + ( + Box::new(remote) as Box + Send + Sync>, + Box::new(local) as Box + Send + Sync>, + ) +} + +impl Local { + /// Moves a batch of tasks into the inject queue. + /// + /// This will temporarily make some of the tasks unavailable to stealers. + /// Once `push_overflow` is done, a notification is sent out, so if other + /// workers "missed" some of the tasks during a steal, they will get + /// another opportunity. + #[inline(never)] + fn push_overflow( + &mut self, + task: task::Notified, + head: UnsignedShort, + tail: UnsignedShort, + inject: &Inject, + metrics: &mut MetricsBatch, + ) -> Result<(), task::Notified> { + /// How many elements are we taking from the local queue. + /// + /// This is one less than the number of tasks pushed to the inject + /// queue as we are also inserting the `task` argument. + const NUM_TASKS_TAKEN: UnsignedShort = (LOCAL_QUEUE_CAPACITY / 2) as UnsignedShort; + + assert_eq!( + tail.wrapping_sub(head) as usize, + LOCAL_QUEUE_CAPACITY, + "queue is not full; tail = {}; head = {}", + tail, + head + ); + + let prev = pack(head, head); + + // Claim a bunch of tasks + // + // We are claiming the tasks **before** reading them out of the buffer. + // This is safe because only the **current** thread is able to push new + // tasks. + // + // There isn't really any need for memory ordering... Relaxed would + // work. This is because all tasks are pushed into the queue from the + // current thread (or memory has been acquired if the local queue handle + // moved). + if self + .inner + .head + .compare_exchange( + prev, + pack( + head.wrapping_add(NUM_TASKS_TAKEN), + head.wrapping_add(NUM_TASKS_TAKEN), + ), + Release, + Relaxed, + ) + .is_err() + { + // We failed to claim the tasks, losing the race. Return out of + // this function and try the full `push` routine again. The queue + // may not be full anymore. + return Err(task); + } + + /// An iterator that takes elements out of the run queue. + struct BatchTaskIter<'a, T: 'static> { + buffer: &'a [UnsafeCell>>; LOCAL_QUEUE_CAPACITY], + head: UnsignedLong, + i: UnsignedLong, + } + impl<'a, T: 'static> Iterator for BatchTaskIter<'a, T> { + type Item = task::Notified; + + #[inline] + fn next(&mut self) -> Option> { + if self.i == UnsignedLong::from(NUM_TASKS_TAKEN) { + None + } else { + let i_idx = self.i.wrapping_add(self.head) as usize & MASK; + let slot = &self.buffer[i_idx]; + + // safety: Our CAS from before has assumed exclusive ownership + // of the task pointers in this range. + let task = slot.with(|ptr| unsafe { ptr::read((*ptr).as_ptr()) }); + + self.i += 1; + Some(task) + } + } + } + + // safety: The CAS above ensures that no consumer will look at these + // values again, and we are the only producer. + let batch_iter = BatchTaskIter { + buffer: &self.inner.buffer, + head: head as UnsignedLong, + i: 0, + }; + inject.push_batch(batch_iter.chain(std::iter::once(task))); + + // Add 1 to factor in the task currently being scheduled. + metrics.incr_overflow_count(); + + Ok(()) + } +} + +impl super::Owner for Local { + /// Returns true if the queue has entries that can be stolen. + fn is_stealable(&self) -> bool { + !self.inner.is_empty() + } + + fn max_capacity(&self) -> usize { + LOCAL_QUEUE_CAPACITY + } + + /// Returns false if there are any entries in the queue + /// + /// Separate to is_stealable so that refactors of is_stealable to "protect" + /// some tasks from stealing won't affect this + fn has_tasks(&self) -> bool { + !self.inner.is_empty() + } + + /// Pushes a task to the back of the local queue, skipping the LIFO slot. + fn push_back_or_overflow( + &mut self, + mut task: task::Notified, + inject: &Inject, + metrics: &mut MetricsBatch, + ) { + let tail = loop { + let head = self.inner.head.load(Acquire); + let (steal, real) = unpack(head); + + // safety: this is the **only** thread that updates this cell. + let tail = unsafe { self.inner.tail.unsync_load() }; + + if tail.wrapping_sub(steal) < LOCAL_QUEUE_CAPACITY as UnsignedShort { + // There is capacity for the task + break tail; + } else if steal != real { + // Concurrently stealing, this will free up capacity, so only + // push the task onto the inject queue + inject.push(task); + return; + } else { + // Push the current task and half of the queue into the + // inject queue. + match self.push_overflow(task, real, tail, inject, metrics) { + Ok(_) => return, + // Lost the race, try again + Err(v) => { + task = v; + } + } + } + }; + + // Map the position to a slot index. + let idx = tail as usize & MASK; + + self.inner.buffer[idx].with_mut(|ptr| { + // Write the task to the slot + // + // Safety: There is only one producer and the above `if` + // condition ensures we don't touch a cell if there is a + // value, thus no consumer. + unsafe { + ptr::write((*ptr).as_mut_ptr(), task); + } + }); + + // Make the task available. Synchronizes with a load in + // `steal_into2`. + self.inner.tail.store(tail.wrapping_add(1), Release); + } + + // fn len(&self) -> usize { + // // Safety: We own the queue and thus are the only ones that could potentially mutate + // // `inner.tail`. + // let dst_tail = unsafe { self.inner.tail.unsync_load() }; + // + // // To the caller, `dst` may **look** empty but still have values + // // contained in the buffer. If another thread is concurrently stealing + // // from `dst` there may not be enough capacity to steal. + // let (steal, real_head) = unpack(self.inner.head.load(Acquire)); + // } + + #[deny(unsafe_op_in_unsafe_fn)] + unsafe fn push_back_unchecked( + &mut self, + tasks: Box> + '_>, + ) { + // Safety: this is the **only** thread that updates this cell. + let tail = unsafe { self.inner.tail.unsync_load() }; + let len = tasks.len(); + for (i, task) in tasks.enumerate() { + let idx = (tail as usize).wrapping_add(i) & MASK; + // Write the task to the new slot + // + // Safety: We are the queue Owner and the caller assures the queue has sufficient capacity. + self.inner.buffer[idx].with_mut(|ptr| unsafe { ptr::write((*ptr).as_mut_ptr(), task) }); + } + + // Make the stolen items available to consumers + self.inner + .tail + .store((tail as usize).wrapping_add(len) as UnsignedShort, Release); + } + + /// Returns a tuple with the lower bound and an Option for the upper bound of remaining + /// slots for enqueuing in the queue. + fn remaining_slots_hint(&self) -> (u16, Option) { + // Safety: We own the queue and thus are the only ones that could potentially mutate + // `inner.tail`. + let tail = unsafe { self.inner.tail.unsync_load() }; + + // To the caller, `dst` may **look** empty but still have values + // contained in the buffer. If another thread is concurrently stealing + // from `dst` there may not be enough capacity to steal. + let (steal, _real_head) = unpack(self.inner.head.load(Acquire)); + + // `steal` is behind `real_head` when there is an in-progress steal, otherwise it is + // equal to `real_head`. `tail` - `steal` is the amount of queue slots currently used. + // `tail` is always larger then `steal`, since the counter is monotonically increasing, + // at least until it wraps around at `UnsignedShort::MAX`. wrapping_sub always gives the + // correct difference. + let capacity = LOCAL_QUEUE_CAPACITY as UnsignedShort - (tail.wrapping_sub(steal)); + (capacity as u16, Some(capacity as u16)) + } + + /// Pops a task from the local queue. + fn pop(&mut self) -> Option> { + let mut head = self.inner.head.load(Acquire); + + let idx = loop { + let (steal, real) = unpack(head); + + // safety: this is the **only** thread that updates this cell. + let tail = unsafe { self.inner.tail.unsync_load() }; + + if real == tail { + // queue is empty + return None; + } + + let next_real = real.wrapping_add(1); + + // If `steal == real` there are no concurrent stealers. Both `steal` + // and `real` are updated. + let next = if steal == real { + pack(next_real, next_real) + } else { + assert_ne!(steal, next_real); + pack(steal, next_real) + }; + + // Attempt to claim a task. + let res = self + .inner + .head + .compare_exchange(head, next, AcqRel, Acquire); + + match res { + Ok(_) => break real as usize & MASK, + Err(actual) => head = actual, + } + }; + + Some(self.inner.buffer[idx].with(|ptr| unsafe { ptr::read(ptr).assume_init() })) + } + + fn push_back(&mut self, tasks: Box> + '_>) { + let len = tasks.len(); + assert!(len <= LOCAL_QUEUE_CAPACITY); + + if len == 0 { + // Nothing to do + return; + } + + let head = self.inner.head.load(Acquire); + let (steal, _) = unpack(head); + + // safety: this is the **only** thread that updates this cell. + let mut tail = unsafe { self.inner.tail.unsync_load() }; + + if tail.wrapping_sub(steal) <= (LOCAL_QUEUE_CAPACITY - len) as UnsignedShort { + // Yes, this if condition is structured a bit weird (first block + // does nothing, second returns an error). It is this way to match + // `push_back_or_overflow`. + } else { + panic!() + } + + for task in tasks { + let idx = tail as usize & MASK; + + self.inner.buffer[idx].with_mut(|ptr| { + // Write the task to the slot + // + // Safety: There is only one producer and the above `if` + // condition ensures we don't touch a cell if there is a + // value, thus no consumer. + unsafe { + ptr::write((*ptr).as_mut_ptr(), task); + } + }); + + tail = tail.wrapping_add(1); + } + + self.inner.tail.store(tail, Release); + } +} + +impl super::Stealer for Steal { + fn is_empty(&self) -> bool { + self.0.is_empty() + } + + /// Steals half the tasks from self and place them into `dst`. + fn steal_into( + &self, + dst: &mut dyn Owner, + dst_metrics: &mut MetricsBatch, + ) -> Option> { + // todo: add a size_hint function similiar like ExactSizeItrator + let (min_remaining_slots, _) = dst.remaining_slots_hint(); + + if min_remaining_slots == 0 { + // we *could* try to steal to steal 1 to prevent going to idle, + // but it's probably not worth the overhead it causes on the destination + // queue. + return None; + } + + let mut stolen_tasks = self.steal_half_max(min_remaining_slots)?; + debug_assert!(stolen_tasks.len() <= u16::MAX as usize); + dst_metrics.incr_steal_count(stolen_tasks.len() as u16); + dst_metrics.incr_steal_operations(); + + // We take the first task from the iterator to directly return it. + let first = stolen_tasks.next()?; + + if stolen_tasks.len() > 0 { + // Safety: We checked that `dst` has sufficient capacity, and we are the owner + // thread, so the capacity can only have increased in the meantime. + unsafe { dst.push_back_unchecked(Box::new(stolen_tasks)) } + } + + Some(first) + } + + cfg_metrics! { + fn len(&self) -> usize { + self.0.len() as _ + } + } +} + +impl Steal { + /// Steal half of the queues item, but not more than `max`. + fn steal_half_max(&self, max: u16) -> Option> { + let mut prev_packed = self.0.head.load(Acquire); + let mut next_packed; + + let (steal_head, real_head) = loop { + let (src_head_steal, src_head_real) = unpack(prev_packed); + let src_tail = self.0.tail.load(Acquire); + // If these two do not match, another thread is concurrently + // stealing from the queue. + if src_head_steal != src_head_real { + return None; + } + // Number of available tasks to steal + let n = src_tail.wrapping_sub(src_head_real); + let n = n - n / 2; + let n = cmp::min(n, max as UnsignedShort); + + if n == 0 { + // No tasks available to steal + return None; + } + // Update the real head index to acquire the tasks. + let steal_to = src_head_real.wrapping_add(n); + assert_ne!(src_head_steal, steal_to); + next_packed = pack(src_head_steal, steal_to); + + // Claim all those tasks. This is done by incrementing the "real" + // head but not the steal. By doing this, no other thread is able to + // steal from this queue until the current thread completes. + let res = self + .0 + .head + .compare_exchange(prev_packed, next_packed, AcqRel, Acquire); + + match res { + Ok(_) => { + break (src_head_steal, steal_to); + } + Err(actual) => prev_packed = actual, + } + }; + + let n = real_head.wrapping_sub(steal_head); + assert!( + n <= LOCAL_QUEUE_CAPACITY as UnsignedShort / 2, + "actual = {}", + n + ); + + Some(StealerIterator { + stealer: self, + stolen_tail: steal_head, + cur_pos: 0, + head: real_head, + }) + } + + // // Steal tasks from `self`, placing them into `dst`. Returns the number of + // // tasks that were stolen. + // fn steal_into2(&self, dst: &mut Local, dst_tail: UnsignedShort) -> UnsignedShort { + // let mut prev_packed = self.0.head.load(Acquire); + // let mut next_packed; + // + // let n = loop { + // let (src_head_steal, src_head_real) = unpack(prev_packed); + // let src_tail = self.0.tail.load(Acquire); + // + // // If these two do not match, another thread is concurrently + // // stealing from the queue. + // if src_head_steal != src_head_real { + // return 0; + // } + // + // // Number of available tasks to steal + // let n = src_tail.wrapping_sub(src_head_real); + // let n = n - n / 2; + // + // if n == 0 { + // // No tasks available to steal + // return 0; + // } + // + // // Update the real head index to acquire the tasks. + // let steal_to = src_head_real.wrapping_add(n); + // assert_ne!(src_head_steal, steal_to); + // next_packed = pack(src_head_steal, steal_to); + // + // // Claim all those tasks. This is done by incrementing the "real" + // // head but not the steal. By doing this, no other thread is able to + // // steal from this queue until the current thread completes. + // let res = self + // .0 + // .head + // .compare_exchange(prev_packed, next_packed, AcqRel, Acquire); + // + // match res { + // Ok(_) => break n, + // Err(actual) => prev_packed = actual, + // } + // }; + // + // assert!( + // n <= LOCAL_QUEUE_CAPACITY as UnsignedShort / 2, + // "actual = {}", + // n + // ); + // + // let (first, _) = unpack(next_packed); + // + // // Take all the tasks + // for i in 0..n { + // // Compute the positions + // let src_pos = first.wrapping_add(i); + // let dst_pos = dst_tail.wrapping_add(i); + // + // // Map to slots + // let src_idx = src_pos as usize & MASK; + // let dst_idx = dst_pos as usize & MASK; + // + // // Read the task + // // + // // safety: We acquired the task with the atomic exchange above. + // let task = self.0.buffer[src_idx].with(|ptr| unsafe { ptr::read((*ptr).as_ptr()) }); + // + // // Write the task to the new slot + // // + // // safety: `dst` queue is empty and we are the only producer to + // // this queue. + // dst.inner.buffer[dst_idx] + // .with_mut(|ptr| unsafe { ptr::write((*ptr).as_mut_ptr(), task) }); + // } + // + // let mut prev_packed = next_packed; + // + // // Update `src_head_steal` to match `src_head_real` signalling that the + // // stealing routine is complete. + // loop { + // let head = unpack(prev_packed).1; + // next_packed = pack(head, head); + // + // let res = self + // .0 + // .head + // .compare_exchange(prev_packed, next_packed, AcqRel, Acquire); + // + // match res { + // Ok(_) => return n, + // Err(actual) => { + // let (actual_steal, actual_real) = unpack(actual); + // + // assert_ne!(actual_steal, actual_real); + // + // prev_packed = actual; + // } + // } + // } + // } +} + +impl Clone for Steal { + fn clone(&self) -> Steal { + Steal(self.0.clone()) + } +} + +impl Inner { + fn len(&self) -> UnsignedShort { + let (_, head) = unpack(self.head.load(Acquire)); + let tail = self.tail.load(Acquire); + + tail.wrapping_sub(head) + } + + fn is_empty(&self) -> bool { + self.len() == 0 + } +} + +/// Split the head value into the real head and the index a stealer is working +/// on. +fn unpack(n: UnsignedLong) -> (UnsignedShort, UnsignedShort) { + let real = n & UnsignedShort::MAX as UnsignedLong; + let steal = n >> (mem::size_of::() * 8); + + (steal as UnsignedShort, real as UnsignedShort) +} + +/// Join the two head values +fn pack(steal: UnsignedShort, real: UnsignedShort) -> UnsignedLong { + (real as UnsignedLong) | ((steal as UnsignedLong) << (mem::size_of::() * 8)) +} + +impl Drop for Local { + fn drop(&mut self) { + if !std::thread::panicking() { + assert!(self.pop().is_none(), "queue not empty"); + } + } +} + +#[test] +fn test_local_queue_capacity() { + assert!(LOCAL_QUEUE_CAPACITY - 1 <= u8::MAX as usize); +} diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index bedaff39e4f..23241db74c6 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -67,6 +67,7 @@ use crate::runtime::{ use crate::util::atomic_cell::AtomicCell; use crate::util::rand::{FastRand, RngSeedGenerator}; +use crate::runtime::builder::MultiThreadFlavor; use std::cell::RefCell; use std::time::Duration; @@ -99,7 +100,7 @@ struct Core { lifo_enabled: bool, /// The worker-local run queue. - run_queue: queue::Local>, + run_queue: Box> + Send + Sync>, /// True if the worker is currently searching for more work. Searching /// involves attempting to steal from other workers. @@ -163,7 +164,7 @@ pub(super) struct Shared { /// Used to communicate with a worker from other threads. struct Remote { /// Steals tasks from this worker. - steal: queue::Steal>, + steal: Box> + Send + Sync>, /// Unparks the associated worker thread unpark: Unparker, @@ -200,6 +201,7 @@ const MAX_LIFO_POLLS_PER_TICK: usize = 3; pub(super) fn create( size: usize, + flavor: MultiThreadFlavor, park: Parker, driver_handle: driver::Handle, blocking_spawner: blocking::Spawner, @@ -212,7 +214,7 @@ pub(super) fn create( // Create the local queues for _ in 0..size { - let (steal, run_queue) = queue::local(); + let (steal, run_queue) = queue::local(flavor); let park = park.clone(); let unpark = park.unpark(); @@ -679,7 +681,7 @@ impl Core { // `run_queue.push_back` below, there will be *at least* `cap` // available slots in the queue. let cap = usize::min( - self.run_queue.remaining_slots(), + self.run_queue.remaining_slots_hint().0 as usize, self.run_queue.max_capacity() / 2, ); @@ -693,11 +695,15 @@ impl Core { let mut tasks = worker.inject().pop_n(n); - // Pop the first task to return immedietly + // Pop the first task to return immediately let ret = tasks.next(); // Push the rest of the on the run queue - self.run_queue.push_back(tasks); + // SAFETY: We checked the minimum capacity of the queue in advance, and + // don't enqueue more than that. Concurrent enqueues are not possible. + unsafe { + self.run_queue.push_back_unchecked(Box::new(tasks)); + } ret } @@ -732,7 +738,7 @@ impl Core { let target = &worker.handle.shared.remotes[i]; if let Some(task) = target .steal - .steal_into(&mut self.run_queue, &mut self.metrics) + .steal_into(&mut *self.run_queue, &mut self.metrics) { return Some(task); } diff --git a/tokio/src/runtime/task/list.rs b/tokio/src/runtime/task/list.rs index fb7dbdc1d95..80bda9e66cd 100644 --- a/tokio/src/runtime/task/list.rs +++ b/tokio/src/runtime/task/list.rs @@ -157,8 +157,10 @@ impl OwnedTasks { } } - pub(crate) fn active_tasks_count(&self) -> usize { - self.inner.lock().list.count() + cfg_metrics! { + pub(crate) fn active_tasks_count(&self) -> usize { + self.inner.lock().list.count() + } } pub(crate) fn remove(&self, task: &Task) -> Option> { diff --git a/tokio/src/runtime/tests/loom_queue.rs b/tokio/src/runtime/tests/loom_queue.rs index 58c17ad65c2..be610002d97 100644 --- a/tokio/src/runtime/tests/loom_queue.rs +++ b/tokio/src/runtime/tests/loom_queue.rs @@ -2,27 +2,28 @@ use crate::runtime::scheduler::multi_thread::queue; use crate::runtime::task::Inject; use crate::runtime::tests::NoopSchedule; use crate::runtime::MetricsBatch; +use std::sync::Arc; +use crate::runtime::builder::MultiThreadFlavor; use loom::thread; fn metrics_batch() -> MetricsBatch { MetricsBatch::new(&crate::runtime::WorkerMetrics::new()) } -#[test] -fn basic() { - loom::model(|| { - let (steal, mut local) = queue::local(); +fn basic_flavor(flavor: MultiThreadFlavor) { + loom::model(move || { + let (steal, mut local) = queue::local(flavor); let inject = Inject::new(); let mut metrics = metrics_batch(); let th = thread::spawn(move || { let mut metrics = metrics_batch(); - let (_, mut local) = queue::local(); + let (_, mut local) = queue::local(flavor); let mut n = 0; for _ in 0..3 { - if steal.steal_into(&mut local, &mut metrics).is_some() { + if steal.steal_into(&mut *local, &mut metrics).is_some() { n += 1; } @@ -66,18 +67,28 @@ fn basic() { } #[test] -fn steal_overflow() { - loom::model(|| { - let (steal, mut local) = queue::local(); +fn basic_default() { + basic_flavor(MultiThreadFlavor::Default); +} + +//#[cfg(all(tokio_unstable, feature = "bwos"))] +#[test] +fn basic_bwos() { + basic_flavor(MultiThreadFlavor::Bwos); +} + +fn steal_overflow(flavor: MultiThreadFlavor) { + loom::model(move || { + let (steal, mut local) = queue::local(flavor); let inject = Inject::new(); let mut metrics = metrics_batch(); let th = thread::spawn(move || { let mut metrics = metrics_batch(); - let (_, mut local) = queue::local(); + let (_, mut local) = queue::local(flavor); let mut n = 0; - if steal.steal_into(&mut local, &mut metrics).is_some() { + if steal.steal_into(&mut *local, &mut metrics).is_some() { n += 1; } @@ -118,14 +129,27 @@ fn steal_overflow() { } #[test] -fn multi_stealer() { +fn steal_overflow_default() { + steal_overflow(MultiThreadFlavor::Default) +} + +//#[cfg(all(tokio_unstable, feature = "bwos"))] +#[test] +fn steal_overflow_bwos() { + steal_overflow(MultiThreadFlavor::Bwos) +} + +fn multi_stealer_flavor(flavor: MultiThreadFlavor) { const NUM_TASKS: usize = 5; - fn steal_tasks(steal: queue::Steal) -> usize { + fn steal_tasks( + steal: Arc>, + flavor: MultiThreadFlavor, + ) -> usize { let mut metrics = metrics_batch(); - let (_, mut local) = queue::local(); + let (_, mut local) = queue::local(flavor); - if steal.steal_into(&mut local, &mut metrics).is_none() { + if steal.steal_into(&mut *local, &mut metrics).is_none() { return 0; } @@ -138,8 +162,8 @@ fn multi_stealer() { n } - loom::model(|| { - let (steal, mut local) = queue::local(); + loom::model(move || { + let (steal, mut local) = queue::local(flavor); let inject = Inject::new(); let mut metrics = metrics_batch(); @@ -149,12 +173,14 @@ fn multi_stealer() { local.push_back_or_overflow(task, &inject, &mut metrics); } + let steal: Arc + Send + Sync> = steal.into(); + let th1 = { let steal = steal.clone(); - thread::spawn(move || steal_tasks(steal)) + thread::spawn(move || steal_tasks(steal, flavor)) }; - let th2 = thread::spawn(move || steal_tasks(steal)); + let th2 = thread::spawn(move || steal_tasks(steal, flavor)); let mut n = 0; @@ -174,11 +200,21 @@ fn multi_stealer() { } #[test] -fn chained_steal() { - loom::model(|| { +fn multi_stealer_default() { + multi_stealer_flavor(MultiThreadFlavor::Default) +} + +//#[cfg(all(tokio_unstable, feature = "bwos"))] +#[test] +fn multi_stealer_bwos() { + multi_stealer_flavor(MultiThreadFlavor::Bwos) +} + +fn chained_steal(flavor: MultiThreadFlavor) { + loom::model(move || { let mut metrics = metrics_batch(); - let (s1, mut l1) = queue::local(); - let (s2, mut l2) = queue::local(); + let (s1, mut l1) = queue::local(flavor); + let (s2, mut l2) = queue::local(flavor); let inject = Inject::new(); // Load up some tasks @@ -193,8 +229,8 @@ fn chained_steal() { // Spawn a task to steal from **our** queue let th = thread::spawn(move || { let mut metrics = metrics_batch(); - let (_, mut local) = queue::local(); - s1.steal_into(&mut local, &mut metrics); + let (_, mut local) = queue::local(flavor); + s1.steal_into(&mut *local, &mut metrics); while local.pop().is_some() {} }); @@ -202,7 +238,7 @@ fn chained_steal() { // Drain our tasks, then attempt to steal while l1.pop().is_some() {} - s2.steal_into(&mut l1, &mut metrics); + s2.steal_into(&mut *l1, &mut metrics); th.join().unwrap(); @@ -211,3 +247,14 @@ fn chained_steal() { while inject.pop().is_some() {} }); } + +#[test] +fn chained_steal_default() { + chained_steal(MultiThreadFlavor::Default) +} + +//#[cfg(all(tokio_unstable, feature = "bwos"))] +#[test] +fn chained_steal_bwos() { + chained_steal(MultiThreadFlavor::Bwos) +} diff --git a/tokio/src/runtime/tests/queue.rs b/tokio/src/runtime/tests/queue.rs index 09f249e9ea4..865ed769afa 100644 --- a/tokio/src/runtime/tests/queue.rs +++ b/tokio/src/runtime/tests/queue.rs @@ -2,6 +2,7 @@ use crate::runtime::scheduler::multi_thread::queue; use crate::runtime::task::{self, Inject, Schedule, Task}; use crate::runtime::MetricsBatch; +use crate::runtime::builder::MultiThreadFlavor; use std::thread; use std::time::Duration; @@ -28,7 +29,7 @@ fn metrics_batch() -> MetricsBatch { #[test] fn fits_256_one_at_a_time() { - let (_, mut local) = queue::local(); + let (_, mut local) = queue::local(MultiThreadFlavor::Default); let inject = Inject::new(); let mut metrics = metrics_batch(); @@ -48,12 +49,12 @@ fn fits_256_one_at_a_time() { #[test] fn fits_256_all_at_once() { - let (_, mut local) = queue::local(); + let (_, mut local) = queue::local(MultiThreadFlavor::Default); let mut tasks = (0..256) .map(|_| super::unowned(async {}).0) .collect::>(); - local.push_back(tasks.drain(..)); + local.push_back(Box::new(tasks.drain(..))); let mut i = 0; while local.pop().is_some() { @@ -65,16 +66,16 @@ fn fits_256_all_at_once() { #[test] fn fits_256_all_in_chunks() { - let (_, mut local) = queue::local(); + let (_, mut local) = queue::local(MultiThreadFlavor::Default); let mut tasks = (0..256) .map(|_| super::unowned(async {}).0) .collect::>(); - local.push_back(tasks.drain(..10)); - local.push_back(tasks.drain(..100)); - local.push_back(tasks.drain(..46)); - local.push_back(tasks.drain(..100)); + local.push_back(Box::new(tasks.drain(..10))); + local.push_back(Box::new(tasks.drain(..100))); + local.push_back(Box::new(tasks.drain(..46))); + local.push_back(Box::new(tasks.drain(..100))); let mut i = 0; while local.pop().is_some() { @@ -86,7 +87,7 @@ fn fits_256_all_in_chunks() { #[test] fn overflow() { - let (_, mut local) = queue::local(); + let (_, mut local) = queue::local(MultiThreadFlavor::Default); let inject = Inject::new(); let mut metrics = metrics_batch(); @@ -116,8 +117,8 @@ fn overflow() { fn steal_batch() { let mut metrics = metrics_batch(); - let (steal1, mut local1) = queue::local(); - let (_, mut local2) = queue::local(); + let (steal1, mut local1) = queue::local(MultiThreadFlavor::Default); + let (_, mut local2) = queue::local(MultiThreadFlavor::Default); let inject = Inject::new(); for _ in 0..4 { @@ -125,7 +126,7 @@ fn steal_batch() { local1.push_back_or_overflow(task, &inject, &mut metrics); } - assert!(steal1.steal_into(&mut local2, &mut metrics).is_some()); + assert!(steal1.steal_into(&mut *local2, &mut metrics).is_some()); cfg_metrics! { assert_metrics!(metrics, steal_count == 2); @@ -163,16 +164,16 @@ fn stress1() { let mut metrics = metrics_batch(); for _ in 0..NUM_ITER { - let (steal, mut local) = queue::local(); + let (steal, mut local) = queue::local(MultiThreadFlavor::Default); let inject = Inject::new(); let th = thread::spawn(move || { let mut metrics = metrics_batch(); - let (_, mut local) = queue::local(); + let (_, mut local) = queue::local(MultiThreadFlavor::Default); let mut n = 0; for _ in 0..NUM_STEAL { - if steal.steal_into(&mut local, &mut metrics).is_some() { + if steal.steal_into(&mut *local, &mut metrics).is_some() { n += 1; } @@ -226,16 +227,16 @@ fn stress2() { let mut metrics = metrics_batch(); for _ in 0..NUM_ITER { - let (steal, mut local) = queue::local(); + let (steal, mut local) = queue::local(MultiThreadFlavor::Default); let inject = Inject::new(); let th = thread::spawn(move || { let mut stats = metrics_batch(); - let (_, mut local) = queue::local(); + let (_, mut local) = queue::local(MultiThreadFlavor::Default); let mut n = 0; for _ in 0..NUM_STEAL { - if steal.steal_into(&mut local, &mut stats).is_some() { + if steal.steal_into(&mut *local, &mut stats).is_some() { n += 1; } diff --git a/tokio/src/util/array_init.rs b/tokio/src/util/array_init.rs new file mode 100644 index 00000000000..09de603b9c9 --- /dev/null +++ b/tokio/src/util/array_init.rs @@ -0,0 +1,23 @@ +/// Simplified version of https://github.com/Manishearth/array-init for an infallible +/// initializer +use std::mem::MaybeUninit; + +#[inline] +pub(crate) fn array_init(mut initializer: F) -> [T; N] +where + F: FnMut(usize) -> T, +{ + let mut array: MaybeUninit<[T; N]> = MaybeUninit::uninit(); + // pointer to array = *mut [T; N] <-> *mut T = pointer to first element + let mut ptr_i = array.as_mut_ptr() as *mut T; + for i in 0..N { + let value_i = initializer(i); + // SAFETY: We are initialising the array entry by entry + unsafe { + ptr_i.write(value_i); + ptr_i = ptr_i.add(1); + } + } + // SAFETY: We have finished initialising the array. + unsafe { array.assume_init() } +} diff --git a/tokio/src/util/cache_padded.rs b/tokio/src/util/cache_padded.rs new file mode 100644 index 00000000000..47ae95286f4 --- /dev/null +++ b/tokio/src/util/cache_padded.rs @@ -0,0 +1,91 @@ +// Implementation copied from crossbeam-utils on May, 23 2023: +// https://github.com/crossbeam-rs/crossbeam/blob/cbaa0b14697cc9550b38dca12e0cfed2b69356cc/crossbeam-utils/src/cache_padded.rs +#![allow(dead_code)] + +use core::fmt; +use core::ops::{Deref, DerefMut}; + +#[derive(Clone, Copy, Default, Hash, PartialEq, Eq)] +#[cfg_attr( + any( + target_arch = "x86_64", + target_arch = "aarch64", + target_arch = "powerpc64", + ), + repr(align(128)) +)] +#[cfg_attr( + any( + target_arch = "arm", + target_arch = "mips", + target_arch = "mips64", + target_arch = "riscv32", + target_arch = "riscv64", + target_arch = "sparc", + target_arch = "hexagon", + ), + repr(align(32)) +)] +#[cfg_attr(target_arch = "m68k", repr(align(16)))] +#[cfg_attr(target_arch = "s390x", repr(align(256)))] +#[cfg_attr( + not(any( + target_arch = "x86_64", + target_arch = "aarch64", + target_arch = "powerpc64", + target_arch = "arm", + target_arch = "mips", + target_arch = "mips64", + target_arch = "riscv32", + target_arch = "riscv64", + target_arch = "sparc", + target_arch = "hexagon", + target_arch = "m68k", + target_arch = "s390x", + )), + repr(align(64)) +)] +pub(crate) struct CachePadded { + value: T, +} + +unsafe impl Send for CachePadded {} +unsafe impl Sync for CachePadded {} + +impl CachePadded { + pub(crate) const fn new(t: T) -> CachePadded { + CachePadded:: { value: t } + } + + pub(crate) fn into_inner(self) -> T { + self.value + } +} + +impl Deref for CachePadded { + type Target = T; + + fn deref(&self) -> &T { + &self.value + } +} + +impl DerefMut for CachePadded { + fn deref_mut(&mut self) -> &mut T { + &mut self.value + } +} + +impl fmt::Debug for CachePadded { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("CachePadded") + .field("value", &self.value) + .finish() + } +} + +impl From for CachePadded { + fn from(t: T) -> Self { + CachePadded::new(t) + } +} diff --git a/tokio/src/util/linked_list.rs b/tokio/src/util/linked_list.rs index ca6ef0e7b79..880369c3eab 100644 --- a/tokio/src/util/linked_list.rs +++ b/tokio/src/util/linked_list.rs @@ -270,6 +270,7 @@ impl CountedLinkedList { val } + #[cfg(any(all(tokio_unstable, not(loom), feature = "stats"), test))] pub(crate) fn count(&self) -> usize { self.count } diff --git a/tokio/src/util/mod.rs b/tokio/src/util/mod.rs index b1afc5716b9..edadb345899 100644 --- a/tokio/src/util/mod.rs +++ b/tokio/src/util/mod.rs @@ -73,6 +73,12 @@ cfg_rt_multi_thread! { pub(crate) use try_lock::TryLock; } +//cfg_rt_multi_thread_bwos! { +mod array_init; +pub(crate) mod cache_padded; +pub(crate) use array_init::array_init; +//} + pub(crate) mod trace; pub(crate) mod error;