Skip to content

Commit 2e042cc

Browse files
jbmscopybara-github
authored andcommitted
Refactor cache locking and eliminate queued_for_writeback_bytes_limit
The refactoring cache locking is intended to facilitate later concurrent improvements. Non-transactional writes are now always committed immediately. To defer committing, a transaction can be used instead. PiperOrigin-RevId: 599336232 Change-Id: I9b88690e8106bf898bbadd25996a628b044228b2
1 parent fa3f251 commit 2e042cc

50 files changed

Lines changed: 1199 additions & 1736 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ storage and manipulation of large multi-dimensional arrays that:
2121
* Offers an asynchronous API to enable high-throughput access even to
2222
high-latency remote storage.
2323

24-
* Supports read/writeback caching and transactions, with strong atomicity,
24+
* Supports read caching and transactions, with strong atomicity,
2525
isolation, consistency, and durability (ACID) guarantees.
2626

2727
* Supports safe, efficient access from multiple processes and machines via

docs/context_schema.yml

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -61,14 +61,6 @@ definitions:
6161
least-recently used data that is not in use is evicted from the cache
6262
when this limit is reached.
6363
default: 0
64-
queued_for_writeback_bytes_limit:
65-
type: integer
66-
minimum: 0
67-
description: |-
68-
Soft limit on the total number of bytes of data pending writeback.
69-
Writeback is initated on the least-recently used data that is pending
70-
writeback when this limit is reached. Defaults to half of
71-
`.total_bytes_limit`.
7264
data_copy_concurrency:
7365
$id: Context.data_copy_concurrency
7466
description: |-

