Skip to content

Commit 8849a5f

Browse files
badrishcCopilot
andcommitted
Fix two rare CI failures: ListPushPopStressTest host crash and VectorManager cleanup vs Reset() AVE
Two independent rare CI failures, both surfacing as `Test host process crashed` and aborting the whole test run. ## 1. `ClusterVectorSetTests.MigrateVectorSetWhileModifyingAsync` — fatal `AccessViolationException` in `VectorManager` cleanup task ### Symptom ``` Passed Garnet.test.cluster.ClusterVectorSetTests.MigrateVectorSetWhileModifyingAsync [12 s] Fatal error. System.AccessViolationException: Attempted to read or write protected memory. at Tsavorite.core.LogRecord.get_Info() at Tsavorite.core.LogRecord.get_AllocatedSize() at Tsavorite.core.ObjectScanIterator`2[...].GetPhysicalAddressAndAllocatedSize(...) at Tsavorite.core.ObjectScanIterator`2[...].GetNext() at Tsavorite.core.TsavoriteKVIterator`6[...].PushNext[...](...) at Tsavorite.core.TsavoriteKV`2[...].Iterate[...](MainSessionFunctions, ...) at Garnet.server.VectorManager+<RunCleanupTaskAsync>d__24.MoveNext() The active test run was aborted. Reason: Test host process crashed ``` The AVE is a Corrupted-State Exception — `catch (Exception)` in `RunCleanupTaskAsync` cannot suppress it; the runtime fails fast and the test host crashes. ### Root cause `Recovery.Reset()` → `hlogBase.Reset()` (in `AllocatorBase` and the per-allocator overrides `SpanByte` / `Object` / `TsavoriteLog`) frees pages by synchronously invoking `OnPagesClosed(...)` and a `for (i in BufferSize) FreePage(i)` loop. Both paths ultimately call `ReturnPage(index)`, which sets: ```csharp pageArrays[index] = default; pagePointers[index] = default; // ★ becomes 0 ``` `Reset()`'s docstring promised *"WARNING: assumes that threads have drained out at this point."* But Garnet's cluster re-attach paths invoke it on a running store: * `libs/cluster/Server/Replication/ReplicaOps/ReplicaDisklessSync.cs:100` * `libs/cluster/Server/Replication/ReplicaOps/ReplicaDiskbasedSync.cs:136` In both files `storeWrapper.Reset()` is called **before** `SuspendPrimaryOnlyTasksAsync()`, and even that suspend only drains `TaskManager` tasks — `VectorManager.cleanupTask` is independent and never drained. Once `pagePointers[i] = 0`, the iterator's `GetPhysicalAddress` returns `0 + offset` — a tiny kernel-page address — and dereferencing it in `*(RecordInfo*)physicalAddress` raises a fatal AVE. ### The exact interleaving Production scenario in `MigrateVectorSetWhileModifyingAsync`: 1. Source primary migrates a slot containing a vector set → drops the index → `CleanupDroppedIndex` queues a cleanup-task scan on the source primary. 2. The drop AOF entry replicates to the source's replica, which replays it and **also** queues a cleanup-task scan on the replica. 3. Cluster topology change (post-migration, gossip, or any reason) triggers a replica re-attach → `ReplicaDisklessSync.ReplicateAttachAsync` / `ReplicaDiskbasedSync.ReplicateAttachAsync` calls `storeWrapper.Reset()`. 4. The replica's cleanup task is still mid-iterate over the main store → AVE. Thread-level interleaving: ``` Thread A: VectorManager cleanup task Thread B: storeWrapper.Reset() ───────────────────────────────────────── ───────────────────────────────── loop session.Iterate(callbacks) PushNext → ObjectScanIterator.GetNext() epoch.Resume() ◄── enter at epoch E headAddress = HeadAddress (still old value) LoadPageIfNeeded(...) (cur >= head → in-mem) physicalAddress = pagePointers[pageIdx] + offset Recovery.Reset() hlogBase.Reset() HeadAddress ← TailAddress OnPagesClosed(...) FreePage(p) ReturnPage(p) pagePointers[p] = 0 ◄── ★ // override loop: for i in BufferSize: FreePage(i) ReturnPage(i) pagePointers[i] = 0 *(RecordInfo*)physicalAddress ◄── ☠ AVE (LogRecord.GetInfo / LogRecord.AllocatedSize) ``` ### Why epoch protection didn't catch this Tsavorite's normal eviction path defers page-freeing through: ```csharp epoch.BumpCurrentEpoch(() => OnPagesClosed(newAddr)); ``` `BumpCurrentEpoch` queues the action and only fires it after `SafeToReclaimEpoch` has advanced past the prior epoch — i.e. after every thread that was holding the prior epoch has either suspended or moved on. That's why scan iterators are safe against normal eviction. `Reset()` skipped that mechanism in two places: 1. `AllocatorBase.Reset()` invoked `OnPagesClosed(newBeginAddress)` directly. 2. The per-allocator overrides had a `for (i in BufferSize) FreePage(i)` loop that ran **after** `base.Reset()` returned — also without epoch protection. **This second loop is the actual point of failure**: even if `OnPagesClosed` were deferred, the leftover (tail) page is freed by the override loop while a reader could still be reading it. ### The fix (Tsavorite layer) `AllocatorBase.Reset()` defers ALL page-close + page-free work through `BumpCurrentEpoch` and waits on a `ManualResetEventSlim` signalled by the deferred action — no polling: ```csharp using var resetComplete = new ManualResetEventSlim(initialState: false); // If caller was already epoch-protected, our prior epoch is what the action // will be waiting on — release it before waiting and re-acquire after. var wasProtected = epoch.ThisInstanceProtected(); if (!wasProtected) epoch.Resume(); // BumpCurrentEpoch requires a protected caller try { epoch.BumpCurrentEpoch(() => { try { if (headShifted) OnPagesClosed(newBeginAddress); FreeAllAllocatedPages(); } finally { resetComplete.Set(); } // never deadlock if action throws }); } finally { epoch.Suspend(); } // unconditionally so the action can fire resetComplete.Wait(); if (wasProtected) epoch.Resume(); ``` Each per-allocator override (`SpanByte` / `Object` / `TsavoriteLog`) moves its `FreePage(i)` loop into a new `FreeAllAllocatedPages()` virtual so the loop runs inside the deferred action: ```csharp public override void Reset() { base.Reset(); Initialize(); } protected override void FreeAllAllocatedPages() { for (int index = 0; index < BufferSize; index++) if (IsAllocated(index)) FreePage(index); } ``` ### Why this is safe * The deferred action runs only after `SafeToReclaimEpoch ≥ priorEpoch`, i.e. after every iterator that was inside `GetNext` at the moment `Reset()` was called has either suspended or advanced. By the time `pagePointers[i] = 0` executes, no thread is reading `pagePointers[i]`. * Iterators that re-enter `GetNext` after `HeadAddress` was shifted see `currentAddress < headAddress` and route through the buffered disk frame instead of `pagePointers` — so they don't touch the cleared array. * `Reset()` blocks until the deferred work has actually run, preserving its synchronous contract (the override's `Initialize()` after `Reset()` observes a fully freed page set). ### Test vs. product Strictly, `Reset()`'s docstring put the burden on callers. The cluster re-attach paths violate that — they call `Reset()` before draining the `VectorManager` cleanup task, and `SuspendPrimaryOnlyTasksAsync()` doesn't cover it. The alternative would be to drain every background reader at every `Reset()` callsite, but we chose to make `Reset()` itself epoch-safe because the contract was implicit, callsites are scattered, and Tsavorite already has the right primitive (`epoch.BumpCurrentEpoch`) — the normal eviction path uses it. This makes the safety property **enforced** rather than **assumed**, and protects any future caller / background reader. ### Repro `test/Garnet.test/VectorCleanupVsResetRaceTests.cs` — adds 4 000 vectors, drops the set (queues a full-keyspace cleanup scan), then spams `storeWrapper.Reset()` for 5 s. * **Without the fix:** crashes the host on every iteration with the exact production stack (`LogRecord.get_Info` → `ObjectScanIterator.GetNext` → `VectorManager.RunCleanupTaskAsync`). * **With the fix:** all 5 `[Repeat]` iterations pass (~2 700 resets per iteration concurrent with the cleanup iterator), no AVE. ## 2. `RespListTests.ListPushPopStressTest` — host crash on rare `RedisTimeoutException` ### Symptom ``` Unhandled exception. StackExchange.Redis.RedisTimeoutException: Timeout performing LPUSH (30000ms) at StackExchange.Redis.ConnectionMultiplexer.ExecuteSyncImpl[T](...) at StackExchange.Redis.RedisDatabase.ListLeftPush(...) at Garnet.test.RespListTests.<>c__DisplayClass39_1.<ListPushPopStressTest>b__0() The active test run was aborted. Reason: Test host process crashed ``` ### Root cause (two compounding issues) 1. **Worker threads created via `new Thread(() => ...)` had no try/catch.** In modern .NET an unhandled exception in a manually-created `Thread` terminates the process, so a single transient `RedisTimeoutException` aborted the entire test run. 2. **All 20 sync workers shared a single `ConnectionMultiplexer`.** Every command went through one socket and one background writer. Under CI load + lowMemory eviction overhead the writer falls behind and accumulates queued messages until SyncTimeout (30s) trips. The failure diagnostics confirmed this: `mc: 1/1, qs: 20, bw: SpinningDown`. ### Fix * Pre-create one `ConnectionMultiplexer` per worker on the main thread. Each thread now owns its own socket, eliminating the single-writer bottleneck. Pre-creating also avoids a 20-way connect storm racing `ConnectTimeout`. * Wrap each worker body in try/catch; capture exceptions into a `ConcurrentBag`, signal stop, exit cleanly. No more host crash. * Throw the aggregate **before** the post-checks so a real timeout isn't masked by secondary "list not empty" assertion noise. * Route the deadline-exceeded path through the failure bag too. ## Files ``` libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorBase.cs | 76 +++++++++++++++++++++++-- libs/storage/Tsavorite/cs/src/core/Allocator/ObjectAllocatorImpl.cs | 7 ++- libs/storage/Tsavorite/cs/src/core/Allocator/SpanByteAllocatorImpl.cs | 7 ++- libs/storage/Tsavorite/cs/src/core/Allocator/TsavoriteLogAllocatorImpl.cs | 7 ++- test/Garnet.test/RespListTests.cs | 124 +++++++++++++++++++++++++-------------- test/Garnet.test/VectorCleanupVsResetRaceTests.cs | new ``` Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 950d976 commit 8849a5f

