@@ -340,12 +340,18 @@ impl<T> BaoFileHandleWeak<T> {
340
340
}
341
341
}
342
342
343
+ /// a type alias which represents the callback which is executed after
344
+ /// the write guard is dropped
345
+ type AfterLockWriteCb < T > = Box < dyn Fn ( & BaoFileStorage < T > ) + Send + Sync + ' static > ;
346
+
343
347
/// The inner part of a bao file handle.
344
348
#[ derive( Debug ) ]
345
349
pub struct BaoFileHandleInner < T > {
346
- pub ( crate ) storage : tokio:: sync:: RwLock < BaoFileStorage < T > > ,
350
+ pub ( crate ) storage :
351
+ crate :: util:: callback_lock:: CallbackLock < BaoFileStorage < T > , AfterLockWriteCb < T > > ,
347
352
config : Arc < BaoFileConfig > ,
348
353
hash : Hash ,
354
+ rx : tokio:: sync:: watch:: Receiver < StorageMeta > ,
349
355
}
350
356
351
357
/// A cheaply cloneable handle to a bao file, including the hash and the configuration.
@@ -511,21 +517,55 @@ enum HandleChange {
511
517
// later: size verified
512
518
}
513
519
520
+ /// struct which stores simple metadata about the [BaoFileHandle] in a way that is
521
+ /// accessible in synchronous function calls
522
+ #[ derive( Debug ) ]
523
+ struct StorageMeta {
524
+ complete : bool ,
525
+ size : Result < u64 , io:: Error > ,
526
+ }
527
+
528
+ impl StorageMeta {
529
+ fn new < T : bao_tree:: io:: sync:: ReadAt > ( storage : & BaoFileStorage < T > ) -> Self {
530
+ let size = match storage {
531
+ BaoFileStorage :: Complete ( mem) => Ok ( mem. data_size ( ) ) ,
532
+ BaoFileStorage :: IncompleteMem ( mem) => Ok ( mem. current_size ( ) ) ,
533
+ BaoFileStorage :: IncompleteFile ( file) => file. current_size ( ) ,
534
+ } ;
535
+ StorageMeta {
536
+ complete : matches ! ( storage, BaoFileStorage :: Complete ( _) ) ,
537
+ size,
538
+ }
539
+ }
540
+ }
541
+
514
542
impl < T > BaoFileHandle < T >
515
543
where
516
544
T : bao_tree:: io:: sync:: ReadAt ,
517
545
{
546
+ /// internal helper function to initialize a new instance of self
547
+ fn new_inner ( storage : BaoFileStorage < T > , config : Arc < BaoFileConfig > , hash : Hash ) -> Self {
548
+ let ( tx, rx) = tokio:: sync:: watch:: channel ( StorageMeta :: new ( & storage) ) ;
549
+ Self ( Arc :: new ( BaoFileHandleInner {
550
+ storage : crate :: util:: callback_lock:: CallbackLock :: new (
551
+ storage,
552
+ Box :: new ( move |storage : & BaoFileStorage < T > | {
553
+ let _ = tx. send ( StorageMeta :: new ( storage) ) ;
554
+ } ) ,
555
+ ) ,
556
+ config,
557
+ hash,
558
+ rx,
559
+ } ) )
560
+ }
561
+
518
562
/// Create a new bao file handle.
519
563
///
520
564
/// This will create a new file handle with an empty memory storage.
521
565
/// Since there are very likely to be many of these, we use an arc rwlock
522
566
pub fn incomplete_mem ( config : Arc < BaoFileConfig > , hash : Hash ) -> Self {
523
567
let storage = BaoFileStorage :: incomplete_mem ( ) ;
524
- Self ( Arc :: new ( BaoFileHandleInner {
525
- storage : tokio:: sync:: RwLock :: new ( storage) ,
526
- config,
527
- hash,
528
- } ) )
568
+ Self :: new_inner ( storage, config, hash)
529
569
}
530
570
531
571
/// Create a new bao file handle with a partial file.
@@ -536,11 +576,7 @@ where
536
576
outboard : create_read_write ( & paths. outboard ) ?,
537
577
sizes : create_read_write ( & paths. sizes ) ?,
538
578
} ) ;
539
- Ok ( Self ( Arc :: new ( BaoFileHandleInner {
540
- storage : tokio:: sync:: RwLock :: new ( storage) ,
541
- config,
542
- hash,
543
- } ) ) )
579
+ Ok ( Self :: new_inner ( storage, config, hash) )
544
580
}
545
581
546
582
/// Create a new complete bao file handle.
@@ -551,11 +587,7 @@ where
551
587
outboard : MemOrFile < Bytes , FileAndSize < T > > ,
552
588
) -> Self {
553
589
let storage = BaoFileStorage :: Complete ( CompleteStorage { data, outboard } ) ;
554
- Self ( Arc :: new ( BaoFileHandleInner {
555
- storage : tokio:: sync:: RwLock :: new ( storage) ,
556
- config,
557
- hash,
558
- } ) )
590
+ Self :: new_inner ( storage, config, hash)
559
591
}
560
592
561
593
/// Transform the storage in place. If the transform fails, the storage will
@@ -573,10 +605,7 @@ where
573
605
574
606
/// True if the file is complete.
575
607
pub fn is_complete ( & self ) -> bool {
576
- matches ! (
577
- self . storage. try_read( ) . unwrap( ) . deref( ) ,
578
- BaoFileStorage :: Complete ( _)
579
- )
608
+ self . rx . borrow ( ) . deref ( ) . complete
580
609
}
581
610
582
611
/// An AsyncSliceReader for the data file.
@@ -596,18 +625,25 @@ where
596
625
}
597
626
598
627
/// The most precise known total size of the data file.
599
- pub fn current_size ( & self ) -> io:: Result < u64 > {
600
- match self . storage . try_read ( ) . unwrap ( ) . deref ( ) {
601
- BaoFileStorage :: Complete ( mem) => Ok ( mem. data_size ( ) ) ,
602
- BaoFileStorage :: IncompleteMem ( mem) => Ok ( mem. current_size ( ) ) ,
603
- BaoFileStorage :: IncompleteFile ( file) => file. current_size ( ) ,
604
- }
628
+ pub fn current_size ( & self ) -> Result < u64 , io:: ErrorKind > {
629
+ self . rx
630
+ . borrow ( )
631
+ . size
632
+ . as_ref ( )
633
+ // NB: we return the io::ErrorKind here
634
+ // because io::Error is !Clone
635
+ . map_err ( |e| e. kind ( ) )
636
+ . copied ( )
605
637
}
606
638
607
639
/// The outboard for the file.
608
640
pub fn outboard ( & self ) -> io:: Result < PreOrderOutboard < OutboardReader < T > > > {
609
641
let root = self . hash . into ( ) ;
610
- let tree = BaoTree :: new ( self . current_size ( ) ?, IROH_BLOCK_SIZE ) ;
642
+ let tree = BaoTree :: new (
643
+ self . current_size ( )
644
+ . map_err ( |kind| io:: Error :: new ( kind, "an io error has occurred" ) ) ?,
645
+ IROH_BLOCK_SIZE ,
646
+ ) ;
611
647
let outboard = self . outboard_reader ( ) ;
612
648
Ok ( PreOrderOutboard {
613
649
root,
0 commit comments