Skip to content

Commit 3af326d

Browse files
authored
feat: new fuse table option enable_parquet_dictionary (#17675)
* feat: new table option `FUSE_OPT_KEY_ENABLE_PARQUET_ENCODING` When this option is set to true, fuse table with parquet storage format will enable encoding and dictionary during serialization. The defult encoding policy of paquet-rs will be used. * resolve rebase conflicts * cargo fmt * refactor: rename tbl option to enable_parquet_dictionary * refacotr: make streaming block writter aware of table's dictionary options * chore: refine logic test * chore: refine code comments
1 parent d99a230 commit 3af326d

File tree

18 files changed

+216
-24
lines changed

18 files changed

+216
-24
lines changed

src/query/formats/src/output_format/parquet.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,15 @@ impl OutputFormat for ParquetOutputFormat {
5353
return Ok(vec![]);
5454
}
5555
let mut buf = Vec::with_capacity(DEFAULT_BLOCK_BUFFER_SIZE);
56-
let _ = blocks_to_parquet(&self.schema, blocks, &mut buf, TableCompression::Zstd, None)?;
56+
// While unloading data as parquet, enable dictionary unconditionally, usually this leads to smaller size.
57+
let _ = blocks_to_parquet(
58+
&self.schema,
59+
blocks,
60+
&mut buf,
61+
TableCompression::Zstd,
62+
true,
63+
None,
64+
)?;
5765
Ok(buf)
5866
}
5967
}

src/query/service/src/interpreters/common/table_option_validation.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ use databend_common_storages_fuse::FUSE_OPT_KEY_DATA_RETENTION_NUM_SNAPSHOTS_TO_
3232
use databend_common_storages_fuse::FUSE_OPT_KEY_DATA_RETENTION_PERIOD_IN_HOURS;
3333
use databend_common_storages_fuse::FUSE_OPT_KEY_ENABLE_AUTO_ANALYZE;
3434
use databend_common_storages_fuse::FUSE_OPT_KEY_ENABLE_AUTO_VACUUM;
35+
use databend_common_storages_fuse::FUSE_OPT_KEY_ENABLE_PARQUET_DICTIONARY;
3536
use databend_common_storages_fuse::FUSE_OPT_KEY_FILE_SIZE;
3637
use databend_common_storages_fuse::FUSE_OPT_KEY_ROW_AVG_DEPTH_THRESHOLD;
3738
use databend_common_storages_fuse::FUSE_OPT_KEY_ROW_PER_BLOCK;
@@ -89,6 +90,7 @@ pub static CREATE_FUSE_OPTIONS: LazyLock<HashSet<&'static str>> = LazyLock::new(
8990
r.insert(OPT_KEY_TEMP_PREFIX);
9091
r.insert(OPT_KEY_SEGMENT_FORMAT);
9192
r.insert(OPT_KEY_ENABLE_COPY_DEDUP_FULL_PATH);
93+
r.insert(FUSE_OPT_KEY_ENABLE_PARQUET_DICTIONARY);
9294
r
9395
});
9496

@@ -259,3 +261,19 @@ where
259261
}
260262
Ok(())
261263
}
264+
265+
pub fn is_valid_fuse_parquet_dictionary_opt(
266+
options: &BTreeMap<String, String>,
267+
) -> databend_common_exception::Result<()> {
268+
is_valid_bool_opt(FUSE_OPT_KEY_ENABLE_PARQUET_DICTIONARY, options)
269+
}
270+
271+
fn is_valid_bool_opt(
272+
key: &str,
273+
options: &BTreeMap<String, String>,
274+
) -> databend_common_exception::Result<()> {
275+
if let Some(value) = options.get(key) {
276+
value.parse::<bool>()?;
277+
}
278+
Ok(())
279+
}

