@@ -30,14 +30,18 @@ const COMPACT_CONFIG: CompactConfig = CompactConfig {
3030pub struct TurboKeyValueDatabase {
3131 db : Arc < TurboPersistence > ,
3232 compact_join_handle : Mutex < Option < JoinHandle < Result < ( ) > > > > ,
33+ is_ci : bool ,
34+ is_short_session : bool ,
3335}
3436
3537impl TurboKeyValueDatabase {
36- pub fn new ( versioned_path : PathBuf ) -> Result < Self > {
38+ pub fn new ( versioned_path : PathBuf , is_ci : bool , is_short_session : bool ) -> Result < Self > {
3739 let db = Arc :: new ( TurboPersistence :: open ( versioned_path) ?) ;
3840 let mut this = Self {
3941 db : db. clone ( ) ,
4042 compact_join_handle : Mutex :: new ( None ) ,
43+ is_ci,
44+ is_short_session,
4145 } ;
4246 // start compaction in background if the database is not empty
4347 if !db. is_empty ( ) {
@@ -98,8 +102,8 @@ impl KeyValueDatabase for TurboKeyValueDatabase {
98102 Ok ( WriteBatch :: concurrent ( TurboWriteBatch {
99103 batch : self . db . write_batch ( ) ?,
100104 db : & self . db ,
101- compact_join_handle : & self . compact_join_handle ,
102- initial_write : self . db . is_empty ( ) ,
105+ compact_join_handle : ( ! self . is_short_session && ! self . db . is_empty ( ) )
106+ . then_some ( & self . compact_join_handle ) ,
103107 } ) )
104108 }
105109
@@ -110,6 +114,16 @@ impl KeyValueDatabase for TurboKeyValueDatabase {
110114 if let Some ( join_handle) = self . compact_join_handle . lock ( ) . take ( ) {
111115 join_handle. join ( ) . unwrap ( ) ?;
112116 }
117+ // Compact the database on shutdown
118+ self . db . compact ( & CompactConfig {
119+ max_merge_segment_count : if self . is_ci {
120+ // Fully compact in CI to reduce cache size
121+ usize:: MAX
122+ } else {
123+ available_parallelism ( ) . map_or ( 4 , |c| max ( 4 , c. get ( ) ) )
124+ } ,
125+ ..COMPACT_CONFIG
126+ } ) ?;
113127 // Shutdown the database
114128 self . db . shutdown ( )
115129 }
@@ -118,8 +132,7 @@ impl KeyValueDatabase for TurboKeyValueDatabase {
118132pub struct TurboWriteBatch < ' a > {
119133 batch : turbo_persistence:: WriteBatch < WriteBuffer < ' static > , 5 > ,
120134 db : & ' a Arc < TurboPersistence > ,
121- compact_join_handle : & ' a Mutex < Option < JoinHandle < Result < ( ) > > > > ,
122- initial_write : bool ,
135+ compact_join_handle : Option < & ' a Mutex < Option < JoinHandle < Result < ( ) > > > > > ,
123136}
124137
125138impl < ' a > BaseWriteBatch < ' a > for TurboWriteBatch < ' a > {
@@ -140,7 +153,7 @@ impl<'a> BaseWriteBatch<'a> for TurboWriteBatch<'a> {
140153 // Commit the write batch
141154 self . db . commit_write_batch ( self . batch ) ?;
142155
143- if ! self . initial_write {
156+ if let Some ( compact_join_handle ) = self . compact_join_handle {
144157 // Start a new compaction in the background
145158 let db = self . db . clone ( ) ;
146159 let handle = spawn ( move || {
@@ -150,7 +163,7 @@ impl<'a> BaseWriteBatch<'a> for TurboWriteBatch<'a> {
150163 ..COMPACT_CONFIG
151164 } )
152165 } ) ;
153- self . compact_join_handle . lock ( ) . replace ( handle) ;
166+ compact_join_handle. lock ( ) . replace ( handle) ;
154167 }
155168
156169 Ok ( ( ) )
0 commit comments