Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4433,7 +4433,10 @@ mod tests {
.unwrap();

// check that the read metadata is also correct
let options = ReadOptionsBuilder::new().with_page_index().build();
let options = ReadOptionsBuilder::new()
.with_page_index()
.with_encoding_stats_as_mask(false)
.build();
let reader = SerializedFileReader::new_with_options(file, options).unwrap();

let rowgroup = reader.get_row_group(0).expect("row group missing");
Expand Down
6 changes: 6 additions & 0 deletions parquet/src/arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -512,12 +512,14 @@ mod test {

// read the metadata from the file WITHOUT the page index structures
let original_metadata = ParquetMetaDataReader::new()
.with_encoding_stats_as_mask(false)
.parse_and_finish(&parquet_bytes)
.unwrap();

// this should error because the page indexes are not present, but have offsets specified
let metadata_bytes = metadata_to_bytes(&original_metadata);
let err = ParquetMetaDataReader::new()
.with_encoding_stats_as_mask(false)
.with_page_indexes(true) // there are no page indexes in the metadata
.parse_and_finish(&metadata_bytes)
.err()
Expand All @@ -534,6 +536,7 @@ mod test {

// read the metadata from the file
let original_metadata = ParquetMetaDataReader::new()
.with_encoding_stats_as_mask(false)
.parse_and_finish(&parquet_bytes)
.unwrap();

Expand All @@ -546,6 +549,7 @@ mod test {
);

let roundtrip_metadata = ParquetMetaDataReader::new()
.with_encoding_stats_as_mask(false)
.parse_and_finish(&metadata_bytes)
.unwrap();

Expand All @@ -560,13 +564,15 @@ mod test {
// read the metadata from the file including the page index structures
// (which are stored elsewhere in the footer)
let original_metadata = ParquetMetaDataReader::new()
.with_encoding_stats_as_mask(false)
.with_page_indexes(true)
.parse_and_finish(&parquet_bytes)
.unwrap();

// read metadata back from the serialized bytes and ensure it is the same
let metadata_bytes = metadata_to_bytes(&original_metadata);
let roundtrip_metadata = ParquetMetaDataReader::new()
.with_encoding_stats_as_mask(false)
.with_page_indexes(true)
.parse_and_finish(&metadata_bytes)
.unwrap();
Expand Down
77 changes: 75 additions & 2 deletions parquet/src/file/metadata/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1062,6 +1062,10 @@ impl ColumnChunkMetaData {

/// Returns the page encoding statistics, or `None` if no page encoding statistics
/// are available (or they were converted to a mask).
///
/// Note: By default, this crate converts page encoding statistics to a mask for performance
/// reasons. To get the full statistics, you must set [`ParquetMetaDataOptions::with_encoding_stats_as_mask`]
/// to `false`.
pub fn page_encoding_stats(&self) -> Option<&Vec<PageEncodingStats>> {
match self.encoding_stats.as_ref() {
Some(ParquetPageEncodingStats::Full(stats)) => Some(stats),
Expand All @@ -1072,6 +1076,8 @@ impl ColumnChunkMetaData {
/// Returns the page encoding statistics reduced to a bitmask, or `None` if statistics are
/// not available (or they were left in their original form).
///
/// Note: This is the default behavior for this crate.
///
/// The [`PageEncodingStats`] struct was added to the Parquet specification specifically to
/// enable fast determination of whether all pages in a column chunk are dictionary encoded
/// (see <https://github.com/apache/parquet-format/pull/16>).
Expand Down Expand Up @@ -1667,7 +1673,9 @@ impl OffsetIndexBuilder {
mod tests {
use super::*;
use crate::basic::{PageType, SortOrder};
use crate::file::metadata::thrift::tests::{read_column_chunk, read_row_group};
use crate::file::metadata::thrift::tests::{
read_column_chunk, read_column_chunk_with_options, read_row_group,
};

#[test]
fn test_row_group_metadata_thrift_conversion() {
Expand Down Expand Up @@ -1822,7 +1830,72 @@ mod tests {
let mut buf = Vec::new();
let mut writer = ThriftCompactOutputProtocol::new(&mut buf);
col_metadata.write_thrift(&mut writer).unwrap();
let col_chunk_res = read_column_chunk(&mut buf, column_descr).unwrap();
let col_chunk_res = read_column_chunk(&mut buf, column_descr.clone()).unwrap();

let expected_metadata = ColumnChunkMetaData::builder(column_descr)
.set_encodings_mask(EncodingMask::new_from_encodings(
[Encoding::PLAIN, Encoding::RLE].iter(),
))
.set_file_path("file_path".to_owned())
.set_num_values(1000)
.set_compression(Compression::SNAPPY)
.set_total_compressed_size(2000)
.set_total_uncompressed_size(3000)
.set_data_page_offset(4000)
.set_dictionary_page_offset(Some(5000))
.set_page_encoding_stats_mask(EncodingMask::new_from_encodings(
[Encoding::PLAIN, Encoding::RLE].iter(),
))
.set_bloom_filter_offset(Some(6000))
.set_bloom_filter_length(Some(25))
.set_offset_index_offset(Some(7000))
.set_offset_index_length(Some(25))
.set_column_index_offset(Some(8000))
.set_column_index_length(Some(25))
.set_unencoded_byte_array_data_bytes(Some(2000))
.set_repetition_level_histogram(Some(LevelHistogram::from(vec![100, 100])))
.set_definition_level_histogram(Some(LevelHistogram::from(vec![0, 200])))
.build()
.unwrap();

assert_eq!(col_chunk_res, expected_metadata);
}

#[test]
fn test_column_chunk_metadata_thrift_conversion_full_stats() {
let column_descr = get_test_schema_descr().column(0);
let stats = vec![
PageEncodingStats {
page_type: PageType::DATA_PAGE,
encoding: Encoding::PLAIN,
count: 3,
},
PageEncodingStats {
page_type: PageType::DATA_PAGE,
encoding: Encoding::RLE,
count: 5,
},
];
let col_metadata = ColumnChunkMetaData::builder(column_descr.clone())
.set_encodings_mask(EncodingMask::new_from_encodings(
[Encoding::PLAIN, Encoding::RLE].iter(),
))
.set_num_values(1000)
.set_compression(Compression::SNAPPY)
.set_total_compressed_size(2000)
.set_total_uncompressed_size(3000)
.set_data_page_offset(4000)
.set_page_encoding_stats(stats)
.build()
.unwrap();

let mut buf = Vec::new();
let mut writer = ThriftCompactOutputProtocol::new(&mut buf);
col_metadata.write_thrift(&mut writer).unwrap();

let options = ParquetMetaDataOptions::new().with_encoding_stats_as_mask(false);
let col_chunk_res =
read_column_chunk_with_options(&mut buf, column_descr, Some(&options)).unwrap();

assert_eq!(col_chunk_res, col_metadata);
}
Expand Down
20 changes: 18 additions & 2 deletions parquet/src/file/metadata/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,23 @@ impl ParquetStatisticsPolicy {
/// [`ParquetMetaData`]: crate::file::metadata::ParquetMetaData
/// [`ParquetMetaDataReader`]: crate::file::metadata::ParquetMetaDataReader
/// [`ParquetMetaDataPushDecoder`]: crate::file::metadata::ParquetMetaDataPushDecoder
#[derive(Default, Debug, Clone)]
#[derive(Debug, Clone)]
pub struct ParquetMetaDataOptions {
schema_descr: Option<SchemaDescPtr>,
encoding_stats_as_mask: bool,
encoding_stats_policy: ParquetStatisticsPolicy,
}

impl Default for ParquetMetaDataOptions {
fn default() -> Self {
Self {
schema_descr: None,
encoding_stats_as_mask: true,
encoding_stats_policy: ParquetStatisticsPolicy::KeepAll,
}
}
}

impl ParquetMetaDataOptions {
/// Return a new default [`ParquetMetaDataOptions`].
pub fn new() -> Self {
Expand All @@ -118,7 +128,7 @@ impl ParquetMetaDataOptions {
}

/// Returns whether to present the [`encoding_stats`] field of the Parquet `ColumnMetaData`
/// as a bitmask (defaults to `false`).
/// as a bitmask (defaults to `true`).
///
/// See [`ColumnChunkMetaData::page_encoding_stats_mask`] for an explanation of why this
/// might be desirable.
Expand Down Expand Up @@ -193,6 +203,12 @@ mod tests {
};
use std::{io::Read, sync::Arc};

#[test]
fn test_options_default() {
let options = ParquetMetaDataOptions::default();
assert!(options.encoding_stats_as_mask());
}

#[test]
fn test_provide_schema() {
let mut buf: Vec<u8> = Vec::new();
Expand Down
14 changes: 14 additions & 0 deletions parquet/src/file/metadata/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,20 @@ impl ParquetMetaDataReader {
.with_offset_index_policy(policy)
}

/// Sets whether to read [`PageEncodingStats`] as a bitmask.
///
/// See [`ParquetMetaDataOptions::with_encoding_stats_as_mask`] for more details.
pub fn with_encoding_stats_as_mask(mut self, val: bool) -> Self {
let mut options = self
.metadata_options
.as_deref()
.cloned()
.unwrap_or_default();
options = options.with_encoding_stats_as_mask(val);
self.metadata_options = Some(Arc::new(options));
self
}

/// Sets the [`PageIndexPolicy`] for the column index
pub fn with_column_index_policy(mut self, policy: PageIndexPolicy) -> Self {
self.column_index = policy;
Expand Down
14 changes: 11 additions & 3 deletions parquet/src/file/metadata/thrift/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ fn read_column_metadata<'a>(
let mut seen_mask = 0u16;

let mut skip_pes = false;
let mut pes_mask = false;
let mut pes_mask = true;

if let Some(opts) = options {
skip_pes = opts.skip_encoding_stats(col_index);
Expand Down Expand Up @@ -1704,7 +1704,7 @@ write_thrift_field!(RustBoundingBox, FieldType::Struct);
pub(crate) mod tests {
use crate::errors::Result;
use crate::file::metadata::thrift::{BoundingBox, SchemaElement, write_schema};
use crate::file::metadata::{ColumnChunkMetaData, RowGroupMetaData};
use crate::file::metadata::{ColumnChunkMetaData, ParquetMetaDataOptions, RowGroupMetaData};
use crate::parquet_thrift::tests::test_roundtrip;
use crate::parquet_thrift::{
ElementType, ThriftCompactOutputProtocol, ThriftSliceInputProtocol, read_thrift_vec,
Expand All @@ -1726,9 +1726,17 @@ pub(crate) mod tests {
pub(crate) fn read_column_chunk(
buf: &mut [u8],
column_descr: Arc<ColumnDescriptor>,
) -> Result<ColumnChunkMetaData> {
read_column_chunk_with_options(buf, column_descr, None)
}

pub(crate) fn read_column_chunk_with_options(
buf: &mut [u8],
column_descr: Arc<ColumnDescriptor>,
options: Option<&ParquetMetaDataOptions>,
) -> Result<ColumnChunkMetaData> {
let mut reader = ThriftSliceInputProtocol::new(buf);
crate::file::metadata::thrift::read_column_chunk(&mut reader, &column_descr, 0, None)
crate::file::metadata::thrift::read_column_chunk(&mut reader, &column_descr, 0, options)
}

pub(crate) fn roundtrip_schema(schema: TypePtr) -> Result<TypePtr> {
Expand Down
5 changes: 4 additions & 1 deletion parquet/src/file/serialized_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1855,7 +1855,10 @@ mod tests {
fn test_file_reader_optional_metadata() {
// file with optional metadata: bloom filters, encoding stats, column index and offset index.
let file = get_test_file("data_index_bloom_encoding_stats.parquet");
let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
let options = ReadOptionsBuilder::new()
.with_encoding_stats_as_mask(false)
.build();
let file_reader = Arc::new(SerializedFileReader::new_with_options(file, options).unwrap());

let row_group_metadata = file_reader.metadata.row_group(0);
let col0_metadata = row_group_metadata.column(0);
Expand Down
Loading