Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ set(ICEBERG_SOURCES
table_requirements.cc
table_scan.cc
table_update.cc
transaction.cc
transaction_catalog.cc
transform.cc
transform_function.cc
type.cc
Expand Down
9 changes: 7 additions & 2 deletions src/iceberg/catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,8 @@ class ICEBERG_EXPORT Catalog {
/// \return a Table instance or ErrorKind::kAlreadyExists if the table already exists
virtual Result<std::unique_ptr<Table>> UpdateTable(
const TableIdentifier& identifier,
const std::vector<std::unique_ptr<TableRequirement>>& requirements,
const std::vector<std::unique_ptr<TableUpdate>>& updates) = 0;
const std::vector<std::shared_ptr<const TableRequirement>>& requirements,
const std::vector<std::shared_ptr<const TableUpdate>>& updates) = 0;

/// \brief Start a transaction to create a table
///
Expand Down Expand Up @@ -184,6 +184,11 @@ class ICEBERG_EXPORT Catalog {
/// \return a Table instance or ErrorKind::kAlreadyExists if the table already exists
virtual Result<std::shared_ptr<Table>> RegisterTable(
const TableIdentifier& identifier, const std::string& metadata_file_location) = 0;

/// \brief Set whether the last operation in a transaction has been committed
///
/// \param committed true if the last operation has been committed, false otherwise
virtual void SetLastOperationCommitted(bool committed) = 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is only used by transaction catalog so it should not appear here.

};

} // namespace iceberg
5 changes: 2 additions & 3 deletions src/iceberg/catalog/memory/in_memory_catalog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -392,9 +392,8 @@ Result<std::unique_ptr<Table>> InMemoryCatalog::CreateTable(

Result<std::unique_ptr<Table>> InMemoryCatalog::UpdateTable(
const TableIdentifier& identifier,
const std::vector<std::unique_ptr<TableRequirement>>& requirements,
const std::vector<std::unique_ptr<TableUpdate>>& updates) {
std::unique_lock lock(mutex_);
const std::vector<std::shared_ptr<const TableRequirement>>& requirements,
const std::vector<std::shared_ptr<const TableUpdate>>& updates) {
return NotImplemented("update table");
}

Expand Down
6 changes: 4 additions & 2 deletions src/iceberg/catalog/memory/in_memory_catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ class ICEBERG_EXPORT InMemoryCatalog

Result<std::unique_ptr<Table>> UpdateTable(
const TableIdentifier& identifier,
const std::vector<std::unique_ptr<TableRequirement>>& requirements,
const std::vector<std::unique_ptr<TableUpdate>>& updates) override;
const std::vector<std::shared_ptr<const TableRequirement>>& requirements,
const std::vector<std::shared_ptr<const TableUpdate>>& updates) override;

Result<std::shared_ptr<Transaction>> StageCreateTable(
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
Expand All @@ -97,6 +97,8 @@ class ICEBERG_EXPORT InMemoryCatalog
const TableIdentifier& identifier,
const std::string& metadata_file_location) override;

void SetLastOperationCommitted(bool committed) override {}

