Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 95 additions & 86 deletions libs/client/LightEpoch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the MIT license.

using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
Expand All @@ -15,37 +16,9 @@ namespace Garnet.client
public sealed unsafe class LightEpoch
{
/// <summary>
/// Buffer to track information for LightEpoch instances. This is used:
/// (1) in AssignInstance, to assign a unique instanceId to each LightEpoch instance, and
/// (2) in Metadata, to track per-thread epoch table entries for each LightEpoch instance.
/// </summary>
[StructLayout(LayoutKind.Explicit, Size = MaxInstances * sizeof(int))]
private struct InstanceIndexBuffer
{
/// <summary>
/// Maximum number of concurrent instances of LightEpoch supported.
/// </summary>
internal const int MaxInstances = 1024;

/// <summary>
/// Anchor field for the buffer.
/// </summary>
[FieldOffset(0)]
int field0;

/// <summary>
/// Reference to the entry for the given instance ID.
/// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal ref int GetRef(int instanceId)
{
Debug.Assert(instanceId >= 0 && instanceId < MaxInstances);
return ref Unsafe.AsRef<int>((int*)Unsafe.AsPointer(ref field0) + instanceId);
}
}

/// <summary>
/// Store for thread-static metadata.
/// Store for thread-static metadata. Each thread lazily allocates a GC-pinned int[]
/// that maps instanceId to epoch table entry index. The pinned pointer enables
/// uniform access via pointer arithmetic with no branches on the hot path.
/// </summary>
private class Metadata
{
Expand All @@ -62,24 +35,31 @@ private class Metadata
internal static ushort startOffset1;

/// <summary>
/// Alternate start offset to reserve entry in the epoch table (to reduce probing if <see cref="startOffset1"/> slot is already filled)
/// Alternate start offset to reserve entry in the epoch table
/// </summary>
[ThreadStatic]
internal static ushort startOffset2;

/// <summary>
/// This is the thread-static index for fast access to the tableAligned index
/// that is obtained when each LightEpoch instance calls ReserveEntry.
/// The instanceId of the LightEpoch instance (assigned to the instance
/// at constructor time using InstanceTracker) is the lookup offset into
/// Entries.
///
/// Note that Entries effectively gives us ThreadLocal{T} semantics of
/// (instance, thread)-specific metadata, without the overhead of
/// ThreadLocal{T}.
/// Pointer to the per-thread entries array. Each LightEpoch instance's instanceId
/// is the lookup offset: <c>*(entriesPtr + instanceId)</c> gives the epoch table
/// entry index for this thread. Targets a GC-pinned int[] on the Pinned Object Heap.
/// Null until the thread's first <see cref="LightEpoch.Acquire"/> call.
/// </summary>
[ThreadStatic]
internal static int* entriesPtr;

/// <summary>
/// Keeps the pinned entries array rooted so GC does not collect it.
/// </summary>
[ThreadStatic]
internal static int[] entriesArray;

/// <summary>
/// Current capacity of <see cref="entriesArray"/>. Zero until initialized.
/// </summary>
[ThreadStatic]
internal static InstanceIndexBuffer Entries;
internal static int entriesCapacity;
}

/// <summary>
Expand All @@ -102,6 +82,11 @@ private class Metadata
/// </summary>
const int kDrainListSize = 16;

/// <summary>
/// Initial per-thread entries array capacity. Grows by doubling as needed.
/// </summary>
const int kInitialEntriesCapacity = 16;

/// <summary>
/// Thread protection status entries.
/// </summary>
Expand Down Expand Up @@ -152,12 +137,25 @@ private class Metadata
readonly int instanceId;

/// <summary>
/// This is the LightEpoch-level static buffer (array) of available instance slots.
/// On LightEpoch instance creation, it is used by SelectInstance() to find an
/// available slot in this array; this becomes the LightEpoch instance's instanceId,
/// which is the lookup index into the thread-static Metadata.Entries.
/// Set to 1 on first Dispose call to prevent double-dispose.
/// </summary>
int disposed;

/// <summary>
/// Next instance ID to allocate. Monotonically increasing.
/// </summary>
static int nextInstanceId;

/// <summary>
/// Pool of recycled instance IDs for reuse. Keeps per-thread arrays compact
/// when instances are frequently created and disposed.
/// </summary>
static readonly ConcurrentQueue<int> freeInstanceIds = new();

/// <summary>
/// Number of currently active LightEpoch instances.
/// </summary>
static InstanceIndexBuffer InstanceTracker;
static int activeInstanceCount;

/// <summary>
/// Instantiate the epoch table
Expand Down Expand Up @@ -186,14 +184,10 @@ public LightEpoch()

int SelectInstance()
{
for (var i = 0; i < InstanceIndexBuffer.MaxInstances; i++)
{
ref var entry = ref InstanceTracker.GetRef(i);
// Try to claim this instance ID (indicated as 1 in the entry)
if (kInvalidIndex == Interlocked.CompareExchange(ref entry, 1, kInvalidIndex))
return i;
}
throw new InvalidOperationException($"Exceeded maximum number of active LightEpoch instances {ActiveInstanceCount()} {InstanceIndexBuffer.MaxInstances}");
Interlocked.Increment(ref activeInstanceCount);
if (freeInstanceIds.TryDequeue(out var recycledId))
return recycledId;
return Interlocked.Increment(ref nextInstanceId) - 1;
}

Comment on lines +187 to 192
/// <summary>
Expand All @@ -202,31 +196,28 @@ int SelectInstance()
/// <returns></returns>
public static int ActiveInstanceCount()
{
int count = 0;
for (var i = 0; i < InstanceIndexBuffer.MaxInstances; i++)
{
if (kInvalidIndex != InstanceTracker.GetRef(i))
count++;
}
return count;
return Volatile.Read(ref activeInstanceCount);
}

/// <summary>
/// Reset all instances. Used for testing to reset static LightEpoch state for all instances.
/// </summary>
public static void ResetAllInstances()
{
for (var i = 0; i < InstanceIndexBuffer.MaxInstances; i++)
{
InstanceTracker.GetRef(i) = kInvalidIndex;
}
nextInstanceId = 0;
activeInstanceCount = 0;
while (freeInstanceIds.TryDequeue(out _)) { }
}

/// <summary>
/// Clean up epoch table
/// </summary>
public void Dispose()
{
// Guard against double-dispose
if (Interlocked.Exchange(ref disposed, 1) != 0)
return;

// Cancel any threads currently waiting on the semaphore so they
// unwind and decrement waiterCount.
cts.Cancel();
Expand All @@ -241,8 +232,8 @@ public void Dispose()

CurrentEpoch = 1;
SafeToReclaimEpoch = 0;
// Mark this instance ID as available
InstanceTracker.GetRef(instanceId) = kInvalidIndex;
freeInstanceIds.Enqueue(instanceId);
Interlocked.Decrement(ref activeInstanceCount);

cts.Dispose();
waiterSemaphore.Dispose();
Expand All @@ -252,15 +243,14 @@ public void Dispose()
/// Check whether current epoch instance is protected on this thread
/// </summary>
/// <returns>Result of the check</returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public bool ThisInstanceProtected()
{
ref var entry = ref Metadata.Entries.GetRef(instanceId);
if (kInvalidIndex != entry)
{
if ((*(tableAligned + entry)).threadId == Metadata.threadId)
return true;
}
return false;
var ptr = Metadata.entriesPtr;
if (ptr == null || (uint)instanceId >= (uint)Metadata.entriesCapacity)
return false;
var entry = *(ptr + instanceId);
return kInvalidIndex != entry && (*(tableAligned + entry)).threadId == Metadata.threadId;
}

/// <summary>
Expand All @@ -285,7 +275,8 @@ public bool TrySuspend()
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void ProtectAndDrain()
{
ref var entry = ref Metadata.Entries.GetRef(instanceId);
Debug.Assert(Metadata.entriesPtr != null, "ProtectAndDrain called before Resume on this thread");
ref var entry = ref *(Metadata.entriesPtr + instanceId);

Debug.Assert(entry > 0, "Trying to refresh unacquired epoch");
Comment on lines +278 to 281
Debug.Assert((*(tableAligned + entry)).threadId > 0, "Epoch table entry missing threadId");
Expand Down Expand Up @@ -501,12 +492,15 @@ void Drain(long nextEpoch)
[MethodImpl(MethodImplOptions.AggressiveInlining)]
void Acquire()
{
ref var entry = ref Metadata.Entries.GetRef(instanceId);
if (instanceId >= Metadata.entriesCapacity)
EnsureThreadInitialized();

ref var entry = ref *(Metadata.entriesPtr + instanceId);
Debug.Assert(entry == kInvalidIndex,
"Trying to acquire protected epoch. Make sure you do not re-enter Tsavorite from callbacks or IDevice implementations. If using tasks, use TaskCreationOptions.RunContinuationsAsynchronously.");

// Reserve an entry in the epoch table for this thread
ReserveEntryForThread(ref entry);
ReserveEntry(ref entry);

Debug.Assert((*(tableAligned + entry)).localCurrentEpoch == 0,
"Trying to acquire protected epoch. Make sure you do not re-enter Tsavorite from callbacks or IDevice implementations. If using tasks, use TaskCreationOptions.RunContinuationsAsynchronously.");
Expand All @@ -529,7 +523,8 @@ void Acquire()
[MethodImpl(MethodImplOptions.AggressiveInlining)]
void Release()
{
ref var entry = ref Metadata.Entries.GetRef(instanceId);
Debug.Assert(Metadata.entriesPtr != null, "Release called before Resume on this thread");
ref var entry = ref *(Metadata.entriesPtr + instanceId);

Debug.Assert((*(tableAligned + entry)).localCurrentEpoch != 0,
"Trying to release unprotected epoch. Make sure you do not re-enter Tsavorite from callbacks or IDevice implementations. If using tasks, use TaskCreationOptions.RunContinuationsAsynchronously.");
Expand Down Expand Up @@ -654,20 +649,34 @@ void ReserveEntryWait(ref int entry)
}

/// <summary>
/// Allocate a new entry in epoch table
/// Initialize per-thread metadata and/or grow the entries array. Called when
/// <c>instanceId >= Metadata.entriesCapacity</c>, which covers both first-time
/// init (capacity is 0) and growth (new instance with higher ID). This method
/// is NoInlining to keep the fast path in Acquire compact.
/// </summary>
/// <returns>Reserved entry</returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
void ReserveEntryForThread(ref int entry)
[MethodImpl(MethodImplOptions.NoInlining)]
void EnsureThreadInitialized()
{
if (Metadata.threadId == 0) // run once per thread for performance
if (Metadata.threadId == 0)
{
Metadata.threadId = Environment.CurrentManagedThreadId;
uint code = (uint)Utility.Murmur3(Metadata.threadId);
var code = (uint)Utility.Murmur3(Metadata.threadId);
Metadata.startOffset1 = (ushort)(1 + (code % kTableSize));
Metadata.startOffset2 = (ushort)(1 + ((code >> 16) % kTableSize));
}
ReserveEntry(ref entry);

// Compute required capacity (round up to power of 2)
var requiredCapacity = instanceId + 1;
var newCapacity = Math.Max(kInitialEntriesCapacity, Metadata.entriesCapacity);
while (newCapacity < requiredCapacity)
newCapacity *= 2;
Comment on lines +669 to +672

var newArray = GC.AllocateArray<int>(newCapacity, pinned: true);
if (Metadata.entriesArray != null)
Array.Copy(Metadata.entriesArray, newArray, Metadata.entriesArray.Length);
Metadata.entriesArray = newArray;
Metadata.entriesPtr = (int*)Unsafe.AsPointer(ref newArray[0]);
Metadata.entriesCapacity = newCapacity;
}

/// <inheritdoc/>
Expand Down
Loading
Loading