Skip to content

Commit 6179dff

Browse files
committed
Support smallest_decimal_enabled flag in pyarrow
1 parent d81cf13 commit 6179dff

File tree

8 files changed

+69
-18
lines changed

8 files changed

+69
-18
lines changed

cpp/src/arrow/dataset/file_parquet.cc

+3-1
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,8 @@ parquet::ArrowReaderProperties MakeArrowReaderProperties(
132132
parquet_scan_options.arrow_reader_properties->cache_options());
133133
arrow_properties.set_io_context(
134134
parquet_scan_options.arrow_reader_properties->io_context());
135+
arrow_properties.set_smallest_decimal_enabled(
136+
parquet_scan_options.arrow_reader_properties->smallest_decimal_enabled());
135137
arrow_properties.set_use_threads(options.use_threads);
136138
return arrow_properties;
137139
}
@@ -532,7 +534,7 @@ Future<std::shared_ptr<parquet::arrow::FileReader>> ParquetFileFormat::GetReader
532534
metadata)
533535
.Then(
534536
[=](const std::unique_ptr<parquet::ParquetFileReader>& reader) mutable
535-
-> Result<std::shared_ptr<parquet::arrow::FileReader>> {
537+
-> Result<std::shared_ptr<parquet::arrow::FileReader>> {
536538
auto arrow_properties = MakeArrowReaderProperties(
537539
*self, *reader->metadata(), *options, *parquet_scan_options);
538540

cpp/src/parquet/arrow/writer.cc

+8-8
Original file line numberDiff line numberDiff line change
@@ -300,9 +300,9 @@ class FileWriterImpl : public FileWriter {
300300
}
301301
}
302302

303-
Status Init(const ArrowReaderProperties& schema_arrow_reader_properities) {
303+
Status Init(const ArrowReaderProperties& schema_arrow_reader_properties) {
304304
return SchemaManifest::Make(writer_->schema(), /*schema_metadata=*/nullptr,
305-
schema_arrow_reader_properities, &schema_manifest_);
305+
schema_arrow_reader_properties, &schema_manifest_);
306306
}
307307

308308
Status NewRowGroup() override {
@@ -516,10 +516,10 @@ Status FileWriter::Make(::arrow::MemoryPool* pool,
516516
std::shared_ptr<::arrow::Schema> schema,
517517
std::shared_ptr<ArrowWriterProperties> arrow_properties,
518518
std::unique_ptr<FileWriter>* out,
519-
const ArrowReaderProperties& schema_arrow_reader_properities) {
519+
const ArrowReaderProperties& schema_arrow_reader_properties) {
520520
std::unique_ptr<FileWriterImpl> impl(new FileWriterImpl(
521521
std::move(schema), pool, std::move(writer), std::move(arrow_properties)));
522-
RETURN_NOT_OK(impl->Init(schema_arrow_reader_properities));
522+
RETURN_NOT_OK(impl->Init(schema_arrow_reader_properties));
523523
*out = std::move(impl);
524524
return Status::OK();
525525
}
@@ -556,7 +556,7 @@ Result<std::unique_ptr<FileWriter>> FileWriter::Open(
556556
std::shared_ptr<::arrow::io::OutputStream> sink,
557557
std::shared_ptr<WriterProperties> properties,
558558
std::shared_ptr<ArrowWriterProperties> arrow_properties,
559-
const ArrowReaderProperties& schema_arrow_reader_properities) {
559+
const ArrowReaderProperties& schema_arrow_reader_properties) {
560560
std::shared_ptr<SchemaDescriptor> parquet_schema;
561561
RETURN_NOT_OK(
562562
ToParquetSchema(&schema, *properties, *arrow_properties, &parquet_schema));
@@ -575,7 +575,7 @@ Result<std::unique_ptr<FileWriter>> FileWriter::Open(
575575
auto schema_ptr = std::make_shared<::arrow::Schema>(schema);
576576
RETURN_NOT_OK(Make(pool, std::move(base_writer), std::move(schema_ptr),
577577
std::move(arrow_properties), &writer,
578-
schema_arrow_reader_properities));
578+
schema_arrow_reader_properties));
579579

580580
return writer;
581581
}
@@ -596,12 +596,12 @@ Status WriteTable(const ::arrow::Table& table, ::arrow::MemoryPool* pool,
596596
std::shared_ptr<::arrow::io::OutputStream> sink, int64_t chunk_size,
597597
std::shared_ptr<WriterProperties> properties,
598598
std::shared_ptr<ArrowWriterProperties> arrow_properties,
599-
const ArrowReaderProperties& schema_arrow_reader_properities) {
599+
const ArrowReaderProperties& schema_arrow_reader_properties) {
600600
std::unique_ptr<FileWriter> writer;
601601
ARROW_ASSIGN_OR_RAISE(
602602
writer,
603603
FileWriter::Open(*table.schema(), pool, std::move(sink), std::move(properties),
604-
std::move(arrow_properties), schema_arrow_reader_properities));
604+
std::move(arrow_properties), schema_arrow_reader_properties));
605605
RETURN_NOT_OK(writer->WriteTable(table, chunk_size));
606606
return writer->Close();
607607
}

