Skip to content

Commit 7be2574

Browse files
committed
Allow toggling drop behavior in cancellation
1 parent b30d516 commit 7be2574

File tree

9 files changed

+143
-98
lines changed

9 files changed

+143
-98
lines changed

examples/lazy-input/main.rs

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -89,15 +89,18 @@ impl LazyInputDatabase {
8989
fn new(tx: Sender<DebounceEventResult>) -> Self {
9090
let logs: Arc<Mutex<Vec<String>>> = Default::default();
9191
Self {
92-
storage: Storage::new(Some(Box::new({
93-
let logs = logs.clone();
94-
move |event| {
95-
// don't log boring events
96-
if let salsa::EventKind::WillExecute { .. } = event.kind {
97-
logs.lock().unwrap().push(format!("{event:?}"));
92+
storage: Storage::new(
93+
false,
94+
Some(Box::new({
95+
let logs = logs.clone();
96+
move |event| {
97+
// don't log boring events
98+
if let salsa::EventKind::WillExecute { .. } = event.kind {
99+
logs.lock().unwrap().push(format!("{event:?}"));
100+
}
98101
}
99-
}
100-
}))),
102+
})),
103+
),
101104
logs,
102105
files: DashMap::new(),
103106
file_watcher: Arc::new(Mutex::new(

src/database_impl.rs

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,16 @@ impl Default for DatabaseImpl {
1414
fn default() -> Self {
1515
Self {
1616
// Default behavior: tracing debug log the event.
17-
storage: Storage::new(if tracing::enabled!(Level::DEBUG) {
18-
Some(Box::new(|event| {
19-
tracing::debug!("salsa_event({:?})", event)
20-
}))
21-
} else {
22-
None
23-
}),
17+
storage: Storage::new(
18+
false,
19+
if tracing::enabled!(Level::DEBUG) {
20+
Some(Box::new(|event| {
21+
tracing::debug!("salsa_event({:?})", event)
22+
}))
23+
} else {
24+
None
25+
},
26+
),
2427
}
2528
}
2629
}

src/runtime.rs

Lines changed: 37 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@ use crate::loom::sync::atomic::{AtomicBool, Ordering};
1010
use crate::loom::sync::{AtomicMut, Mutex};
1111
use crate::loom::thread::{self, JoinHandle, ThreadId};
1212
use crate::plumbing::MemoDropSender;
13-
use crate::table::memo::{memo_drop_channel, spawn_memo_drop_thread, MemoDropReceiver};
13+
use crate::table::memo::{
14+
memo_drop_channel, run_memo_drops, spawn_memo_drop_thread, MemoDropReceiver,
15+
};
1416
use crate::table::Table;
1517
use crate::{Cancelled, Event, EventKind, Revision};
1618

@@ -41,7 +43,7 @@ pub struct Runtime {
4143
table: Table,
4244
// DROP ORDER: `memo_drop_sender` and `unparker` need to drop before `memo_drop_thread`
4345
// as `memo_drop_thread` exits once `memo_drop_sender` drops and its last parker barrier unparks
44-
memo_drop_unparker: UnparkOnDrop,
46+
memo_drop_unparker: Option<UnparkOnDrop>,
4547
memo_drop_sender: MemoDropSender,
4648
memo_drop_receiver: Exclusive<Option<MemoDropReceiver>>,
4749
memo_drop_thread: JoinOnDrop,
@@ -87,26 +89,6 @@ impl<V> StampedValue<V> {
8789
}
8890
}
8991

90-
impl Default for Runtime {
91-
fn default() -> Self {
92-
let (memo_drop_sender, memo_drop_receiver) = memo_drop_channel();
93-
94-
let parker = Parker::new();
95-
let unparker = UnparkOnDrop(parker.unparker().clone());
96-
memo_drop_sender.park(parker);
97-
Runtime {
98-
revisions: [Revision::start(); Durability::LEN],
99-
revision_canceled: Default::default(),
100-
dependency_graph: Default::default(),
101-
table: Default::default(),
102-
memo_drop_unparker: unparker,
103-
memo_drop_sender,
104-
memo_drop_receiver: Exclusive::new(Some(memo_drop_receiver)),
105-
memo_drop_thread: Default::default(),
106-
}
107-
}
108-
}
109-
11092
impl std::fmt::Debug for Runtime {
11193
fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
11294
fmt.debug_struct("Runtime")
@@ -118,6 +100,28 @@ impl std::fmt::Debug for Runtime {
118100
}
119101

120102
impl Runtime {
103+
pub(crate) fn new(drop_in_thread: bool) -> Self {
104+
let (memo_drop_sender, memo_drop_receiver) = memo_drop_channel();
105+
let memo_drop_unparker = if drop_in_thread {
106+
let parker = Parker::new();
107+
let unparker = UnparkOnDrop(parker.unparker().clone());
108+
memo_drop_sender.park(parker);
109+
Some(unparker)
110+
} else {
111+
None
112+
};
113+
Runtime {
114+
revisions: [Revision::start(); Durability::LEN],
115+
revision_canceled: Default::default(),
116+
dependency_graph: Default::default(),
117+
table: Default::default(),
118+
memo_drop_unparker,
119+
memo_drop_sender,
120+
memo_drop_receiver: Exclusive::new(Some(memo_drop_receiver)),
121+
memo_drop_thread: JoinOnDrop(None),
122+
}
123+
}
124+
121125
#[inline]
122126
pub(crate) fn current_revision(&self) -> Revision {
123127
self.revisions[0]
@@ -181,13 +185,18 @@ impl Runtime {
181185
/// Releases the previous barrier and acquires a new one, effectively kicking off a destruction
182186
/// cycle for all collected memos up to this point.
183187
pub(crate) fn memo_drop_barrier(&mut self) {
184-
if let Some(receiver) = self.memo_drop_receiver.get_mut().take() {
185-
tracing::trace!("Spawning memo drop thread");
186-
self.memo_drop_thread.0 = Some(spawn_memo_drop_thread(receiver));
188+
match &mut self.memo_drop_unparker {
189+
Some(UnparkOnDrop(memo_drop_unparker)) => {
190+
if let Some(receiver) = self.memo_drop_receiver.get_mut().take() {
191+
tracing::trace!("Spawning memo drop thread");
192+
self.memo_drop_thread.0 = Some(spawn_memo_drop_thread(receiver));
193+
}
194+
let parker = Parker::new();
195+
mem::replace(memo_drop_unparker, parker.unparker().clone()).unpark();
196+
self.memo_drop_sender.park(parker);
197+
}
198+
None => run_memo_drops(self.memo_drop_receiver.get_mut().as_mut().unwrap()),
187199
}
188-
let parker = Parker::new();
189-
mem::replace(&mut self.memo_drop_unparker.0, parker.unparker().clone()).unpark();
190-
self.memo_drop_sender.park(parker);
191200
}
192201

193202
/// Block until `other_id` completes executing `database_key`, or return `BlockResult::Cycle`

src/storage.rs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,14 +36,17 @@ impl<Db> Clone for StorageHandle<Db> {
3636

3737
impl<Db: Database> Default for StorageHandle<Db> {
3838
fn default() -> Self {
39-
Self::new(None)
39+
Self::new(false, None)
4040
}
4141
}
4242

4343
impl<Db: Database> StorageHandle<Db> {
44-
pub fn new(event_callback: Option<Box<dyn Fn(crate::Event) + Send + Sync + 'static>>) -> Self {
44+
pub fn new(
45+
drop_in_thread: bool,
46+
event_callback: Option<Box<dyn Fn(crate::Event) + Send + Sync + 'static>>,
47+
) -> Self {
4548
Self {
46-
zalsa_impl: Arc::new(Zalsa::new::<Db>(event_callback)),
49+
zalsa_impl: Arc::new(Zalsa::new::<Db>(drop_in_thread, event_callback)),
4750
coordinate: CoordinateDrop(Arc::new(Coordinate {
4851
clones: Mutex::new(1),
4952
cvar: Default::default(),
@@ -100,17 +103,20 @@ impl RefUnwindSafe for Coordinate {}
100103

101104
impl<Db: Database> Default for Storage<Db> {
102105
fn default() -> Self {
103-
Self::new(None)
106+
Self::new(false, None)
104107
}
105108
}
106109

107110
impl<Db: Database> Storage<Db> {
108111
/// Create a new database storage.
109112
///
110113
/// The `event_callback` function is invoked by the salsa runtime at various points during execution.
111-
pub fn new(event_callback: Option<Box<dyn Fn(crate::Event) + Send + Sync + 'static>>) -> Self {
114+
pub fn new(
115+
drop_in_thread: bool,
116+
event_callback: Option<Box<dyn Fn(crate::Event) + Send + Sync + 'static>>,
117+
) -> Self {
112118
Self {
113-
handle: StorageHandle::new(event_callback),
119+
handle: StorageHandle::new(drop_in_thread, event_callback),
114120
zalsa_local: ZalsaLocal::new(),
115121
}
116122
}

src/table/memo.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,14 @@ pub fn spawn_memo_drop_thread(receiver: MemoDropReceiver) -> thread::JoinHandle<
9595
})
9696
}
9797

98+
pub fn run_memo_drops(receiver: &MemoDropReceiver) {
99+
receiver.0.try_iter().for_each(|e| match e {
100+
MemoDropAction::Drop(memo_drop) => drop(memo_drop),
101+
MemoDropAction::EvictValue(memo_clear) => memo_clear.clear(),
102+
MemoDropAction::Park(_) => panic!("tried to park in `run_memo_drops`"),
103+
});
104+
}
105+
98106
pub fn memo_drop_channel() -> (MemoDropSender, MemoDropReceiver) {
99107
let (tx, rx) = std::sync::mpsc::channel();
100108
(MemoDropSender(tx), MemoDropReceiver(rx))

src/zalsa.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ impl RefUnwindSafe for Zalsa {}
162162

163163
impl Zalsa {
164164
pub(crate) fn new<Db: Database>(
165+
drop_in_thread: bool,
165166
event_callback: Option<Box<dyn Fn(crate::Event) + Send + Sync + 'static>>,
166167
) -> Self {
167168
Self {
@@ -171,7 +172,7 @@ impl Zalsa {
171172
ingredient_to_id_struct_type_id_map: Default::default(),
172173
ingredients_vec: boxcar::Vec::new(),
173174
ingredients_requiring_reset: boxcar::Vec::new(),
174-
runtime: Runtime::default(),
175+
runtime: Runtime::new(drop_in_thread),
175176
memo_ingredient_indices: Default::default(),
176177
event_callback,
177178
}

tests/common/mod.rs

Lines changed: 32 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -83,10 +83,13 @@ impl Default for EventLoggerDatabase {
8383
fn default() -> Self {
8484
let logger = Logger::default();
8585
Self {
86-
storage: Storage::new(Some(Box::new({
87-
let logger = logger.clone();
88-
move |event| logger.push_log(format!("{:?}", event.kind))
89-
}))),
86+
storage: Storage::new(
87+
false,
88+
Some(Box::new({
89+
let logger = logger.clone();
90+
move |event| logger.push_log(format!("{:?}", event.kind))
91+
})),
92+
),
9093
logger,
9194
}
9295
}
@@ -112,16 +115,19 @@ impl Default for DiscardLoggerDatabase {
112115
fn default() -> Self {
113116
let logger = Logger::default();
114117
Self {
115-
storage: Storage::new(Some(Box::new({
116-
let logger = logger.clone();
117-
move |event| match event.kind {
118-
salsa::EventKind::WillDiscardStaleOutput { .. }
119-
| salsa::EventKind::DidDiscard { .. } => {
120-
logger.push_log(format!("salsa_event({:?})", event.kind));
118+
storage: Storage::new(
119+
false,
120+
Some(Box::new({
121+
let logger = logger.clone();
122+
move |event| match event.kind {
123+
salsa::EventKind::WillDiscardStaleOutput { .. }
124+
| salsa::EventKind::DidDiscard { .. } => {
125+
logger.push_log(format!("salsa_event({:?})", event.kind));
126+
}
127+
_ => {}
121128
}
122-
_ => {}
123-
}
124-
}))),
129+
})),
130+
),
125131
logger,
126132
}
127133
}
@@ -147,17 +153,20 @@ impl Default for ExecuteValidateLoggerDatabase {
147153
fn default() -> Self {
148154
let logger = Logger::default();
149155
Self {
150-
storage: Storage::new(Some(Box::new({
151-
let logger = logger.clone();
152-
move |event| match event.kind {
153-
salsa::EventKind::WillExecute { .. }
154-
| salsa::EventKind::WillIterateCycle { .. }
155-
| salsa::EventKind::DidValidateMemoizedValue { .. } => {
156-
logger.push_log(format!("salsa_event({:?})", event.kind));
156+
storage: Storage::new(
157+
false,
158+
Some(Box::new({
159+
let logger = logger.clone();
160+
move |event| match event.kind {
161+
salsa::EventKind::WillExecute { .. }
162+
| salsa::EventKind::WillIterateCycle { .. }
163+
| salsa::EventKind::DidValidateMemoizedValue { .. } => {
164+
logger.push_log(format!("salsa_event({:?})", event.kind));
165+
}
166+
_ => {}
157167
}
158-
_ => {}
159-
}
160-
}))),
168+
})),
169+
),
161170
logger,
162171
}
163172
}

tests/cycle_output.rs

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -87,19 +87,22 @@ impl Default for Database {
8787
fn default() -> Self {
8888
let logger = Logger::default();
8989
Self {
90-
storage: Storage::new(Some(Box::new({
91-
let logger = logger.clone();
92-
move |event| match event.kind {
93-
salsa::EventKind::WillExecute { .. }
94-
| salsa::EventKind::DidValidateMemoizedValue { .. } => {
95-
logger.push_log(format!("salsa_event({:?})", event.kind));
90+
storage: Storage::new(
91+
false,
92+
Some(Box::new({
93+
let logger = logger.clone();
94+
move |event| match event.kind {
95+
salsa::EventKind::WillExecute { .. }
96+
| salsa::EventKind::DidValidateMemoizedValue { .. } => {
97+
logger.push_log(format!("salsa_event({:?})", event.kind));
98+
}
99+
salsa::EventKind::WillCheckCancellation => {}
100+
_ => {
101+
logger.push_log(format!("salsa_event({:?})", event.kind));
102+
}
96103
}
97-
salsa::EventKind::WillCheckCancellation => {}
98-
_ => {
99-
logger.push_log(format!("salsa_event({:?})", event.kind));
100-
}
101-
}
102-
}))),
104+
})),
105+
),
103106
logger,
104107
input: Default::default(),
105108
}

tests/parallel/setup.rs

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -66,20 +66,23 @@ impl Default for Knobs {
6666
let signal_on_did_cancel = Arc::new(AtomicUsize::new(0));
6767

6868
Self {
69-
storage: Storage::new(Some(Box::new({
70-
let signal = signal.clone();
71-
let signal_on_will_block = signal_on_will_block.clone();
72-
let signal_on_did_cancel = signal_on_did_cancel.clone();
73-
move |event| match event.kind {
74-
salsa::EventKind::WillBlockOn { .. } => {
75-
signal.signal(signal_on_will_block.load(Ordering::Acquire));
69+
storage: Storage::new(
70+
false,
71+
Some(Box::new({
72+
let signal = signal.clone();
73+
let signal_on_will_block = signal_on_will_block.clone();
74+
let signal_on_did_cancel = signal_on_did_cancel.clone();
75+
move |event| match event.kind {
76+
salsa::EventKind::WillBlockOn { .. } => {
77+
signal.signal(signal_on_will_block.load(Ordering::Acquire));
78+
}
79+
salsa::EventKind::DidSetCancellationFlag => {
80+
signal.signal(signal_on_did_cancel.load(Ordering::Acquire));
81+
}
82+
_ => {}
7683
}
77-
salsa::EventKind::DidSetCancellationFlag => {
78-
signal.signal(signal_on_did_cancel.load(Ordering::Acquire));
79-
}
80-
_ => {}
81-
}
82-
}))),
84+
})),
85+
),
8386
signal,
8487
signal_on_will_block,
8588
signal_on_did_cancel,

0 commit comments

Comments
 (0)