1
1
use std:: {
2
2
borrow:: Cow ,
3
- cmp:: min,
3
+ cmp:: { max , min} ,
4
4
fs:: File ,
5
5
io:: { self , BufWriter , Seek , Write } ,
6
6
path:: Path ,
@@ -45,6 +45,8 @@ const MIN_VALUE_COMPRESSION_SAMPLES_SIZE: usize = 1024;
45
45
const MIN_KEY_COMPRESSION_SAMPLES_SIZE : usize = 1024 ;
46
46
/// The bytes that are used per key/value entry for a sample.
47
47
const COMPRESSION_DICTIONARY_SAMPLE_PER_ENTRY : usize = 100 ;
48
+ /// The minimum bytes that are used per key/value entry for a sample.
49
+ const MIN_COMPRESSION_DICTIONARY_SAMPLE_PER_ENTRY : usize = 16 ;
48
50
49
51
/// Trait for entries from that SST files can be created
50
52
pub trait Entry {
@@ -153,58 +155,75 @@ impl<'a> StaticSortedFileBuilder<'a> {
153
155
{
154
156
return Ok ( ( ) ) ;
155
157
}
156
- let key_compression_samples_size = min ( KEY_COMPRESSION_SAMPLES_SIZE , total_key_size / 10 ) ;
158
+ let key_compression_samples_size = min ( KEY_COMPRESSION_SAMPLES_SIZE , total_key_size / 16 ) ;
157
159
let value_compression_samples_size =
158
- min ( VALUE_COMPRESSION_SAMPLES_SIZE , total_value_size / 10 ) ;
160
+ min ( VALUE_COMPRESSION_SAMPLES_SIZE , total_value_size / 16 ) ;
159
161
let mut value_samples = Vec :: with_capacity ( value_compression_samples_size) ;
160
162
let mut value_sample_sizes = Vec :: new ( ) ;
161
163
let mut key_samples = Vec :: with_capacity ( key_compression_samples_size) ;
162
164
let mut key_sample_sizes = Vec :: new ( ) ;
163
- let mut i = 12345678 % entries. len ( ) ;
164
- let mut j = 0 ;
165
- loop {
166
- let entry = & entries[ i] ;
165
+
166
+ // Limit the number of iterations to avoid infinite loops
167
+ let max_iterations =
168
+ max ( total_key_size, total_value_size) / COMPRESSION_DICTIONARY_SAMPLE_PER_ENTRY * 2 ;
169
+ for i in 0 ..max_iterations {
170
+ let entry = & entries[ i % entries. len ( ) ] ;
167
171
let value_remaining = value_compression_samples_size - value_samples. len ( ) ;
168
- let key_remaining = key_compression_samples_size - key_samples. len ( ) ;
169
- if value_remaining > 0
170
- && let EntryValue :: Small { value } | EntryValue :: Medium { value } = entry. value ( )
171
- {
172
- let value = if value. len ( ) <= COMPRESSION_DICTIONARY_SAMPLE_PER_ENTRY {
173
- value
174
- } else {
175
- j = ( j + 12345678 ) % ( value. len ( ) - COMPRESSION_DICTIONARY_SAMPLE_PER_ENTRY ) ;
176
- & value[ j..j + COMPRESSION_DICTIONARY_SAMPLE_PER_ENTRY ]
177
- } ;
178
- if value. len ( ) <= value_remaining {
179
- value_sample_sizes. push ( value. len ( ) ) ;
180
- value_samples. extend_from_slice ( value) ;
181
- } else {
182
- value_sample_sizes. push ( value_remaining) ;
183
- value_samples. extend_from_slice ( & value[ ..value_remaining] ) ;
172
+ if value_remaining < MIN_COMPRESSION_DICTIONARY_SAMPLE_PER_ENTRY {
173
+ break ;
174
+ }
175
+ if let EntryValue :: Small { value } | EntryValue :: Medium { value } = entry. value ( ) {
176
+ let len = value. len ( ) ;
177
+ if len >= MIN_COMPRESSION_DICTIONARY_SAMPLE_PER_ENTRY {
178
+ let used_len = min ( value_remaining, COMPRESSION_DICTIONARY_SAMPLE_PER_ENTRY ) ;
179
+ if len <= used_len {
180
+ value_sample_sizes. push ( len) ;
181
+ value_samples. extend_from_slice ( value) ;
182
+ } else {
183
+ value_sample_sizes. push ( used_len) ;
184
+ let p = value_samples. len ( ) % ( len - used_len) ;
185
+ value_samples. extend_from_slice ( & value[ p..p + used_len] ) ;
186
+ } ;
184
187
}
185
188
}
186
- if key_remaining > 0 {
189
+ }
190
+ assert ! ( value_samples. len( ) == value_sample_sizes. iter( ) . sum:: <usize >( ) ) ;
191
+ if value_samples. len ( ) > MIN_VALUE_COMPRESSION_SAMPLES_SIZE && value_sample_sizes. len ( ) > 5
192
+ {
193
+ self . value_compression_dictionary = zstd:: dict:: from_continuous (
194
+ & value_samples,
195
+ & value_sample_sizes,
196
+ VALUE_COMPRESSION_DICTIONARY_SIZE ,
197
+ )
198
+ . context ( "Value dictionary creation failed" ) ?;
199
+ } else {
200
+ self . value_compression_dictionary = Vec :: new ( ) ;
201
+ }
202
+
203
+ for i in 0 ..max_iterations {
204
+ let entry = & entries[ i % entries. len ( ) ] ;
205
+ let key_remaining = key_compression_samples_size - key_samples. len ( ) ;
206
+ if key_remaining < MIN_COMPRESSION_DICTIONARY_SAMPLE_PER_ENTRY {
207
+ break ;
208
+ }
209
+ let len = entry. key_len ( ) ;
210
+ if len >= MIN_COMPRESSION_DICTIONARY_SAMPLE_PER_ENTRY {
187
211
let used_len = min ( key_remaining, COMPRESSION_DICTIONARY_SAMPLE_PER_ENTRY ) ;
188
- if entry . key_len ( ) <= used_len {
189
- key_sample_sizes. push ( entry . key_len ( ) ) ;
212
+ if len <= used_len {
213
+ key_sample_sizes. push ( len ) ;
190
214
entry. write_key_to ( & mut key_samples) ;
191
215
} else {
192
- let mut temp = Vec :: with_capacity ( entry . key_len ( ) ) ;
216
+ let mut temp = Vec :: with_capacity ( len ) ;
193
217
entry. write_key_to ( & mut temp) ;
194
- debug_assert ! ( temp. len( ) == entry . key_len ( ) ) ;
218
+ debug_assert ! ( temp. len( ) == len ) ;
195
219
196
- j = ( j + 12345678 ) % ( temp . len ( ) - used_len) ;
220
+ let p = key_samples . len ( ) % ( len - used_len) ;
197
221
key_sample_sizes. push ( used_len) ;
198
- key_samples. extend_from_slice ( & temp[ j..j + used_len] ) ;
222
+ key_samples. extend_from_slice ( & temp[ p..p + used_len] ) ;
199
223
}
200
224
}
201
- if key_remaining == 0 && value_remaining == 0 {
202
- break ;
203
- }
204
- i = ( i + 12345678 ) % entries. len ( ) ;
205
225
}
206
226
assert ! ( key_samples. len( ) == key_sample_sizes. iter( ) . sum:: <usize >( ) ) ;
207
- assert ! ( value_samples. len( ) == value_sample_sizes. iter( ) . sum:: <usize >( ) ) ;
208
227
if key_samples. len ( ) > MIN_KEY_COMPRESSION_SAMPLES_SIZE && key_sample_sizes. len ( ) > 5 {
209
228
self . key_compression_dictionary = zstd:: dict:: from_continuous (
210
229
& key_samples,
@@ -213,15 +232,6 @@ impl<'a> StaticSortedFileBuilder<'a> {
213
232
)
214
233
. context ( "Key dictionary creation failed" ) ?;
215
234
}
216
- if value_samples. len ( ) > MIN_VALUE_COMPRESSION_SAMPLES_SIZE && value_sample_sizes. len ( ) > 5
217
- {
218
- self . value_compression_dictionary = zstd:: dict:: from_continuous (
219
- & value_samples,
220
- & value_sample_sizes,
221
- VALUE_COMPRESSION_DICTIONARY_SIZE ,
222
- )
223
- . context ( "Value dictionary creation failed" ) ?;
224
- }
225
235
Ok ( ( ) )
226
236
}
227
237
0 commit comments