Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ set(EXTENSION_SOURCES src/lance_extension.cpp src/lance_scan.cpp
src/lance_write.cpp
src/lance_truncate.cpp
src/lance_index.cpp
src/lance_arrow_compat.cpp
src/lance_resolver.cpp)

build_static_extension(${TARGET_NAME} ${EXTENSION_SOURCES})
Expand Down
43 changes: 43 additions & 0 deletions src/include/lance_arrow_compat.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#pragma once

#include "duckdb/common/arrow/arrow.hpp"

#include <string>
#include <vector>

namespace duckdb {

// Compatibility shim between the Arrow types Lance emits and the subset
// DuckDB's bundled Arrow consumer supports.
//
// Some Lance datasets carry Arrow types that DuckDB cannot represent natively
// (today: FloatingPoint(HALF); future additions slot in without touching call
// sites). The helpers below "coerce" such types to the nearest DuckDB-supported
// shape at the reader boundary:
//
// * `LanceCoerceArrowSchemaForDuckDB` — in-place rewrite of the Arrow schema
// so DuckDB's type mapper accepts it. Returns the top-level column names
// whose declared type had to be coerced (empty if none). Callers should
// store this list on the catalog entry so write paths can refuse to
// silently re-encode.
//
// * `LanceCoerceArrowArrayForDuckDB` — per-batch mirror: rewrite buffers of
// the same coerced fields so values DuckDB reads match the coerced schema.
// `schema` MUST still carry the original (un-coerced) formats — call this
// BEFORE `LanceCoerceArrowSchemaForDuckDB` on the same schema.
//
// Ownership: both helpers install wrapping release callbacks so their new
// allocations are freed when the producer's release fires. Pass the ROOT
// ArrowSchema / ArrowArray; children are released transitively.

// Umbrella helpers — call these at every Lance → DuckDB boundary.
std::vector<std::string> LanceCoerceArrowSchemaForDuckDB(ArrowSchema *schema);

void LanceCoerceArrowArrayForDuckDB(const ArrowSchema *schema,
ArrowArray *array);

// Quick predicate — true when the schema contains any type that the coercion
// helpers above would rewrite.
bool LanceArrowSchemaNeedsCoercion(const ArrowSchema *schema);

} // namespace duckdb
14 changes: 14 additions & 0 deletions src/include/lance_table_entry.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,23 @@ class LanceTableEntry final : public TableCatalogEntry {
return *namespace_config;
}

// Top-level catalog columns whose declared type was coerced to a
// DuckDB-compatible shape by the Arrow-compat reader-boundary layer
// (e.g. FloatingPoint(HALF) → FloatingPoint(SINGLE)). Writers must refuse
// to operate on such columns — DuckDB would hand back values in the
// coerced type and silently widen / otherwise corrupt the on-disk storage.
bool HasCoercedColumns() const { return !coerced_column_names.empty(); }
const vector<string> &CoercedColumnNames() const {
return coerced_column_names;
}
void SetCoercedColumnNames(vector<string> names) {
coerced_column_names = std::move(names);
}

private:
string dataset_uri;
unique_ptr<LanceNamespaceTableConfig> namespace_config;
vector<string> coerced_column_names;
};

} // namespace duckdb
297 changes: 297 additions & 0 deletions src/lance_arrow_compat.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,297 @@
#include "lance_arrow_compat.hpp"

#include <algorithm>
#include <cstdint>
#include <cstdlib>
#include <cstring>
#include <new>
#include <string>
#include <vector>

