Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions .github/copilot-instructions.md
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,73 @@ To add a new Garnet server setting:
- `LightEpoch` instances track ownership — only dispose if owned
- In parallel tests, share a `LightEpoch` instance across `GarnetClient` instances

### Epoch Protection and Log Address Invariants

Tsavorite uses **epoch-based memory reclamation** (`LightEpoch`) so writers can publish new values and reclaim old memory only after every reader has moved past it. Any change to the allocator, recovery, scan iterators, transient locking, or callbacks fired from the drain list must respect the rules below.

**Key files**: `libs/storage/Tsavorite/cs/src/core/Epochs/LightEpoch.cs`, `libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorBase.cs`, `libs/storage/Tsavorite/cs/src/core/ClientSession/ClientSession.cs`.

#### Epoch protection model (`LightEpoch`)

- Each thread acquires a per-instance entry via `epoch.Resume()` (calls `Acquire`) and releases it via `epoch.Suspend()` (calls `Release`). Inside the protected region, the thread's `localCurrentEpoch` is advanced on every `ProtectAndDrain()` call and on entry.
- `Resume()` is **non-reentrant** — `Acquire` asserts if the thread is already protected on this instance. Use `ResumeIfNotProtected()` (returns `true` if it acquired) when code may be entered under an existing hold; pair with a matching `Suspend()` only on the path that took it.
- `BasicContext.{RMW, Upsert, Read, Delete}` wrap the call in `UnsafeResumeThread()` / `UnsafeSuspendThread()` (in `ClientSession`) via try/finally. Custom code that calls `epoch.ProtectAndDrain()` (e.g., spin-waiters in `EpochOperations.SpinWaitUntilClosed`/`SpinWaitUntilRecordIsClosed`, `TransientLocking.LockForScan`) **must already hold the epoch** — the `Debug.Assert(entry > 0, "Trying to refresh unacquired epoch")` in `LightEpoch.ProtectAndDrain` fires otherwise.
- `BumpCurrentEpoch(Action)` increments the global epoch and queues `Action` against the *prior* epoch. The action fires on whatever thread next observes that epoch as safe-to-reclaim — typically inside `ProtectAndDrain` → `Drain`. Therefore actions must be **thread-agnostic** (no thread-affine state) and **safe to fire synchronously** from the bumping thread itself: `BumpCurrentEpoch(Action)` calls `ProtectAndDrain` internally and may execute the action it just queued.

#### Log address layout and invariants

The seven log addresses on `AllocatorBase` advance monotonically and obey:

```
BeginAddress <= ClosedUntilAddress <= SafeHeadAddress <= HeadAddress
<= FlushedUntilAddress
<= SafeReadOnlyAddress <= ReadOnlyAddress <= TailAddress
```

| Address | Meaning |
|---------|---------|
| `BeginAddress` | Lowest valid address. Advancing it logically retires older addresses but **does not delete on-disk files** by itself; physical truncation only happens when a `ShiftBeginAddress` caller passes `truncateLog: true` (typically a checkpoint commit), and even then the device may defer file removal. |
| `ClosedUntilAddress` | Highest address whose page buffer has been freed (`pagePointers[idx] = 0`). |
| `SafeHeadAddress` | High-water set by `OnPagesClosed` *before* freeing — readers see it lead `ClosedUntilAddress`. |
| `HeadAddress` | Lowest in-memory address. May advance while you hold the epoch, **but any address that was `>= HeadAddress` at any point during your protected region cannot be evicted until you `Suspend`**. Capped at `FlushedUntilAddress` — eviction never gets ahead of disk durability. |
| `FlushedUntilAddress` | All bytes below have been written to disk. Updated by flush completion callbacks invoked from `AsyncFlushPagesForReadOnly`. Lags `SafeReadOnlyAddress` (a page is only flushed once it has become safely read-only). |
| `SafeReadOnlyAddress` | Below this, no writer can be in-place mutating. Set by `OnPagesMarkedReadOnly` after writers have drained; same call also kicks off the flush that will later advance `FlushedUntilAddress`. |
| `ReadOnlyAddress` | Maximum address of the immutable region. Records below are flushed/in-flush. |
| `TailAddress` | Next address to allocate; published via the `PageOffset` CAS in `HandlePageOverflow`. |

#### Cascade pattern: publish → epoch barrier → post-drain action

