Skip to content

Commit 7cd707a

Browse files
committed
use callback lock to watch internal state
1 parent ff35704 commit 7cd707a

File tree

1 file changed

+61
-27
lines changed

1 file changed

+61
-27
lines changed

src/store/bao_file.rs

+61-27
Original file line numberDiff line numberDiff line change
@@ -340,12 +340,18 @@ impl<T> BaoFileHandleWeak<T> {
340340
}
341341
}
342342

343+
/// a type alias which represents the callback which is executed after
344+
/// the write guard is dropped
345+
type AfterLockWriteCb<T> = Box<dyn Fn(&BaoFileStorage<T>) + Send + Sync + 'static>;
346+
343347
/// The inner part of a bao file handle.
344348
#[derive(Debug)]
345349
pub struct BaoFileHandleInner<T> {
346-
pub(crate) storage: tokio::sync::RwLock<BaoFileStorage<T>>,
350+
pub(crate) storage:
351+
crate::util::callback_lock::CallbackLock<BaoFileStorage<T>, AfterLockWriteCb<T>>,
347352
config: Arc<BaoFileConfig>,
348353
hash: Hash,
354+
rx: tokio::sync::watch::Receiver<StorageMeta>,
349355
}
350356

351357
/// A cheaply cloneable handle to a bao file, including the hash and the configuration.
@@ -511,21 +517,55 @@ enum HandleChange {
511517
// later: size verified
512518
}
513519

520+
/// struct which stores simple metadata about the [BaoFileHandle] in a way that is
521+
/// accessible in synchronous function calls
522+
#[derive(Debug)]
523+
struct StorageMeta {
524+
complete: bool,
525+
size: Result<u64, io::Error>,
526+
}
527+
528+
impl StorageMeta {
529+
fn new<T: bao_tree::io::sync::ReadAt>(storage: &BaoFileStorage<T>) -> Self {
530+
let size = match storage {
531+
BaoFileStorage::Complete(mem) => Ok(mem.data_size()),
532+
BaoFileStorage::IncompleteMem(mem) => Ok(mem.current_size()),
533+
BaoFileStorage::IncompleteFile(file) => file.current_size(),
534+
};
535+
StorageMeta {
536+
complete: matches!(storage, BaoFileStorage::Complete(_)),
537+
size,
538+
}
539+
}
540+
}
541+
514542
impl<T> BaoFileHandle<T>
515543
where
516544
T: bao_tree::io::sync::ReadAt,
517545
{
546+
/// internal helper function to initialize a new instance of self
547+
fn new_inner(storage: BaoFileStorage<T>, config: Arc<BaoFileConfig>, hash: Hash) -> Self {
548+
let (tx, rx) = tokio::sync::watch::channel(StorageMeta::new(&storage));
549+
Self(Arc::new(BaoFileHandleInner {
550+
storage: crate::util::callback_lock::CallbackLock::new(
551+
storage,
552+
Box::new(move |storage: &BaoFileStorage<T>| {
553+
let _ = tx.send(StorageMeta::new(storage));
554+
}),
555+
),
556+
config,
557+
hash,
558+
rx,
559+
}))
560+
}
561+
518562
/// Create a new bao file handle.
519563
///
520564
/// This will create a new file handle with an empty memory storage.
521565
/// Since there are very likely to be many of these, we use an arc rwlock
522566
pub fn incomplete_mem(config: Arc<BaoFileConfig>, hash: Hash) -> Self {
523567
let storage = BaoFileStorage::incomplete_mem();
524-
Self(Arc::new(BaoFileHandleInner {
525-
storage: tokio::sync::RwLock::new(storage),
526-
config,
527-
hash,
528-
}))
568+
Self::new_inner(storage, config, hash)
529569
}
530570

531571
/// Create a new bao file handle with a partial file.
@@ -536,11 +576,7 @@ where
536576
outboard: create_read_write(&paths.outboard)?,
537577
sizes: create_read_write(&paths.sizes)?,
538578
});
539-
Ok(Self(Arc::new(BaoFileHandleInner {
540-
storage: tokio::sync::RwLock::new(storage),
541-
config,
542-
hash,
543-
})))
579+
Ok(Self::new_inner(storage, config, hash))
544580
}
545581

546582
/// Create a new complete bao file handle.
@@ -551,11 +587,7 @@ where
551587
outboard: MemOrFile<Bytes, FileAndSize<T>>,
552588
) -> Self {
553589
let storage = BaoFileStorage::Complete(CompleteStorage { data, outboard });
554-
Self(Arc::new(BaoFileHandleInner {
555-
storage: tokio::sync::RwLock::new(storage),
556-
config,
557-
hash,
558-
}))
590+
Self::new_inner(storage, config, hash)
559591
}
560592

561593
/// Transform the storage in place. If the transform fails, the storage will
@@ -573,10 +605,7 @@ where
573605

574606
/// True if the file is complete.
575607
pub fn is_complete(&self) -> bool {
576-
matches!(
577-
self.storage.try_read().unwrap().deref(),
578-
BaoFileStorage::Complete(_)
579-
)
608+
self.rx.borrow().deref().complete
580609
}
581610

582611
/// An AsyncSliceReader for the data file.
@@ -596,18 +625,23 @@ where
596625
}
597626

598627
/// The most precise known total size of the data file.
599-
pub fn current_size(&self) -> io::Result<u64> {
600-
match self.storage.try_read().unwrap().deref() {
601-
BaoFileStorage::Complete(mem) => Ok(mem.data_size()),
602-
BaoFileStorage::IncompleteMem(mem) => Ok(mem.current_size()),
603-
BaoFileStorage::IncompleteFile(file) => file.current_size(),
604-
}
628+
pub fn current_size(&self) -> Result<u64, io::ErrorKind> {
629+
self.rx
630+
.borrow()
631+
.size
632+
.as_ref()
633+
.map_err(|e| e.kind())
634+
.copied()
605635
}
606636

607637
/// The outboard for the file.
608638
pub fn outboard(&self) -> io::Result<PreOrderOutboard<OutboardReader<T>>> {
609639
let root = self.hash.into();
610-
let tree = BaoTree::new(self.current_size()?, IROH_BLOCK_SIZE);
640+
let tree = BaoTree::new(
641+
self.current_size()
642+
.map_err(|kind| io::Error::new(kind, "an io error has occurred"))?,
643+
IROH_BLOCK_SIZE,
644+
);
611645
let outboard = self.outboard_reader();
612646
Ok(PreOrderOutboard {
613647
root,

0 commit comments

Comments
 (0)