src/query/service/src/interpreters/interpreter_table_create.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ use crate::interpreters::common::table_option_validation::is_valid_bloom_index_c
7171
use crate::interpreters::common::table_option_validation::is_valid_change_tracking;
7272
use crate::interpreters::common::table_option_validation::is_valid_create_opt;
7373
use crate::interpreters::common::table_option_validation::is_valid_data_retention_period;
74+
use crate::interpreters::common::table_option_validation::is_valid_fuse_parquet_dictionary_opt;
7475
use crate::interpreters::common::table_option_validation::is_valid_option_of_type;
7576
use crate::interpreters::common::table_option_validation::is_valid_random_seed;
7677
use crate::interpreters::common::table_option_validation::is_valid_row_per_block;
@@ -455,6 +456,8 @@ impl CreateTableInterpreter {
455456
is_valid_random_seed(&table_meta.options)?;
456457
// check table level data_retention_period_in_hours
457458
is_valid_data_retention_period(&table_meta.options)?;
459+
// check enable_parquet_encoding
460+
is_valid_fuse_parquet_dictionary_opt(&table_meta.options)?;
458461

459462
// Same as settings of FUSE_OPT_KEY_ENABLE_AUTO_VACUUM, expect value type is unsigned integer
460463
is_valid_option_of_type::<u32>(&table_meta.options, FUSE_OPT_KEY_ENABLE_AUTO_VACUUM)?;

src/query/service/src/interpreters/interpreter_table_set_options.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ use crate::interpreters::common::table_option_validation::is_valid_block_per_seg
5656
use crate::interpreters::common::table_option_validation::is_valid_bloom_index_columns;
5757
use crate::interpreters::common::table_option_validation::is_valid_create_opt;
5858
use crate::interpreters::common::table_option_validation::is_valid_data_retention_period;
59+
use crate::interpreters::common::table_option_validation::is_valid_fuse_parquet_dictionary_opt;
5960
use crate::interpreters::common::table_option_validation::is_valid_option_of_type;
6061
use crate::interpreters::common::table_option_validation::is_valid_row_per_block;
6162
use crate::interpreters::Interpreter;
@@ -95,6 +96,8 @@ impl Interpreter for SetOptionsInterpreter {
9596
is_valid_row_per_block(&self.plan.set_options)?;
9697
// check data_retention_period
9798
is_valid_data_retention_period(&self.plan.set_options)?;
99+
// check enable_parquet_encoding
100+
is_valid_fuse_parquet_dictionary_opt(&self.plan.set_options)?;
98101

99102
// check storage_format
100103
let error_str = "invalid opt for fuse table in alter table statement";

src/query/service/src/test_kits/block_writer.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,7 @@ impl<'a> BlockWriter<'a> {
157157
vec![index_block],
158158
&mut data,
159159
TableCompression::None,
160+
false,
160161
None,
161162
)?;
162163
let size = data.len() as u64;

src/query/storages/basic/src/result_cache/write/writer.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ impl ResultCacheWriter {
7777
self.blocks.clone(),
7878
&mut buf,
7979
TableCompression::None,
80+
false,
8081
None,
8182
)?;
8283

src/query/storages/common/blocks/src/parquet_rs.rs

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ use parquet::basic::Encoding;
2323
use parquet::file::metadata::KeyValue;
2424
use parquet::file::properties::EnabledStatistics;
2525
use parquet::file::properties::WriterProperties;
26+
use parquet::file::properties::WriterPropertiesBuilder;
27+
use parquet::file::properties::WriterVersion;
2628
use parquet::format::FileMetaData;
2729

2830
/// Serialize data blocks to parquet format.
@@ -31,19 +33,13 @@ pub fn blocks_to_parquet(
3133
blocks: Vec<DataBlock>,
3234
write_buffer: &mut Vec<u8>,
3335
compression: TableCompression,
36+
enable_dictionary: bool,
3437
metadata: Option<Vec<KeyValue>>,
3538
) -> Result<FileMetaData> {
3639
assert!(!blocks.is_empty());
37-
let props = WriterProperties::builder()
38-
.set_compression(compression.into())
39-
// use `usize::MAX` to effectively limit the number of row groups to 1
40-
.set_max_row_group_size(usize::MAX)
41-
.set_encoding(Encoding::PLAIN)
42-
.set_dictionary_enabled(false)
43-
.set_statistics_enabled(EnabledStatistics::None)
44-
.set_bloom_filter_enabled(false)
45-
.set_key_value_metadata(metadata)
46-
.build();
40+
let builder = parquet_writer_properties_builder(compression, enable_dictionary, metadata);
41+
42+
let props = builder.build();
4743
let batches = blocks
4844
.into_iter()
4945
.map(|block| block.to_record_batch(table_schema))
@@ -56,3 +52,26 @@ pub fn blocks_to_parquet(
5652
let file_meta = writer.close()?;
5753
Ok(file_meta)
5854
}
55+
56+
pub fn parquet_writer_properties_builder(
57+
compression: TableCompression,
58+
enable_dictionary: bool,
59+
metadata: Option<Vec<KeyValue>>,
60+
) -> WriterPropertiesBuilder {
61+
let builder = WriterProperties::builder()
62+
.set_compression(compression.into())
63+
// use `usize::MAX` to effectively limit the number of row groups to 1
64+
.set_max_row_group_size(usize::MAX)
65+
.set_encoding(Encoding::PLAIN)
66+
.set_statistics_enabled(EnabledStatistics::None)
67+
.set_bloom_filter_enabled(false)
68+
.set_key_value_metadata(metadata);
69+
70+
if enable_dictionary {
71+
builder
72+
.set_writer_version(WriterVersion::PARQUET_2_0)
73+
.set_dictionary_enabled(true)
74+
} else {
75+
builder.set_dictionary_enabled(false)
76+
}
77+
}

src/query/storages/fuse/benches/bench.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,11 +141,13 @@ mod dummy {
141141
let max_page_size = 8192;
142142
let block_per_seg = 1000;
143143

144+
let enable_parquet_dictionary = false;
144145
let write_settings = WriteSettings {
145146
storage_format,
146147
table_compression: compression,
147148
max_page_size,
148149
block_per_seg,
150+
enable_parquet_dictionary,
149151
};
150152
let schema = Arc::new(schema);
151153
let mut buffer = Vec::new();

src/query/storages/fuse/src/constants.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,13 @@ pub const FUSE_OPT_KEY_ROW_PER_BLOCK: &str = "row_per_block";
1818
pub const FUSE_OPT_KEY_ROW_PER_PAGE: &str = "row_per_page";
1919
pub const FUSE_OPT_KEY_ROW_AVG_DEPTH_THRESHOLD: &str = "row_avg_depth_threshold";
2020
pub const FUSE_OPT_KEY_FILE_SIZE: &str = "file_size";
21-
2221
pub const FUSE_OPT_KEY_DATA_RETENTION_PERIOD_IN_HOURS: &str = "data_retention_period_in_hours";
2322
pub const FUSE_OPT_KEY_DATA_RETENTION_NUM_SNAPSHOTS_TO_KEEP: &str =
2423
"data_retention_num_snapshots_to_keep";
2524
pub const FUSE_OPT_KEY_ENABLE_AUTO_VACUUM: &str = "enable_auto_vacuum";
2625
pub const FUSE_OPT_KEY_ENABLE_AUTO_ANALYZE: &str = "enable_auto_analyze";
2726
pub const FUSE_OPT_KEY_ATTACH_COLUMN_IDS: &str = "attach_column_ids";
27+
pub const FUSE_OPT_KEY_ENABLE_PARQUET_DICTIONARY: &str = "enable_parquet_dictionary";
2828

2929
pub const FUSE_TBL_BLOCK_PREFIX: &str = "_b";
3030
pub const FUSE_TBL_BLOCK_INDEX_PREFIX: &str = "_i";

src/query/storages/fuse/src/fuse_table.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ use crate::FUSE_OPT_KEY_BLOCK_IN_MEM_SIZE_THRESHOLD;
133133
use crate::FUSE_OPT_KEY_BLOCK_PER_SEGMENT;
134134
use crate::FUSE_OPT_KEY_DATA_RETENTION_NUM_SNAPSHOTS_TO_KEEP;
135135
use crate::FUSE_OPT_KEY_DATA_RETENTION_PERIOD_IN_HOURS;
136+
use crate::FUSE_OPT_KEY_ENABLE_PARQUET_DICTIONARY;
136137
use crate::FUSE_OPT_KEY_FILE_SIZE;
137138
use crate::FUSE_OPT_KEY_ROW_PER_BLOCK;
138139
use crate::FUSE_OPT_KEY_ROW_PER_PAGE;
@@ -321,11 +322,15 @@ impl FuseTable {
321322
let block_per_seg =
322323
self.get_option(FUSE_OPT_KEY_BLOCK_PER_SEGMENT, DEFAULT_BLOCK_PER_SEGMENT);
323324

325+
let enable_parquet_dictionary_encoding =
326+
self.get_option(FUSE_OPT_KEY_ENABLE_PARQUET_DICTIONARY, false);
327+
324328
WriteSettings {
325329
storage_format: self.storage_format,
326330
table_compression: self.table_compression,
327331
max_page_size,
328332
block_per_seg,
333+
enable_parquet_dictionary: enable_parquet_dictionary_encoding,
329334
}
330335
}
331336

0 commit comments

Comments
 (0)