@@ -30,14 +30,18 @@ const COMPACT_CONFIG: CompactConfig = CompactConfig {
30
30
pub struct TurboKeyValueDatabase {
31
31
db : Arc < TurboPersistence > ,
32
32
compact_join_handle : Mutex < Option < JoinHandle < Result < ( ) > > > > ,
33
+ is_ci : bool ,
34
+ is_short_session : bool ,
33
35
}
34
36
35
37
impl TurboKeyValueDatabase {
36
- pub fn new ( versioned_path : PathBuf ) -> Result < Self > {
38
+ pub fn new ( versioned_path : PathBuf , is_ci : bool , is_short_session : bool ) -> Result < Self > {
37
39
let db = Arc :: new ( TurboPersistence :: open ( versioned_path) ?) ;
38
40
let mut this = Self {
39
41
db : db. clone ( ) ,
40
42
compact_join_handle : Mutex :: new ( None ) ,
43
+ is_ci,
44
+ is_short_session,
41
45
} ;
42
46
// start compaction in background if the database is not empty
43
47
if !db. is_empty ( ) {
@@ -98,8 +102,8 @@ impl KeyValueDatabase for TurboKeyValueDatabase {
98
102
Ok ( WriteBatch :: concurrent ( TurboWriteBatch {
99
103
batch : self . db . write_batch ( ) ?,
100
104
db : & self . db ,
101
- compact_join_handle : & self . compact_join_handle ,
102
- initial_write : self . db . is_empty ( ) ,
105
+ compact_join_handle : ( ! self . is_short_session && ! self . db . is_empty ( ) )
106
+ . then_some ( & self . compact_join_handle ) ,
103
107
} ) )
104
108
}
105
109
@@ -110,6 +114,16 @@ impl KeyValueDatabase for TurboKeyValueDatabase {
110
114
if let Some ( join_handle) = self . compact_join_handle . lock ( ) . take ( ) {
111
115
join_handle. join ( ) . unwrap ( ) ?;
112
116
}
117
+ // Compact the database on shutdown
118
+ self . db . compact ( & CompactConfig {
119
+ max_merge_segment_count : if self . is_ci {
120
+ // Fully compact in CI to reduce cache size
121
+ usize:: MAX
122
+ } else {
123
+ available_parallelism ( ) . map_or ( 4 , |c| max ( 4 , c. get ( ) ) )
124
+ } ,
125
+ ..COMPACT_CONFIG
126
+ } ) ?;
113
127
// Shutdown the database
114
128
self . db . shutdown ( )
115
129
}
@@ -118,8 +132,7 @@ impl KeyValueDatabase for TurboKeyValueDatabase {
118
132
pub struct TurboWriteBatch < ' a > {
119
133
batch : turbo_persistence:: WriteBatch < WriteBuffer < ' static > , 5 > ,
120
134
db : & ' a Arc < TurboPersistence > ,
121
- compact_join_handle : & ' a Mutex < Option < JoinHandle < Result < ( ) > > > > ,
122
- initial_write : bool ,
135
+ compact_join_handle : Option < & ' a Mutex < Option < JoinHandle < Result < ( ) > > > > > ,
123
136
}
124
137
125
138
impl < ' a > BaseWriteBatch < ' a > for TurboWriteBatch < ' a > {
@@ -140,7 +153,7 @@ impl<'a> BaseWriteBatch<'a> for TurboWriteBatch<'a> {
140
153
// Commit the write batch
141
154
self . db . commit_write_batch ( self . batch ) ?;
142
155
143
- if ! self . initial_write {
156
+ if let Some ( compact_join_handle ) = self . compact_join_handle {
144
157
// Start a new compaction in the background
145
158
let db = self . db . clone ( ) ;
146
159
let handle = spawn ( move || {
@@ -150,7 +163,7 @@ impl<'a> BaseWriteBatch<'a> for TurboWriteBatch<'a> {
150
163
..COMPACT_CONFIG
151
164
} )
152
165
} ) ;
153
- self . compact_join_handle . lock ( ) . replace ( handle) ;
166
+ compact_join_handle. lock ( ) . replace ( handle) ;
154
167
}
155
168
156
169
Ok ( ( ) )
0 commit comments