cpp/src/parquet/arrow/writer.h

+3-3
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ class PARQUET_EXPORT FileWriter {
5858
std::shared_ptr<::arrow::Schema> schema,
5959
std::shared_ptr<ArrowWriterProperties> arrow_properties,
6060
std::unique_ptr<FileWriter>* out,
61-
const ArrowReaderProperties& schema_arrow_reader_properities =
61+
const ArrowReaderProperties& schema_arrow_reader_properties =
6262
default_arrow_reader_properties());
6363

6464
/// \brief Try to create an Arrow to Parquet file writer.
@@ -76,7 +76,7 @@ class PARQUET_EXPORT FileWriter {
7676
std::shared_ptr<WriterProperties> properties = default_writer_properties(),
7777
std::shared_ptr<ArrowWriterProperties> arrow_properties =
7878
default_arrow_writer_properties(),
79-
const ArrowReaderProperties& schema_arrow_reader_properities =
79+
const ArrowReaderProperties& schema_arrow_reader_properties =
8080
default_arrow_reader_properties());
8181

8282
/// Return the Arrow schema to be written to.
@@ -183,7 +183,7 @@ WriteTable(const ::arrow::Table& table, MemoryPool* pool,
183183
std::shared_ptr<WriterProperties> properties = default_writer_properties(),
184184
std::shared_ptr<ArrowWriterProperties> arrow_properties =
185185
default_arrow_writer_properties(),
186-
const ArrowReaderProperties& schema_arrow_reader_properities =
186+
const ArrowReaderProperties& schema_arrow_reader_properties =
187187
default_arrow_reader_properties());
188188

189189
} // namespace arrow

python/pyarrow/_dataset_parquet.pyx

+17-2
Original file line numberDiff line numberDiff line change
@@ -703,7 +703,7 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
703703
cache_options : pyarrow.CacheOptions, default None
704704
Cache options used when pre_buffer is enabled. The default values should
705705
be good for most use cases. You may want to adjust these for example if
706-
you have exceptionally high latency to the file system.
706+
you have exceptionally high latency to the file system.
707707
thrift_string_size_limit : int, default None
708708
If not None, override the maximum total string size allocated
709709
when decoding Thrift structures. The default limit should be
@@ -720,6 +720,11 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
720720
Parquet file.
721721
page_checksum_verification : bool, default False
722722
If True, verify the page checksum for each page read from the file.
723+
page_checksum_verification : bool, default False
724+
If True, verify the page checksum for each page read from the file.
725+
smallest_decimal_enabled : bool, default False
726+
If True, always convert to the smallest arrow decimal type based
727+
on precision.
723728
"""
724729

725730
# Avoid mistakingly creating attributes
@@ -733,7 +738,8 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
733738
thrift_container_size_limit=None,
734739
decryption_config=None,
735740
decryption_properties=None,
736-
bint page_checksum_verification=False):
741+
bint page_checksum_verification=False,
742+
bint smallest_decimal_enabled=False):
737743
self.init(shared_ptr[CFragmentScanOptions](
738744
new CParquetFragmentScanOptions()))
739745
self.use_buffered_stream = use_buffered_stream
@@ -752,6 +758,7 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
752758
if decryption_properties is not None:
753759
self.decryption_properties = decryption_properties
754760
self.page_checksum_verification = page_checksum_verification
761+
self.smallest_decimal_enabled = smallest_decimal_enabled
755762

756763
cdef void init(self, const shared_ptr[CFragmentScanOptions]& sp):
757764
FragmentScanOptions.init(self, sp)
@@ -868,6 +875,14 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
868875
def page_checksum_verification(self, bint page_checksum_verification):
869876
self.reader_properties().set_page_checksum_verification(page_checksum_verification)
870877

878+
@property
879+
def smallest_decimal_enabled(self):
880+
return self.arrow_reader_properties().smallest_decimal_enabled()
881+
882+
@smallest_decimal_enabled.setter
883+
def smallest_decimal_enabled(self, bint smallest_decimal_enabled):
884+
self.arrow_reader_properties().set_smallest_decimal_enabled(smallest_decimal_enabled)
885+
871886
def equals(self, ParquetFragmentScanOptions other):
872887
"""
873888
Parameters

