Skip to content

Commit

Permalink
fix: Parquet udfs take in PgRelation (#209)
Browse files Browse the repository at this point in the history
* parquet udfs take in pgrelation

* tests

* upgrade script

* checkout origin/dev

* 0.3.3

* fix update script name

* cargo.lock

* drop function in upgrade script

* Adapt to 0.3.3

* lock

---------

Co-authored-by: Philippe Noël <[email protected]>
  • Loading branch information
rebasedming and philippemnoel authored Feb 8, 2025
1 parent 4934ae8 commit 020bf09
Show file tree
Hide file tree
Showing 3 changed files with 201 additions and 32 deletions.
41 changes: 40 additions & 1 deletion sql/pg_analytics--0.3.2--0.3.3.sql
Original file line number Diff line number Diff line change
@@ -1 +1,40 @@
\echo Use "ALTER EXTENSION pg_analytics UPDATE TO '0.3.3'" to load this file. \quit
-- src/api/parquet.rs:52
-- pg_analytics::api::parquet::parquet_describe
DROP FUNCTION "parquet_describe"(TEXT);
CREATE OR REPLACE FUNCTION "parquet_describe"(
"relation" regclass /* pgrx::rel::PgRelation */
) RETURNS TABLE (
"column_name" TEXT, /* core::option::Option<alloc::string::String> */
"column_type" TEXT, /* core::option::Option<alloc::string::String> */
"null" TEXT, /* core::option::Option<alloc::string::String> */
"key" TEXT, /* core::option::Option<alloc::string::String> */
"default" TEXT, /* core::option::Option<alloc::string::String> */
"extra" TEXT /* core::option::Option<alloc::string::String> */
)
STRICT
LANGUAGE c /* Rust */
AS 'MODULE_PATHNAME', 'parquet_describe_wrapper';
/* </end connected objects> */

-- src/api/parquet.rs:73
-- pg_analytics::api::parquet::parquet_schema
DROP FUNCTION "parquet_schema"(TEXT);
CREATE OR REPLACE FUNCTION "parquet_schema"(
"relation" regclass /* pgrx::rel::PgRelation */
) RETURNS TABLE (
"file_name" TEXT, /* core::option::Option<alloc::string::String> */
"name" TEXT, /* core::option::Option<alloc::string::String> */
"type" TEXT, /* core::option::Option<alloc::string::String> */
"type_length" TEXT, /* core::option::Option<alloc::string::String> */
"repetition_type" TEXT, /* core::option::Option<alloc::string::String> */
"num_children" bigint, /* core::option::Option<i64> */
"converted_type" TEXT, /* core::option::Option<alloc::string::String> */
"scale" bigint, /* core::option::Option<i64> */
"precision" bigint, /* core::option::Option<i64> */
"field_id" bigint, /* core::option::Option<i64> */
"logical_type" TEXT /* core::option::Option<alloc::string::String> */
)
STRICT
LANGUAGE c /* Rust */
AS 'MODULE_PATHNAME', 'parquet_schema_wrapper';
/* </end connected objects> */
117 changes: 86 additions & 31 deletions src/api/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@

use anyhow::Result;
use pgrx::*;
use supabase_wrappers::prelude::{options_to_hashmap, user_mapping_options};

use crate::duckdb::connection;
use crate::duckdb::parquet::ParquetOption;
use crate::duckdb::utils;
use crate::fdw::base::register_duckdb_view;
use crate::fdw::handler::FdwHandler;

type ParquetSchemaRow = (
Option<String>,
Expand Down Expand Up @@ -47,16 +51,19 @@ type ParquetDescribeRow = (
#[allow(clippy::type_complexity)]
#[pg_extern]
pub fn parquet_describe(
files: &str,
) -> iter::TableIterator<(
name!(column_name, Option<String>),
name!(column_type, Option<String>),
name!(null, Option<String>),
name!(key, Option<String>),
name!(default, Option<String>),
name!(extra, Option<String>),
)> {
let rows = parquet_describe_impl(files).unwrap_or_else(|e| {
relation: PgRelation,
) -> iter::TableIterator<
'static,
(
name!(column_name, Option<String>),
name!(column_type, Option<String>),
name!(null, Option<String>),
name!(key, Option<String>),
name!(default, Option<String>),
name!(extra, Option<String>),
),
> {
let rows = parquet_describe_impl(relation).unwrap_or_else(|e| {
panic!("{}", e);
});
iter::TableIterator::new(rows)
Expand All @@ -65,31 +72,57 @@ pub fn parquet_describe(
#[allow(clippy::type_complexity)]
#[pg_extern]
pub fn parquet_schema(
files: &str,
) -> iter::TableIterator<(
name!(file_name, Option<String>),
name!(name, Option<String>),
name!(type, Option<String>),
name!(type_length, Option<String>),
name!(repetition_type, Option<String>),
name!(num_children, Option<i64>),
name!(converted_type, Option<String>),
name!(scale, Option<i64>),
name!(precision, Option<i64>),
name!(field_id, Option<i64>),
name!(logical_type, Option<String>),
)> {
let rows = parquet_schema_impl(files).unwrap_or_else(|e| {
relation: PgRelation,
) -> iter::TableIterator<
'static,
(
name!(file_name, Option<String>),
name!(name, Option<String>),
name!(type, Option<String>),
name!(type_length, Option<String>),
name!(repetition_type, Option<String>),
name!(num_children, Option<i64>),
name!(converted_type, Option<String>),
name!(scale, Option<i64>),
name!(precision, Option<i64>),
name!(field_id, Option<i64>),
name!(logical_type, Option<String>),
),
> {
let rows = parquet_schema_impl(relation).unwrap_or_else(|e| {
panic!("{}", e);
});
iter::TableIterator::new(rows)
}

#[inline]
fn parquet_schema_impl(files: &str) -> Result<Vec<ParquetSchemaRow>> {
let schema_str = utils::format_csv(files);
fn parquet_schema_impl(relation: PgRelation) -> Result<Vec<ParquetSchemaRow>> {
let foreign_table = unsafe { pg_sys::GetForeignTable(relation.oid()) };
let handler = FdwHandler::from(foreign_table);
if FdwHandler::from(foreign_table) != FdwHandler::Parquet {
panic!("relation is not a parquet table");
}

let foreign_server = unsafe { pg_sys::GetForeignServer((*foreign_table).serverid) };
let user_mapping_options = unsafe { user_mapping_options(foreign_server) };
let table_options = unsafe { options_to_hashmap((*foreign_table).options)? };

register_duckdb_view(
relation.name(),
relation.namespace(),
table_options.clone(),
user_mapping_options,
handler,
)?;

let files = utils::format_csv(
table_options
.get(ParquetOption::Files.as_ref())
.expect("table should have files option"),
);

let conn = unsafe { &*connection::get_global_connection().get() };
let query = format!("SELECT * FROM parquet_schema({schema_str})");
let query = format!("SELECT * FROM parquet_schema({files})");
let mut stmt = conn.prepare(&query)?;

Ok(stmt
Expand All @@ -113,10 +146,32 @@ fn parquet_schema_impl(files: &str) -> Result<Vec<ParquetSchemaRow>> {
}

#[inline]
fn parquet_describe_impl(files: &str) -> Result<Vec<ParquetDescribeRow>> {
let schema_str = utils::format_csv(files);
fn parquet_describe_impl(relation: PgRelation) -> Result<Vec<ParquetDescribeRow>> {
let foreign_table = unsafe { pg_sys::GetForeignTable(relation.oid()) };
let handler = FdwHandler::from(foreign_table);
if FdwHandler::from(foreign_table) != FdwHandler::Parquet {
panic!("relation is not a parquet table");
}

let foreign_server = unsafe { pg_sys::GetForeignServer((*foreign_table).serverid) };
let user_mapping_options = unsafe { user_mapping_options(foreign_server) };
let table_options = unsafe { options_to_hashmap((*foreign_table).options)? };

register_duckdb_view(
relation.name(),
relation.namespace(),
table_options.clone(),
user_mapping_options,
handler,
)?;

let files = utils::format_csv(
table_options
.get(ParquetOption::Files.as_ref())
.expect("table should have files option"),
);
let conn = unsafe { &*connection::get_global_connection().get() };
let query = format!("DESCRIBE SELECT * FROM {schema_str}");
let query = format!("DESCRIBE SELECT * FROM {files}");
let mut stmt = conn.prepare(&query)?;

Ok(stmt
Expand Down
75 changes: 75 additions & 0 deletions tests/tests/parquet.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright (c) 2023-2025 Retake, Inc.
//
// This file is part of ParadeDB - Postgres for Search and Analytics
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

mod fixtures;

use crate::fixtures::db::Query;
use crate::fixtures::{conn, s3, S3};
use anyhow::Result;
use rstest::*;
use sqlx::PgConnection;

use crate::fixtures::tables::nyc_trips::NycTripsTable;

const S3_TRIPS_BUCKET: &str = "test-trip-setup";
const S3_TRIPS_KEY: &str = "test_trip_setup.parquet";

#[rstest]
async fn test_parquet_describe(#[future(awt)] s3: S3, mut conn: PgConnection) -> Result<()> {
NycTripsTable::setup().execute(&mut conn);
let rows: Vec<NycTripsTable> = "SELECT * FROM nyc_trips".fetch(&mut conn);
s3.client
.create_bucket()
.bucket(S3_TRIPS_BUCKET)
.send()
.await?;
s3.create_bucket(S3_TRIPS_BUCKET).await?;
s3.put_rows(S3_TRIPS_BUCKET, S3_TRIPS_KEY, &rows).await?;

NycTripsTable::setup_s3_listing_fdw(
&s3.url.clone(),
&format!("s3://{S3_TRIPS_BUCKET}/{S3_TRIPS_KEY}"),
)
.execute(&mut conn);

let count: (i64,) = "SELECT COUNT(*) FROM parquet_describe('trips')".fetch_one(&mut conn);
assert_eq!(count.0, 15);
Ok(())
}

#[rstest]
async fn test_parquet_schema(#[future(awt)] s3: S3, mut conn: PgConnection) -> Result<()> {
NycTripsTable::setup().execute(&mut conn);
let rows: Vec<NycTripsTable> = "SELECT * FROM nyc_trips".fetch(&mut conn);
s3.client
.create_bucket()
.bucket(S3_TRIPS_BUCKET)
.send()
.await?;
s3.create_bucket(S3_TRIPS_BUCKET).await?;
s3.put_rows(S3_TRIPS_BUCKET, S3_TRIPS_KEY, &rows).await?;

NycTripsTable::setup_s3_listing_fdw(
&s3.url.clone(),
&format!("s3://{S3_TRIPS_BUCKET}/{S3_TRIPS_KEY}"),
)
.execute(&mut conn);

let count: (i64,) = "SELECT COUNT(*) FROM parquet_schema('trips')".fetch_one(&mut conn);
assert_eq!(count.0, 16);
Ok(())
}

0 comments on commit 020bf09

Please sign in to comment.