Skip to content

Commit 0c1f156

Browse files
author
shuxu.li
committed
feat: transactional UpdateProperties support
1 parent 19fbc32 commit 0c1f156

14 files changed

+580
-88
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ set(ICEBERG_INCLUDES "$<BUILD_INTERFACE:${PROJECT_BINARY_DIR}/src>"
1919
"$<BUILD_INTERFACE:${PROJECT_SOURCE_DIR}/src>")
2020
set(ICEBERG_SOURCES
2121
arrow_c_data_guard_internal.cc
22+
base_transaction.cc
2223
catalog/memory/in_memory_catalog.cc
2324
expression/binder.cc
2425
expression/evaluator.cc
@@ -43,6 +44,7 @@ set(ICEBERG_SOURCES
4344
partition_field.cc
4445
partition_spec.cc
4546
partition_summary.cc
47+
pending_update.cc
4648
row/arrow_array_wrapper.cc
4749
row/manifest_wrapper.cc
4850
row/struct_like.cc

src/iceberg/base_transaction.cc

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/base_transaction.h"
21+
22+
#include <utility>
23+
24+
#include "iceberg/catalog.h"
25+
#include "iceberg/pending_update.h"
26+
#include "iceberg/table.h"
27+
#include "iceberg/table_metadata.h"
28+
#include "iceberg/table_requirements.h"
29+
#include "iceberg/table_update.h"
30+
31+
namespace iceberg {
32+
33+
BaseTransaction::BaseTransaction(std::shared_ptr<Table> table,
34+
std::shared_ptr<Catalog> catalog)
35+
: table_(std::move(table)), catalog_(std::move(catalog)) {
36+
ICEBERG_DCHECK(table_ != nullptr, "table must not be null");
37+
ICEBERG_DCHECK(catalog_ != nullptr, "catalog must not be null");
38+
}
39+
40+
const std::shared_ptr<Table>& BaseTransaction::table() const { return table_; }
41+
42+
std::shared_ptr<PropertiesUpdate> BaseTransaction::UpdateProperties() {
43+
return RegisterUpdate<PropertiesUpdate>();
44+
}
45+
46+
std::shared_ptr<AppendFiles> BaseTransaction::NewAppend() {
47+
throw NotImplemented("BaseTransaction::NewAppend not implemented");
48+
}
49+
50+
Status BaseTransaction::CommitTransaction() {
51+
const auto& metadata = table_->metadata();
52+
if (!metadata) {
53+
return InvalidArgument("Table metadata is null");
54+
}
55+
56+
auto builder = TableMetadataBuilder::BuildFrom(metadata.get());
57+
for (const auto& pending_update : pending_updates_) {
58+
if (!pending_update) {
59+
continue;
60+
}
61+
ICEBERG_RETURN_UNEXPECTED(pending_update->Apply(*builder));
62+
}
63+
64+
auto table_updates = builder->GetChanges();
65+
TableUpdateContext context(metadata.get(), /*is_replace=*/false);
66+
for (const auto& update : table_updates) {
67+
ICEBERG_RETURN_UNEXPECTED(update->GenerateRequirements(context));
68+
}
69+
ICEBERG_ASSIGN_OR_RAISE(auto table_requirements, context.Build());
70+
71+
ICEBERG_ASSIGN_OR_RAISE(
72+
auto updated_table,
73+
catalog_->UpdateTable(table_->name(), table_requirements, table_updates));
74+
75+
if (updated_table) {
76+
table_ = std::shared_ptr<Table>(std::move(updated_table));
77+
}
78+
79+
pending_updates_.clear();
80+
return {};
81+
}
82+
83+
} // namespace iceberg

src/iceberg/base_transaction.h

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
#include <vector>
23+
24+
#include "iceberg/transaction.h"
25+
#include "iceberg/type_fwd.h"
26+
27+
namespace iceberg {
28+
29+
/// \brief Base class for transaction implementations
30+
class BaseTransaction : public Transaction {
31+
public:
32+
BaseTransaction(std::shared_ptr<Table> table, std::shared_ptr<Catalog> catalog);
33+
~BaseTransaction() override = default;
34+
35+
const std::shared_ptr<Table>& table() const override;
36+
37+
std::shared_ptr<PropertiesUpdate> UpdateProperties() override;
38+
39+
std::shared_ptr<AppendFiles> NewAppend() override;
40+
41+
Status CommitTransaction() override;
42+
43+
protected:
44+
template <typename UpdateType, typename... Args>
45+
std::shared_ptr<UpdateType> RegisterUpdate(Args&&... args) {
46+
auto update = std::make_shared<UpdateType>(std::forward<Args>(args)...);
47+
pending_updates_.push_back(update);
48+
return update;
49+
}
50+
51+
std::shared_ptr<Table> table_;
52+
std::shared_ptr<Catalog> catalog_;
53+
std::vector<std::shared_ptr<PendingUpdate>> pending_updates_;
54+
};
55+
56+
} // namespace iceberg

src/iceberg/pending_update.cc

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/pending_update.h"
21+
22+
#include "iceberg/catalog.h"
23+
#include "iceberg/table.h"
24+
#include "iceberg/table_metadata.h"
25+
#include "iceberg/table_update.h"
26+
#include "iceberg/util/macros.h"
27+
28+
namespace iceberg {
29+
30+
// ============================================================================
31+
// UpdateProperties implementation
32+
// ============================================================================
33+
34+
PropertiesUpdate& PropertiesUpdate::Set(std::string const& key,
35+
std::string const& value) {
36+
updates_[key] = value;
37+
return *this;
38+
}
39+
40+
PropertiesUpdate& PropertiesUpdate::Remove(std::string const& key) {
41+
removals_.push_back(key);
42+
return *this;
43+
}
44+
45+
Result<PropertiesUpdateChanges> PropertiesUpdate::Apply() {
46+
return PropertiesUpdateChanges{updates_, removals_};
47+
}
48+
49+
Status PropertiesUpdate::ApplyResult(TableMetadataBuilder& builder,
50+
PropertiesUpdateChanges result) {
51+
if (!result.updates.empty()) {
52+
builder.SetProperties(result.updates);
53+
}
54+
if (!result.removals.empty()) {
55+
builder.RemoveProperties(result.removals);
56+
}
57+
return {};
58+
}
59+
60+
Status PropertiesUpdate::Commit() {
61+
return NotImplemented("UpdateProperties::Commit() not implemented");
62+
}
63+
64+
} // namespace iceberg

src/iceberg/pending_update.h

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,14 @@
2222
/// \file iceberg/pending_update.h
2323
/// API for table changes using builder pattern
2424

25+
#include <string>
26+
#include <unordered_map>
27+
#include <vector>
28+
2529
#include "iceberg/iceberg_export.h"
2630
#include "iceberg/result.h"
2731
#include "iceberg/type_fwd.h"
32+
#include "iceberg/util/macros.h"
2833

2934
namespace iceberg {
3035

@@ -60,6 +65,17 @@ class ICEBERG_EXPORT PendingUpdate {
6065

6166
protected:
6267
PendingUpdate() = default;
68+
69+
/// \brief Apply the pending changes to a TableMetadataBuilder
70+
///
71+
/// This method applies the changes by calling builder's specific methods.
72+
/// The builder will automatically record corresponding TableUpdate objects.
73+
///
74+
/// \param builder The TableMetadataBuilder to apply changes to
75+
/// \return Status::OK if the changes were applied successfully, or an error
76+
virtual Status Apply(TableMetadataBuilder& builder) = 0;
77+
78+
friend class BaseTransaction;
6379
};
6480

6581
/// \brief Template class for type-safe table metadata changes using builder pattern
@@ -89,6 +105,80 @@ class ICEBERG_EXPORT PendingUpdateTyped : public PendingUpdate {
89105

90106
protected:
91107
PendingUpdateTyped() = default;
108+
109+
/// \brief Apply the pending changes to a TableMetadataBuilder
110+
///
111+
/// Default implementation: calls Apply() to get the result, then applies it
112+
/// to the builder using ApplyResult().
113+
///
114+
/// \param builder The TableMetadataBuilder to apply changes to
115+
/// \return Status::OK if the changes were applied successfully, or an error
116+
Status Apply(TableMetadataBuilder& builder) override {
117+
auto result = Apply();
118+
ICEBERG_RETURN_UNEXPECTED(result);
119+
120+
return ApplyResult(builder, std::move(result.value()));
121+
}
122+
123+
/// \brief Apply the result to a TableMetadataBuilder
124+
///
125+
/// Subclasses must implement this method to apply the result of Apply()
126+
/// to the builder.
127+
///
128+
/// \param builder The TableMetadataBuilder to apply the result to
129+
/// \param result The result from Apply()
130+
/// \return Status::OK if the result was applied successfully, or an error
131+
virtual Status ApplyResult(TableMetadataBuilder& builder, T result) = 0;
132+
};
133+
134+
/// \brief Builder for updating (set/remove) table properties
135+
///
136+
/// This class provides a fluent API for setting or removing table properties within a
137+
/// transaction. Mutations are accumulated and applied atomically when the transaction
138+
/// is committed.
139+
struct ICEBERG_EXPORT PropertiesUpdateChanges {
140+
std::unordered_map<std::string, std::string> updates;
141+
std::vector<std::string> removals;
142+
};
143+
144+
class ICEBERG_EXPORT PropertiesUpdate
145+
: public PendingUpdateTyped<PropertiesUpdateChanges> {
146+
public:
147+
PropertiesUpdate() = default;
148+
~PropertiesUpdate() override = default;
149+
150+
PropertiesUpdate(const PropertiesUpdate&) = delete;
151+
PropertiesUpdate& operator=(const PropertiesUpdate&) = delete;
152+
153+
/// \brief Set a property key-value pair
154+
///
155+
/// \param key The property key
156+
/// \param value The property value
157+
/// \return Reference to this builder for method chaining
158+
PropertiesUpdate& Set(std::string const& key, std::string const& value);
159+
160+
/// \brief Remove a property key
161+
///
162+
/// \param key The property key to remove
163+
/// \return Reference to this builder for method chaining
164+
PropertiesUpdate& Remove(std::string const& key);
165+
166+
/// \brief Apply the pending changes and return the uncommitted result
167+
///
168+
/// \return The pending property updates/removals, or an error
169+
Result<PropertiesUpdateChanges> Apply() override;
170+
171+
/// \brief Apply and commit the pending changes to the table
172+
///
173+
/// \return Status::OK if the commit was successful, or an error
174+
Status Commit() override;
175+
176+
private:
177+
Status ApplyResult(TableMetadataBuilder& builder,
178+
PropertiesUpdateChanges result) override;
179+
180+
std::unordered_map<std::string, std::string> updates_;
181+
std::vector<std::string> removals_;
92182
};
93183

94184
} // namespace iceberg

src/iceberg/table.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,9 @@ class ICEBERG_EXPORT Table {
118118
/// \brief Returns a FileIO to read and write table data and metadata files
119119
const std::shared_ptr<FileIO>& io() const;
120120

121+
/// \brief Return the underlying table metadata
122+
const std::shared_ptr<TableMetadata>& metadata() const { return metadata_; }
123+
121124
private:
122125
const TableIdentifier identifier_;
123126
std::shared_ptr<TableMetadata> metadata_;

0 commit comments

Comments
 (0)