python/pyarrow/_parquet.pxd

+2
Original file line numberDiff line numberDiff line change
@@ -405,6 +405,8 @@ cdef extern from "parquet/api/reader.h" namespace "parquet" nogil:
405405
CCacheOptions cache_options() const
406406
void set_coerce_int96_timestamp_unit(TimeUnit unit)
407407
TimeUnit coerce_int96_timestamp_unit() const
408+
void set_smallest_decimal_enabled(c_bool smallest_decimal_enabled)
409+
c_bool smallest_decimal_enabled() const
408410

409411
ArrowReaderProperties default_arrow_reader_properties()
410412

python/pyarrow/_parquet.pyx

+4-1
Original file line numberDiff line numberDiff line change
@@ -1441,7 +1441,8 @@ cdef class ParquetReader(_Weakrefable):
14411441
FileDecryptionProperties decryption_properties=None,
14421442
thrift_string_size_limit=None,
14431443
thrift_container_size_limit=None,
1444-
page_checksum_verification=False):
1444+
page_checksum_verification=False,
1445+
smallest_decimal_enabled=False):
14451446
"""
14461447
Open a parquet file for reading.
14471448
@@ -1458,6 +1459,7 @@ cdef class ParquetReader(_Weakrefable):
14581459
thrift_string_size_limit : int, optional
14591460
thrift_container_size_limit : int, optional
14601461
page_checksum_verification : bool, default False
1462+
smallest_decimal_enabled : bool, default False
14611463
"""
14621464
cdef:
14631465
shared_ptr[CFileMetaData] c_metadata
@@ -1497,6 +1499,7 @@ cdef class ParquetReader(_Weakrefable):
14971499
decryption_properties.unwrap())
14981500

14991501
arrow_props.set_pre_buffer(pre_buffer)
1502+
arrow_props.set_smallest_decimal_enabled(smallest_decimal_enabled)
15001503

15011504
properties.set_page_checksum_verification(page_checksum_verification)
15021505

python/pyarrow/parquet/core.py

+18-3
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,9 @@ class ParquetFile:
255255
it will be parsed as an URI to determine the filesystem.
256256
page_checksum_verification : bool, default False
257257
If True, verify the checksum for each page read from the file.
258+
smallest_decimal_enabled : bool, default False
259+
If True, always convert to the smallest arrow decimal type based
260+
on precision.
258261
259262
Examples
260263
--------
@@ -303,7 +306,7 @@ def __init__(self, source, *, metadata=None, common_metadata=None,
303306
pre_buffer=False, coerce_int96_timestamp_unit=None,
304307
decryption_properties=None, thrift_string_size_limit=None,
305308
thrift_container_size_limit=None, filesystem=None,
306-
page_checksum_verification=False):
309+
page_checksum_verification=False, smallest_decimal_enabled=False):
307310

308311
self._close_source = getattr(source, 'closed', True)
309312

@@ -323,6 +326,7 @@ def __init__(self, source, *, metadata=None, common_metadata=None,
323326
thrift_string_size_limit=thrift_string_size_limit,
324327
thrift_container_size_limit=thrift_container_size_limit,
325328
page_checksum_verification=page_checksum_verification,
329+
smallest_decimal_enabled=smallest_decimal_enabled,
326330
)
327331
self.common_metadata = common_metadata
328332
self._nested_paths_by_prefix = self._build_nested_paths()
@@ -1267,6 +1271,9 @@ class ParquetDataset:
12671271
If True, verify the page checksum for each page read from the file.
12681272
use_legacy_dataset : bool, optional
12691273
Deprecated and has no effect from PyArrow version 15.0.0.
1274+
smallest_decimal_enabled : bool, default False
1275+
If True, always convert to the smallest arrow decimal type based
1276+
on precision.
12701277
12711278
Examples
12721279
--------
@@ -1280,7 +1287,8 @@ def __init__(self, path_or_paths, filesystem=None, schema=None, *, filters=None,
12801287
decryption_properties=None, thrift_string_size_limit=None,
12811288
thrift_container_size_limit=None,
12821289
page_checksum_verification=False,
1283-
use_legacy_dataset=None):
1290+
use_legacy_dataset=None,
1291+
smallest_decimal_enabled=False):
12841292