Address advancement uses a **publish → bump → action** cascade so that the post-barrier work runs only after every prior holder has observed the new value:

1. **Publish** the new address into the visible field via `MonotonicUpdate`.
2. **`BumpCurrentEpoch(Action)`** queues the post-barrier work against the prior epoch; it fires once every thread that observed the old value has either `Suspend`ed or `ProtectAndDrain`ed.
3. The **action** does the work that requires "all prior holders have moved past": flush pages, advance the `Safe*` companion, close pages, free buffers, truncate disk segments.

The two cascades you encounter on the runtime hot path:

- **Read-only / flush cycle** — `ShiftReadOnlyAddress(newRO)` publishes `ReadOnlyAddress`, then `BumpCurrentEpoch(OnPagesMarkedReadOnly)`. The action advances `SafeReadOnlyAddress` and issues `AsyncFlushPagesForReadOnly`; flush completion later advances `FlushedUntilAddress` via `FlushCallback`. Triggered by `PageAlignedShiftReadOnlyAddress` whenever the tail moves far enough past the read-only region.
- **Eviction / close cycle** — `ShiftHeadAddress(desiredHA)` publishes `HeadAddress`, then `BumpCurrentEpoch(OnPagesClosed)`. The action advances `SafeHeadAddress` and `ClosedUntilAddress`, and frees page buffers via the per-allocator `FreePage` (defined in `SpanByteAllocatorImpl` / `ObjectAllocatorImpl`). Triggered when `FlushedUntilAddress` moves past `HeadAddress + (some delta)`, or explicitly via `ShiftHeadAddressToBlocking`.

Other cascades:

- **`ShiftBeginAddress(newBA, truncateLog)`** — publishes `BeginAddress` (and cascades through `ShiftReadOnlyAddress` + `ShiftHeadAddress` if needed). When `truncateLog: true`, also bumps with `TruncateUntilAddress` to drop on-disk segments below the new begin; when `false` (the common case) on-disk segments are left in place to be reclaimed at the next checkpoint commit. Disk file removal itself is asynchronous — even after `TruncateUntilAddress` returns, the device may defer the actual unlink.
- **`ShiftReadOnlyAddressWithWait(newRO, wait)`** — convenience wrapper that uses `ResumeIfNotProtected`/`Suspend` to launch the shift and (optionally) blocks the caller on `FlushedUntilAddress < newRO`.

#### Rules when changing allocator/iterator/callback code

1. **Holding the epoch implies stability**: an address observed `>= HeadAddress` during the protected region cannot be evicted before `Suspend()`. Re-acquire after suspend and re-validate.
2. **`Suspend` and `Resume` must be balanced** on every code path. The only suspend inside the basic op path is the `ALLOCATE_FAILED` retry in `HandleRetryStatus`, balanced via try/finally.
3. **Drain-list actions run on arbitrary threads** that hold the epoch. Do not capture thread-static state; do not call code that asserts on a specific thread.
4. **Multi-phase mutations** that need to advance several addresses with barriers between them should use one `BumpCurrentEpoch(Action)` per phase with a `ManualResetEventSlim` to wait. **Drop the prior epoch before waiting** on the MRE — otherwise the drain list cannot make progress (the action you queued cannot fire while you hold the epoch it is gating on). Re-acquire to issue the next bump. `AllocatorBase.Reset` is an example: phase 1 publishes `ReadOnlyAddress` and waits for writers to drain before advancing `SafeReadOnlyAddress`/`FlushedUntilAddress`; phase 2 publishes `HeadAddress` and waits for readers to drain before closing/freeing pages.
5. **Address publication ordering**: when one operation advances multiple addresses, advance the more permissive ones (`HeadAddress`, `ReadOnlyAddress`) before the more restrictive ones (`BeginAddress`). The full invariant `BeginAddress <= ClosedUntilAddress <= SafeHeadAddress <= HeadAddress <= FlushedUntilAddress <= SafeReadOnlyAddress <= ReadOnlyAddress <= TailAddress` must hold throughout, and stale readers caching the older value will route through safer paths (e.g., disk-frame branch in `LoadPageIfNeeded` rather than dereferencing freed `pagePointers`). `AllocatorBase.Reset` publishes `BeginAddress` last for this reason — an iterator with a stale `nextAddress` then routes through the disk-frame path instead of the in-memory page that has just been freed.
6. **Page pointers**: after `OnPagesClosed` → `FreePage`, `pagePointers[idx] = 0`. Iterators must not dereference a page pointer outside the epoch protection that observed `addr >= HeadAddress`.
7. **Scan iterators and `BufferAndLoad`**: `ScanIteratorBase.BufferAndLoad` may internally call `BumpCurrentEpoch`, `ProtectAndDrain`, or `Suspend`+`Resume` on IO, any of which advances the iterator thread's `localCurrentEpoch` and may synchronously fire deferred drain-list actions. Reads stay safe because the IO frame is iterator-owned (allocated in the iterator's constructor) and `headAddress` advances monotonically — `LoadPageIfNeeded` only routes a record to the in-log path when it was `>= HeadAddress` at the time of sampling, so the snapshot's routing decision is always conservative.

