Skip to content

Commit 5aaf6ba

Browse files
committedJan 24, 2025··
kernel: use lock-free queues for scheduler work submission
1 parent 5e598d7 commit 5aaf6ba

File tree

7 files changed

+471
-76
lines changed

7 files changed

+471
-76
lines changed
 

‎Cargo.lock

+346-11
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

‎Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ volatile-register = "0.2.2"
6969
buddy_system_allocator = "0.11.0"
7070
hashbrown = { version = "0.15.2", default-features = false, features = ["nightly", "inline-more", "allocator-api2"] }
7171
foldhash = { version = "0.1.4", default-features = false }
72+
nolock = { version = "0.4.1", default-features = false, features = ["queues"] }
7273

7374
bindgen = { git = "https://github.com/oro-os/dep.rust-bindgen.git" }
7475
syn = { version = "2.0.60", features = ["full", "printing"] }

‎oro-arch-x86_64/src/init.rs

+4-2
Original file line numberDiff line numberDiff line change
@@ -183,9 +183,11 @@ pub unsafe fn initialize_primary(lapic: Lapic) {
183183
.expect("failed to create root ring instance");
184184

185185
// Create a thread for the entry point.
186-
// TODO(qix-): Allow stack size to be passed in via module command line.
187-
let _thread = Thread::new(&instance, entry_point)
186+
let thread = Thread::new(&instance, entry_point)
188187
.expect("failed to create root ring instance thread");
188+
189+
// Spawn it.
190+
Thread::spawn(thread);
189191
}
190192
}
191193
}

‎oro-kernel/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ oro-dbgutil.workspace = true
3838

3939
hashbrown.workspace = true
4040
foldhash = { workspace = true, default-features = false }
41+
nolock = { workspace = true, default-features = false, features = ["queues"] }
4142

4243
[lints]
4344
workspace = true

‎oro-kernel/src/lib.rs

+46-10
Original file line numberDiff line numberDiff line change
@@ -52,17 +52,20 @@ use core::{
5252
sync::atomic::{AtomicBool, Ordering::SeqCst},
5353
};
5454

55-
use interface::RingInterface;
55+
use nolock::queues::{
56+
DequeueError,
57+
mpmc::bounded::scq::{Receiver, Sender},
58+
};
5659
use oro_macro::assert;
5760
use oro_mem::{
58-
alloc::vec::Vec,
5961
global_alloc::GlobalPfa,
6062
mapper::{AddressSegment, AddressSpace as _, MapError},
6163
pfa::Alloc,
6264
};
63-
use oro_sync::{Lock, TicketMutex};
65+
use oro_sync::TicketMutex;
66+
use tab::Tab;
6467

65-
use self::{arch::Arch, scheduler::Scheduler};
68+
use self::{arch::Arch, interface::RingInterface, scheduler::Scheduler, thread::Thread};
6669

