diff --git a/src/iceberg/catalog/memory/in_memory_catalog.cc b/src/iceberg/catalog/memory/in_memory_catalog.cc index d644c44fe..9e4a485a0 100644 --- a/src/iceberg/catalog/memory/in_memory_catalog.cc +++ b/src/iceberg/catalog/memory/in_memory_catalog.cc @@ -21,7 +21,6 @@ #include #include -#include #include "iceberg/table.h" #include "iceberg/table_metadata.h" @@ -337,42 +336,42 @@ std::string_view InMemoryCatalog::name() const { return catalog_name_; } Status InMemoryCatalog::CreateNamespace( const Namespace& ns, const std::unordered_map& properties) { - std::lock_guard guard(mutex_); + std::unique_lock lock(mutex_); return root_namespace_->CreateNamespace(ns, properties); } Result> InMemoryCatalog::GetNamespaceProperties(const Namespace& ns) const { - std::lock_guard guard(mutex_); + std::shared_lock lock(mutex_); return root_namespace_->GetProperties(ns); } Result> InMemoryCatalog::ListNamespaces( const Namespace& ns) const { - std::lock_guard guard(mutex_); + std::shared_lock lock(mutex_); return root_namespace_->ListNamespaces(ns); } Status InMemoryCatalog::DropNamespace(const Namespace& ns) { - std::lock_guard guard(mutex_); + std::unique_lock lock(mutex_); return root_namespace_->DropNamespace(ns); } Result InMemoryCatalog::NamespaceExists(const Namespace& ns) const { - std::lock_guard guard(mutex_); + std::shared_lock lock(mutex_); return root_namespace_->NamespaceExists(ns); } Status InMemoryCatalog::UpdateNamespaceProperties( const Namespace& ns, const std::unordered_map& updates, const std::unordered_set& removals) { - std::lock_guard guard(mutex_); + std::unique_lock lock(mutex_); return root_namespace_->UpdateNamespaceProperties(ns, updates, removals); } Result> InMemoryCatalog::ListTables( const Namespace& ns) const { - std::lock_guard guard(mutex_); + std::shared_lock lock(mutex_); const auto& table_names = root_namespace_->ListTables(ns); ICEBERG_RETURN_UNEXPECTED(table_names); std::vector table_idents; @@ -387,6 +386,7 @@ Result> InMemoryCatalog::CreateTable( const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec, const std::string& location, const std::unordered_map& properties) { + std::unique_lock lock(mutex_); return NotImplemented("create table"); } @@ -394,6 +394,7 @@ Result> InMemoryCatalog::UpdateTable( const TableIdentifier& identifier, const std::vector>& requirements, const std::vector>& updates) { + std::unique_lock lock(mutex_); return NotImplemented("update table"); } @@ -401,22 +402,24 @@ Result> InMemoryCatalog::StageCreateTable( const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec, const std::string& location, const std::unordered_map& properties) { + std::unique_lock lock(mutex_); return NotImplemented("stage create table"); } Result InMemoryCatalog::TableExists(const TableIdentifier& identifier) const { - std::lock_guard guard(mutex_); + std::shared_lock lock(mutex_); return root_namespace_->TableExists(identifier); } Status InMemoryCatalog::DropTable(const TableIdentifier& identifier, bool purge) { - std::lock_guard guard(mutex_); + std::unique_lock lock(mutex_); // TODO(Guotao): Delete all metadata files if purge is true. return root_namespace_->UnregisterTable(identifier); } Status InMemoryCatalog::RenameTable(const TableIdentifier& from, const TableIdentifier& to) { + std::unique_lock lock(mutex_); return NotImplemented("rename table"); } @@ -426,31 +429,40 @@ Result> InMemoryCatalog::LoadTable( return InvalidArgument("file_io is not set for catalog {}", catalog_name_); } - Result metadata_location; + std::string metadata_location; { - std::lock_guard guard(mutex_); + std::shared_lock lock(mutex_); ICEBERG_ASSIGN_OR_RAISE(metadata_location, root_namespace_->GetTableMetadataLocation(identifier)); } ICEBERG_ASSIGN_OR_RAISE(auto metadata, - TableMetadataUtil::Read(*file_io_, metadata_location.value())); + TableMetadataUtil::Read(*file_io_, metadata_location)); - return std::make_unique(identifier, std::move(metadata), - metadata_location.value(), file_io_, + return std::make_unique
(identifier, std::move(metadata), metadata_location, + file_io_, std::static_pointer_cast(shared_from_this())); } Result> InMemoryCatalog::RegisterTable( const TableIdentifier& identifier, const std::string& metadata_file_location) { - std::lock_guard guard(mutex_); + if (!file_io_) [[unlikely]] { + return InvalidArgument("file_io is not set for catalog {}", catalog_name_); + } + + ICEBERG_ASSIGN_OR_RAISE(auto metadata, + TableMetadataUtil::Read(*file_io_, metadata_file_location)); + + std::unique_lock lock(mutex_); if (!root_namespace_->NamespaceExists(identifier.ns)) { return NoSuchNamespace("table namespace does not exist."); } if (!root_namespace_->RegisterTable(identifier, metadata_file_location)) { return UnknownError("The registry failed."); } - return LoadTable(identifier); + return std::make_unique
(identifier, std::move(metadata), metadata_file_location, + file_io_, + std::static_pointer_cast(shared_from_this())); } } // namespace iceberg diff --git a/src/iceberg/catalog/memory/in_memory_catalog.h b/src/iceberg/catalog/memory/in_memory_catalog.h index 5d1f2e13c..e6a9acbce 100644 --- a/src/iceberg/catalog/memory/in_memory_catalog.h +++ b/src/iceberg/catalog/memory/in_memory_catalog.h @@ -19,7 +19,7 @@ #pragma once -#include +#include #include "iceberg/catalog.h" @@ -103,7 +103,7 @@ class ICEBERG_EXPORT InMemoryCatalog std::shared_ptr file_io_; std::string warehouse_location_; std::unique_ptr root_namespace_; - mutable std::recursive_mutex mutex_; + mutable std::shared_mutex mutex_; }; } // namespace iceberg