namespace duckdb {

namespace {

// ---------------------------------------------------------------------------
// Per-type coercion rules. Add new rules here; call sites never change.
// ---------------------------------------------------------------------------
//
// A rule describes a single Arrow type DuckDB cannot consume. Each rule
// provides:
// * `matches(format)`: does this rule apply to the given Arrow format string?
// * `coerced_format`: static storage Arrow format the schema is rewritten
// to.
// * `convert(src, dst, n)`: widen `n` elements from the on-disk buffer into
// a DuckDB-native buffer. `dst` is a new buffer
// of `coerced_element_size * n` bytes.
// * `src_element_size`: bytes per element on disk.
// * `coerced_element_size`: bytes per element after coercion.

struct CoercionRule {
bool (*matches)(const char *format);
const char *coerced_format;
size_t src_element_size;
size_t coerced_element_size;
void (*convert)(const void *src, void *dst, int64_t count);
};

// --- Float16 → Float32 ------------------------------------------------------

// IEEE 754 half-precision (binary16) → single-precision (binary32) widening.
// Matches lance-spark's Float16Utils.halfToFloat bit-for-bit so readers across
// the two engines see identical values.
inline float HalfToFloat(uint16_t h) {
uint32_t sign = static_cast<uint32_t>(h >> 15) & 0x1u;
uint32_t exponent = static_cast<uint32_t>(h >> 10) & 0x1Fu;
uint32_t mantissa = static_cast<uint32_t>(h) & 0x3FFu;
uint32_t bits;

if (exponent == 0) {
if (mantissa == 0) {
bits = sign << 31;
} else {
int e = 1;
while ((mantissa & 0x400u) == 0) {
mantissa <<= 1;
e--;
}
mantissa &= 0x3FFu;
uint32_t float_exp = static_cast<uint32_t>(e + (127 - 15));
bits = (sign << 31) | (float_exp << 23) | (mantissa << 13);
}
} else if (exponent == 0x1Fu) {
bits = (sign << 31) | 0x7F800000u | (mantissa << 13);
} else {
uint32_t float_exp = exponent + (127u - 15u);
bits = (sign << 31) | (float_exp << 23) | (mantissa << 13);
}

float out;
std::memcpy(&out, &bits, sizeof(out));
return out;
}

bool MatchesFloat16(const char *format) {
return format != nullptr && format[0] == 'e' && format[1] == '\0';
}

void ConvertFloat16ToFloat32(const void *src, void *dst, int64_t count) {
const auto *in = static_cast<const uint16_t *>(src);
auto *out = static_cast<float *>(dst);
for (int64_t i = 0; i < count; i++) {
out[i] = HalfToFloat(in[i]);
}
}

constexpr char kFloat32FormatLiteral[] = "f";

constexpr CoercionRule kRules[] = {
{MatchesFloat16, kFloat32FormatLiteral, sizeof(uint16_t), sizeof(float),
ConvertFloat16ToFloat32},
};

const CoercionRule *FindRule(const char *format) {
for (const auto &rule : kRules) {
if (rule.matches(format)) {
return &rule;
}
}
return nullptr;
}

// ---------------------------------------------------------------------------
// Generic recursion (unchanged by the set of coercion rules).
// ---------------------------------------------------------------------------

bool SchemaNeedsCoercion(const ArrowSchema *schema) {
if (!schema) {
return false;
}
if (FindRule(schema->format)) {
return true;
}
for (int64_t i = 0; i < schema->n_children; i++) {
if (SchemaNeedsCoercion(schema->children[i])) {
return true;
}
}
return SchemaNeedsCoercion(schema->dictionary);
}

// --- Array buffer rewrite ---------------------------------------------------

struct ArrayOverride {
struct Entry {
ArrowArray *array;
const void *original_buffer;
void *new_buffer;
};
std::vector<Entry> entries;
void (*original_release)(ArrowArray *) = nullptr;
void *original_private_data = nullptr;
};

void ArrayReleaseWrapper(ArrowArray *array) {
if (!array || !array->private_data) {
return;
}
auto *state = static_cast<ArrayOverride *>(array->private_data);
for (auto &e : state->entries) {
e.array->buffers[1] = e.original_buffer;
std::free(e.new_buffer);
}
auto *original_release = state->original_release;
array->release = original_release;
array->private_data = state->original_private_data;
delete state;
if (original_release) {
original_release(array);
}
}

void CoerceArrayRecursive(const ArrowSchema *schema, ArrowArray *array,
ArrayOverride &state) {
if (!schema || !array) {
return;
}
if (const auto *rule = FindRule(schema->format)) {
if (array->n_buffers >= 2 && array->buffers != nullptr &&
array->buffers[1] != nullptr) {
int64_t total = array->length + array->offset;
if (total > 0) {
size_t out_bytes =
static_cast<size_t>(total) * rule->coerced_element_size;
void *new_buf = std::malloc(out_bytes);
if (!new_buf) {
throw std::bad_alloc();
}
rule->convert(array->buffers[1], new_buf, total);
ArrayOverride::Entry entry;
entry.array = array;
entry.original_buffer = array->buffers[1];
entry.new_buffer = new_buf;
state.entries.push_back(entry);
array->buffers[1] = new_buf;
}
}
}
int64_t n = std::min<int64_t>(schema->n_children, array->n_children);
for (int64_t i = 0; i < n; i++) {
CoerceArrayRecursive(schema->children[i], array->children[i], state);
}
if (schema->dictionary && array->dictionary) {
CoerceArrayRecursive(schema->dictionary, array->dictionary, state);
}
}

// --- Schema format rewrite --------------------------------------------------

struct SchemaOverride {
struct Entry {
ArrowSchema *schema;
const char *original_format;
};
std::vector<Entry> entries;
void (*original_release)(ArrowSchema *) = nullptr;
void *original_private_data = nullptr;
};

void SchemaReleaseWrapper(ArrowSchema *schema) {
if (!schema || !schema->private_data) {
return;
}
auto *state = static_cast<SchemaOverride *>(schema->private_data);
for (auto &e : state->entries) {
e.schema->format = e.original_format;
}
auto *original_release = state->original_release;
schema->release = original_release;
schema->private_data = state->original_private_data;
delete state;
if (original_release) {
original_release(schema);
}
}

void CoerceSchemaRecursive(ArrowSchema *schema, SchemaOverride &state) {
if (!schema) {
return;
}
if (const auto *rule = FindRule(schema->format)) {
SchemaOverride::Entry entry;
entry.schema = schema;
entry.original_format = schema->format;
state.entries.push_back(entry);
schema->format = rule->coerced_format;
}
for (int64_t i = 0; i < schema->n_children; i++) {
CoerceSchemaRecursive(schema->children[i], state);
}
CoerceSchemaRecursive(schema->dictionary, state);
}

} // namespace

bool LanceArrowSchemaNeedsCoercion(const ArrowSchema *schema) {
return SchemaNeedsCoercion(schema);
}

std::vector<std::string> LanceCoerceArrowSchemaForDuckDB(ArrowSchema *schema) {
std::vector<std::string> coerced_top_level;
if (!schema || !SchemaNeedsCoercion(schema)) {
return coerced_top_level;
}

// Capture affected top-level column names BEFORE installing the release
// wrapper, so the list reflects user-facing catalog columns regardless of
// whether the coercion lives at the top or inside a nested type.
for (int64_t i = 0; i < schema->n_children; i++) {
auto *child = schema->children[i];
if (SchemaNeedsCoercion(child) && child && child->name) {
coerced_top_level.emplace_back(child->name);
}
}

auto *state = new SchemaOverride();
state->original_release = schema->release;
state->original_private_data = schema->private_data;
try {
CoerceSchemaRecursive(schema, *state);
} catch (...) {
for (auto &e : state->entries) {
e.schema->format = e.original_format;
}
delete state;
throw;
}
schema->private_data = state;
schema->release = SchemaReleaseWrapper;
return coerced_top_level;
}

void LanceCoerceArrowArrayForDuckDB(const ArrowSchema *schema,
ArrowArray *array) {
if (!schema || !array || array->release == nullptr) {
return;
}
if (!SchemaNeedsCoercion(schema)) {
return;
}
auto *state = new ArrayOverride();
state->original_release = array->release;
state->original_private_data = array->private_data;
try {
CoerceArrayRecursive(schema, array, *state);
} catch (...) {
for (auto &e : state->entries) {
e.array->buffers[1] = e.original_buffer;
std::free(e.new_buffer);
}
delete state;
throw;
}
array->private_data = state;
array->release = ArrayReleaseWrapper;
}

} // namespace duckdb
9 changes: 9 additions & 0 deletions src/lance_insert.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,15 @@ PhysicalOperator &PlanLanceInsertAppend(ClientContext &context,
if (!lance_table) {
throw InternalException("PlanLanceInsertAppend called for non-Lance table");
}
if (lance_table->HasCoercedColumns()) {
throw NotImplementedException(
"INSERT into Lance table '" + lance_table->name +
"' is not supported: column(s) [" +
StringUtil::Join(lance_table->CoercedColumnNames(), ", ") +
"] have Arrow types DuckDB cannot represent natively, so the "
"catalog exposes a coerced type. Writing in the coerced type would "
"silently change the on-disk storage.");
}

vector<string> column_names;
vector<LogicalType> column_types;
Expand Down
Loading
Loading