6770
/// Core-local instance of the Oro kernel.
6871
///
@@ -239,8 +242,10 @@ impl<A: Arch> Kernel<A> {
239242
/// Global state shared by all [`Kernel`] instances across
240243
/// core boot/powerdown/bringup cycles.
241244
pub struct KernelState<A: Arch> {
242-
/// List of all threads.
243-
threads: TicketMutex<Vec<tab::Tab<thread::Thread<A>>>>,
245+
/// Unclaimed thread deque sender.
246+
thread_tx: Sender<Tab<Thread<A>>>,
247+
/// Unclaimed thread deque receiver.
248+
thread_rx: Receiver<Tab<Thread<A>>>,
244249
/// The root ring.
245250
root_ring: tab::Tab<ring::Ring<A>>,
246251
/// Whether or not the root ring has been initialized.
@@ -271,9 +276,12 @@ impl<A: Arch> KernelState<A> {
271276

272277
let root_ring = ring::Ring::<A>::new_root()?;
273278

279+
let (thread_rx, thread_tx) = nolock::queues::mpmc::bounded::scq::queue(128);
280+
274281
this.write(Self {
282+
thread_tx,
283+
thread_rx,
275284
root_ring,
276-
threads: TicketMutex::default(),
277285
has_initialized_root: AtomicBool::new(false),
278286
});
279287

@@ -285,9 +293,37 @@ impl<A: Arch> KernelState<A> {
285293
&self.root_ring
286294
}
287295

288-
/// Returns a reference to the mutex-guarded list of threads.
289-
pub fn threads(&self) -> &impl Lock<Target = Vec<tab::Tab<thread::Thread<A>>>> {
290-
&self.threads
296+
/// Submits a thread to the kernel state to be claimed by
297+
/// the next 'free' scheduler.
298+
///
299+
/// # Panics
300+
/// This function panics if the unclaimed thread queue is full.
301+
///
302+
/// For now, this is acceptable, but will be relaxed in the future
303+
/// as the scheduler is fleshed out a bit more.
304+
pub fn submit_unclaimed_thread(&self, thread: Tab<Thread<A>>) {
305+
// Tell the thread it's been deallocated.
306+
unsafe {
307+
Thread::<A>::deallocate(&thread);
308+
}
309+
310+
match self.thread_tx.try_enqueue(thread) {
311+
Ok(t) => t,
312+
Err((err, _)) => panic!("thread queue full or disconnected: {err:?}"),
313+
};
314+
}
315+
316+
/// Tries to take the next unclaimed thread.
317+
#[expect(clippy::missing_panics_doc)]
318+
pub fn try_claim_thread(&self) -> Option<Tab<Thread<A>>> {
319+
match self.thread_rx.try_dequeue() {
320+
Ok(thread) => Some(thread),
321+
Err(DequeueError::Closed) => {
322+
// NOTE(qix-): Should never happen.
323+
panic!("thread queue disconnected");
324+
}
325+
Err(DequeueError::Empty) => None,
326+
}
291327
}
292328
}
293329

‎oro-kernel/src/scheduler.rs

+53-45
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,17 @@
11
//! Houses types, traits and functionality for the Oro kernel scheduler.
22
3-
use oro_mem::alloc::vec::Vec;
4-
use oro_sync::Lock;
3+
use nolock::queues::{
4+
DequeueError,
5+
spsc::unbounded::{UnboundedReceiver as Receiver, UnboundedSender as Sender},
6+
};
7+
use oro_debug::dbg_warn;
58

69
use crate::{
710
Kernel,
811
arch::{Arch, CoreHandle},
912
syscall::{InFlightSystemCall, InterfaceResponse, SystemCallRequest, SystemCallResponse},
1013
tab::Tab,
11-
thread::{RunState, ScheduleAction, ScheduleError, Thread},
14+
thread::{RunState, ScheduleAction, Thread},
1215
};
1316

1417
/// Main scheduler state machine.
@@ -22,11 +25,13 @@ use crate::{
2225
/// itself.
2326
pub struct Scheduler<A: Arch> {
2427
/// A reference to the kernel instance.
25-
kernel: &'static Kernel<A>,
28+
kernel: &'static Kernel<A>,
2629
/// The current thread, if there is one being executed.
27-
current: Option<Tab<Thread<A>>>,
28-
/// The index of the next thread to execute.
29-
next_index: usize,
30+
current: Option<Tab<Thread<A>>>,
31+
/// The thread queue transfer side.
32+
thread_tx: Sender<Tab<Thread<A>>>,
33+
/// The thread queue receive side.
34+
thread_rx: Receiver<Tab<Thread<A>>>,
3035
}
3136

3237
// XXX(qix-): Temporary workaround to make things compile
@@ -37,10 +42,13 @@ unsafe impl<A: Arch> Sync for Scheduler<A> {}
3742
impl<A: Arch> Scheduler<A> {
3843
/// Creates a new scheduler instance.
3944
pub(crate) fn new(kernel: &'static Kernel<A>) -> Self {
45+
let (thread_rx, thread_tx) = nolock::queues::spsc::unbounded::queue();
46+
4047
Self {
4148
kernel,
4249
current: None,
43-
next_index: 0,
50+
thread_tx,
51+
thread_rx,
4452
}
4553
}
4654

@@ -72,48 +80,39 @@ impl<A: Arch> Scheduler<A> {
7280
thread
7381
.with_mut(|t| t.try_pause(self.kernel.id()))
7482
.expect("thread pause failed");
75-
}
76-
77-
// XXX(qix-): This is a terrible design but gets the job done for now.
78-
// XXX(qix-): Every single core will be competing for a list of the same threads
79-
// XXX(qix-): until a thread migration system is implemented.
80-
let mut thread_list = self.kernel.state().threads().lock();
81-
82-
// XXX(qix-): An even worse design, but it's a temporary workaround
83-
// XXX(qix-): to deal with a deadlock. The scheduling algorithm will
84-
// XXX(qix-): get a huge refactor at some point in the near future.
85-
let mut dead_threads = Vec::new();
8683

87-
while self.next_index < thread_list.len() {
88-
let thread = &thread_list[self.next_index];
89-
self.next_index += 1;
90-
91-
match thread.with_mut(|t| t.try_schedule(self.kernel.id())) {
92-
Ok(action) => {
93-
// Select it for execution.
94-
self.current = Some(thread.clone());
95-
return Some((thread.clone(), action));
96-
}
97-
Err(ScheduleError::Terminated) => {
98-
dead_threads.push(self.next_index - 1);
99-
}
100-
_ => {}
84+
if let Err((thread, err)) = self.thread_tx.enqueue(thread) {
85+
// We failed to schedule the thread.
86+
dbg_warn!(
87+
"scheduler {} failed to requeue thread {:#016X}: {err:?}",
88+
self.kernel.id(),
89+
thread.id()
90+
);
91+
self.kernel.state().submit_unclaimed_thread(thread);
10192
}
10293
}
10394

104-
// For any dead threads we found, remove them from the thread list.
105-
for thread_idx in dead_threads {
106-
// XXX(qix-): This will inevitably screw up the schedule order
107-
// XXX(qix-): for all schedulers in the system but it works for
108-
// XXX(qix-): now. The scheduler will get a huge refactor at some
109-
// XXX(qix-): point in the near future.
110-
thread_list.swap_remove(thread_idx);
111-
}
112-
113-
drop(thread_list);
95+
loop {
96+
let selected = match self.kernel.state().try_claim_thread() {
97+
Some(thread) => thread,
98+
None => {
99+
match self.thread_rx.try_dequeue() {
100+
Ok(thread) => thread,
101+
Err(DequeueError::Closed) => panic!("thread queue closed"),
102+
Err(DequeueError::Empty) => {
103+
return None;
104+
}
105+
}
106+
}
107+
};
114108

115-
self.next_index = 0;
116-
None
109+
// Take action if needed, otherwise skip the thread; it'll be re-queued when
110+
// it needs to be.
111+
if let Ok(action) = selected.with_mut(|t| t.try_schedule(self.kernel.id())) {
112+
self.current = Some(selected.clone());
113+
return Some((selected, action));
114+
}
115+
}
117116
}
118117

119118
/// Called whenever the architecture has reached a codepath
@@ -246,6 +245,15 @@ impl<A: Arch> Scheduler<A> {
246245
}
247246
}
248247

248+
impl<A: Arch> Drop for Scheduler<A> {
249+
fn drop(&mut self) {
250+
// Drain the thread queue.
251+
while let Ok(thread) = self.thread_rx.try_dequeue() {
252+
self.kernel.state().submit_unclaimed_thread(thread);
253+
}
254+
}
255+
}
256+
249257
/// Indicates the type of context switch to be taken by an event caller
250258
/// (typically, the architecture).
251259
///

‎oro-kernel/src/thread.rs

+20-8
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,10 @@ use oro_mem::{
1515
mapper::{AddressSegment, AddressSpace as _, MapError, UnmapError},
1616
pfa::Alloc,
1717
};
18-
use oro_sync::Lock;
1918
use oro_sysabi::{key, syscall::Error as SysError};
2019

2120
use crate::{
22-
AddressSpace, Kernel, UserHandle,
21+
AddressSpace, UserHandle,
2322
arch::{Arch, ThreadHandle},
2423
instance::Instance,
2524
ring::Ring,
@@ -197,12 +196,6 @@ impl<A: Arch> Thread<A> {
197196

198197
instance.with_mut(|instance| instance.threads.insert(tab.id(), tab.clone()));
199198

200-
Kernel::<A>::get()
201-
.state()
202-
.threads()
203-
.lock()
204-
.push(tab.clone());
205-
206199
Ok(tab)
207200
}
208201

@@ -422,6 +415,25 @@ impl<A: Arch> Thread<A> {
422415
}
423416
}
424417

418+
/// Spawns the thread. If the thread has already been spawned,
419+
/// this function does nothing.
420+
#[inline]
421+
pub fn spawn(this: Tab<Thread<A>>) {
422+
if this.with(|t| matches!(t.state, State::Unallocated)) {
423+
crate::Kernel::<A>::get()
424+
.state()
425+
.submit_unclaimed_thread(this);
426+
}
427+
}
428+
429+
/// Tells the thread it's been deallocated by a scheduler.
430+
///
431+
/// # Safety
432+
/// The caller must ensure that the thread is not actively running on any core.
433+
pub(crate) unsafe fn deallocate(this: &Tab<Thread<A>>) {
434+
this.with_mut(|t| t.state = State::Unallocated);
435+
}
436+
425437
/// Returns a reference to the thread's data table.
426438
#[inline]
427439
pub fn data(&self) -> &TypeTable {

0 commit comments

Comments
 (0)
Please sign in to comment.