#### Tests that exercise these paths

- `BasicLockTests.FunctionsLockTest` (in `libs/storage/Tsavorite/cs/test/BasicLockTests.cs`) — multi-threaded RMW/Upsert under contention; exercises Resume/Suspend balance and `ProtectAndDrain`.
- Cluster checkpoint/flush tests under `test/Garnet.test.cluster/` — exercise the full address cascade with live clients.

### Scratch Buffer Conventions

`StorageSession` has two scratch buffer types — use the right one:
Expand Down
108 changes: 95 additions & 13 deletions libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -243,32 +243,107 @@ protected abstract void ReadAsync<TContext>(ulong alignedSourceAddress, IntPtr d
/// <summary>Write page to device (async)</summary>
protected abstract void WriteAsync<TContext>(long flushPage, DeviceIOCompletionCallback callback, PageAsyncFlushResult<TContext> asyncResult);

/// <summary>Reset the hybrid log. WARNING: assumes that threads have drained out at this point.</summary>
/// <summary>
/// Reset the hybrid log. Safe against concurrent iterators / readers / writers via a
/// two-phase epoch cascade that mirrors the normal flush + close paths:
///
/// Phase 1: publish new ReadOnlyAddress synchronously, then under
/// BumpCurrentEpoch — i.e. after writers caching the OLD ReadOnlyAddress
/// have drained — publish SafeReadOnlyAddress and FlushedUntilAddress.
/// Mirrors OnPagesMarkedReadOnly's invariant that "by the time
/// SafeReadOnlyAddress advances, no thread is mutating below it".
///
/// Phase 2: publish new HeadAddress synchronously (now safe — writers have observed
/// the new ReadOnlyAddress, so no writer holds a cached old ReadOnlyAddress
/// that would leave HeadAddress > cached ReadOnlyAddress). Then under
/// BumpCurrentEpoch — i.e. after readers caching the OLD HeadAddress have
/// drained — close pages (advancing SafeHeadAddress and ClosedUntilAddress)
/// and free pages. Mirrors OnPagesClosed's invariant.
///
/// Final: publish new BeginAddress synchronously. Publishing it last (rather than
/// up front) means an iterator with a stale nextAddress sees
/// currentAddress &gt; OLD BeginAddress and does not snap forward into the
/// just-freed in-memory range — instead the currentAddress &lt; NEW HeadAddress
/// check routes it through LoadPageIfNeeded's disk-frame branch (frame is
/// iterator-owned, disk segment is intact). The invariant
/// BeginAddress &lt;= HeadAddress holds throughout.
/// </summary>
[MethodImpl(MethodImplOptions.NoInlining)]
public virtual void Reset()
{
var newBeginAddress = GetTailAddress();

// Shift read-only addresses to tail without flushing
// To use BumpCurrentEpoch we must be epoch-protected; conversely to wait for the
// queued action to drain we must NOT be holding the prior epoch. We toggle the
// protection per phase. If the caller arrived already protected, restore at the end.
var wasProtected = epoch.ThisInstanceProtected();
if (wasProtected)
epoch.Suspend();

// -------- Phase 1: ReadOnly -> wait for writer drain -> SafeReadOnly + FlushedUntil --------
_ = MonotonicUpdate(ref ReadOnlyAddress, newBeginAddress, out _);
_ = MonotonicUpdate(ref SafeReadOnlyAddress, newBeginAddress, out _);

// Shift head address to tail
if (MonotonicUpdate(ref HeadAddress, newBeginAddress, out _))
using (var phase1Done = new ManualResetEventSlim(initialState: false))
{
// Close addresses
OnPagesClosed(newBeginAddress);
epoch.Resume();
try
{
epoch.BumpCurrentEpoch(() =>
{
try
{
_ = MonotonicUpdate(ref SafeReadOnlyAddress, newBeginAddress, out _);
_ = MonotonicUpdate(ref FlushedUntilAddress, newBeginAddress, out _);
}
finally { phase1Done.Set(); }
});
}
finally { epoch.Suspend(); }
phase1Done.Wait();
}

// -------- Phase 2: HeadAddress -> wait for reader drain -> OnPagesClosed + FreeAllPages --------
var headShifted = MonotonicUpdate(ref HeadAddress, newBeginAddress, out _);

// Wait for pages to get closed
while (ClosedUntilAddress < newBeginAddress)
using (var phase2Done = new ManualResetEventSlim(initialState: false))
{
epoch.Resume();
try
{
_ = Thread.Yield();
if (epoch.ThisInstanceProtected())
epoch.ProtectAndDrain();
epoch.BumpCurrentEpoch(() =>
{
try
{
if (headShifted)
OnPagesClosed(newBeginAddress);

// Wait for ClosedUntilAddress to catch up to newBeginAddress before
// freeing remaining pages. Two scenarios make this necessary:
// (a) headShifted==true: OnPagesClosed may have returned immediately
// because another thread already owned OnPagesClosedWorker for our
// range — that worker is still freeing pages on the other thread.
// (b) headShifted==false: a concurrent Reset (or other ShiftHeadAddress
// caller) already advanced HeadAddress past newBeginAddress and its
// OnPagesClosedWorker may still be running.
// In both cases, calling FreeAllAllocatedPages while the worker is mid-flight
// would race with its FreePage calls and corrupt page state.
while (ClosedUntilAddress < newBeginAddress)
_ = Thread.Yield();

FreeAllAllocatedPages();
}
finally { phase2Done.Set(); }
});
}
finally { epoch.Suspend(); }
phase2Done.Wait();
}

// Update begin address to tail
// Restore caller's epoch state if they were protected on entry.
if (wasProtected)
epoch.Resume();

// -------- Final: publish BeginAddress (see XML doc on Reset for why this happens last) --------
_ = MonotonicUpdate(ref BeginAddress, newBeginAddress, out _);

flushEvent.Initialize();
Expand All @@ -281,6 +356,13 @@ public virtual void Reset()
device.Reset();
}

/// <summary>
/// Free any pages still allocated after <see cref="OnPagesClosed"/> has run. Subclasses
/// override to call their per-allocator FreePage. Invoked from inside Reset's
/// epoch.BumpCurrentEpoch action so it is safe against concurrent iterators.
/// </summary>
protected virtual void FreeAllAllocatedPages() { }

/// <summary>Asynchronously wraps <see cref="TruncateUntilAddressBlocking(long)"/>.</summary>
internal void TruncateUntilAddress(long toAddress) => _ = Task.Run(() => TruncateUntilAddressBlocking(toAddress));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,17 @@ protected internal override void Initialize()
public override void Reset()
{
base.Reset();
Initialize();
}

/// <inheritdoc />
protected override void FreeAllAllocatedPages()
{
for (var index = 0; index < BufferSize; index++)
{
if (IsAllocated(index))
FreePage(index);
}
Initialize();
}

/// <summary>Allocate memory page, pinned in memory, and in sector aligned form, if possible</summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,17 @@ public SpanByteAllocatorImpl(AllocatorSettings settings, TStoreFunctions storeFu
public override void Reset()
{
base.Reset();
Initialize();
}

/// <inheritdoc />
protected override void FreeAllAllocatedPages()
{
for (int index = 0; index < BufferSize; index++)
{
if (IsAllocated(index))
FreePage(index);
}
Initialize();
}

/// <summary>Allocate memory page, pinned in memory, and in sector aligned form, if possible</summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,17 @@ public TsavoriteLogAllocatorImpl(AllocatorSettings settings)
public override void Reset()
{
base.Reset();
Initialize();
}

/// <inheritdoc />
protected override void FreeAllAllocatedPages()
{
for (var index = 0; index < BufferSize; index++)
{
if (IsAllocated(index))
FreePage(index);
}
Initialize();
}

/// <summary>
Expand Down
Loading
Loading