tensorstore/driver/image/driver_impl.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -347,7 +347,7 @@ Future<internal::DriverHandle> ImageDriverSpec<Specialization>::Open(
347347
internal::EncodeCacheKey(&cache_identifier, store.driver,
348348
data_copy_concurrency, store.path);
349349
auto cache = internal::GetOrCreateAsyncInitializedCache<CacheType>(
350-
**cache_pool, cache_identifier,
350+
cache_pool->get(), cache_identifier,
351351
[&] {
352352
auto cache = std::make_unique<CacheType>();
353353
cache->data_copy_concurrency_ = data_copy_concurrency;

tensorstore/driver/json/driver.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -357,7 +357,7 @@ Future<internal::Driver::Handle> JsonDriverSpec::Open(
357357
internal::EncodeCacheKey(&cache_identifier, store.driver,
358358
data_copy_concurrency);
359359
auto cache = internal::GetOrCreateAsyncInitializedCache<JsonCache>(
360-
**cache_pool, cache_identifier,
360+
cache_pool->get(), cache_identifier,
361361
[&] {
362362
auto cache = std::make_unique<JsonCache>();
363363
cache->data_copy_concurrency_ = data_copy_concurrency;

tensorstore/driver/json/driver_test.cc

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,7 @@ TEST(JsonDriverTest, ReadError) {
249249
{
250250
auto write_future =
251251
tensorstore::Write(MakeScalarArray<::nlohmann::json>(42), store);
252+
write_future.Force();
252253
TENSORSTORE_EXPECT_OK(write_future.copy_future);
253254
mock_key_value_store->read_requests.pop().promise.SetResult(
254255
absl::UnknownError("read error2"));
@@ -274,6 +275,7 @@ TEST(JsonDriverTest, ConditionalWriteback) {
274275
{
275276
auto write_future =
276277
tensorstore::Write(MakeScalarArray<::nlohmann::json>(42), store);
278+
write_future.Force();
277279
TENSORSTORE_EXPECT_OK(write_future.copy_future);
278280
mock_key_value_store->read_requests.pop()(memory_store);
279281
mock_key_value_store->write_requests.pop()(memory_store);
@@ -284,6 +286,7 @@ TEST(JsonDriverTest, ConditionalWriteback) {
284286
{
285287
auto write_future =
286288
tensorstore::Write(MakeScalarArray<::nlohmann::json>(42), store);
289+
write_future.Force();
287290
TENSORSTORE_EXPECT_OK(write_future.copy_future);
288291
mock_key_value_store->read_requests.pop()(memory_store);
289292
// No write request, since value is unchanged.
@@ -304,6 +307,7 @@ TEST(JsonDriverTest, UnconditionalWriteback) {
304307
tensorstore::Open(spec, context).result());
305308
auto write_future =
306309
tensorstore::Write(MakeScalarArray<::nlohmann::json>(42), store);
310+
write_future.Force();
307311
{
308312
auto write_req = mock_key_value_store->write_requests.pop();
309313
EXPECT_EQ(tensorstore::StorageGeneration::Unknown(),
@@ -315,29 +319,21 @@ TEST(JsonDriverTest, UnconditionalWriteback) {
315319

316320
TEST(JsonDriverTest, ZeroElementWrite) {
317321
auto json_spec = GetSpec("");
318-
json_spec["cache_pool"] = {{"total_bytes_limit", 10000000}};
319-
TENSORSTORE_ASSERT_OK_AND_ASSIGN(auto store,
320-
tensorstore::Open(json_spec).result());
321-
// Confirm that a one-element write is not immediately committed due to cache.
322-
{
323-
auto write_future =
324-
tensorstore::Write(MakeScalarArray<::nlohmann::json>(42), store);
325-
TENSORSTORE_EXPECT_OK(write_future.copy_future);
326-
absl::SleepFor(absl::Milliseconds(10));
327-
EXPECT_FALSE(write_future.commit_future.ready());
328-
// When forced, future becomes ready.
329-
TENSORSTORE_EXPECT_OK(write_future.commit_future);
330-
}
331-
322+
json_spec["kvstore"] = {{"driver", "mock_key_value_store"},
323+
{"path", GetPath()}};
324+
auto context = tensorstore::Context::Default();
325+
TENSORSTORE_ASSERT_OK_AND_ASSIGN(
326+
auto mock_key_value_store_resource,
327+
context.GetResource<tensorstore::internal::MockKeyValueStoreResource>());
328+
auto mock_key_value_store = *mock_key_value_store_resource;
329+
TENSORSTORE_ASSERT_OK_AND_ASSIGN(
330+
auto store, tensorstore::Open(json_spec, context).result());
332331
// Test that a write to zero elements is detected as a non-modification, and
333332
// leads to an immediately-ready future.
334333
{
335334
auto write_future = tensorstore::Write(
336335
tensorstore::AllocateArray<::nlohmann::json>({0}),
337336
store | tensorstore::Dims(0).AddNew().SizedInterval(0, 0));
338-
TENSORSTORE_EXPECT_OK(write_future.copy_future);
339-
absl::SleepFor(absl::Milliseconds(10));
340-
EXPECT_TRUE(write_future.commit_future.ready());
341337
TENSORSTORE_EXPECT_OK(write_future.commit_future);
342338
}
343339
}

tensorstore/driver/kvs_backed_chunk_driver.cc

Lines changed: 17 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -825,25 +825,22 @@ Result<internal::Driver::Handle> CreateTensorStoreFromMetadata(
825825
}
826826
absl::Status data_key_value_store_status;
827827
const auto& state_ref = *state;
828-
auto data_cache =
829-
(*state->cache_pool())
830-
->GetCache<DataCacheBase>(
831-
typeid(state_ref), chunk_cache_identifier,
832-
[&]() -> std::unique_ptr<DataCacheBase> {
833-
auto store_result = state->GetDataKeyValueStore(
834-
GetOwningCache(*base.metadata_cache_entry_).base_store_,
835-
metadata.get());
836-
if (!store_result) {
837-
data_key_value_store_status =
838-
std::move(store_result).status();
839-
return nullptr;
840-
}
841-
DataCacheInitializer initializer;
842-
initializer.store = std::move(*store_result);
843-
initializer.metadata_cache_entry = base.metadata_cache_entry_;
844-
initializer.metadata = metadata;
845-
return state->GetDataCache(std::move(initializer));
846-
});
828+
auto data_cache = internal::GetCacheWithExplicitTypeInfo<DataCacheBase>(
829+
state->cache_pool()->get(), typeid(state_ref), chunk_cache_identifier,
830+
[&]() -> std::unique_ptr<DataCacheBase> {
831+
auto store_result = state->GetDataKeyValueStore(
832+
GetOwningCache(*base.metadata_cache_entry_).base_store_,
833+
metadata.get());
834+
if (!store_result) {
835+
data_key_value_store_status = std::move(store_result).status();
836+
return nullptr;
837+
}
838+
DataCacheInitializer initializer;
839+
initializer.store = std::move(*store_result);
840+
initializer.metadata_cache_entry = base.metadata_cache_entry_;
841+
initializer.metadata = metadata;
842+
return state->GetDataCache(std::move(initializer));
843+
});
847844
TENSORSTORE_RETURN_IF_ERROR(data_key_value_store_status);
848845
TENSORSTORE_ASSIGN_OR_RETURN(
849846
auto new_transform,
@@ -1225,7 +1222,7 @@ internal::CachePtr<MetadataCache> GetOrCreateMetadataCache(
12251222
internal::EncodeCacheKey(&base.metadata_cache_key_, spec.store.driver,
12261223
typeid(*state), state->GetMetadataCacheKey());
12271224
return internal::GetOrCreateAsyncInitializedCache<MetadataCache>(
1228-
**state->cache_pool(), base.metadata_cache_key_,
1225+
state->cache_pool()->get(), base.metadata_cache_key_,
12291226
[&] {
12301227
ABSL_LOG_IF(INFO, TENSORSTORE_KVS_DRIVER_DEBUG)
12311228
<< "Creating metadata cache: open_state=" << state;

tensorstore/driver/neuroglancer_precomputed/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ tensorstore_cc_test(
8282
"//tensorstore:staleness_bound",
8383
"//tensorstore:static_cast",
8484
"//tensorstore:strided_layout",
85+
"//tensorstore:transaction",
8586
"//tensorstore/driver:driver_testutil",
8687
"//tensorstore/driver/zarr",
8788
"//tensorstore/index_space:dim_expression",

tensorstore/driver/neuroglancer_precomputed/driver_test.cc

Lines changed: 34 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
#include "tensorstore/static_cast.h"
6262
#include "tensorstore/strided_layout.h"
6363
#include "tensorstore/tensorstore.h"
64+
#include "tensorstore/transaction.h"
6465
#include "tensorstore/util/dimension_set.h"
6566
#include "tensorstore/util/result.h"
6667
#include "tensorstore/util/status.h"
@@ -1501,9 +1502,7 @@ TEST(ShardedWriteTest, Basic) {
15011502
EXPECT_THAT(tensorstore::Read(store).result(), ::testing::Optional(array));
15021503
}
15031504

1504-
// Disable due to race condition whereby writeback of a shard may start while
1505-
// some chunks that have been modified are still being written back to it.
1506-
TEST(FullShardWriteTest, Basic) {
1505+
TEST(FullShardWriteTest, WithTransaction) {
15071506
auto context = Context::Default();
15081507

15091508
TENSORSTORE_ASSERT_OK_AND_ASSIGN(
@@ -1512,8 +1511,6 @@ TEST(FullShardWriteTest, Basic) {
15121511
auto mock_key_value_store = *mock_key_value_store_resource;
15131512

15141513
::nlohmann::json json_spec{
1515-
// Use a cache to avoid early writeback of partial shard.
1516-
{"context", {{"cache_pool", {{"total_bytes_limit", 10'000'000}}}}},
15171514
{"driver", "neuroglancer_precomputed"},
15181515
{"kvstore",
15191516
{
@@ -1556,51 +1553,46 @@ TEST(FullShardWriteTest, Basic) {
15561553
// Shard 4 origin: {0, 0, 8}
15571554
// Shard 5 origin: {0, 4, 8}
15581555

1559-
// Repeat the test to try to detect errors due to possible timing-dependent
1560-
// behavior differences.
1561-
for (int i = 0; i < 100; ++i) {
1562-
auto store_future = tensorstore::Open(json_spec, context);
1563-
store_future.Force();
1556+
tensorstore::Transaction txn(tensorstore::isolated);
1557+
auto store_future = tensorstore::Open(json_spec, context);
1558+
store_future.Force();
15641559

1565-
{
1566-
auto req = mock_key_value_store->read_requests.pop();
1567-
EXPECT_EQ("prefix/info", req.key);
1568-
req.promise.SetResult(kvstore::ReadResult::Missing(absl::Now()));
1569-
}
1570-
1571-
{
1572-
auto req = mock_key_value_store->write_requests.pop();
1573-
EXPECT_EQ("prefix/info", req.key);
1574-
EXPECT_EQ(StorageGeneration::NoValue(), req.options.if_equal);
1575-
req.promise.SetResult(TimestampedStorageGeneration{
1576-
StorageGeneration::FromString("g0"), absl::Now()});
1577-
}
1560+
{
1561+
auto req = mock_key_value_store->read_requests.pop();
1562+
EXPECT_EQ("prefix/info", req.key);
1563+
req.promise.SetResult(kvstore::ReadResult::Missing(absl::Now()));
1564+
}
15781565

1579-
TENSORSTORE_ASSERT_OK_AND_ASSIGN(auto store, store_future.result());
1566+
{
1567+
auto req = mock_key_value_store->write_requests.pop();
1568+
EXPECT_EQ("prefix/info", req.key);
1569+
EXPECT_EQ(StorageGeneration::NoValue(), req.options.if_equal);
1570+
req.promise.SetResult(TimestampedStorageGeneration{
1571+
StorageGeneration::FromString("g0"), absl::Now()});
1572+
}
15801573

1581-
auto future = tensorstore::Write(
1582-
tensorstore::MakeScalarArray<uint16_t>(42),
1583-
tensorstore::ChainResult(
1584-
store,
1585-
tensorstore::Dims(0, 1, 2).SizedInterval({0, 4, 8}, {4, 2, 2})));
1574+
TENSORSTORE_ASSERT_OK_AND_ASSIGN(auto store, store_future.result());
15861575

1587-
// Ensure copying finishes before writeback starts.
1588-
TENSORSTORE_ASSERT_OK(future.copy_future.result());
1589-
ASSERT_FALSE(future.commit_future.ready());
1576+
auto future = tensorstore::Write(
1577+
tensorstore::MakeScalarArray<uint16_t>(42),
1578+
store | txn |
1579+
tensorstore::Dims(0, 1, 2).SizedInterval({0, 4, 8}, {4, 2, 2}));
15901580

1591-
future.Force();
1581+
// Ensure copying finishes before writeback starts.
1582+
TENSORSTORE_ASSERT_OK(future);
15921583

1593-
{
1594-
auto req = mock_key_value_store->write_requests.pop();
1595-
ASSERT_EQ("prefix/1_1_1/5.shard", req.key);
1596-
// Writeback is unconditional because the entire shard is being written.
1597-
ASSERT_EQ(StorageGeneration::Unknown(), req.options.if_equal);
1598-
req.promise.SetResult(TimestampedStorageGeneration{
1599-
StorageGeneration::FromString("g0"), absl::Now()});
1600-
}
1584+
txn.CommitAsync().IgnoreFuture();
16011585

1602-
TENSORSTORE_ASSERT_OK(future.result());
1586+
{
1587+
auto req = mock_key_value_store->write_requests.pop();
1588+
ASSERT_EQ("prefix/1_1_1/5.shard", req.key);
1589+
// Writeback is unconditional because the entire shard is being written.
1590+
ASSERT_EQ(StorageGeneration::Unknown(), req.options.if_equal);
1591+
req.promise.SetResult(TimestampedStorageGeneration{
1592+
StorageGeneration::FromString("g0"), absl::Now()});
16031593
}
1594+
1595+
TENSORSTORE_ASSERT_OK(txn.future());
16041596
}
16051597

16061598
// Tests that an empty path is handled correctly.

tensorstore/driver/stack/driver_test.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ ::nlohmann::json GetRank1Length4N5Driver(int inclusive_min,
8585
if (exclusive_max == -1) {
8686
exclusive_max = inclusive_min + 4;
8787
}
88-
auto path = absl::StrCat("p", inclusive_min, exclusive_max, "/");
88+
auto path = absl::StrCat("p", inclusive_min, "_", exclusive_max, "/");
8989
auto result = ::nlohmann::json{
9090
{"driver", "n5"},
9191
{"kvstore",

tensorstore/driver/virtual_chunked/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ tensorstore_cc_library(
2424
"//tensorstore/driver:chunk_cache_driver",
2525
"//tensorstore/index_space:index_transform",
2626
"//tensorstore/internal:data_copy_concurrency_resource",
27+
"//tensorstore/internal/cache",
2728
"//tensorstore/internal/cache:cache_pool_resource",
2829
"//tensorstore/internal/cache:chunk_cache",
2930
"//tensorstore/kvstore:generation",

0 commit comments

Comments
 (0)