diff --git a/.gitmodules b/.gitmodules index aac2855e..ac349e8f 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,3 +1,6 @@ [submodule "third_party/duckdb"] path = third_party/duckdb url = https://github.com/duckdb/duckdb.git +[submodule "third_party/ducklake"] + path = third_party/ducklake + url = https://github.com/duckdb/ducklake.git diff --git a/Makefile b/Makefile index c16cac76..2daeb829 100644 --- a/Makefile +++ b/Makefile @@ -5,6 +5,8 @@ EXTENSION = pg_duckdb DATA = pg_duckdb.control $(wildcard sql/pg_duckdb--*.sql) SRCS = $(wildcard src/*.cpp src/*/*.cpp) +DUCKLAKE_SRCS_DIR = third_party/ducklake/src +SRCS += $(wildcard $(DUCKLAKE_SRCS_DIR)/common/*.cpp $(DUCKLAKE_SRCS_DIR)/functions/*.cpp $(DUCKLAKE_SRCS_DIR)/storage/*.cpp $(DUCKLAKE_SRCS_DIR)/metadata_manager/*.cpp) OBJS = $(subst .cpp,.o, $(SRCS)) C_SRCS = $(wildcard src/*.c src/*/*.c) @@ -60,7 +62,7 @@ endif COMPILER_FLAGS=-Wno-sign-compare -Wshadow -Wswitch -Wunused-parameter -Wunreachable-code -Wno-unknown-pragmas -Wall -Wextra ${ERROR_ON_WARNING} -override PG_CPPFLAGS += -Iinclude -isystem third_party/duckdb/src/include -isystem third_party/duckdb/third_party/re2 -isystem $(INCLUDEDIR_SERVER) ${COMPILER_FLAGS} +override PG_CPPFLAGS += -Iinclude -I$(DUCKLAKE_SRCS_DIR)/include -isystem third_party/duckdb/src/include -isystem third_party/duckdb/third_party/re2 -isystem $(INCLUDEDIR_SERVER) ${COMPILER_FLAGS} override PG_CXXFLAGS += -std=c++17 ${DUCKDB_BUILD_CXX_FLAGS} ${COMPILER_FLAGS} -Wno-register -Weffc++ # Ignore declaration-after-statement warnings in our code. Postgres enforces # this because their ancient style guide requires it, but we don't care. It @@ -112,7 +114,7 @@ schedulecheck: duckdb: $(FULL_DUCKDB_LIB) .git/modules/third_party/duckdb/HEAD: - git submodule update --init --recursive + git submodule update --init --recursive --depth 1 $(FULL_DUCKDB_LIB): .git/modules/third_party/duckdb/HEAD third_party/pg_duckdb_extensions.cmake OVERRIDE_GIT_DESCRIBE=$(DUCKDB_VERSION) \ diff --git a/include/pgduckdb/pgduckdb_ruleutils.h b/include/pgduckdb/pgduckdb_ruleutils.h index ed45f067..bf2f68a0 100644 --- a/include/pgduckdb/pgduckdb_ruleutils.h +++ b/include/pgduckdb/pgduckdb_ruleutils.h @@ -15,8 +15,9 @@ char *pgduckdb_get_tabledef(Oid relation_id); char *pgduckdb_get_alter_tabledef(Oid relation_oid, AlterTableStmt *alter_stmt); char *pgduckdb_get_rename_tabledef(Oid relation_oid, RenameStmt *rename_stmt); bool pgduckdb_is_not_default_expr(Node *node, void *context); -List *pgduckdb_db_and_schema(const char *postgres_schema_name, bool is_duckdb_table); -const char *pgduckdb_db_and_schema_string(const char *postgres_schema_name, bool is_duckdb_table); +List *pgduckdb_db_and_schema(const char *postgres_schema_name, bool is_duckdb_table, bool is_ducklake_table); +const char *pgduckdb_db_and_schema_string(const char *postgres_schema_name, bool is_duckdb_table, + bool is_ducklake_table); bool pgduckdb_is_duckdb_row(Oid type_oid); bool pgduckdb_is_unresolved_type(Oid type_oid); bool pgduckdb_is_fake_type(Oid type_oid); diff --git a/include/pgducklake/pgducklake_catalog.hpp b/include/pgducklake/pgducklake_catalog.hpp new file mode 100644 index 00000000..8bc2acae --- /dev/null +++ b/include/pgducklake/pgducklake_catalog.hpp @@ -0,0 +1,27 @@ +#pragma once + +#include "storage/ducklake_catalog.hpp" + +#include "pgduckdb/utility/cpp_only_file.hpp" // Must be last include. + +namespace pgduckdb { + +class PgDuckLakeCatalog : public duckdb::DuckLakeCatalog { +public: + PgDuckLakeCatalog(duckdb::AttachedDatabase &db_p, duckdb::DuckLakeOptions options_p) + : duckdb::DuckLakeCatalog(db_p, std::move(options_p)) { + } + + void Initialize(duckdb::optional_ptr context, bool load_builtin) override; + + duckdb::optional_ptr LookupSchema(duckdb::CatalogTransaction transaction, + const duckdb::EntryLookupInfo &schema_lookup, + duckdb::OnEntryNotFound if_not_found) override; + +private: + std::mutex schemas_lock; + //! Map of schema index -> schema + std::unordered_map> schemas; +}; + +} // namespace pgduckdb diff --git a/include/pgducklake/pgducklake_handler.hpp b/include/pgducklake/pgducklake_handler.hpp new file mode 100644 index 00000000..9a075e27 --- /dev/null +++ b/include/pgducklake/pgducklake_handler.hpp @@ -0,0 +1,10 @@ +#pragma once + +#include "pgduckdb/pg/declarations.hpp" + +namespace pgduckdb { + +bool IsDuckLakeTable(Relation relation); +bool IsDuckLakeTable(Oid oid); + +} // namespace pgduckdb diff --git a/include/pgducklake/pgducklake_metadata_manager.hpp b/include/pgducklake/pgducklake_metadata_manager.hpp new file mode 100644 index 00000000..6c759293 --- /dev/null +++ b/include/pgducklake/pgducklake_metadata_manager.hpp @@ -0,0 +1,46 @@ +#pragma once + +#include "pgduckdb/pg/declarations.hpp" + +#include "storage/ducklake_metadata_info.hpp" +#include "storage/ducklake_metadata_manager.hpp" + +namespace pgduckdb { + +class PgDuckLakeMetadataManager : public duckdb::DuckLakeMetadataManager { +public: + PgDuckLakeMetadataManager(duckdb::DuckLakeTransaction &transaction); + + duckdb::DuckLakeCatalogInfo GetCatalogForSnapshot(duckdb::DuckLakeSnapshot snapshot) override; + duckdb::vector GetGlobalTableStats(duckdb::DuckLakeSnapshot snapshot) override; + duckdb::vector GetFilesForTable(duckdb::DuckLakeTableEntry &table_entry, + duckdb::DuckLakeSnapshot snapshot, + const duckdb::string &filter) override; + duckdb::unique_ptr GetSnapshot() override; + duckdb::unique_ptr GetSnapshot(duckdb::BoundAtClause &at_clause) override; + + void WriteNewSchemas(duckdb::DuckLakeSnapshot commit_snapshot, + const duckdb::vector &new_schemas); + void WriteNewTables(duckdb::DuckLakeSnapshot commit_snapshot, + const duckdb::vector &new_tables) override; + + void InsertSnapshot(duckdb::DuckLakeSnapshot commit_snapshot) override; + void WriteSnapshotChanges(duckdb::DuckLakeSnapshot commit_snapshot, + const duckdb::SnapshotChangeInfo &change_info) override; + void WriteNewDataFiles(duckdb::DuckLakeSnapshot commit_snapshot, + const duckdb::vector &new_files) override; + void UpdateGlobalTableStats(const duckdb::DuckLakeGlobalStatsInfo &stats) override; + + bool GetDuckLakeTableInfo(const duckdb::DuckLakeSnapshot &snapshot, duckdb::DuckLakeSchemaEntry &schema, + duckdb::DuckLakeTableInfo &table_info); + +private: + void WriteNewSchema(duckdb::DuckLakeSnapshot commit_snapshot, const duckdb::DuckLakeSchemaInfo &schema_info); + void WriteNewTable(duckdb::DuckLakeSnapshot commit_snapshot, const duckdb::DuckLakeTableInfo &table_info); + int GetDuckLakeSchemas(const duckdb::DuckLakeSnapshot &snapshot, duckdb::DuckLakeCatalogInfo &catalog_info); + int GetDuckLakeTables(const duckdb::DuckLakeSnapshot &snapshot, duckdb::DuckLakeCatalogInfo &catalog); + + Snapshot snapshot; +}; + +} // namespace pgduckdb diff --git a/include/pgducklake/pgducklake_schema_entry.hpp b/include/pgducklake/pgducklake_schema_entry.hpp new file mode 100644 index 00000000..65b3feb1 --- /dev/null +++ b/include/pgducklake/pgducklake_schema_entry.hpp @@ -0,0 +1,22 @@ +#pragma once + +#include "storage/ducklake_schema_entry.hpp" + +namespace pgduckdb { + +class PgDuckLakeSchemaEntry : public duckdb::DuckLakeSchemaEntry { +public: + PgDuckLakeSchemaEntry(duckdb::Catalog &catalog, duckdb::CreateSchemaInfo &info, duckdb::SchemaIndex schema_id, + duckdb::string schema_uuid, duckdb::string data_path); + + duckdb::optional_ptr LookupEntry(duckdb::CatalogTransaction transaction, + const duckdb::EntryLookupInfo &lookup_info) override; + +private: + duckdb::optional_ptr LoadTableEntry(duckdb::CatalogTransaction transaction, + const duckdb::EntryLookupInfo &lookup_info); + + duckdb::DuckLakeCatalogSet tables; +}; + +} // namespace pgduckdb diff --git a/include/pgducklake/pgducklake_storage.hpp b/include/pgducklake/pgducklake_storage.hpp new file mode 100644 index 00000000..47007be9 --- /dev/null +++ b/include/pgducklake/pgducklake_storage.hpp @@ -0,0 +1,14 @@ +#pragma once + +#include "duckdb/storage/storage_extension.hpp" + +#include "pgduckdb/utility/cpp_only_file.hpp" // Must be last include. + +namespace pgduckdb { + +class PgDuckLakeStorageExtension : public duckdb::StorageExtension { +public: + PgDuckLakeStorageExtension(); +}; + +} // namespace pgduckdb diff --git a/include/pgducklake/pgducklake_table.hpp b/include/pgducklake/pgducklake_table.hpp new file mode 100644 index 00000000..c9adf80f --- /dev/null +++ b/include/pgducklake/pgducklake_table.hpp @@ -0,0 +1,12 @@ +#pragma once + +#include "pgduckdb/pg/declarations.hpp" + +namespace pgduckdb { + +class PgDuckLakeTable { +public: + static void CreateTable(Relation rel); +}; + +} // namespace pgduckdb diff --git a/include/pgducklake/pgducklake_transaction.hpp b/include/pgducklake/pgducklake_transaction.hpp new file mode 100644 index 00000000..71b9d194 --- /dev/null +++ b/include/pgducklake/pgducklake_transaction.hpp @@ -0,0 +1,17 @@ +#pragma once + +#include "storage/ducklake_transaction.hpp" + +namespace pgduckdb { + +class PgDuckLakeTransaction : public duckdb::DuckLakeTransaction, + public std::enable_shared_from_this { +public: + PgDuckLakeTransaction(duckdb::DuckLakeCatalog &ducklake_catalog, duckdb::TransactionManager &manager, + duckdb::ClientContext &context); + ~PgDuckLakeTransaction() override {}; + + void Start() override; +}; + +} // namespace pgduckdb diff --git a/include/pgducklake/pgducklake_transaction_manager.hpp b/include/pgducklake/pgducklake_transaction_manager.hpp new file mode 100644 index 00000000..8c4fb6ff --- /dev/null +++ b/include/pgducklake/pgducklake_transaction_manager.hpp @@ -0,0 +1,23 @@ +#pragma once + +#include "storage/ducklake_transaction_manager.hpp" + +#include "pgducklake/pgducklake_transaction.hpp" + +namespace pgduckdb { + +class PgDuckLakeTransactionManager : public duckdb::DuckLakeTransactionManager { +public: + PgDuckLakeTransactionManager(duckdb::AttachedDatabase &db_p, duckdb::DuckLakeCatalog &ducklake_catalog); + + duckdb::Transaction &StartTransaction(duckdb::ClientContext &context) override; + duckdb::ErrorData CommitTransaction(duckdb::ClientContext &context, duckdb::Transaction &transaction) override; + void RollbackTransaction(duckdb::Transaction &transaction) override; + +private: + duckdb::DuckLakeCatalog &ducklake_catalog; + std::mutex transaction_lock; + duckdb::reference_map_t> transactions; +}; + +} // namespace pgduckdb diff --git a/sql/pg_duckdb--0.3.0--1.0.0.sql b/sql/pg_duckdb--0.3.0--1.0.0.sql index 6ea60029..53932331 100644 --- a/sql/pg_duckdb--0.3.0--1.0.0.sql +++ b/sql/pg_duckdb--0.3.0--1.0.0.sql @@ -771,3 +771,202 @@ LANGUAGE C AS 'MODULE_PATHNAME', 'pgduckdb_create_azure_secret'; ALTER TABLE duckdb.extensions ADD COLUMN repository TEXT NOT NULL DEFAULT 'core'; ALTER TABLE duckdb.extensions RENAME COLUMN enabled TO autoload; ALTER TABLE duckdb.extensions ALTER COLUMN autoload SET NOT NULL; + +CREATE TABLE duckdb.ducklake_metadata(key VARCHAR NOT NULL, value VARCHAR NOT NULL, scope VARCHAR, scope_id BIGINT); +INSERT INTO duckdb.ducklake_metadata VALUES + ('version', '0.1'), + ('created_by', 'pg_ducklake'), + ('data_path', '/tmp/ducklake'), + ('encrypted', 'false'); + +CREATE TABLE duckdb.ducklake_snapshot( + snapshot_id BIGINT PRIMARY KEY, + snapshot_time TIMESTAMPTZ, + schema_version BIGINT, + next_catalog_id BIGINT, + next_file_id BIGINT +); +INSERT INTO duckdb.ducklake_snapshot VALUES (0, NOW(), 0, 1, 0); + +CREATE TABLE duckdb.ducklake_snapshot_changes( + snapshot_id BIGINT PRIMARY KEY, + changes_made VARCHAR +); +INSERT INTO duckdb.ducklake_snapshot_changes VALUES (0, 'created_schema:"main"'); + +CREATE TABLE duckdb.ducklake_schema( + schema_id BIGINT PRIMARY KEY, + schema_uuid UUID, + begin_snapshot BIGINT, + end_snapshot BIGINT, + schema_name VARCHAR, + path VARCHAR, + path_is_relative BOOLEAN +); +INSERT INTO duckdb.ducklake_schema VALUES (0, gen_random_uuid(), 0, NULL, 'main', 'main/', true); + +CREATE TABLE duckdb.ducklake_table( + table_id BIGINT, + table_uuid UUID, + begin_snapshot BIGINT, + end_snapshot BIGINT, + schema_id BIGINT, + table_name VARCHAR, + path VARCHAR, + path_is_relative BOOLEAN +); + +CREATE TABLE duckdb.ducklake_view( + view_id BIGINT, + view_uuid UUID, + begin_snapshot BIGINT, + end_snapshot BIGINT, + schema_id BIGINT, + view_name VARCHAR, + dialect VARCHAR, + sql VARCHAR, + column_aliases VARCHAR +); + +CREATE TABLE duckdb.ducklake_tag( + object_id BIGINT, + begin_snapshot BIGINT, + end_snapshot BIGINT, + key VARCHAR, + value VARCHAR +); + +CREATE TABLE duckdb.ducklake_column_tag( + table_id BIGINT, + column_id BIGINT, + begin_snapshot BIGINT, + end_snapshot BIGINT, + key VARCHAR, + value VARCHAR +); + +CREATE TABLE duckdb.ducklake_data_file( + data_file_id BIGINT PRIMARY KEY, + table_id BIGINT, + begin_snapshot BIGINT, + end_snapshot BIGINT, + file_order BIGINT, + path VARCHAR, + path_is_relative BOOLEAN, + file_format VARCHAR, + record_count BIGINT, + file_size_bytes BIGINT, + footer_size BIGINT, + row_id_start BIGINT, + partition_id BIGINT, + encryption_key VARCHAR, + partial_file_info VARCHAR, + mapping_id BIGINT +); + +CREATE TABLE duckdb.ducklake_file_column_statistics( + data_file_id BIGINT, + table_id BIGINT, + column_id BIGINT, + column_size_bytes BIGINT, + value_count BIGINT, + null_count BIGINT, + min_value VARCHAR, + max_value VARCHAR, + contains_nan BOOLEAN +); + +CREATE TABLE duckdb.ducklake_delete_file( + delete_file_id BIGINT PRIMARY KEY, + table_id BIGINT, + begin_snapshot BIGINT, + end_snapshot BIGINT, + data_file_id BIGINT, + path VARCHAR, + path_is_relative BOOLEAN, + format VARCHAR, + delete_count BIGINT, + file_size_bytes BIGINT, + footer_size BIGINT, + encryption_key VARCHAR +); + +CREATE TABLE duckdb.ducklake_column( + column_id BIGINT, + begin_snapshot BIGINT, + end_snapshot BIGINT, + table_id BIGINT, + column_order BIGINT, + column_name VARCHAR, + column_type VARCHAR, + initial_default VARCHAR, + default_value VARCHAR, + nulls_allowed BOOLEAN, + parent_column BIGINT +); + +CREATE TABLE duckdb.ducklake_table_stats( + table_id BIGINT, + record_count BIGINT, + next_row_id BIGINT, + file_size_bytes BIGINT +); + +CREATE TABLE duckdb.ducklake_table_column_stats( + table_id BIGINT, + column_id BIGINT, + contains_null BOOLEAN, + contains_nan BOOLEAN, + min_value VARCHAR, + max_value VARCHAR +); + +CREATE TABLE duckdb.ducklake_partition_info( + partition_id BIGINT, + table_id BIGINT, + begin_snapshot BIGINT, + end_snapshot BIGINT +); + +CREATE TABLE duckdb.ducklake_partition_column( + partition_id BIGINT, + table_id BIGINT, + partition_key_index BIGINT, + column_id BIGINT, + transform VARCHAR +); + +CREATE TABLE duckdb.ducklake_file_partition_value( + data_file_id BIGINT PRIMARY KEY, + table_id BIGINT, + partition_key_index BIGINT, + partition_value VARCHAR +); + +CREATE TABLE duckdb.ducklake_files_scheduled_for_deletion( + data_file_id BIGINT, + path VARCHAR, + path_is_relative BOOLEAN, + schedule_start TIMESTAMPTZ +); + +CREATE TABLE duckdb.ducklake_inlined_data_tables( + table_id BIGINT, + table_name VARCHAR, + schema_version BIGINT +); + +CREATE TABLE duckdb.ducklake_column_mapping(mapping_id BIGINT, table_id BIGINT, type VARCHAR); + +CREATE TABLE duckdb.ducklake_name_mapping( + mapping_id BIGINT, + column_id BIGINT, + source_name VARCHAR, + target_field_id BIGINT, + parent_column BIGINT +); + +CREATE FUNCTION ducklake_handler(internal) RETURNS table_am_handler + AS 'MODULE_PATHNAME' LANGUAGE C STRICT; + +CREATE ACCESS METHOD ducklake TYPE TABLE HANDLER ducklake_handler; diff --git a/src/pgduckdb_ddl.cpp b/src/pgduckdb_ddl.cpp index 00dae7d0..7c50526f 100644 --- a/src/pgduckdb_ddl.cpp +++ b/src/pgduckdb_ddl.cpp @@ -703,7 +703,7 @@ DECLARE_PG_FUNCTION(duckdb_create_table_trigger) { Oid saved_userid; int sec_context; const char *postgres_schema_name = get_namespace_name_or_temp(get_rel_namespace(relid)); - const char *duckdb_db = (const char *)linitial(pgduckdb_db_and_schema(postgres_schema_name, true)); + const char *duckdb_db = (const char *)linitial(pgduckdb_db_and_schema(postgres_schema_name, true, false)); auto default_db = pgduckdb::DuckDBManager::Get().GetDefaultDBName(); Oid arg_types[] = {OIDOID, TEXTOID, TEXTOID, TEXTOID}; @@ -995,7 +995,7 @@ DECLARE_PG_FUNCTION(duckdb_drop_trigger) { char *postgres_schema_name = SPI_getvalue(tuple, SPI_tuptable->tupdesc, 1); char *table_name = SPI_getvalue(tuple, SPI_tuptable->tupdesc, 2); char *drop_query = - psprintf("DROP TABLE IF EXISTS %s.%s", pgduckdb_db_and_schema_string(postgres_schema_name, true), + psprintf("DROP TABLE IF EXISTS %s.%s", pgduckdb_db_and_schema_string(postgres_schema_name, true, false), quote_identifier(table_name)); pgduckdb::DuckDBQueryOrThrow(*connection, drop_query); diff --git a/src/pgduckdb_duckdb.cpp b/src/pgduckdb_duckdb.cpp index 1b7efd90..d2073de0 100644 --- a/src/pgduckdb_duckdb.cpp +++ b/src/pgduckdb_duckdb.cpp @@ -23,6 +23,8 @@ #include "pgduckdb/pgduckdb_xact.hpp" #include "pgduckdb/scan/postgres_scan.hpp" +#include "pgducklake/pgducklake_storage.hpp" + #include "pgduckdb/utility/cpp_wrapper.hpp" #include "pgduckdb/vendor/pg_list.hpp" @@ -167,8 +169,10 @@ DuckDBManager::Initialize() { auto &dbconfig = duckdb::DBConfig::GetConfig(*database->instance); dbconfig.storage_extensions["pgduckdb"] = duckdb::make_uniq(); + dbconfig.storage_extensions["pgducklake"] = duckdb::make_uniq(); duckdb::ExtensionInstallInfo extension_install_info; database->instance->SetExtensionLoaded("pgduckdb", extension_install_info); + database->instance->SetExtensionLoaded("pgducklake", extension_install_info); connection = duckdb::make_uniq(*database); @@ -180,6 +184,7 @@ DuckDBManager::Initialize() { pgduckdb::DuckDBQueryOrThrow(context, "SET default_collation =" + duckdb::KeywordHelper::WriteQuoted(duckdb_default_collation)); pgduckdb::DuckDBQueryOrThrow(context, "ATTACH DATABASE 'pgduckdb' (TYPE pgduckdb)"); + pgduckdb::DuckDBQueryOrThrow(context, "ATTACH DATABASE 'pgducklake' (TYPE pgducklake)"); pgduckdb::DuckDBQueryOrThrow(context, "ATTACH DATABASE ':memory:' AS pg_temp;"); if (pgduckdb::IsMotherDuckEnabled()) { diff --git a/src/pgduckdb_hooks.cpp b/src/pgduckdb_hooks.cpp index 3ca21e78..4f369f14 100644 --- a/src/pgduckdb_hooks.cpp +++ b/src/pgduckdb_hooks.cpp @@ -35,6 +35,8 @@ extern "C" { #include "pgduckdb/pgduckdb_node.hpp" #include "pgduckdb/utility/cpp_wrapper.hpp" +#include "pgducklake/pgducklake_handler.hpp" + static planner_hook_type prev_planner_hook = NULL; static ExecutorStart_hook_type prev_executor_start_hook = NULL; static ExecutorFinish_hook_type prev_executor_finish_hook = NULL; @@ -85,6 +87,16 @@ ContainsDuckdbTables(List *rte_list) { return false; } +static bool +ContainsDuckLakeTables(List *rte_list) { + foreach_node(RangeTblEntry, rte, rte_list) { + if (pgduckdb::IsDuckLakeTable(rte->relid)) { + return true; + } + } + return false; +} + static bool ContainsDuckdbItems(Node *node, void *context) { if (node == NULL) @@ -92,7 +104,7 @@ ContainsDuckdbItems(Node *node, void *context) { if (IsA(node, Query)) { Query *query = (Query *)node; - if (ContainsDuckdbTables(query->rtable)) { + if (ContainsDuckdbTables(query->rtable) || ContainsDuckLakeTables(query->rtable)) { return true; } #if PG_VERSION_NUM >= 160000 @@ -198,7 +210,9 @@ IsAllowedStatement(Query *query, bool throw_error) { if (query->commandType != CMD_SELECT) { if (query->rtable != NULL) { RangeTblEntry *resultRte = list_nth_node(RangeTblEntry, query->rtable, query->resultRelation - 1); - if (!::IsDuckdbTable(resultRte->relid)) { + bool is_ducklake_table = pgduckdb::IsDuckLakeTable(resultRte->relid); + + if (!::IsDuckdbTable(resultRte->relid) && !is_ducklake_table) { elog(elevel, "DuckDB does not support modififying Postgres tables"); return false; } diff --git a/src/pgduckdb_node.cpp b/src/pgduckdb_node.cpp index 2eb0103d..3617d6c0 100644 --- a/src/pgduckdb_node.cpp +++ b/src/pgduckdb_node.cpp @@ -35,6 +35,7 @@ typedef struct DuckdbScanState { CustomScanState css; /* must be first field */ const CustomScan *custom_scan; const Query *query; + EState *estate; ParamListInfo params; duckdb::Connection *duckdb_connection; duckdb::PreparedStatement *prepared_statement; @@ -145,6 +146,7 @@ Duckdb_BeginCustomScan_Cpp(CustomScanState *cscanstate, EState *estate, int /*ef duckdb_scan_state->duckdb_connection = pgduckdb::DuckDBManager::GetConnection(); duckdb_scan_state->prepared_statement = prepared_query.release(); + duckdb_scan_state->estate = estate; duckdb_scan_state->params = estate->es_param_list_info; duckdb_scan_state->is_executed = false; duckdb_scan_state->fetch_next = true; @@ -271,6 +273,11 @@ Duckdb_ExecCustomScan_Cpp(CustomScanState *node) { } } + if (duckdb_scan_state->query_results->properties.return_type == duckdb::StatementReturnType::CHANGED_ROWS) { + duckdb_scan_state->estate->es_processed = + duckdb_scan_state->current_data_chunk->GetValue(0, 0).GetValue(); + } + MemoryContextReset(duckdb_scan_state->css.ss.ps.ps_ExprContext->ecxt_per_tuple_memory); ExecClearTuple(slot); diff --git a/src/pgduckdb_ruleutils.cpp b/src/pgduckdb_ruleutils.cpp index 51316cbc..4aa8b474 100644 --- a/src/pgduckdb_ruleutils.cpp +++ b/src/pgduckdb_ruleutils.cpp @@ -42,6 +42,8 @@ extern "C" { #include "pgduckdb/pgduckdb_metadata_cache.hpp" #include "pgduckdb/pgduckdb_userdata_cache.hpp" +#include "pgducklake/pgducklake_handler.hpp" + extern "C" { bool outermost_query = true; @@ -442,7 +444,11 @@ pgduckdb_write_row_refname(StringInfo buf, char *refname, bool is_top_level) { * are not escaped yet. */ List * -pgduckdb_db_and_schema(const char *postgres_schema_name, bool is_duckdb_table) { +pgduckdb_db_and_schema(const char *postgres_schema_name, bool is_duckdb_table, bool is_ducklake_table) { + if (is_ducklake_table) { + return list_make2((void *)"pgducklake", (void *)postgres_schema_name); + } + if (!is_duckdb_table) { return list_make2((void *)"pgduckdb", (void *)postgres_schema_name); } @@ -508,8 +514,8 @@ pgduckdb_db_and_schema(const char *postgres_schema_name, bool is_duckdb_table) { * database are quoted if necessary. */ const char * -pgduckdb_db_and_schema_string(const char *postgres_schema_name, bool is_duckdb_table) { - List *db_and_schema = pgduckdb_db_and_schema(postgres_schema_name, is_duckdb_table); +pgduckdb_db_and_schema_string(const char *postgres_schema_name, bool is_duckdb_table, bool is_ducklake_table) { + List *db_and_schema = pgduckdb_db_and_schema(postgres_schema_name, is_duckdb_table, is_ducklake_table); const char *db_name = (const char *)linitial(db_and_schema); const char *schema_name = (const char *)lsecond(db_and_schema); return psprintf("%s.%s", quote_identifier(db_name), quote_identifier(schema_name)); @@ -529,8 +535,9 @@ pgduckdb_relation_name(Oid relation_oid) { const char *relname = NameStr(relation->relname); const char *postgres_schema_name = get_namespace_name_or_temp(relation->relnamespace); bool is_duckdb_table = pgduckdb::IsDuckdbTable(relation); + bool is_ducklake_table = pgduckdb::IsDuckLakeTable(relation_oid); - const char *db_and_schema = pgduckdb_db_and_schema_string(postgres_schema_name, is_duckdb_table); + const char *db_and_schema = pgduckdb_db_and_schema_string(postgres_schema_name, is_duckdb_table, is_ducklake_table); char *result = psprintf("%s.%s", db_and_schema, quote_identifier(relname)); @@ -581,7 +588,8 @@ pgduckdb_get_tabledef(Oid relation_oid) { Relation relation = relation_open(relation_oid, AccessShareLock); const char *relation_name = pgduckdb_relation_name(relation_oid); const char *postgres_schema_name = get_namespace_name_or_temp(relation->rd_rel->relnamespace); - const char *db_and_schema = pgduckdb_db_and_schema_string(postgres_schema_name, pgduckdb::IsDuckdbTable(relation)); + const char *db_and_schema = pgduckdb_db_and_schema_string(postgres_schema_name, pgduckdb::IsDuckdbTable(relation), + pgduckdb::IsDuckLakeTable(relation)); StringInfoData buffer; initStringInfo(&buffer); @@ -813,7 +821,8 @@ pgduckdb_get_rename_tabledef(Oid relation_oid, RenameStmt *rename_stmt) { Assert(pgduckdb::IsDuckdbTable(relation)); const char *postgres_schema_name = get_namespace_name_or_temp(relation->rd_rel->relnamespace); - const char *db_and_schema = pgduckdb_db_and_schema_string(postgres_schema_name, true); + const char *db_and_schema = + pgduckdb_db_and_schema_string(postgres_schema_name, true, pgduckdb::IsDuckLakeTable(relation_oid)); const char *old_table_name = psprintf("%s.%s", db_and_schema, quote_identifier(rename_stmt->relation->relname)); StringInfoData buffer; diff --git a/src/pgducklake/pgducklake_catalog.cpp b/src/pgducklake/pgducklake_catalog.cpp new file mode 100644 index 00000000..71a11a29 --- /dev/null +++ b/src/pgducklake/pgducklake_catalog.cpp @@ -0,0 +1,52 @@ +#include "pgducklake/pgducklake_catalog.hpp" + +#include "pgducklake/pgducklake_schema_entry.hpp" +#include "pgducklake/pgducklake_transaction.hpp" + +#include "duckdb/parser/parsed_data/create_schema_info.hpp" + +namespace pgduckdb { + +void +PgDuckLakeCatalog::Initialize(duckdb::optional_ptr /*context*/, bool /*load_builtin*/) { + // do nothing, extension already created metadata tables for us. +} + +duckdb::optional_ptr +PgDuckLakeCatalog::LookupSchema(duckdb::CatalogTransaction transaction, const duckdb::EntryLookupInfo &schema_lookup, + duckdb::OnEntryNotFound if_not_found) { + return DuckLakeCatalog::LookupSchema(transaction, schema_lookup, if_not_found); + // Lazy metadata loading. Commented out as it requires upstream ducklake changes. +#if 0 + auto &schema_name = schema_lookup.GetEntryName(); + auto at_clause = schema_lookup.GetAtClause(); + auto result = DuckLakeCatalog::LookupSchema(transaction, schema_lookup, if_not_found); + if (!result) { + return nullptr; + } + + // TODO handle transaction local schema + + // wrap the schema entry in a PgDuckLakeSchemaEntry + auto &duck_transaction = transaction.transaction->Cast(); + auto snapshot = duck_transaction.GetSnapshot(at_clause); + std::lock_guard guard(schemas_lock); + if (schemas.find(snapshot.schema_version) == schemas.end()) { + schemas.insert( + duckdb::make_pair(snapshot.schema_version, std::move(duckdb::make_uniq()))); + } + auto &entry = *schemas.find(snapshot.schema_version)->second; + + duckdb::DuckLakeSchemaEntry &ducklake_schema_entry = result->Cast(); + auto info = result->GetInfo(); + duckdb::CreateSchemaInfo schema_info; + schema_info.schema = info->schema; + auto schema_entry = duckdb::make_uniq( + ducklake_schema_entry.ParentCatalog(), schema_info, ducklake_schema_entry.GetSchemaId(), + ducklake_schema_entry.GetSchemaUUID(), ducklake_schema_entry.DataPath()); + entry.CreateEntry(std::move(schema_entry)); + return entry.GetEntry(schema_name); +#endif +} + +} // namespace pgduckdb diff --git a/src/pgducklake/pgducklake_handler.cpp b/src/pgducklake/pgducklake_handler.cpp new file mode 100644 index 00000000..93eb0a17 --- /dev/null +++ b/src/pgducklake/pgducklake_handler.cpp @@ -0,0 +1,384 @@ +#include "pgduckdb/pgduckdb_types.hpp" + +extern "C" { +#include "postgres.h" + +#include "access/tableam.h" +#include "fmgr.h" +#include "utils/syscache.h" +} + +#include "pgduckdb/utility/cpp_wrapper.hpp" +#include "pgducklake/pgducklake_table.hpp" + +struct DuckLakeScanDescData { + TableScanDescData rs_base; +}; +using DuckLakeScanDesc = DuckLakeScanDescData *; + +const TupleTableSlotOps * +ducklake_slot_callbacks(Relation /* rel */) { + return &TTSOpsMinimalTuple; +} + +TableScanDesc +ducklake_scan_begin(Relation rel, Snapshot snapshot, int nkeys, struct ScanKeyData *key, ParallelTableScanDesc pscan, + uint32 flags) { + DuckLakeScanDesc scan = static_cast(palloc(sizeof(DuckLakeScanDescData))); + scan->rs_base.rs_rd = rel; + scan->rs_base.rs_snapshot = snapshot; + scan->rs_base.rs_nkeys = nkeys; + scan->rs_base.rs_key = key; + scan->rs_base.rs_flags = flags; + scan->rs_base.rs_parallel = pscan; + return reinterpret_cast(scan); +} + +void +ducklake_scan_end(TableScanDesc scan) { + DuckLakeScanDesc cscan = reinterpret_cast(scan); + pfree(cscan); +} + +void +ducklake_scan_rescan(TableScanDesc /* scan */, struct ScanKeyData * /* key */, bool /* set_params */, + bool /* allow_strat */, bool /* allow_sync */, bool /* allow_pagemode */) { + elog(ERROR, "ducklake_scan_rescan not implemented"); +} + +bool +ducklake_scan_getnextslot(TableScanDesc /* scan */, ScanDirection /* direction */, TupleTableSlot * /* slot */) { + elog(ERROR, "ducklake_scan_getnextslot not implemented"); +} + +Size +ducklake_parallelscan_estimate(Relation /* rel */) { + elog(ERROR, "ducklake_parallelscan_estimate not implemented"); +} + +Size +ducklake_parallelscan_initialize(Relation /* rel */, ParallelTableScanDesc /* pscan */) { + elog(ERROR, "ducklake_parallelscan_initialize not implemented"); +} + +void +ducklake_parallelscan_reinitialize(Relation /* rel */, ParallelTableScanDesc /* pscan */) { + elog(ERROR, "ducklake_parallelscan_reinitialize not implemented"); +} + +struct IndexFetchTableData * +ducklake_index_fetch_begin(Relation /* rel */) { + elog(ERROR, "ducklake_index_fetch_begin not implemented"); +} + +void +ducklake_index_fetch_reset(struct IndexFetchTableData * /* data */) { + elog(ERROR, "ducklake_index_fetch_reset not implemented"); +} + +void +ducklake_index_fetch_end(struct IndexFetchTableData * /* data */) { + elog(ERROR, "ducklake_index_fetch_end not implemented"); +} + +bool +ducklake_index_fetch_tuple(struct IndexFetchTableData * /* scan */, ItemPointer /* tid */, Snapshot /* snapshot */, + TupleTableSlot * /* slot */, bool * /* call_again */, bool * /* all_dead */) { + elog(ERROR, "ducklake_index_fetch_tuple not implemented"); +} + +bool +ducklake_tuple_fetch_row_version(Relation /* rel */, ItemPointer /* tid */, Snapshot /* snapshot */, + TupleTableSlot * /* slot */) { + elog(ERROR, "ducklake_tuple_fetch_row_version not implemented"); +} + +bool +ducklake_tuple_tid_valid(TableScanDesc /* scan */, ItemPointer /* tid */) { + elog(ERROR, "ducklake_tuple_tid_valid not implemented"); +} + +void +ducklake_tuple_get_latest_tid(TableScanDesc /* scan */, ItemPointer /* tid */) { + elog(ERROR, "ducklake_tuple_get_latest_tid not implemented"); +} + +bool +ducklake_tuple_satisfies_snapshot(Relation /* rel */, TupleTableSlot * /* slot */, Snapshot /* snapshot */) { + elog(ERROR, "ducklake_tuple_satisfies_snapshot not implemented"); +} + +TransactionId +ducklake_index_delete_tuples(Relation /* rel */, TM_IndexDeleteOp * /* delstate */) { + elog(ERROR, "ducklake_index_delete_tuples not implemented"); +} + +void +ducklake_tuple_insert(Relation /* rel */, TupleTableSlot * /* slot */, CommandId /* cid */, int /* options */, + struct BulkInsertStateData * /* bistate */) { + elog(ERROR, "ducklake_tuple_insert not implemented"); +} + +void +ducklake_tuple_insert_speculative(Relation /* rel */, TupleTableSlot * /* slot */, CommandId /* cid */, + int /* options */, struct BulkInsertStateData * /* bistate */, + uint32 /* specToken */) { + elog(ERROR, "ducklake_tuple_insert_speculative not implemented"); +} + +void +ducklake_tuple_complete_speculative(Relation /* rel */, TupleTableSlot * /* slot */, uint32 /* specToken */, + bool /* succeeded */) { + elog(ERROR, "ducklake_tuple_complete_speculative not implemented"); +} + +void +ducklake_multi_insert(Relation /* rel */, TupleTableSlot ** /* slots */, int /* nslots */, CommandId /* cid */, + int /* options */, struct BulkInsertStateData * /* bistate */) { + elog(ERROR, "ducklake_multi_insert not implemented"); +} + +TM_Result +ducklake_tuple_delete(Relation /* rel */, ItemPointer /* tid */, CommandId /* cid */, Snapshot /* snapshot */, + Snapshot /* crosscheck */, bool /* wait */, TM_FailureData * /* tmfd */, + bool /* changingPart */) { + elog(ERROR, "ducklake_tuple_delete not implemented"); +} + +#if PG_VERSION_NUM >= 160000 +TM_Result +ducklake_tuple_update(Relation /* rel */, ItemPointer /* otid */, TupleTableSlot * /* slot */, CommandId /* cid */, + Snapshot /* snapshot */, Snapshot /* crosscheck */, bool /* wait */, TM_FailureData * /* tmfd */, + LockTupleMode * /* lockmode */, TU_UpdateIndexes * /* update_indexes */) { +#else +TM_Result +ducklake_tuple_update(Relation /* rel */, ItemPointer /* otid */, TupleTableSlot * /* slot */, CommandId /* cid */, + Snapshot /* snapshot */, Snapshot /* crosscheck */, bool /* wait */, TM_FailureData * /* tmfd */, + LockTupleMode * /* lockmode */, bool * /* update_indexes */) { +#endif + elog(ERROR, "ducklake_tuple_update not implemented"); +} + +TM_Result +ducklake_tuple_lock(Relation /* rel */, ItemPointer /* tid */, Snapshot /* snapshot */, TupleTableSlot * /* slot */, + CommandId /* cid */, LockTupleMode /* mode */, LockWaitPolicy /* wait_policy */, uint8 /* flags */, + TM_FailureData * /* tmfd */) { + elog(ERROR, "ducklake_tuple_lock not implemented"); +} + +#if PG_VERSION_NUM >= 160000 +void +ducklake_relation_set_new_filelocator(Relation rel, const RelFileLocator * /* newrlocator */, char /* persistence */, + TransactionId * /* freezeXid */, MultiXactId * /* minmulti */) { +#else +void +ducklake_relation_set_new_filenode(Relation rel, const RelFileNode * /* newrnode */, char /* persistence */, + TransactionId * /* freezeXid */, MultiXactId * /* minmulti */) { +#endif + HeapTuple tp = SearchSysCache1(RELOID, ObjectIdGetDatum(rel->rd_id)); + if (!HeapTupleIsValid(tp)) { + TupleDesc desc = RelationGetDescr(rel); + for (int i = 0; i < desc->natts; i++) { + Form_pg_attribute attr = &desc->attrs[i]; + auto duck_type = pgduckdb::ConvertPostgresToDuckColumnType(attr); + if (duck_type.id() == duckdb::LogicalTypeId::USER) { + elog(ERROR, "column \"%s\" has unsupported type", NameStr(attr->attname)); + } + if (attr->attgenerated) { + elog(ERROR, "unsupported generated column \"%s\"", NameStr(attr->attname)); + } + } + + InvokeCPPFunc(pgduckdb::PgDuckLakeTable::CreateTable, rel); + } else { + ReleaseSysCache(tp); + elog(ERROR, "ducklake_relation_set_new_filenode not implemented"); + } +} + +void +ducklake_relation_nontransactional_truncate(Relation /* rel */) { + elog(ERROR, "ducklake_relation_nontransactional_truncate not implemented"); +} + +#if PG_VERSION_NUM >= 160000 +void +ducklake_relation_copy_data(Relation /* rel */, const RelFileLocator * /* newrlocator */) { +#else +void +ducklake_relation_copy_data(Relation /* rel */, const RelFileNode * /* newrnode */) { +#endif + elog(ERROR, "ducklake_relation_copy_data not implemented"); +} + +void +ducklake_relation_copy_for_cluster(Relation /* OldTable */, Relation /* NewTable */, Relation /* OldIndex */, + bool /* use_sort */, TransactionId /* OldestXmin */, + TransactionId * /* xid_cutoff */, MultiXactId * /* multi_cutoff */, + double * /* num_tuples */, double * /* tups_vacuumed */, + double * /* tups_recently_dead */) { + elog(ERROR, "ducklake_relation_copy_for_cluster not implemented"); +} + +void +ducklake_relation_vacuum(Relation /* rel */, struct VacuumParams * /* params */, BufferAccessStrategy /* bstrategy */) { + elog(ERROR, "ducklake_relation_vacuum not implemented"); +} + +#if PG_VERSION_NUM >= 170000 +bool +ducklake_scan_analyze_next_block(TableScanDesc /* scan */, ReadStream * /* stream */) { +#else +bool +ducklake_scan_analyze_next_block(TableScanDesc /* scan */, BlockNumber /* blockno */, + BufferAccessStrategy /* bstrategy */) { +#endif + return false; +} + +bool +ducklake_scan_analyze_next_tuple(TableScanDesc /* scan */, TransactionId /* OldestXmin */, double * /* liverows */, + double * /* deadrows */, TupleTableSlot * /* slot */) { + elog(ERROR, "ducklake_scan_analyze_next_tuple not implemented"); +} + +static double +ducklake_index_build_range_scan(Relation /* table_rel */, Relation /* index_rel */, struct IndexInfo * /* index_info */, + bool /* allow_sync */, bool /* anyvisible */, bool /* progress */, + BlockNumber /* start_blockno */, BlockNumber /* numblocks */, + IndexBuildCallback /* callback */, void * /* callback_state */, + TableScanDesc /* scan */) { + elog(ERROR, "ducklake_index_build_range_scan not implemented"); +} + +static void +ducklake_index_validate_scan(Relation /* table_rel */, Relation /* index_rel */, struct IndexInfo * /* index_info */, + Snapshot /* snapshot */, struct ValidateIndexState * /* state */) { + elog(ERROR, "ducklake_index_validate_scan not implemented"); +} + +static uint64 +ducklake_relation_size(Relation /* rel */, ForkNumber /* forkNumber */) { + return 0; +} + +static bool +ducklake_relation_needs_toast_table(Relation /* rel */) { + return false; +} + +static void +ducklake_relation_estimate_size(Relation /* rel */, int32 *attr_widths, BlockNumber *pages, double *tuples, + double *allvisfrac) { + /* no data available */ + if (attr_widths) + *attr_widths = 0; + if (pages) + *pages = 0; + if (tuples) + *tuples = 0; + if (allvisfrac) + *allvisfrac = 0; +} + +bool +ducklake_scan_sample_next_block(TableScanDesc /* scan */, struct SampleScanState * /* scanstate */) { + elog(ERROR, "ducklake_scan_sample_next_block not implemented"); +} + +bool +ducklake_scan_sample_next_tuple(TableScanDesc /* scan */, struct SampleScanState * /* scanstate */, + TupleTableSlot * /* slot */) { + elog(ERROR, "ducklake_scan_sample_next_tuple not implemented"); +} + +const TableAmRoutine ducklake_routine = {T_TableAmRoutine, + + ducklake_slot_callbacks, + + ducklake_scan_begin, + ducklake_scan_end, + ducklake_scan_rescan, + ducklake_scan_getnextslot, + + NULL /*scan_set_tidrange*/, + NULL /*scan_getnextslot_tidrange*/, + + ducklake_parallelscan_estimate, + ducklake_parallelscan_initialize, + ducklake_parallelscan_reinitialize, + + ducklake_index_fetch_begin, + ducklake_index_fetch_reset, + ducklake_index_fetch_end, + ducklake_index_fetch_tuple, + + ducklake_tuple_fetch_row_version, + ducklake_tuple_tid_valid, + ducklake_tuple_get_latest_tid, + ducklake_tuple_satisfies_snapshot, + ducklake_index_delete_tuples, + + ducklake_tuple_insert, + ducklake_tuple_insert_speculative, + ducklake_tuple_complete_speculative, + ducklake_multi_insert, + ducklake_tuple_delete, + ducklake_tuple_update, + ducklake_tuple_lock, + NULL /*finish_bulk_insert*/, + +#if PG_VERSION_NUM >= 160000 + ducklake_relation_set_new_filelocator, +#else + ducklake_relation_set_new_filenode, +#endif + ducklake_relation_nontransactional_truncate, + ducklake_relation_copy_data, + ducklake_relation_copy_for_cluster, + ducklake_relation_vacuum, + ducklake_scan_analyze_next_block, + ducklake_scan_analyze_next_tuple, + ducklake_index_build_range_scan, + ducklake_index_validate_scan, + + ducklake_relation_size, + ducklake_relation_needs_toast_table, + NULL /*relation_toast_am*/, + NULL /*relation_fetch_toast_slice*/, + + ducklake_relation_estimate_size, + + NULL /*scan_bitmap_next_block*/, + NULL /*scan_bitmap_next_tuple*/, + ducklake_scan_sample_next_block, + ducklake_scan_sample_next_tuple}; + +extern "C" { +PG_FUNCTION_INFO_V1(ducklake_handler); +Datum +ducklake_handler(PG_FUNCTION_ARGS) { + PG_RETURN_POINTER(&ducklake_routine); +} +} + +namespace pgduckdb { + +bool +IsDuckLakeTable(Relation rel) { + return rel->rd_tableam == &ducklake_routine; +} + +bool +IsDuckLakeTable(Oid oid) { + if (oid == InvalidOid) { + return false; + } + + Relation rel = RelationIdGetRelation(oid); + bool result = IsDuckLakeTable(rel); + RelationClose(rel); + return result; +} + +} // namespace pgduckdb diff --git a/src/pgducklake/pgducklake_metadata_manager.cpp b/src/pgducklake/pgducklake_metadata_manager.cpp new file mode 100644 index 00000000..a091bfbc --- /dev/null +++ b/src/pgducklake/pgducklake_metadata_manager.cpp @@ -0,0 +1,576 @@ +#include "pgducklake/pgducklake_metadata_manager.hpp" + +#include "pgduckdb/logger.hpp" + +#include "storage/ducklake_table_entry.hpp" +#include "storage/ducklake_transaction.hpp" +#include "storage/ducklake_catalog.hpp" +#include "storage/ducklake_schema_entry.hpp" + +extern "C" { +#include "postgres.h" + +#include "access/genam.h" +#include "access/htup_details.h" +#include "access/table.h" +#include "catalog/namespace.h" +#include "catalog/indexing.h" +#include "utils/builtins.h" +#include "utils/fmgroids.h" +#include "utils/lsyscache.h" +#include "utils/rel.h" +} + +namespace pgduckdb { + +namespace { + +Oid +DuckDbNamespace(bool missing_ok = true) { + return get_namespace_oid("duckdb", missing_ok); +} + +Oid +DuckLakeSnapshotOid() { + return get_relname_relid("ducklake_snapshot", DuckDbNamespace()); +} + +Oid +DuckLakeSchemaOid() { + return get_relname_relid("ducklake_schema", DuckDbNamespace()); +} + +Oid +DuckLakeSnapshotIdOid() { + return get_relname_relid("ducklake_snapshot_pkey", DuckDbNamespace()); +} + +Oid +DuckLakeSnapshotChangesOid() { + return get_relname_relid("ducklake_snapshot_changes", DuckDbNamespace()); +} + +Oid +DuckLakeTableOid() { + return get_relname_relid("ducklake_table", DuckDbNamespace()); +} + +Oid +DuckLakeColumnOid() { + return get_relname_relid("ducklake_column", DuckDbNamespace()); +} + +Oid +DuckLakeDataFileOid() { + return get_relname_relid("ducklake_data_file", DuckDbNamespace()); +} +#if 0 +Oid +DuckLakeDeleteFileOid() { + return get_relname_relid("ducklake_delete_file", DuckDbNamespace()); +} +#endif +} // namespace + +PgDuckLakeMetadataManager::PgDuckLakeMetadataManager(duckdb::DuckLakeTransaction &transaction) + : duckdb::DuckLakeMetadataManager(transaction) { +} + +int +PgDuckLakeMetadataManager::GetDuckLakeSchemas(const duckdb::DuckLakeSnapshot &snapshot, + duckdb::DuckLakeCatalogInfo &catalog_info) { + ::Relation table = table_open(DuckLakeSchemaOid(), AccessShareLock); + TupleDesc desc = RelationGetDescr(table); + ScanKeyData key[1]; + ScanKeyInit(&key[0], 3 /*begin_snapshot*/, BTLessEqualStrategyNumber, F_INT8LE, + Int64GetDatum(snapshot.snapshot_id)); + SysScanDesc scan = systable_beginscan(table, InvalidOid, false, NULL, 1, key); + int count = 0; + + HeapTuple tuple; + while (HeapTupleIsValid(tuple = systable_getnext(scan))) { + bool isnull; + duckdb::DuckLakeSchemaInfo schema; + int64_t end_snapshot = DatumGetInt64(heap_getattr(tuple, 4 /*end_snapshot*/, desc, &isnull)); + if (isnull || snapshot.snapshot_id < end_snapshot) { + schema.id = duckdb::SchemaIndex(DatumGetInt64(heap_getattr(tuple, 1 /*schema_id*/, desc, &isnull))); + Datum uuid = heap_getattr(tuple, 2 /*schema_uuid*/, desc, &isnull); + schema.uuid = DatumGetCString(DirectFunctionCall1(uuid_out, uuid)); + schema.name = TextDatumGetCString(heap_getattr(tuple, 5 /*schema_name*/, desc, &isnull)); + Datum path = heap_getattr(tuple, 6 /*path*/, desc, &isnull); + if (isnull) { + schema.path = transaction.GetCatalog().DataPath(); + } else { + duckdb::DuckLakePath path_info; + path_info.path = TextDatumGetCString(path); + path_info.path_is_relative = DatumGetBool(heap_getattr(tuple, 7 /*path_is_relative*/, desc, &isnull)); + D_ASSERT(!isnull); + schema.path = FromRelativePath(path_info); + } + catalog_info.schemas.push_back(std::move(schema)); + count++; + } + } + systable_endscan(scan); + table_close(table, AccessShareLock); + return count; +} + +int +PgDuckLakeMetadataManager::GetDuckLakeTables(const duckdb::DuckLakeSnapshot &snapshot, + duckdb::DuckLakeCatalogInfo &catalog) { + ::Relation table = table_open(DuckLakeTableOid(), AccessShareLock); + TupleDesc desc = RelationGetDescr(table); + ScanKeyData key[1]; + ScanKeyInit(&key[0], 3 /*begin_snapshot*/, BTLessEqualStrategyNumber, F_INT8LE, + Int64GetDatum(snapshot.snapshot_id)); + SysScanDesc scan = systable_beginscan(table, InvalidOid, false, NULL, 1, key); + int count = 0; + + duckdb::map schema_map; + for (idx_t i = 0; i < catalog.schemas.size(); i++) { + schema_map[catalog.schemas[i].id] = i; + } + + HeapTuple tuple; + while (HeapTupleIsValid(tuple = systable_getnext(scan))) { + bool isnull; + duckdb::DuckLakeTableInfo table_info; + int64_t end_snapshot = DatumGetInt64(heap_getattr(tuple, 4 /*end_snapshot*/, desc, &isnull)); + if (isnull || snapshot.snapshot_id < end_snapshot) { + table_info.id = duckdb::TableIndex(DatumGetInt64(heap_getattr(tuple, 1 /*table_id*/, desc, &isnull))); + table_info.schema_id = + duckdb::SchemaIndex(DatumGetInt64(heap_getattr(tuple, 5 /*schema_id*/, desc, &isnull))); + table_info.uuid = + DatumGetCString(DirectFunctionCall1(uuid_out, heap_getattr(tuple, 2 /*table_uuid*/, desc, &isnull))); + table_info.name = TextDatumGetCString(heap_getattr(tuple, 6 /*table_name*/, desc, &isnull)); + + auto schema_entry = schema_map.find(table_info.schema_id); + D_ASSERT(schema_entry != schema_map.end()); + auto &schema = catalog.schemas[schema_entry->second]; + Datum path = heap_getattr(tuple, 7 /*path*/, desc, &isnull); + if (isnull) { + table_info.path = transaction.GetCatalog().DataPath(); + } else { + duckdb::DuckLakePath path_info; + path_info.path = TextDatumGetCString(path); + path_info.path_is_relative = DatumGetBool(heap_getattr(tuple, 8 /*path_is_relative*/, desc, &isnull)); + D_ASSERT(!isnull); + table_info.path = FromRelativePath(path_info, schema.path); + } + catalog.tables.push_back(std::move(table_info)); + count++; + } + } + + systable_endscan(scan); + table_close(table, AccessShareLock); + return count; +} + +static int +GetDuckLakeTableDetails(const duckdb::DuckLakeSnapshot &snapshot, duckdb::DuckLakeTableInfo &table_info) { + ::Relation table = table_open(DuckLakeColumnOid(), AccessShareLock); + TupleDesc desc = RelationGetDescr(table); + ScanKeyData key[1]; + ScanKeyInit(&key[0], 4 /*table_id*/, BTGreaterEqualStrategyNumber, F_INT8EQ, Int64GetDatum(table_info.id.index)); + SysScanDesc scan = systable_beginscan(table, InvalidOid, false, NULL, 1, key); + + HeapTuple tuple; + std::map column_map; + while (HeapTupleIsValid(tuple = systable_getnext(scan))) { + bool isnull, isnulls[3]; + int64_t column_id = DatumGetInt64(heap_getattr(tuple, 1 /*column_id*/, desc, &isnulls[0])); + int64_t begin_snapshot = DatumGetInt64(heap_getattr(tuple, 2 /*begin_snapshot*/, desc, &isnulls[1])); + int64_t end_snapshot = DatumGetInt64(heap_getattr(tuple, 3 /*end_snapshot*/, desc, &isnulls[2])); + if (isnulls[0] || + (snapshot.snapshot_id >= begin_snapshot && (isnulls[2] || snapshot.snapshot_id < end_snapshot))) { + duckdb::DuckLakeColumnInfo column_info; + column_info.id = duckdb::FieldIndex(column_id); + int64_t column_order = DatumGetInt64(heap_getattr(tuple, 5 /*column_order*/, desc, &isnull)); + column_info.name = TextDatumGetCString(heap_getattr(tuple, 6 /*column_name*/, desc, &isnull)); + column_info.type = TextDatumGetCString(heap_getattr(tuple, 7 /*column_type*/, desc, &isnull)); + Datum initial_default = heap_getattr(tuple, 8 /*initial_default*/, desc, &isnull); + if (!isnull) { + column_info.initial_default = duckdb::Value(TextDatumGetCString(initial_default)); + } + Datum default_val = heap_getattr(tuple, 9 /*default_value*/, desc, &isnull); + if (!isnull) { + column_info.default_value = duckdb::Value(TextDatumGetCString(default_val)); + } + column_info.nulls_allowed = DatumGetBool(heap_getattr(tuple, 10 /*nulls_allowed*/, desc, &isnull)); + // TODO: tags + // TODO: parent_columns + // table_info.columns.push_back(std::move(column_info)); + column_map[column_order] = std::move(column_info); + } + } + + for (auto &column : column_map) { + table_info.columns.push_back(std::move(column.second)); + } + + systable_endscan(scan); + table_close(table, AccessShareLock); + return column_map.size(); +} + +bool +PgDuckLakeMetadataManager::GetDuckLakeTableInfo(const duckdb::DuckLakeSnapshot &snapshot, + duckdb::DuckLakeSchemaEntry &schema_entry, + duckdb::DuckLakeTableInfo &table_info) { + ::Relation table = table_open(DuckLakeTableOid(), AccessShareLock); + TupleDesc desc = RelationGetDescr(table); + ScanKeyData key[3]; + ScanKeyInit(&key[0], 3 /*begin_snapshot*/, BTLessEqualStrategyNumber, F_INT8LE, + Int64GetDatum(snapshot.snapshot_id)); + ScanKeyInit(&key[1], 5 /*schema_id*/, BTLessEqualStrategyNumber, F_INT8EQ, + Int64GetDatum(schema_entry.GetSchemaId().index)); + ScanKeyInit(&key[2], 6 /*table_name*/, BTLessEqualStrategyNumber, F_TEXTEQ, + CStringGetTextDatum(table_info.name.c_str())); + SysScanDesc scan = systable_beginscan(table, InvalidOid, false, NULL, 3, key); + int count = 0; + + HeapTuple tuple; + while (HeapTupleIsValid(tuple = systable_getnext(scan))) { + bool isnull; + int64_t end_snapshot = DatumGetInt64(heap_getattr(tuple, 4 /*end_snapshot*/, desc, &isnull)); + if (isnull || snapshot.snapshot_id < end_snapshot) { + table_info.id = duckdb::TableIndex(DatumGetInt64(heap_getattr(tuple, 1 /*table_id*/, desc, &isnull))); + table_info.schema_id = + duckdb::SchemaIndex(DatumGetInt64(heap_getattr(tuple, 5 /*schema_id*/, desc, &isnull))); + table_info.uuid = + DatumGetCString(DirectFunctionCall1(uuid_out, heap_getattr(tuple, 2 /*table_uuid*/, desc, &isnull))); + Datum path = heap_getattr(tuple, 7 /*path*/, desc, &isnull); + if (isnull) { + table_info.path = schema_entry.DataPath(); + } else { + duckdb::DuckLakePath path_info; + path_info.path = TextDatumGetCString(path); + path_info.path_is_relative = DatumGetBool(heap_getattr(tuple, 8 /*path_is_relative*/, desc, &isnull)); + D_ASSERT(!isnull); + table_info.path = FromRelativePath(path_info, schema_entry.DataPath()); + } + ++count; + } + } + + if (count > 1) { + throw std::runtime_error("Multiple tables found for schema " + + std::to_string(schema_entry.GetSchemaId().index) + " and table " + table_info.name); + } + + if (count > 0) { + GetDuckLakeTableDetails(snapshot, table_info); + } + + systable_endscan(scan); + table_close(table, AccessShareLock); + return count > 0; +} + +duckdb::DuckLakeCatalogInfo +PgDuckLakeMetadataManager::GetCatalogForSnapshot(duckdb::DuckLakeSnapshot ducklake_snapshot) { + duckdb::DuckLakeCatalogInfo catalog_info; + + // --- 1. Schemas --- + int nschemas = GetDuckLakeSchemas(ducklake_snapshot, catalog_info); + pd_log(DEBUG2, "Read %d schemas", nschemas); + + // --- 2. Tables --- + int ntables = GetDuckLakeTables(ducklake_snapshot, catalog_info); + pd_log(DEBUG2, "Read %d tables", ntables); + for (auto &table : catalog_info.tables) { + int ncolumns = GetDuckLakeTableDetails(ducklake_snapshot, table); + pd_log(DEBUG2, "Read %d columns for table %s", ncolumns, table.name.c_str()); + } + + // TODO: Views, Partitions (repeat similar pattern as above) + + return catalog_info; +} + +duckdb::unique_ptr +PgDuckLakeMetadataManager::GetSnapshot() { + ::Relation table = table_open(DuckLakeSnapshotOid(), AccessShareLock); + ::Relation index = index_open(DuckLakeSnapshotIdOid(), AccessShareLock); + TupleDesc desc = RelationGetDescr(table); + + ScanKeyData key[1]; + ScanKeyInit(&key[0], 1 /*snapshot_id*/, BTGreaterEqualStrategyNumber, F_INT8GE, Int64GetDatum(0)); + SysScanDesc scan = systable_beginscan_ordered(table, index, NULL, 1, key); + + HeapTuple tuple; + bool isnulls[4]; + duckdb::unique_ptr ret = nullptr; + + // Get the first tuple (max snapshot_id due to BackwardScanDirection) + if (HeapTupleIsValid(tuple = systable_getnext_ordered(scan, BackwardScanDirection))) { + auto snapshot_id = DatumGetInt64(heap_getattr(tuple, 1, desc, &isnulls[0])); + auto schema_version = DatumGetInt64(heap_getattr(tuple, 3, desc, &isnulls[1])); + auto next_catalog_id = DatumGetInt64(heap_getattr(tuple, 4, desc, &isnulls[2])); + auto next_file_id = DatumGetInt64(heap_getattr(tuple, 5, desc, &isnulls[3])); + + D_ASSERT(!isnulls[0] && !isnulls[1] && !isnulls[2] && !isnulls[3]); + ret = duckdb::make_uniq(snapshot_id, schema_version, next_catalog_id, next_file_id); + } + + systable_endscan_ordered(scan); + table_close(table, AccessShareLock); + index_close(index, AccessShareLock); + return ret; +} + +duckdb::unique_ptr +PgDuckLakeMetadataManager::GetSnapshot(duckdb::BoundAtClause & /* at_clause */) { + // TODO + return nullptr; +} + +duckdb::vector +PgDuckLakeMetadataManager::GetFilesForTable(duckdb::DuckLakeTableEntry &table_entry, + duckdb::DuckLakeSnapshot ducklake_snapshot, + const duckdb::string & /*filter*/) { + duckdb::vector files; + ::Relation table = table_open(DuckLakeDataFileOid(), AccessShareLock); + TupleDesc desc = RelationGetDescr(table); + ScanKeyData key[2]; + ScanKeyInit(&key[0], 2 /*table_id*/, BTGreaterEqualStrategyNumber, F_INT8EQ, + Int64GetDatum(table_entry.GetTableId().index)); + ScanKeyInit(&key[1], 3 /*begin_snapshot*/, BTLessEqualStrategyNumber, F_INT8LE, + Int64GetDatum(ducklake_snapshot.snapshot_id)); + SysScanDesc scan = systable_beginscan(table, InvalidOid, false, NULL, 1, key); + HeapTuple tuple; + + while (HeapTupleIsValid(tuple = systable_getnext(scan))) { + bool isnull; + int64_t end_snapshot = DatumGetInt64(heap_getattr(tuple, 4 /*end_snapshot*/, desc, &isnull)); + if (isnull || ducklake_snapshot.snapshot_id < end_snapshot) { + duckdb::DuckLakeFileListEntry file_entry; + duckdb::DuckLakePath path; + path.path = TextDatumGetCString(heap_getattr(tuple, 6 /*path*/, desc, &isnull)); + path.path_is_relative = DatumGetBool(heap_getattr(tuple, 7 /*path_is_relative*/, desc, &isnull)); + file_entry.file.path = FromRelativePath(path); + file_entry.file.file_size_bytes = DatumGetInt64(heap_getattr(tuple, 10 /*file_size_bytes*/, desc, &isnull)); + file_entry.file.footer_size = DatumGetInt64(heap_getattr(tuple, 11 /*footer_size*/, desc, &isnull)); + file_entry.row_id_start = DatumGetInt64(heap_getattr(tuple, 12 /*row_id_start*/, desc, &isnull)); + file_entry.snapshot_id = DatumGetInt64(heap_getattr(tuple, 3 /*begin_snapshot*/, desc, &isnull)); + Datum partial_file_info = heap_getattr(tuple, 13 /*partial_file_info*/, desc, &isnull); + if (!isnull) { + (void)partial_file_info; + // TODO handle partial file info + } + + // TODO handle delete files + files.push_back(std::move(file_entry)); + } + } + + systable_endscan(scan); + table_close(table, AccessShareLock); + return files; +} + +duckdb::vector +PgDuckLakeMetadataManager::GetGlobalTableStats(duckdb::DuckLakeSnapshot /* snapshot */) { + duckdb::vector global_stats; + + // TODO: Implement + + return global_stats; +} + +void +PgDuckLakeMetadataManager::WriteNewSchema(duckdb::DuckLakeSnapshot commit_snapshot, + const duckdb::DuckLakeSchemaInfo &schema_info) { + ::Relation table = table_open(DuckLakeSchemaOid(), RowExclusiveLock); + TupleDesc desc = RelationGetDescr(table); + + Datum values[7]; + bool nulls[7]; + memset(nulls, 0, sizeof(nulls)); + values[0] = Int64GetDatum(schema_info.id.index); + values[1] = DirectFunctionCall1(uuid_in, CStringGetDatum(schema_info.uuid.c_str())); + values[2] = Int64GetDatum(commit_snapshot.snapshot_id); + nulls[3] = true; + values[4] = CStringGetTextDatum(schema_info.name.c_str()); + auto path = GetRelativePath(schema_info.path); + values[5] = CStringGetTextDatum(path.path.c_str()); + values[6] = BoolGetDatum(path.path_is_relative); + + HeapTuple tuple = heap_form_tuple(desc, values, nulls); + CatalogTupleInsert(table, tuple); + table_close(table, NoLock); +} + +void +PgDuckLakeMetadataManager::WriteNewSchemas(duckdb::DuckLakeSnapshot commit_snapshot, + const duckdb::vector &new_schemas) { + for (auto &schema : new_schemas) { + WriteNewSchema(commit_snapshot, schema); + } +} + +void +PgDuckLakeMetadataManager::WriteNewTable(duckdb::DuckLakeSnapshot commit_snapshot, + const duckdb::DuckLakeTableInfo &table_info) { + ::Relation table = table_open(DuckLakeTableOid(), RowExclusiveLock); + TupleDesc desc = RelationGetDescr(table); + + Datum values[8]; + bool nulls[8]; + memset(nulls, 0, sizeof(nulls)); + values[0] = Int64GetDatum(table_info.id.index); + values[1] = DirectFunctionCall1(uuid_in, CStringGetDatum(table_info.uuid.c_str())); + values[2] = Int64GetDatum(commit_snapshot.snapshot_id); + nulls[3] = true; + values[4] = Int64GetDatum(table_info.schema_id.index); + values[5] = CStringGetTextDatum(table_info.name.c_str()); + auto path = GetRelativePath(table_info.schema_id, table_info.path); + values[6] = CStringGetTextDatum(path.path.c_str()); + values[7] = BoolGetDatum(path.path_is_relative); + + HeapTuple tuple = heap_form_tuple(desc, values, nulls); + CatalogTupleInsert(table, tuple); + table_close(table, NoLock); + + // Write table columns + ::Relation col_table = table_open(DuckLakeColumnOid(), RowExclusiveLock); + TupleDesc col_desc = RelationGetDescr(col_table); + for (auto &column : table_info.columns) { + Datum col_values[11]; + bool col_nulls[11]; + memset(col_nulls, 0, sizeof(col_nulls)); + + col_values[0] = Int64GetDatum(column.id.index); // column_id + col_values[1] = Int64GetDatum(commit_snapshot.snapshot_id); // begin_snapshot + col_nulls[2] = true; // end_snapshot + col_values[3] = Int64GetDatum(table_info.id.index); // table_id + col_values[4] = Int64GetDatum(column.id.index); // column_order + col_values[5] = CStringGetTextDatum(column.name.c_str()); // column_name + col_values[6] = CStringGetTextDatum(column.type.c_str()); // column_type + col_nulls[7] = true; // initial_default + col_nulls[8] = true; // default_value + col_values[9] = BoolGetDatum(column.nulls_allowed); // nulls_allowed + col_nulls[10] = true; // parent_column + + HeapTuple col_tuple = heap_form_tuple(col_desc, col_values, col_nulls); + CatalogTupleInsert(col_table, col_tuple); + } + + table_close(col_table, NoLock); +} + +void +PgDuckLakeMetadataManager::WriteNewTables(duckdb::DuckLakeSnapshot commit_snapshot, + const duckdb::vector &new_tables) { + for (auto &table : new_tables) { + WriteNewTable(commit_snapshot, table); + } +} + +void +PgDuckLakeMetadataManager::InsertSnapshot(duckdb::DuckLakeSnapshot commit_snapshot) { + ::Relation table = table_open(DuckLakeSnapshotOid(), RowExclusiveLock); + TupleDesc desc = RelationGetDescr(table); + + Datum values[5]; + bool nulls[5]; + memset(nulls, 0, sizeof(nulls)); + values[0] = Int64GetDatum(commit_snapshot.snapshot_id); + values[1] = TimestampTzGetDatum(GetCurrentTimestamp()); + values[2] = Int64GetDatum(commit_snapshot.schema_version); + values[3] = Int64GetDatum(commit_snapshot.next_catalog_id); + values[4] = Int64GetDatum(commit_snapshot.next_file_id); + + HeapTuple tuple = heap_form_tuple(desc, values, nulls); + CatalogTupleInsert(table, tuple); + table_close(table, NoLock); +} + +void +PgDuckLakeMetadataManager::WriteSnapshotChanges(duckdb::DuckLakeSnapshot commit_snapshot, + const duckdb::SnapshotChangeInfo &change_info) { + ::Relation table = table_open(DuckLakeSnapshotChangesOid(), RowExclusiveLock); + TupleDesc desc = RelationGetDescr(table); + + Datum values[2]; + bool nulls[2]; + memset(nulls, 0, sizeof(nulls)); + values[0] = Int64GetDatum(commit_snapshot.snapshot_id); + if (change_info.changes_made.empty()) { + nulls[1] = true; + } else { + values[1] = CStringGetTextDatum(change_info.changes_made.c_str()); + } + + HeapTuple tuple = heap_form_tuple(desc, values, nulls); + CatalogTupleInsert(table, tuple); + table_close(table, NoLock); +} + +void +PgDuckLakeMetadataManager::WriteNewDataFiles(duckdb::DuckLakeSnapshot commit_snapshot, + const duckdb::vector &new_files) { + if (new_files.empty()) { + return; + } + + ::Relation table = table_open(DuckLakeDataFileOid(), RowExclusiveLock); + TupleDesc desc = RelationGetDescr(table); + + for (auto &file : new_files) { + Datum values[15]; + bool nulls[15]; + memset(nulls, 0, sizeof(nulls)); + values[0] = Int64GetDatum(file.id.index); + values[1] = Int64GetDatum(file.table_id.index); + values[2] = + Int64GetDatum(file.begin_snapshot.IsValid() ? file.begin_snapshot.GetIndex() : commit_snapshot.snapshot_id); + nulls[3] = true; + nulls[4] = true; + auto path = GetRelativePath(file.file_name); + values[5] = CStringGetTextDatum(path.path.c_str()); + values[6] = BoolGetDatum(path.path_is_relative); + values[7] = CStringGetTextDatum("parquet"); + values[8] = Int64GetDatum(file.row_count); + values[9] = Int64GetDatum(file.file_size_bytes); + if (file.footer_size.IsValid()) { + values[10] = Int64GetDatum(file.footer_size.GetIndex()); + } else { + nulls[10] = true; + } + if (file.row_id_start.IsValid()) { + values[11] = Int64GetDatum(file.row_id_start.GetIndex()); + } else { + nulls[11] = true; + } + if (file.partition_id.IsValid()) { + values[12] = Int64GetDatum(file.partition_id.GetIndex()); + } else { + nulls[12] = true; + } + if (file.encryption_key.empty()) { + nulls[13] = true; + } else { + // TODO handle encryption key + } + if (file.partial_file_info.empty()) { + nulls[14] = true; + } else { + // TODO handle partial file info + } + + HeapTuple tuple = heap_form_tuple(desc, values, nulls); + CatalogTupleInsert(table, tuple); + } + + table_close(table, NoLock); +} + +void +PgDuckLakeMetadataManager::UpdateGlobalTableStats(const duckdb::DuckLakeGlobalStatsInfo & /* stats */) { + // TODO: Implement +} + +} // namespace pgduckdb diff --git a/src/pgducklake/pgducklake_schema_entry.cpp b/src/pgducklake/pgducklake_schema_entry.cpp new file mode 100644 index 00000000..de4fef16 --- /dev/null +++ b/src/pgducklake/pgducklake_schema_entry.cpp @@ -0,0 +1,61 @@ +#include "pgducklake/pgducklake_schema_entry.hpp" + +#include "storage/ducklake_field_data.hpp" +#include "storage/ducklake_metadata_info.hpp" +#include "storage/ducklake_table_entry.hpp" +#include "storage/ducklake_transaction.hpp" + +#include "pgducklake/pgducklake_metadata_manager.hpp" +#include "pgducklake/pgducklake_catalog.hpp" + +#include "duckdb/parser/parsed_data/create_table_info.hpp" + +namespace pgduckdb { + +PgDuckLakeSchemaEntry::PgDuckLakeSchemaEntry(duckdb::Catalog &catalog, duckdb::CreateSchemaInfo &info, + duckdb::SchemaIndex schema_id, duckdb::string schema_uuid, + duckdb::string data_path) + : DuckLakeSchemaEntry(catalog, info, schema_id, schema_uuid, data_path) { +} + +duckdb::optional_ptr +PgDuckLakeSchemaEntry::LookupEntry(duckdb::CatalogTransaction transaction, const duckdb::EntryLookupInfo &lookup_info) { + if (!duckdb::DuckLakeSchemaEntry::CatalogTypeIsSupported(lookup_info.GetCatalogType())) { + throw duckdb::NotImplementedException("Unsupported catalog type: %s", lookup_info.GetCatalogType()); + } + + auto entry = tables.GetEntry(lookup_info.GetEntryName()); + if (entry) { + return entry; + } + + if (lookup_info.GetCatalogType() == duckdb::CatalogType::TABLE_ENTRY) { + return LoadTableEntry(transaction, lookup_info); + } + + return nullptr; +} + +duckdb::optional_ptr +PgDuckLakeSchemaEntry::LoadTableEntry(duckdb::CatalogTransaction transaction, + const duckdb::EntryLookupInfo &lookup_info) { + duckdb::DuckLakeTableInfo table_info; + auto &txn = transaction.transaction->Cast(); + auto &metadata_manager = reinterpret_cast(txn.GetMetadataManager()); + table_info.name = lookup_info.GetEntryName(); + + bool found = metadata_manager.GetDuckLakeTableInfo(txn.GetSnapshot(), *this, table_info); + if (!found) { + return nullptr; + } + // Lazy metadata loading. Commented out as it requires upstream ducklake changes. +#if 0 + auto table_entry = + txn.GetCatalog().Cast().TableEntryFromTableInfo(transaction, *this, table_info); + tables.AddEntry(*this, table_info.id, std::move(table_entry)); + return tables.GetEntryById(table_info.id); +#endif + return nullptr; +} + +} // namespace pgduckdb diff --git a/src/pgducklake/pgducklake_storage.cpp b/src/pgducklake/pgducklake_storage.cpp new file mode 100644 index 00000000..6f76f374 --- /dev/null +++ b/src/pgducklake/pgducklake_storage.cpp @@ -0,0 +1,33 @@ +#include "pgducklake/pgducklake_storage.hpp" + +#include "pgducklake/pgducklake_catalog.hpp" +#include "pgducklake/pgducklake_transaction_manager.hpp" + +namespace pgduckdb { + +static duckdb::unique_ptr +DuckLakeAttach(duckdb::StorageExtensionInfo * /* storage_info */, duckdb::ClientContext & /* context */, + duckdb::AttachedDatabase &db, const duckdb::string & /* name */, duckdb::AttachInfo &info, + duckdb::AccessMode access_mode) { + duckdb::DuckLakeOptions options; + options.metadata_path = info.path; + options.data_path = "/tmp/ducklake/"; + options.metadata_schema = "duckdb"; + options.encryption = duckdb::DuckLakeEncryption::UNENCRYPTED; + options.access_mode = access_mode; + return duckdb::make_uniq(db, std::move(options)); +} + +static duckdb::unique_ptr +DuckLakeCreateTransactionManager(duckdb::StorageExtensionInfo * /* storage_info */, duckdb::AttachedDatabase &db, + duckdb::Catalog &catalog) { + auto &ducklake_catalog = catalog.Cast(); + return duckdb::make_uniq(db, ducklake_catalog); +} + +PgDuckLakeStorageExtension::PgDuckLakeStorageExtension() { + attach = DuckLakeAttach; + create_transaction_manager = DuckLakeCreateTransactionManager; +} + +} // namespace pgduckdb diff --git a/src/pgducklake/pgducklake_table.cpp b/src/pgducklake/pgducklake_table.cpp new file mode 100644 index 00000000..3cfbf585 --- /dev/null +++ b/src/pgducklake/pgducklake_table.cpp @@ -0,0 +1,72 @@ +#include "pgducklake/pgducklake_table.hpp" + +#include "pgduckdb/pgduckdb_duckdb.hpp" +#include "pgduckdb/pgduckdb_types.hpp" +#include "pgducklake/pgducklake_transaction.hpp" +#include "storage/ducklake_catalog.hpp" + +#include "duckdb/catalog/entry_lookup_info.hpp" +#include "duckdb/parser/parsed_data/create_schema_info.hpp" +#include "duckdb/planner/parsed_data/bound_create_table_info.hpp" + +#include + +extern "C" { +#include "postgres.h" + +#include "utils/lsyscache.h" +#include "utils/rel.h" +} + +namespace pgduckdb { + +void +PgDuckLakeTable::CreateTable(Relation rel) { + auto conn = DuckDBManager::Get().GetConnection(true); + auto context = conn->context; + + auto &db_manager = duckdb::DatabaseInstance::GetDatabase(*context).GetDatabaseManager(); + auto ducklake_db = db_manager.GetDatabase(*context, "pgducklake"); + if (!ducklake_db) { + throw std::runtime_error("PgDuckLakeTable::CreateTable: pgducklake database not found"); + } + auto &ducklake_txn = duckdb::Transaction::Get(*context, *ducklake_db).Cast(); + auto &ducklake_catalog = ducklake_txn.GetCatalog(); + auto catalog_txn = ducklake_catalog.GetCatalogTransaction(*context); + + // Get schema entry + char *nspname = get_namespace_name(rel->rd_rel->relnamespace); + duckdb::EntryLookupInfo schema_lookup(duckdb::CatalogType::SCHEMA_ENTRY, nspname); + auto schema_entry = ducklake_catalog.LookupSchema(catalog_txn, schema_lookup, duckdb::OnEntryNotFound::RETURN_NULL); + // Try create schema if not found + if (!schema_entry) { + duckdb::CreateSchemaInfo create_schema_info; + create_schema_info.schema = nspname; + create_schema_info.on_conflict = duckdb::OnCreateConflict::IGNORE_ON_CONFLICT; + auto entry = ducklake_catalog.CreateSchema(catalog_txn, create_schema_info); + schema_entry = entry ? &entry->Cast() : nullptr; + } + + if (!schema_entry) { + throw std::runtime_error("PgDuckLakeTable::CreateTable: schema '" + std::string(nspname) + + "' creation failed, try again later"); + } + + // Create table + duckdb::CreateTableInfo info(*schema_entry, NameStr(rel->rd_rel->relname)); + TupleDesc desc = RelationGetDescr(rel); + for (int i = 0; i < desc->natts; i++) { + auto attr = &desc->attrs[i]; + auto duck_type = pgduckdb::ConvertPostgresToDuckColumnType(attr); + duckdb::ColumnDefinition column(NameStr(attr->attname), duck_type); + info.columns.AddColumn(std::move(column)); + if (attr->atthasdef) { + throw std::runtime_error("PgDuckLakeTable::CreateTable: default value not supported"); + } + } + + duckdb::BoundCreateTableInfo table_info(*schema_entry, std::make_unique(std::move(info))); + schema_entry->CreateTable(catalog_txn, table_info); +} + +} // namespace pgduckdb diff --git a/src/pgducklake/pgducklake_transaction.cpp b/src/pgducklake/pgducklake_transaction.cpp new file mode 100644 index 00000000..dc72945d --- /dev/null +++ b/src/pgducklake/pgducklake_transaction.cpp @@ -0,0 +1,19 @@ +#include "pgducklake/pgducklake_transaction.hpp" + +#include "pgducklake/pgducklake_metadata_manager.hpp" + +namespace pgduckdb { + +PgDuckLakeTransaction::PgDuckLakeTransaction(duckdb::DuckLakeCatalog &ducklake_catalog, + duckdb::TransactionManager &manager, duckdb::ClientContext &context) + : duckdb::DuckLakeTransaction(ducklake_catalog, manager, context) { + SetMetadataManager(duckdb::make_uniq(*this)); +} + +void +PgDuckLakeTransaction::Start() { + // Manually call GetConnection() to ensure that the connection is created + (void)GetConnection(); +} + +} // namespace pgduckdb diff --git a/src/pgducklake/pgducklake_transaction_manager.cpp b/src/pgducklake/pgducklake_transaction_manager.cpp new file mode 100644 index 00000000..6743a927 --- /dev/null +++ b/src/pgducklake/pgducklake_transaction_manager.cpp @@ -0,0 +1,43 @@ +#include "pgducklake/pgducklake_transaction_manager.hpp" + +#include "pgducklake/pgducklake_transaction.hpp" + +namespace pgduckdb { + +PgDuckLakeTransactionManager::PgDuckLakeTransactionManager(duckdb::AttachedDatabase &db_p, + duckdb::DuckLakeCatalog &ducklake_catalog) + : duckdb::DuckLakeTransactionManager(db_p, ducklake_catalog), ducklake_catalog(ducklake_catalog) { +} + +duckdb::Transaction & +PgDuckLakeTransactionManager::StartTransaction(duckdb::ClientContext &context) { + auto transaction = duckdb::make_shared_ptr(ducklake_catalog, *this, context); + transaction->Start(); + auto &result = *transaction; + std::lock_guard l(transaction_lock); + transactions[result] = std::move(transaction); + return result; +} + +duckdb::ErrorData +PgDuckLakeTransactionManager::CommitTransaction(duckdb::ClientContext &context, duckdb::Transaction &transaction) { + auto &ducklake_transaction = transaction.Cast(); + try { + ducklake_transaction.Commit(); + } catch (std::exception &ex) { + return duckdb::ErrorData(ex); + } + std::lock_guard l(transaction_lock); + transactions.erase(transaction); + return duckdb::ErrorData(); +} + +void +PgDuckLakeTransactionManager::RollbackTransaction(duckdb::Transaction &transaction) { + auto &ducklake_transaction = transaction.Cast(); + ducklake_transaction.Rollback(); + std::lock_guard l(transaction_lock); + transactions.erase(transaction); +} + +} // namespace pgduckdb diff --git a/test/regression/expected/ducklake/basic.out b/test/regression/expected/ducklake/basic.out new file mode 100644 index 00000000..19b7aada --- /dev/null +++ b/test/regression/expected/ducklake/basic.out @@ -0,0 +1,38 @@ +CREATE TABLE t(a INT) USING ducklake; +INSERT INTO t SELECT g % 10 FROM generate_series(1, 1000) g; +SELECT COUNT(*) FROM t; + count +------- + 1000 +(1 row) + +SELECT a, COUNT(*) FROM t WHERE a > 5 GROUP BY a ORDER BY a; + a | count +---+------- + 6 | 100 + 7 | 100 + 8 | 100 + 9 | 100 +(4 rows) + +CREATE TABLE h(a INT); +INSERT INTO h SELECT g % 10 FROM generate_series(1, 1000) g; +-- insert into ducklake from an heap table +INSERT INTO t SELECT * FROM h; +SELECT COUNT(*) FROM t; + count +------- + 2000 +(1 row) + +DROP TABLE t; +DROP TABLE h; +-- empty table +CREATE TABLE empty(a INT) USING ducklake; +SELECT COUNT(*) FROM empty; + count +------- + 0 +(1 row) + +DROP TABLE empty; diff --git a/test/regression/schedule b/test/regression/schedule index df739ea4..d05931cc 100644 --- a/test/regression/schedule +++ b/test/regression/schedule @@ -53,3 +53,4 @@ test: type_support test: union_functions test: unresolved_type test: views +test: ducklake/basic diff --git a/test/regression/sql/ducklake/basic.sql b/test/regression/sql/ducklake/basic.sql new file mode 100644 index 00000000..70085ee9 --- /dev/null +++ b/test/regression/sql/ducklake/basic.sql @@ -0,0 +1,21 @@ +CREATE TABLE t(a INT) USING ducklake; +INSERT INTO t SELECT g % 10 FROM generate_series(1, 1000) g; + +SELECT COUNT(*) FROM t; +SELECT a, COUNT(*) FROM t WHERE a > 5 GROUP BY a ORDER BY a; + + +CREATE TABLE h(a INT); +INSERT INTO h SELECT g % 10 FROM generate_series(1, 1000) g; + +-- insert into ducklake from an heap table +INSERT INTO t SELECT * FROM h; +SELECT COUNT(*) FROM t; + +DROP TABLE t; +DROP TABLE h; + +-- empty table +CREATE TABLE empty(a INT) USING ducklake; +SELECT COUNT(*) FROM empty; +DROP TABLE empty; diff --git a/third_party/duckdb b/third_party/duckdb index 71c5c07c..a427ccce 160000 --- a/third_party/duckdb +++ b/third_party/duckdb @@ -1 +1 @@ -Subproject commit 71c5c07cdd295e9409c0505885033ae9eb6b5ddd +Subproject commit a427cccee92c4024e458b8505467253f3434e729 diff --git a/third_party/ducklake b/third_party/ducklake new file mode 160000 index 00000000..479cab79 --- /dev/null +++ b/third_party/ducklake @@ -0,0 +1 @@ +Subproject commit 479cab796542816ba40af9147f7a1667e50bdd97