6 files changed

Lines changed: 310 additions & 53 deletions

File tree

libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorBase.cs

Lines changed: 64 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,14 @@ protected abstract void ReadAsync<TContext>(ulong alignedSourceAddress, IntPtr d
243243
/// <summary>Write page to device (async)</summary>
244244
protected abstract void WriteAsync<TContext>(long flushPage, DeviceIOCompletionCallback callback, PageAsyncFlushResult<TContext> asyncResult);
245245

246-
/// <summary>Reset the hybrid log. WARNING: assumes that threads have drained out at this point.</summary>
246+
/// <summary>
247+
/// Reset the hybrid log. Safe against concurrent iterators / readers: ALL page-close
248+
/// and page-free work is deferred via <see cref="LightEpoch.BumpCurrentEpoch(Action)"/>
249+
/// so it runs only after all in-flight epoch holders have moved past the current epoch.
250+
/// Without this guard a concurrent iterator inside
251+
/// <c>SpanByte/ObjectScanIterator.GetNext</c> could observe a freed
252+
/// <c>pagePointers[index] == 0</c> and AVE on dereference.
253+
/// </summary>
247254
[MethodImpl(MethodImplOptions.NoInlining)]
248255
public virtual void Reset()
249256
{
@@ -253,21 +260,59 @@ public virtual void Reset()
253260
_ = MonotonicUpdate(ref ReadOnlyAddress, newBeginAddress, out _);
254261
_ = MonotonicUpdate(ref SafeReadOnlyAddress, newBeginAddress, out _);
255262

256-
// Shift head address to tail
257-
if (MonotonicUpdate(ref HeadAddress, newBeginAddress, out _))
258-
{
259-
// Close addresses
260-
OnPagesClosed(newBeginAddress);
263+
// Shift head address to tail. Once HeadAddress is at the new value, the iterator
264+
// path checks `currentAddress < headAddress` in LoadPageIfNeeded and falls back to
265+
// disk frame buffering instead of using pagePointers — so subsequent epoch entries
266+
// by an iterator no longer touch the in-memory page pointers we are about to clear.
267+
var headShifted = MonotonicUpdate(ref HeadAddress, newBeginAddress, out _);
268+
269+
// Defer ALL page closure / freeing through BumpCurrentEpoch so the work runs only
270+
// AFTER current epoch holders (e.g. an iterator mid-GetNext) have suspended/advanced.
271+
// This is the key safety property: the iterator's GetNext acquires the epoch around
272+
// its read of pagePointers[index]; deferring the free past the current epoch ensures
273+
// that read completes before the pointer is cleared.
274+
//
275+
// The action both (a) closes the partially-closed range (so OnPagesClosedWorker
276+
// updates ClosedUntilAddress) and (b) frees any remaining allocated pages — which is
277+
// what the per-allocator overrides used to do unsafely after base.Reset returned.
278+
using var resetComplete = new ManualResetEventSlim(initialState: false);
279+
280+
// Remember whether the caller was already epoch-protected: we have to release that
281+
// protection before waiting (otherwise the action — queued for our prior epoch —
282+
// would never fire because we'd still be holding the prior epoch ourselves).
283+
var wasProtected = epoch.ThisInstanceProtected();
284+
if (!wasProtected)
285+
epoch.Resume(); // BumpCurrentEpoch requires a protected caller
261286

262-
// Wait for pages to get closed
263-
while (ClosedUntilAddress < newBeginAddress)
287+
try
288+
{
289+
epoch.BumpCurrentEpoch(() =>
264290
{
265-
_ = Thread.Yield();
266-
if (epoch.ThisInstanceProtected())
267-
epoch.ProtectAndDrain();
268-
}
291+
try
292+
{
293+
if (headShifted)
294+
OnPagesClosed(newBeginAddress);
295+
FreeAllAllocatedPages();
296+
}
297+
finally
298+
{
299+
// Always signal so Reset() can never deadlock if the action throws.
300+
resetComplete.Set();
301+
}
302+
});
303+
}
304+
finally
305+
{
306+
// Release the epoch unconditionally so the deferred action can fire (no-spin).
307+
epoch.Suspend();
269308
}
270309

310+
resetComplete.Wait();
311+
312+
// Restore the caller's epoch state if they were protected on entry.
313+
if (wasProtected)
314+
epoch.Resume();
315+
271316
// Update begin address to tail
272317
_ = MonotonicUpdate(ref BeginAddress, newBeginAddress, out _);
273318

@@ -281,6 +326,13 @@ public virtual void Reset()
281326
device.Reset();
282327
}
283328

329+
/// <summary>
330+
/// Free any pages still allocated after <see cref="OnPagesClosed"/> has run. Subclasses
331+
/// override to call their per-allocator FreePage. Invoked from inside Reset's
332+
/// epoch.BumpCurrentEpoch action so it is safe against concurrent iterators.
333+
/// </summary>
334+
protected virtual void FreeAllAllocatedPages() { }
335+
284336
/// <summary>Asynchronously wraps <see cref="TruncateUntilAddressBlocking(long)"/>.</summary>
285337
internal void TruncateUntilAddress(long toAddress) => _ = Task.Run(() => TruncateUntilAddressBlocking(toAddress));
286338

libs/storage/Tsavorite/cs/src/core/Allocator/ObjectAllocatorImpl.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,12 +117,17 @@ protected internal override void Initialize()
117117
public override void Reset()
118118
{
119119
base.Reset();
120+
Initialize();
121+
}
122+
123+
/// <inheritdoc />
124+
protected override void FreeAllAllocatedPages()
125+
{
120126
for (var index = 0; index < BufferSize; index++)
121127
{
122128
if (IsAllocated(index))
123129
FreePage(index);
124130
}
125-
Initialize();
126131
}
127132

128133
/// <summary>Allocate memory page, pinned in memory, and in sector aligned form, if possible</summary>

libs/storage/Tsavorite/cs/src/core/Allocator/SpanByteAllocatorImpl.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,17 @@ public SpanByteAllocatorImpl(AllocatorSettings settings, TStoreFunctions storeFu
2626
public override void Reset()
2727
{
2828
base.Reset();
29+
Initialize();
30+
}
31+
32+
/// <inheritdoc />
33+
protected override void FreeAllAllocatedPages()
34+
{
2935
for (int index = 0; index < BufferSize; index++)
3036
{
3137
if (IsAllocated(index))
3238
FreePage(index);
3339
}
34-
Initialize();
3540
}
3641

3742
/// <summary>Allocate memory page, pinned in memory, and in sector aligned form, if possible</summary>

libs/storage/Tsavorite/cs/src/core/Allocator/TsavoriteLogAllocatorImpl.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,17 @@ public TsavoriteLogAllocatorImpl(AllocatorSettings settings)
2727
public override void Reset()
2828
{
2929
base.Reset();
30+
Initialize();
31+
}
32+
33+
/// <inheritdoc />
34+
protected override void FreeAllAllocatedPages()
35+
{
3036
for (var index = 0; index < BufferSize; index++)
3137
{
3238
if (IsAllocated(index))
3339
FreePage(index);
3440
}
35-
Initialize();
3641
}
3742

3843
/// <summary>

test/Garnet.test/RespListTests.cs

Lines changed: 86 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// Licensed under the MIT license.
33

44
using System;
5+
using System.Collections.Concurrent;
56
using System.Collections.Generic;
67
using System.Linq;
78
using System.Text;
@@ -1091,59 +1092,106 @@ public void ListPushPopStressTest()
10911092

10921093
var keyArray = keys.ToArray();
10931094
var stop = new ManualResetEventSlim(false);
1095+
var failures = new ConcurrentBag<Exception>();
1096+
1097+
// Pre-create one ConnectionMultiplexer per worker thread so each "client" has its
1098+
// own socket. A single shared multiplexer serializes all sync writes through one
1099+
// background writer; under CI load + lowMemory eviction that writer can fall behind
1100+
// and accumulate enough queued commands to exceed SyncTimeout. Connecting up front
1101+
// (instead of inside each thread) also avoids a 20-way connect storm racing
1102+
// against ConfigurationOptions.ConnectTimeout.
1103+
var workerCount = keyCount * 2;
1104+
var workerMuxes = new ConnectionMultiplexer[workerCount];
1105+
for (int i = 0; i < workerCount; i++)
1106+
workerMuxes[i] = ConnectionMultiplexer.Connect(TestUtils.GetConfig());
10941107

1095-
// Use dedicated threads to avoid threadpool starvation on small CI runners.
1096-
var threads = new Thread[keyCount * 2];
1097-
for (int i = 0; i < keyCount; i++)
1108+
try
10981109
{
1099-
var key = keyArray[i];
1100-
1101-
threads[i * 2] = new Thread(() =>
1110+
// Use dedicated threads to avoid threadpool starvation on small CI runners.
1111+
var threads = new Thread[workerCount];
1112+
for (int i = 0; i < keyCount; i++)
11021113
{
1103-
for (int j = 0; j < ppCount && !stop.IsSet; j++)
1104-
db.ListLeftPush(key, j);
1105-
})
1106-
{ IsBackground = true };
1114+
var key = keyArray[i];
1115+
var pushDb = workerMuxes[i * 2].GetDatabase(0);
1116+
var popDb = workerMuxes[i * 2 + 1].GetDatabase(0);
1117+
1118+
threads[i * 2] = new Thread(() =>
1119+
{
1120+
try
1121+
{
1122+
for (int j = 0; j < ppCount && !stop.IsSet; j++)
1123+
pushDb.ListLeftPush(key, j);
1124+
}
1125+
catch (Exception ex)
1126+
{
1127+
failures.Add(ex);
1128+
stop.Set();
1129+
}
1130+
})
1131+
{ IsBackground = true };
1132+
1133+
threads[i * 2 + 1] = new Thread(() =>
1134+
{
1135+
try
1136+
{
1137+
for (int j = 0; j < ppCount && !stop.IsSet; j++)
1138+
{
1139+
var value = popDb.ListRightPop(key);
1140+
while (value.IsNull && !stop.IsSet)
1141+
{
1142+
Thread.Sleep(1);
1143+
value = popDb.ListRightPop(key);
1144+
}
1145+
if (!stop.IsSet)
1146+
ClassicAssert.IsTrue((int)value >= 0 && (int)value < ppCount, "Pop value inconsistency");
1147+
}
1148+
}
1149+
catch (Exception ex)
1150+
{
1151+
failures.Add(ex);
1152+
stop.Set();
1153+
}
1154+
})
1155+
{ IsBackground = true };
1156+
}
11071157

1108-
threads[i * 2 + 1] = new Thread(() =>
1158+
foreach (var t in threads) t.Start();
1159+
try
11091160
{
1110-
for (int j = 0; j < ppCount && !stop.IsSet; j++)
1161+
var deadline = DateTime.UtcNow.AddMinutes(5);
1162+
foreach (var t in threads)
11111163
{
1112-
var value = db.ListRightPop(key);
1113-
while (value.IsNull && !stop.IsSet)
1164+
var remaining = deadline - DateTime.UtcNow;
1165+
if (remaining <= TimeSpan.Zero || !t.Join(remaining))
11141166
{
1115-
Thread.Sleep(1);
1116-
value = db.ListRightPop(key);
1167+
stop.Set();
1168+
failures.Add(new TimeoutException("ListPushPopStressTest timed out after 5 minutes"));
1169+
break;
11171170
}
1118-
if (!stop.IsSet)
1119-
ClassicAssert.IsTrue((int)value >= 0 && (int)value < ppCount, "Pop value inconsistency");
11201171
}
1121-
})
1122-
{ IsBackground = true };
1123-
}
1172+
}
1173+
finally
1174+
{
1175+
stop.Set();
1176+
foreach (var t in threads)
1177+
t.Join(TimeSpan.FromSeconds(30));
1178+
}
11241179