private:
std::string catalog_name_;
std::unordered_map<std::string, std::string> properties_;
Expand Down
6 changes: 3 additions & 3 deletions src/iceberg/catalog/rest/rest_catalog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
#include "iceberg/catalog/rest/http_client.h"
#include "iceberg/catalog/rest/json_internal.h"
#include "iceberg/catalog/rest/resource_paths.h"
#include "iceberg/catalog/rest/rest_catalog.h"
#include "iceberg/catalog/rest/rest_util.h"
#include "iceberg/json_internal.h"
#include "iceberg/partition_spec.h"
Expand Down Expand Up @@ -197,8 +196,9 @@ Result<std::unique_ptr<Table>> RestCatalog::CreateTable(

Result<std::unique_ptr<Table>> RestCatalog::UpdateTable(
[[maybe_unused]] const TableIdentifier& identifier,
[[maybe_unused]] const std::vector<std::unique_ptr<TableRequirement>>& requirements,
[[maybe_unused]] const std::vector<std::unique_ptr<TableUpdate>>& updates) {
[[maybe_unused]] const std::vector<std::shared_ptr<const TableRequirement>>&
requirements,
[[maybe_unused]] const std::vector<std::shared_ptr<const TableUpdate>>& updates) {
return NotImplemented("Not implemented");
}

Expand Down
6 changes: 4 additions & 2 deletions src/iceberg/catalog/rest/rest_catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog {

Result<std::unique_ptr<Table>> UpdateTable(
const TableIdentifier& identifier,
const std::vector<std::unique_ptr<TableRequirement>>& requirements,
const std::vector<std::unique_ptr<TableUpdate>>& updates) override;
const std::vector<std::shared_ptr<const TableRequirement>>& requirements,
const std::vector<std::shared_ptr<const TableUpdate>>& updates) override;

Result<std::shared_ptr<Transaction>> StageCreateTable(
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
Expand All @@ -96,6 +96,8 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog {
const TableIdentifier& identifier,
const std::string& metadata_file_location) override;

void SetLastOperationCommitted(bool committed) override {}

private:
RestCatalog(std::unique_ptr<RestCatalogProperties> config,
std::unique_ptr<ResourcePaths> paths);
Expand Down
2 changes: 2 additions & 0 deletions src/iceberg/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ iceberg_sources = files(
'table_requirements.cc',
'table_scan.cc',
'table_update.cc',
'transaction.cc',
'transaction_catalog.cc',
'transform.cc',
'transform_function.cc',
'type.cc',
Expand Down
2 changes: 2 additions & 0 deletions src/iceberg/result.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ enum class ErrorKind {
kInvalidManifest,
kInvalidManifestList,
kInvalidSchema,
kInvalidState,
kIOError,
kJsonParseError,
kNamespaceNotEmpty,
Expand Down Expand Up @@ -104,6 +105,7 @@ DEFINE_ERROR_FUNCTION(InvalidExpression)
DEFINE_ERROR_FUNCTION(InvalidManifest)
DEFINE_ERROR_FUNCTION(InvalidManifestList)
DEFINE_ERROR_FUNCTION(InvalidSchema)
DEFINE_ERROR_FUNCTION(InvalidState)
DEFINE_ERROR_FUNCTION(IOError)
DEFINE_ERROR_FUNCTION(JsonParseError)
DEFINE_ERROR_FUNCTION(NamespaceNotEmpty)
Expand Down
5 changes: 3 additions & 2 deletions src/iceberg/table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "iceberg/table_metadata.h"
#include "iceberg/table_properties.h"
#include "iceberg/table_scan.h"
#include "iceberg/transaction.h"
#include "iceberg/update/update_properties.h"
#include "iceberg/util/macros.h"

Expand Down Expand Up @@ -113,8 +114,8 @@ std::unique_ptr<UpdateProperties> Table::UpdateProperties() const {
return std::make_unique<iceberg::UpdateProperties>(identifier_, catalog_, metadata_);
}

std::unique_ptr<Transaction> Table::NewTransaction() const {
throw NotImplemented("Table::NewTransaction is not implemented");
Result<std::unique_ptr<Transaction>> Table::NewTransaction() const {
return Transaction::Make(shared_from_this(), catalog_);
}

const std::shared_ptr<FileIO>& Table::io() const { return io_; }
Expand Down
13 changes: 10 additions & 3 deletions src/iceberg/table.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,15 @@
#include <vector>

#include "iceberg/iceberg_export.h"
#include "iceberg/result.h"
#include "iceberg/snapshot.h"
#include "iceberg/table_identifier.h"
#include "iceberg/type_fwd.h"

namespace iceberg {

/// \brief Represents an Iceberg table
class ICEBERG_EXPORT Table {
class ICEBERG_EXPORT Table : public std::enable_shared_from_this<Table> {
public:
~Table();

Expand Down Expand Up @@ -87,6 +88,9 @@ class ICEBERG_EXPORT Table {
/// \brief Return the table's base location
const std::string& location() const;

/// \brief Return the table's metadata file location
const std::string& metadata_location() const { return metadata_location_; }

/// \brief Return the table's current snapshot, return NotFoundError if not found
Result<std::shared_ptr<Snapshot>> current_snapshot() const;

Expand Down Expand Up @@ -118,12 +122,15 @@ class ICEBERG_EXPORT Table {

/// \brief Create a new transaction for this table
///
/// \return a pointer to the new Transaction
virtual std::unique_ptr<Transaction> NewTransaction() const;
/// \return a new Transaction or an error if the transaction cannot be created
virtual Result<std::unique_ptr<Transaction>> NewTransaction() const;

/// \brief Returns a FileIO to read and write table data and metadata files
const std::shared_ptr<FileIO>& io() const;

/// \brief Return the underlying table metadata
const std::shared_ptr<TableMetadata>& metadata() const { return metadata_; }

private:
const TableIdentifier identifier_;
std::shared_ptr<TableMetadata> metadata_;
Expand Down
1 change: 1 addition & 0 deletions src/iceberg/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ add_iceberg_test(schema_test

add_iceberg_test(table_test
SOURCES
base_transaction_test.cc
json_internal_test.cc
metrics_config_test.cc
schema_json_test.cc
Expand Down
174 changes: 174 additions & 0 deletions src/iceberg/test/base_transaction_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include <unordered_map>

#include <gtest/gtest.h>

#include "iceberg/partition_spec.h"
#include "iceberg/result.h"
#include "iceberg/table.h"
#include "iceberg/table_metadata.h"
#include "iceberg/table_update.h"
#include "iceberg/test/matchers.h"
#include "iceberg/test/mock_catalog.h"
#include "iceberg/transaction.h"
#include "iceberg/update/update_properties.h"

namespace iceberg {

class BaseTransactionTest : public ::testing::Test {
protected:
void SetUp() override {
// Create catalog and table identifier
catalog_ = std::make_shared<::testing::NiceMock<MockCatalog>>();

identifier_ = TableIdentifier(Namespace({"test"}), "test_table");
auto metadata = std::make_shared<TableMetadata>();
table_ =
std::make_shared<Table>(identifier_, std::move(metadata),
"s3://bucket/table/metadata.json", nullptr, catalog_);
}

std::unique_ptr<Transaction> NewTransaction() {
auto transaction_result = BaseTransaction::Make(table_, catalog_);
if (!transaction_result.has_value()) {
ADD_FAILURE() << "Failed to create transaction: "
<< transaction_result.error().message;
}
return std::move(transaction_result).value();
}

TableIdentifier identifier_;
std::shared_ptr<MockCatalog> catalog_;
std::shared_ptr<Table> table_;
};

TEST_F(BaseTransactionTest, CommitSetPropertiesUsesCatalog) {
auto transaction = NewTransaction();
auto update_properties = transaction->NewUpdateProperties();
EXPECT_TRUE(update_properties.has_value());
update_properties.value()->Set("new-key", "new-value");
EXPECT_THAT(update_properties.value()->Commit(), IsOk());

EXPECT_CALL(*catalog_,
UpdateTable(::testing::Eq(identifier_), ::testing::_, ::testing::_))
.WillOnce(
[](const TableIdentifier& id,
const std::vector<std::shared_ptr<const TableRequirement>>& /*requirements*/,
const std::vector<std::shared_ptr<const TableUpdate>>& updates)
-> Result<std::unique_ptr<Table>> {
EXPECT_EQ("test_table", id.name);
EXPECT_EQ(1u, updates.size());
const auto* set_update =
dynamic_cast<const table::SetProperties*>(updates.front().get());
EXPECT_NE(set_update, nullptr);
const auto& updated = set_update->updated();
auto it = updated.find("new-key");
EXPECT_NE(it, updated.end());
EXPECT_EQ("new-value", it->second);
return {std::unique_ptr<Table>()};
});

EXPECT_THAT(transaction->CommitTransaction(), IsOk());
}

TEST_F(BaseTransactionTest, RemovePropertiesSkipsMissingKeys) {
auto transaction = NewTransaction();
auto update_properties = transaction->NewUpdateProperties();
EXPECT_TRUE(update_properties.has_value());
update_properties.value()->Remove("missing").Remove("existing");
EXPECT_THAT(update_properties.value()->Commit(), IsOk());

EXPECT_CALL(*catalog_,
UpdateTable(::testing::Eq(identifier_), ::testing::_, ::testing::_))
.WillOnce(
[](const TableIdentifier&,
const std::vector<std::shared_ptr<const TableRequirement>>& /*requirements*/,
const std::vector<std::shared_ptr<const TableUpdate>>& updates)
-> Result<std::unique_ptr<Table>> {
EXPECT_EQ(1u, updates.size());
const auto* remove_update =
dynamic_cast<const table::RemoveProperties*>(updates.front().get());
EXPECT_NE(remove_update, nullptr);
EXPECT_THAT(remove_update->removed(),
::testing::UnorderedElementsAre("missing", "existing"));
return {std::unique_ptr<Table>()};
});

EXPECT_THAT(transaction->CommitTransaction(), IsOk());
}

TEST_F(BaseTransactionTest, AggregatesMultiplePendingUpdates) {
auto transaction = NewTransaction();
auto update_properties = transaction->NewUpdateProperties();
EXPECT_TRUE(update_properties.has_value());
update_properties.value()->Set("new-key", "new-value");
EXPECT_THAT(update_properties.value()->Commit(), IsOk());
auto remove_properties = transaction->NewUpdateProperties();
EXPECT_TRUE(remove_properties.has_value());
remove_properties.value()->Remove("existing");
EXPECT_THAT(remove_properties.value()->Commit(), IsOk());

EXPECT_CALL(*catalog_,
UpdateTable(::testing::Eq(identifier_), ::testing::_, ::testing::_))
.WillOnce(
[](const TableIdentifier&,
const std::vector<std::shared_ptr<const TableRequirement>>& /*requirements*/,
const std::vector<std::shared_ptr<const TableUpdate>>& updates)
-> Result<std::unique_ptr<Table>> {
EXPECT_EQ(2u, updates.size());

const auto* set_update =
dynamic_cast<const table::SetProperties*>(updates[0].get());
EXPECT_NE(set_update, nullptr);
const auto& updated = set_update->updated();
auto it = updated.find("new-key");
EXPECT_NE(it, updated.end());
EXPECT_EQ("new-value", it->second);

const auto* remove_update =
dynamic_cast<const table::RemoveProperties*>(updates[1].get());
EXPECT_NE(remove_update, nullptr);
EXPECT_THAT(remove_update->removed(), ::testing::ElementsAre("existing"));

return {std::unique_ptr<Table>()};
});

EXPECT_THAT(transaction->CommitTransaction(), IsOk());
}

TEST_F(BaseTransactionTest, FailsIfUpdateNotCommitted) {
auto transaction = NewTransaction();
auto update_properties = transaction->NewUpdateProperties();
EXPECT_TRUE(update_properties.has_value());
update_properties.value()->Set("new-key", "new-value");
EXPECT_THAT(transaction->CommitTransaction(), IsError(ErrorKind::kInvalidState));
}

TEST_F(BaseTransactionTest, NewTransactionFailsWithoutCatalog) {
auto metadata = std::make_shared<TableMetadata>();
auto table_without_catalog =
std::make_shared<Table>(identifier_, std::move(metadata),
"s3://bucket/table/metadata.json", nullptr, nullptr);
EXPECT_THAT(table_without_catalog->NewTransaction(),
IsError(ErrorKind::kInvalidArgument));
}

} // namespace iceberg
6 changes: 4 additions & 2 deletions src/iceberg/test/mock_catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ class MockCatalog : public Catalog {

MOCK_METHOD((Result<std::unique_ptr<Table>>), UpdateTable,
(const TableIdentifier&,
(const std::vector<std::unique_ptr<TableRequirement>>&),
(const std::vector<std::unique_ptr<TableUpdate>>&)),
const std::vector<std::shared_ptr<const TableRequirement>>&,
const std::vector<std::shared_ptr<const TableUpdate>>&),
(override));

MOCK_METHOD((Result<std::shared_ptr<Transaction>>), StageCreateTable,
Expand All @@ -83,6 +83,8 @@ class MockCatalog : public Catalog {

MOCK_METHOD((Result<std::shared_ptr<Table>>), RegisterTable,
(const TableIdentifier&, const std::string&), (override));

MOCK_METHOD(void, SetLastOperationCommitted, (bool), (override));
};

} // namespace iceberg
Loading
Loading