Skip to content

Commit 0c2282f

Browse files
author
Stjepan Glavina
authored
Optimization: a slot for the next task to run (#529)
* Optimization: a slot for the next task to run * Only notify workers when a task is pushed into a queue
1 parent d546ee3 commit 0c2282f

File tree

4 files changed

+91
-50
lines changed

4 files changed

+91
-50
lines changed

benches/mutex.rs

+1-3
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,7 @@
22

33
extern crate test;
44

5-
use std::sync::Arc;
6-
7-
use async_std::sync::Mutex;
5+
use async_std::sync::{Arc, Mutex};
86
use async_std::task;
97
use test::Bencher;
108

src/sync/mutex.rs

+1
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,7 @@ impl<T> Mutex<T> {
170170
/// #
171171
/// # })
172172
/// ```
173+
#[inline]
173174
pub fn try_lock(&self) -> Option<MutexGuard<'_, T>> {
174175
if !self.locked.swap(true, Ordering::SeqCst) {
175176
Some(MutexGuard(self))

src/sync/waker_set.rs

+4-1
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ impl WakerSet {
6060
}
6161

6262
/// Inserts a waker for a blocked operation and returns a key associated with it.
63+
#[cold]
6364
pub fn insert(&self, cx: &Context<'_>) -> usize {
6465
let w = cx.waker().clone();
6566
let mut inner = self.lock();
@@ -70,6 +71,7 @@ impl WakerSet {
7071
}
7172

7273
/// Removes the waker of an operation.
74+
#[cold]
7375
pub fn remove(&self, key: usize) {
7476
let mut inner = self.lock();
7577

@@ -81,6 +83,7 @@ impl WakerSet {
8183
/// Removes the waker of a cancelled operation.
8284
///
8385
/// Returns `true` if another blocked operation from the set was notified.
86+
#[cold]
8487
pub fn cancel(&self, key: usize) -> bool {
8588
let mut inner = self.lock();
8689

@@ -147,6 +150,7 @@ impl WakerSet {
147150
/// Notifies blocked operations, either one or all of them.
148151
///
149152
/// Returns `true` if at least one operation was notified.
153+
#[cold]
150154
fn notify(&self, n: Notify) -> bool {
151155
let mut inner = &mut *self.lock();
152156
let mut notified = false;
@@ -172,7 +176,6 @@ impl WakerSet {
172176
}
173177

174178
/// Locks the list of entries.
175-
#[cold]
176179
fn lock(&self) -> Lock<'_> {
177180
let backoff = Backoff::new();
178181
while self.flag.fetch_or(LOCKED, Ordering::Acquire) & LOCKED != 0 {

src/task/executor/pool.rs

+85-46
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
1-
use std::cell::UnsafeCell;
1+
use std::cell::Cell;
22
use std::iter;
33
use std::thread;
44
use std::time::Duration;
55

66
use crossbeam_deque::{Injector, Stealer, Worker};
77
use once_cell::sync::Lazy;
8+
use once_cell::unsync::OnceCell;
89

910
use crate::task::executor::Sleepers;
1011
use crate::task::Runnable;
@@ -32,9 +33,18 @@ static POOL: Lazy<Pool> = Lazy::new(|| {
3233
let worker = Worker::new_fifo();
3334
stealers.push(worker.stealer());
3435

36+
let proc = Processor {
37+
worker,
38+
slot: Cell::new(None),
39+
slot_runs: Cell::new(0),
40+
};
41+
3542
thread::Builder::new()
3643
.name("async-std/executor".to_string())
37-
.spawn(|| abort_on_panic(|| main_loop(worker)))
44+
.spawn(|| {
45+
let _ = PROCESSOR.with(|p| p.set(proc));
46+
abort_on_panic(|| main_loop());
47+
})
3848
.expect("cannot start a thread driving tasks");
3949
}
4050

@@ -45,59 +55,75 @@ static POOL: Lazy<Pool> = Lazy::new(|| {
4555
}
4656
});
4757

58+
/// The state of a worker thread.
59+
struct Processor {
60+
/// The local task queue.
61+
worker: Worker<Runnable>,
62+
63+
/// Contains the next task to run as an optimization that skips queues.
64+
slot: Cell<Option<Runnable>>,
65+
66+
/// How many times in a row tasks have been taked from the slot rather than the queue.
67+
slot_runs: Cell<u32>,
68+
}
69+
4870
thread_local! {
49-
/// Local task queue associated with the current worker thread.
50-
static QUEUE: UnsafeCell<Option<Worker<Runnable>>> = UnsafeCell::new(None);
71+
/// Worker thread state.
72+
static PROCESSOR: OnceCell<Processor> = OnceCell::new();
5173
}
5274

5375
/// Schedules a new runnable task for execution.
5476
pub(crate) fn schedule(task: Runnable) {
55-
QUEUE.with(|queue| {
56-
let local = unsafe { (*queue.get()).as_ref() };
57-
58-
// If the current thread is a worker thread, push the task into its local task queue.
59-
// Otherwise, push it into the global task queue.
60-
match local {
61-
None => POOL.injector.push(task),
62-
Some(q) => q.push(task),
77+
PROCESSOR.with(|proc| {
78+
// If the current thread is a worker thread, store it into its task slot or push it into
79+
// its local task queue. Otherwise, push it into the global task queue.
80+
match proc.get() {
81+
Some(proc) => {
82+
// Replace the task in the slot.
83+
if let Some(task) = proc.slot.replace(Some(task)) {
84+
// If the slot already contained a task, push it into the local task queue.
85+
proc.worker.push(task);
86+
POOL.sleepers.notify_one();
87+
}
88+
}
89+
None => {
90+
POOL.injector.push(task);
91+
POOL.sleepers.notify_one();
92+
}
6393
}
64-
});
65-
66-
// Notify a sleeping worker that new work just came in.
67-
POOL.sleepers.notify_one();
94+
})
6895
}
6996

7097
/// Main loop running a worker thread.
71-
fn main_loop(local: Worker<Runnable>) {
72-
// Initialize the local task queue.
73-
QUEUE.with(|queue| unsafe { *queue.get() = Some(local) });
98+
fn main_loop() {
99+
/// Number of yields when no runnable task is found.
100+
const YIELDS: u32 = 3;
101+
/// Number of short sleeps when no runnable task in found.
102+
const SLEEPS: u32 = 1;
74103

75104
// The number of times the thread didn't find work in a row.
76-
let mut step = 0;
105+
let mut fails = 0;
77106

78107
loop {
79108
// Try to find a runnable task.
80109
match find_runnable() {
81110
Some(task) => {
82-
// Found. Now run the task.
111+
fails = 0;
112+
113+
// Run the found task.
83114
task.run();
84-
step = 0;
85115
}
86116
None => {
117+
fails += 1;
118+
87119
// Yield the current thread or put it to sleep.
88-
match step {
89-
0..=2 => {
90-
thread::yield_now();
91-
step += 1;
92-
}
93-
3 => {
94-
thread::sleep(Duration::from_micros(10));
95-
step += 1;
96-
}
97-
_ => {
98-
POOL.sleepers.wait();
99-
step = 0;
100-
}
120+
if fails <= YIELDS {
121+
thread::yield_now();
122+
} else if fails <= YIELDS + SLEEPS {
123+
thread::sleep(Duration::from_micros(10));
124+
} else {
125+
POOL.sleepers.wait();
126+
fails = 0;
101127
}
102128
}
103129
}
@@ -106,29 +132,42 @@ fn main_loop(local: Worker<Runnable>) {
106132

107133
/// Find the next runnable task.
108134
fn find_runnable() -> Option<Runnable> {
109-
let pool = &*POOL;
110-
111-
QUEUE.with(|queue| {
112-
let local = unsafe { (*queue.get()).as_ref().unwrap() };
135+
/// Maximum number of times the slot can be used in a row.
136+
const SLOT_LIMIT: u32 = 16;
137+
138+
PROCESSOR.with(|proc| {
139+
let proc = proc.get().unwrap();
140+
141+
// Try taking a task from the slot.
142+
let runs = proc.slot_runs.get();
143+
if runs < SLOT_LIMIT {
144+
if let Some(task) = proc.slot.take() {
145+
proc.slot_runs.set(runs + 1);
146+
return Some(task);
147+
}
148+
}
149+
proc.slot_runs.set(0);
113150

114151
// Pop a task from the local queue, if not empty.
115-
local.pop().or_else(|| {
152+
proc.worker.pop().or_else(|| {
116153
// Otherwise, we need to look for a task elsewhere.
117154
iter::repeat_with(|| {
118155
// Try stealing a batch of tasks from the global queue.
119-
pool.injector
120-
.steal_batch_and_pop(&local)
156+
POOL.injector
157+
.steal_batch_and_pop(&proc.worker)
121158
// Or try stealing a batch of tasks from one of the other threads.
122159
.or_else(|| {
123160
// First, pick a random starting point in the list of local queues.
124-
let len = pool.stealers.len();
161+
let len = POOL.stealers.len();
125162
let start = random(len as u32) as usize;
126163

127164
// Try stealing a batch of tasks from each local queue starting from the
128165
// chosen point.
129-
let (l, r) = pool.stealers.split_at(start);
130-
let rotated = r.iter().chain(l.iter());
131-
rotated.map(|s| s.steal_batch_and_pop(&local)).collect()
166+
let (l, r) = POOL.stealers.split_at(start);
167+
let stealers = r.iter().chain(l.iter());
168+
stealers
169+
.map(|s| s.steal_batch_and_pop(&proc.worker))
170+
.collect()
132171
})
133172
})
134173
// Loop while no task was stolen and any steal operation needs to be retried.

0 commit comments

Comments
 (0)