12851293
if use_legacy_dataset is not None:
12861294
warnings.warn(
@@ -1297,6 +1305,7 @@ def __init__(self, path_or_paths, filesystem=None, schema=None, *, filters=None,
12971305
"thrift_string_size_limit": thrift_string_size_limit,
12981306
"thrift_container_size_limit": thrift_container_size_limit,
12991307
"page_checksum_verification": page_checksum_verification,
1308+
"smallest_decimal_enabled": smallest_decimal_enabled,
13001309
}
13011310
if buffer_size:
13021311
read_options.update(use_buffered_stream=True,
@@ -1686,6 +1695,9 @@ def partitioning(self):
16861695
sufficient for most Parquet files.
16871696
page_checksum_verification : bool, default False
16881697
If True, verify the checksum for each page read from the file.
1698+
smallest_decimal_enabled : bool, default False
1699+
If True, always convert to the smallest arrow decimal type based
1700+
on precision.
16891701
16901702
Returns
16911703
-------
@@ -1781,7 +1793,8 @@ def read_table(source, *, columns=None, use_threads=True,
17811793
coerce_int96_timestamp_unit=None,
17821794
decryption_properties=None, thrift_string_size_limit=None,
17831795
thrift_container_size_limit=None,
1784-
page_checksum_verification=False):
1796+
page_checksum_verification=False,
1797+
smallest_decimal_enabled=False):
17851798

17861799
if use_legacy_dataset is not None:
17871800
warnings.warn(
@@ -1806,6 +1819,7 @@ def read_table(source, *, columns=None, use_threads=True,
18061819
thrift_string_size_limit=thrift_string_size_limit,
18071820
thrift_container_size_limit=thrift_container_size_limit,
18081821
page_checksum_verification=page_checksum_verification,
1822+
smallest_decimal_enabled=smallest_decimal_enabled,
18091823
)
18101824
except ImportError:
18111825
# fall back on ParquetFile for simple cases when pyarrow.dataset
@@ -1838,6 +1852,7 @@ def read_table(source, *, columns=None, use_threads=True,
18381852
thrift_string_size_limit=thrift_string_size_limit,
18391853
thrift_container_size_limit=thrift_container_size_limit,
18401854
page_checksum_verification=page_checksum_verification,
1855+
smallest_decimal_enabled=smallest_decimal_enabled,
18411856
)
18421857

18431858
return dataset.read(columns=columns, use_threads=use_threads,

python/pyarrow/tests/parquet/test_basic.py

+14
Original file line numberDiff line numberDiff line change
@@ -362,6 +362,20 @@ def test_byte_stream_split():
362362
use_dictionary=False)
363363

364364

365+
def test_smallest_decimal_enabled(tempdir):
366+
arr1 = pa.array(list(map(Decimal, range(100))), type=pa.decimal32(5, 2))
367+
arr2 = pa.array(list(map(Decimal, range(100))), type=pa.decimal64(16, 9))
368+
arr3 = pa.array(list(map(Decimal, range(100))), type=pa.decimal128(22, 2))
369+
arr4 = pa.array(list(map(Decimal, range(100))), type=pa.decimal256(48, 2))
370+
data_decimal = [arr1, arr2, arr3, arr4]
371+
table = pa.Table.from_arrays(data_decimal, names=['a', 'b', 'c', 'd'])
372+
373+
# Check with smallest_decimal_enabled
374+
_check_roundtrip(table,
375+
expected=table,
376+
read_table_kwargs={"smallest_decimal_enabled": True})
377+
378+
365379
def test_store_decimal_as_integer(tempdir):
366380
arr_decimal_1_9 = pa.array(list(map(Decimal, range(100))),
367381
type=pa.decimal128(5, 2))

0 commit comments

Comments
 (0)