Skip to content

Commit

Permalink
feat(c/driver/postgresql): customize numeric conversion
Browse files Browse the repository at this point in the history
- introduces statement-level option `adbc.postgresql.numeric_conversion`
- the option is used to tell result reader what strategy to use when converting numeric values to Arrow data; since this cannot be done 1-1, the reader has to convert to other data type
  - clients can use this option to specify the strategy
- value can be either `to_string` or `to_double`
  - when not specified defaults to `to_string`
  - `to_string` -> numerics converted loss-less to string representation
  - when `to_double` -> numeric converted to double (with possible loss of precision)
  • Loading branch information
lupko committed Feb 6, 2024
1 parent 4091e6f commit a7dbe82
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 12 deletions.
111 changes: 109 additions & 2 deletions c/driver/postgresql/copy/postgres_copy_reader_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

#include <gtest/gtest.h>
#include <cmath>
#include <nanoarrow/nanoarrow.hpp>

#include "postgres_copy_test_common.h"
Expand All @@ -25,8 +26,11 @@ namespace adbcpq {

class PostgresCopyStreamTester {
public:
ArrowErrorCode Init(const PostgresType& root_type, ArrowError* error = nullptr) {
NANOARROW_RETURN_NOT_OK(reader_.Init(root_type));
ArrowErrorCode Init(
const PostgresType& root_type,
NumericConversionStrategy numeric_conversion = NumericConversionStrategy::kToString,
ArrowError* error = nullptr) {
NANOARROW_RETURN_NOT_OK(reader_.Init(root_type, numeric_conversion));
NANOARROW_RETURN_NOT_OK(reader_.InferOutputSchema(error));
NANOARROW_RETURN_NOT_OK(reader_.InitFieldReaders(error));
return NANOARROW_OK;
Expand Down Expand Up @@ -373,6 +377,59 @@ TEST(PostgresCopyUtilsTest, PostgresCopyReadNumeric) {
EXPECT_EQ(std::string(item.data, item.size_bytes), "inf");
}

TEST(PostgresCopyUtilsTest, PostgresCopyReadNumericToDouble) {
ArrowBufferView data;
data.data.as_uint8 = kTestPgCopyNumeric;
data.size_bytes = sizeof(kTestPgCopyNumeric);

auto col_type = PostgresType(PostgresTypeId::kNumeric);
PostgresType input_type(PostgresTypeId::kRecord);
input_type.AppendChild("col", col_type);

PostgresCopyStreamTester tester;
ASSERT_EQ(tester.Init(input_type, NumericConversionStrategy::kToDouble), NANOARROW_OK);
ASSERT_EQ(tester.ReadAll(&data), ENODATA);
ASSERT_EQ(data.data.as_uint8 - kTestPgCopyNumeric, sizeof(kTestPgCopyNumeric));
ASSERT_EQ(data.size_bytes, 0);

nanoarrow::UniqueArray array;
ASSERT_EQ(tester.GetArray(array.get()), NANOARROW_OK);
ASSERT_EQ(array->length, 9);
ASSERT_EQ(array->n_children, 1);

nanoarrow::UniqueSchema schema;
tester.GetSchema(schema.get());

nanoarrow::UniqueArrayView array_view;
ASSERT_EQ(ArrowArrayViewInitFromSchema(array_view.get(), schema.get(), nullptr),
NANOARROW_OK);
ASSERT_EQ(array_view->children[0]->storage_type, NANOARROW_TYPE_DOUBLE);
ASSERT_EQ(ArrowArrayViewSetArray(array_view.get(), array.get(), nullptr), NANOARROW_OK);

auto validity = reinterpret_cast<const uint8_t*>(array->children[0]->buffers[0]);
auto data_buffer = reinterpret_cast<const double*>(array->children[0]->buffers[1]);
ASSERT_NE(validity, nullptr);
ASSERT_NE(data_buffer, nullptr);
ASSERT_TRUE(ArrowBitGet(validity, 0));
ASSERT_TRUE(ArrowBitGet(validity, 1));
ASSERT_TRUE(ArrowBitGet(validity, 2));
ASSERT_TRUE(ArrowBitGet(validity, 3));
ASSERT_TRUE(ArrowBitGet(validity, 4));
ASSERT_TRUE(ArrowBitGet(validity, 5));
ASSERT_TRUE(ArrowBitGet(validity, 6));
ASSERT_TRUE(ArrowBitGet(validity, 7));
ASSERT_FALSE(ArrowBitGet(validity, 8));

ASSERT_DOUBLE_EQ(data_buffer[0], 1000000);
ASSERT_DOUBLE_EQ(data_buffer[1], 0.00001234);
ASSERT_DOUBLE_EQ(data_buffer[2], 1.0);
ASSERT_DOUBLE_EQ(data_buffer[3], -123.456);
ASSERT_DOUBLE_EQ(data_buffer[4], 123.456);
ASSERT_TRUE(std::isnan(data_buffer[5]));
ASSERT_TRUE(data_buffer[6] == -std::numeric_limits<double>::infinity());
ASSERT_TRUE(data_buffer[7] == std::numeric_limits<double>::infinity());
}

TEST(PostgresCopyUtilsTest, PostgresCopyReadNumeric16_10) {
ArrowBufferView data;
data.data.as_uint8 = kTestPgCopyNumeric16_10;
Expand Down Expand Up @@ -427,6 +484,56 @@ TEST(PostgresCopyUtilsTest, PostgresCopyReadNumeric16_10) {
EXPECT_EQ(std::string(item.data, item.size_bytes), "nan");
}

TEST(PostgresCopyUtilsTest, PostgresCopyReadNumeric16_10ToDouble) {
ArrowBufferView data;
data.data.as_uint8 = kTestPgCopyNumeric16_10;
data.size_bytes = sizeof(kTestPgCopyNumeric16_10);

auto col_type = PostgresType(PostgresTypeId::kNumeric);
PostgresType input_type(PostgresTypeId::kRecord);
input_type.AppendChild("col", col_type);

PostgresCopyStreamTester tester;
ASSERT_EQ(tester.Init(input_type, NumericConversionStrategy::kToDouble), NANOARROW_OK);
ASSERT_EQ(tester.ReadAll(&data), ENODATA);
ASSERT_EQ(data.data.as_uint8 - kTestPgCopyNumeric16_10,
sizeof(kTestPgCopyNumeric16_10));
ASSERT_EQ(data.size_bytes, 0);

nanoarrow::UniqueArray array;
ASSERT_EQ(tester.GetArray(array.get()), NANOARROW_OK);
ASSERT_EQ(array->length, 7);
ASSERT_EQ(array->n_children, 1);

nanoarrow::UniqueSchema schema;
tester.GetSchema(schema.get());

nanoarrow::UniqueArrayView array_view;
ASSERT_EQ(ArrowArrayViewInitFromSchema(array_view.get(), schema.get(), nullptr),
NANOARROW_OK);
ASSERT_EQ(array_view->children[0]->storage_type, NANOARROW_TYPE_DOUBLE);
ASSERT_EQ(ArrowArrayViewSetArray(array_view.get(), array.get(), nullptr), NANOARROW_OK);

auto validity = reinterpret_cast<const uint8_t*>(array->children[0]->buffers[0]);
auto data_buffer = reinterpret_cast<const double*>(array->children[0]->buffers[1]);
ASSERT_NE(validity, nullptr);
ASSERT_NE(data_buffer, nullptr);
ASSERT_TRUE(ArrowBitGet(validity, 0));
ASSERT_TRUE(ArrowBitGet(validity, 1));
ASSERT_TRUE(ArrowBitGet(validity, 2));
ASSERT_TRUE(ArrowBitGet(validity, 3));
ASSERT_TRUE(ArrowBitGet(validity, 4));
ASSERT_TRUE(ArrowBitGet(validity, 5));
ASSERT_FALSE(ArrowBitGet(validity, 6));

ASSERT_DOUBLE_EQ(data_buffer[0], 0.0);
ASSERT_DOUBLE_EQ(data_buffer[1], 1.01234);
ASSERT_DOUBLE_EQ(data_buffer[2], 1.0123456789);
ASSERT_DOUBLE_EQ(data_buffer[3], -1.0123400000);
ASSERT_DOUBLE_EQ(data_buffer[4], -1.0123456789);
ASSERT_TRUE(std::isnan(data_buffer[5]));
}

TEST(PostgresCopyUtilsTest, PostgresCopyReadTimestamp) {
ArrowBufferView data;
data.data.as_uint8 = kTestPgCopyTimestamp;
Expand Down
14 changes: 9 additions & 5 deletions c/driver/postgresql/copy/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#pragma once

#include <algorithm>
#include <charconv>
#include <cinttypes>
#include <limits>
#include <memory>
Expand Down Expand Up @@ -518,9 +519,7 @@ class PostgresCopyNumericToDoubleFieldReader : public PostgresCopyNumericFieldRe
int64_t max_chars_required = max_chars_required_();
char* target = new char[max_chars_required];
int64_t actual_characters_required = DigitsToString(&target);
target[actual_characters_required + 1] = '\0';

value = strtod(target, NULL);
std::from_chars(target, target + actual_characters_required, value);
delete[] target;

NANOARROW_RETURN_NOT_OK(ArrowBufferAppend(data_, &value, sizeof(double)));
Expand Down Expand Up @@ -1004,14 +1003,17 @@ static inline ArrowErrorCode MakeCopyFieldReader(

class PostgresCopyStreamReader {
public:
ArrowErrorCode Init(PostgresType pg_type) {
ArrowErrorCode Init(PostgresType pg_type, NumericConversionStrategy numeric_conversion =
NumericConversionStrategy::kToString) {
if (pg_type.type_id() != PostgresTypeId::kRecord) {
return EINVAL;
}

pg_type_ = std::move(pg_type);
root_reader_.Init(pg_type_);
array_size_approx_bytes_ = 0;
numeric_conversion_ = numeric_conversion;

return NANOARROW_OK;
}

Expand Down Expand Up @@ -1043,7 +1045,8 @@ class PostgresCopyStreamReader {
ArrowErrorCode InferOutputSchema(ArrowError* error) {
schema_.reset();
ArrowSchemaInit(schema_.get());
NANOARROW_RETURN_NOT_OK(root_reader_.InputType().SetSchema(schema_.get()));
NANOARROW_RETURN_NOT_OK(
root_reader_.InputType().SetSchema(schema_.get(), numeric_conversion_));
return NANOARROW_OK;
}

Expand Down Expand Up @@ -1142,6 +1145,7 @@ class PostgresCopyStreamReader {
nanoarrow::UniqueSchema schema_;
nanoarrow::UniqueArray array_;
int64_t array_size_approx_bytes_;
NumericConversionStrategy numeric_conversion_ = NumericConversionStrategy::kToString;
};

} // namespace adbcpq
20 changes: 16 additions & 4 deletions c/driver/postgresql/postgres_type.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ static inline std::vector<PostgresTypeId> PostgresTypeIdAll(bool nested = true);

class PostgresTypeResolver;

/// \brief Strategy to use when converting received NUMERIC values.
enum class NumericConversionStrategy { kToString, kToDouble };

// An abstraction of a (potentially nested and/or parameterized) Postgres
// data type. This class is where default type conversion to/from Arrow
// is defined. It is intentionally copyable.
Expand Down Expand Up @@ -191,7 +194,9 @@ class PostgresType {
// do not have a corresponding Arrow type are returned as Binary with field
// metadata ADBC:posgresql:typname. These types can be represented as their
// binary COPY representation in the output.
ArrowErrorCode SetSchema(ArrowSchema* schema) const {
ArrowErrorCode SetSchema(ArrowSchema* schema,
NumericConversionStrategy numeric_conversion =
NumericConversionStrategy::kToString) const {
switch (type_id_) {
// ---- Primitive types --------------------
case PostgresTypeId::kBool:
Expand All @@ -217,7 +222,12 @@ class PostgresType {

// ---- Numeric/Decimal-------------------
case PostgresTypeId::kNumeric:
NANOARROW_RETURN_NOT_OK(ArrowSchemaSetType(schema, NANOARROW_TYPE_STRING));
if (numeric_conversion == NumericConversionStrategy::kToDouble) {
NANOARROW_RETURN_NOT_OK(ArrowSchemaSetType(schema, NANOARROW_TYPE_DOUBLE));
} else {
NANOARROW_RETURN_NOT_OK(ArrowSchemaSetType(schema, NANOARROW_TYPE_STRING));
}

NANOARROW_RETURN_NOT_OK(AddPostgresTypeMetadata(schema));

break;
Expand Down Expand Up @@ -271,13 +281,15 @@ class PostgresType {
case PostgresTypeId::kRecord:
NANOARROW_RETURN_NOT_OK(ArrowSchemaSetTypeStruct(schema, n_children()));
for (int64_t i = 0; i < n_children(); i++) {
NANOARROW_RETURN_NOT_OK(children_[i].SetSchema(schema->children[i]));
NANOARROW_RETURN_NOT_OK(
children_[i].SetSchema(schema->children[i], numeric_conversion));
}
break;

case PostgresTypeId::kArray:
NANOARROW_RETURN_NOT_OK(ArrowSchemaSetType(schema, NANOARROW_TYPE_LIST));
NANOARROW_RETURN_NOT_OK(children_[0].SetSchema(schema->children[0]));
NANOARROW_RETURN_NOT_OK(
children_[0].SetSchema(schema->children[0], numeric_conversion));
break;

case PostgresTypeId::kUserDefined:
Expand Down
15 changes: 14 additions & 1 deletion c/driver/postgresql/statement.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1361,6 +1361,10 @@ AdbcStatusCode PostgresStatement::GetOption(const char* key, char* value, size_t
}
} else if (std::strcmp(key, ADBC_POSTGRESQL_OPTION_BATCH_SIZE_HINT_BYTES) == 0) {
result = std::to_string(reader_.batch_size_hint_bytes_);
} else if (std::strcmp(key, ADBC_POSTGRESQL_NUMERIC_CONVERSION) == 0) {
result = numeric_conversion_ == NumericConversionStrategy::kToDouble
? ADBC_POSTGRESQL_NC_OPTION_TO_DOUBLE
: ADBC_POSTGRESQL_NC_OPTION_TO_STRING;
} else {
SetError(error, "[libpq] Unknown statement option '%s'", key);
return ADBC_STATUS_NOT_FOUND;
Expand Down Expand Up @@ -1480,6 +1484,15 @@ AdbcStatusCode PostgresStatement::SetOption(const char* key, const char* value,
}

this->reader_.batch_size_hint_bytes_ = int_value;
} else if (std::strcmp(key, ADBC_POSTGRESQL_NUMERIC_CONVERSION) == 0) {
if (std::strcmp(value, ADBC_POSTGRESQL_NC_OPTION_TO_STRING) == 0) {
numeric_conversion_ = NumericConversionStrategy::kToString;
} else if (std::strcmp(value, ADBC_POSTGRESQL_NC_OPTION_TO_DOUBLE) == 0) {
numeric_conversion_ = NumericConversionStrategy::kToDouble;
} else {
SetError(error, "[libpq] Invalid value '%s' for option '%s'", value, key);
return ADBC_STATUS_INVALID_ARGUMENT;
}
} else {
SetError(error, "[libpq] Unknown statement option '%s'", key);
return ADBC_STATUS_NOT_IMPLEMENTED;
Expand Down Expand Up @@ -1548,7 +1561,7 @@ AdbcStatusCode PostgresStatement::SetupReader(struct AdbcError* error) {
// Initialize the copy reader and infer the output schema (i.e., error for
// unsupported types before issuing the COPY query)
reader_.copy_reader_.reset(new PostgresCopyStreamReader());
reader_.copy_reader_->Init(root_type);
reader_.copy_reader_->Init(root_type, numeric_conversion_);
struct ArrowError na_error;
int na_res = reader_.copy_reader_->InferOutputSchema(&na_error);
if (na_res != NANOARROW_OK) {
Expand Down
7 changes: 7 additions & 0 deletions c/driver/postgresql/statement.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@
#define ADBC_POSTGRESQL_OPTION_BATCH_SIZE_HINT_BYTES \
"adbc.postgresql.batch_size_hint_bytes"

#define ADBC_POSTGRESQL_NUMERIC_CONVERSION "adbc.postgresql.numeric_conversion"

#define ADBC_POSTGRESQL_NC_OPTION_TO_STRING "to_string"

#define ADBC_POSTGRESQL_NC_OPTION_TO_DOUBLE "to_double"

namespace adbcpq {
class PostgresConnection;
class PostgresStatement;
Expand Down Expand Up @@ -162,5 +168,6 @@ class PostgresStatement {
} ingest_;

TupleReader reader_;
NumericConversionStrategy numeric_conversion_;
};
} // namespace adbcpq

0 comments on commit a7dbe82

Please sign in to comment.