1125-
foreach (var t in threads) t.Start();
1126-
try
1127-
{
1128-
var deadline = DateTime.UtcNow.AddMinutes(5);
1129-
foreach (var t in threads)
1180+
// Surface worker failures BEFORE the post-checks so a transient timeout isn't
1181+
// masked by secondary "list not empty" assertion noise.
1182+
if (!failures.IsEmpty)
1183+
throw new AggregateException("ListPushPopStressTest worker(s) failed", failures);
1184+
1185+
foreach (var key in keyArray)
11301186
{
1131-
var remaining = deadline - DateTime.UtcNow;
1132-
if (remaining <= TimeSpan.Zero || !t.Join(remaining))
1133-
ClassicAssert.Fail("ListPushPopStressTest timed out after 5 minutes");
1187+
var count = db.ListLength(key);
1188+
ClassicAssert.AreEqual(0, count);
11341189
}
11351190
}
11361191
finally
11371192
{
1138-
stop.Set();
1139-
foreach (var t in threads)
1140-
t.Join(TimeSpan.FromSeconds(30));
1141-
}
1142-
1143-
foreach (var key in keyArray)
1144-
{
1145-
var count = db.ListLength(key);
1146-
ClassicAssert.AreEqual(0, count);
1193+
foreach (var mux in workerMuxes)
1194+
mux?.Dispose();
11471195
}
11481196
}
11491197

0 commit comments

Comments
 (0)