diff --git a/libs/client/LightEpoch.cs b/libs/client/LightEpoch.cs index 4a13d7aad49..7575da73ba5 100644 --- a/libs/client/LightEpoch.cs +++ b/libs/client/LightEpoch.cs @@ -2,6 +2,7 @@ // Licensed under the MIT license. using System; +using System.Collections.Concurrent; using System.Diagnostics; using System.Runtime.CompilerServices; using System.Runtime.InteropServices; @@ -15,37 +16,9 @@ namespace Garnet.client public sealed unsafe class LightEpoch { /// - /// Buffer to track information for LightEpoch instances. This is used: - /// (1) in AssignInstance, to assign a unique instanceId to each LightEpoch instance, and - /// (2) in Metadata, to track per-thread epoch table entries for each LightEpoch instance. - /// - [StructLayout(LayoutKind.Explicit, Size = MaxInstances * sizeof(int))] - private struct InstanceIndexBuffer - { - /// - /// Maximum number of concurrent instances of LightEpoch supported. - /// - internal const int MaxInstances = 1024; - - /// - /// Anchor field for the buffer. - /// - [FieldOffset(0)] - int field0; - - /// - /// Reference to the entry for the given instance ID. - /// - [MethodImpl(MethodImplOptions.AggressiveInlining)] - internal ref int GetRef(int instanceId) - { - Debug.Assert(instanceId >= 0 && instanceId < MaxInstances); - return ref Unsafe.AsRef((int*)Unsafe.AsPointer(ref field0) + instanceId); - } - } - - /// - /// Store for thread-static metadata. + /// Store for thread-static metadata. Each thread lazily allocates a GC-pinned int[] + /// that maps instanceId to epoch table entry index. The pinned pointer enables + /// uniform access via pointer arithmetic with no branches on the hot path. /// private class Metadata { @@ -62,24 +35,31 @@ private class Metadata internal static ushort startOffset1; /// - /// Alternate start offset to reserve entry in the epoch table (to reduce probing if slot is already filled) + /// Alternate start offset to reserve entry in the epoch table /// [ThreadStatic] internal static ushort startOffset2; /// - /// This is the thread-static index for fast access to the tableAligned index - /// that is obtained when each LightEpoch instance calls ReserveEntry. - /// The instanceId of the LightEpoch instance (assigned to the instance - /// at constructor time using InstanceTracker) is the lookup offset into - /// Entries. - /// - /// Note that Entries effectively gives us ThreadLocal{T} semantics of - /// (instance, thread)-specific metadata, without the overhead of - /// ThreadLocal{T}. + /// Pointer to the per-thread entries array. Each LightEpoch instance's instanceId + /// is the lookup offset: *(entriesPtr + instanceId) gives the epoch table + /// entry index for this thread. Targets a GC-pinned int[] on the Pinned Object Heap. + /// Null until the thread's first call. + /// + [ThreadStatic] + internal static int* entriesPtr; + + /// + /// Keeps the pinned entries array rooted so GC does not collect it. + /// + [ThreadStatic] + internal static int[] entriesArray; + + /// + /// Current capacity of . Zero until initialized. /// [ThreadStatic] - internal static InstanceIndexBuffer Entries; + internal static int entriesCapacity; } /// @@ -102,6 +82,11 @@ private class Metadata /// const int kDrainListSize = 16; + /// + /// Initial per-thread entries array capacity. Grows by doubling as needed. + /// + const int kInitialEntriesCapacity = 16; + /// /// Thread protection status entries. /// @@ -152,12 +137,25 @@ private class Metadata readonly int instanceId; /// - /// This is the LightEpoch-level static buffer (array) of available instance slots. - /// On LightEpoch instance creation, it is used by SelectInstance() to find an - /// available slot in this array; this becomes the LightEpoch instance's instanceId, - /// which is the lookup index into the thread-static Metadata.Entries. + /// Set to 1 on first Dispose call to prevent double-dispose. + /// + int disposed; + + /// + /// Next instance ID to allocate. Monotonically increasing. + /// + static int nextInstanceId; + + /// + /// Pool of recycled instance IDs for reuse. Keeps per-thread arrays compact + /// when instances are frequently created and disposed. + /// + static readonly ConcurrentQueue freeInstanceIds = new(); + + /// + /// Number of currently active LightEpoch instances. /// - static InstanceIndexBuffer InstanceTracker; + static int activeInstanceCount; /// /// Instantiate the epoch table @@ -186,14 +184,10 @@ public LightEpoch() int SelectInstance() { - for (var i = 0; i < InstanceIndexBuffer.MaxInstances; i++) - { - ref var entry = ref InstanceTracker.GetRef(i); - // Try to claim this instance ID (indicated as 1 in the entry) - if (kInvalidIndex == Interlocked.CompareExchange(ref entry, 1, kInvalidIndex)) - return i; - } - throw new InvalidOperationException($"Exceeded maximum number of active LightEpoch instances {ActiveInstanceCount()} {InstanceIndexBuffer.MaxInstances}"); + Interlocked.Increment(ref activeInstanceCount); + if (freeInstanceIds.TryDequeue(out var recycledId)) + return recycledId; + return Interlocked.Increment(ref nextInstanceId) - 1; } /// @@ -202,13 +196,7 @@ int SelectInstance() /// public static int ActiveInstanceCount() { - int count = 0; - for (var i = 0; i < InstanceIndexBuffer.MaxInstances; i++) - { - if (kInvalidIndex != InstanceTracker.GetRef(i)) - count++; - } - return count; + return Volatile.Read(ref activeInstanceCount); } /// @@ -216,10 +204,9 @@ public static int ActiveInstanceCount() /// public static void ResetAllInstances() { - for (var i = 0; i < InstanceIndexBuffer.MaxInstances; i++) - { - InstanceTracker.GetRef(i) = kInvalidIndex; - } + nextInstanceId = 0; + activeInstanceCount = 0; + while (freeInstanceIds.TryDequeue(out _)) { } } /// @@ -227,6 +214,10 @@ public static void ResetAllInstances() /// public void Dispose() { + // Guard against double-dispose + if (Interlocked.Exchange(ref disposed, 1) != 0) + return; + // Cancel any threads currently waiting on the semaphore so they // unwind and decrement waiterCount. cts.Cancel(); @@ -241,8 +232,8 @@ public void Dispose() CurrentEpoch = 1; SafeToReclaimEpoch = 0; - // Mark this instance ID as available - InstanceTracker.GetRef(instanceId) = kInvalidIndex; + freeInstanceIds.Enqueue(instanceId); + Interlocked.Decrement(ref activeInstanceCount); cts.Dispose(); waiterSemaphore.Dispose(); @@ -252,15 +243,14 @@ public void Dispose() /// Check whether current epoch instance is protected on this thread /// /// Result of the check + [MethodImpl(MethodImplOptions.AggressiveInlining)] public bool ThisInstanceProtected() { - ref var entry = ref Metadata.Entries.GetRef(instanceId); - if (kInvalidIndex != entry) - { - if ((*(tableAligned + entry)).threadId == Metadata.threadId) - return true; - } - return false; + var ptr = Metadata.entriesPtr; + if (ptr == null || (uint)instanceId >= (uint)Metadata.entriesCapacity) + return false; + var entry = *(ptr + instanceId); + return kInvalidIndex != entry && (*(tableAligned + entry)).threadId == Metadata.threadId; } /// @@ -285,7 +275,8 @@ public bool TrySuspend() [MethodImpl(MethodImplOptions.AggressiveInlining)] public void ProtectAndDrain() { - ref var entry = ref Metadata.Entries.GetRef(instanceId); + Debug.Assert(Metadata.entriesPtr != null, "ProtectAndDrain called before Resume on this thread"); + ref var entry = ref *(Metadata.entriesPtr + instanceId); Debug.Assert(entry > 0, "Trying to refresh unacquired epoch"); Debug.Assert((*(tableAligned + entry)).threadId > 0, "Epoch table entry missing threadId"); @@ -501,12 +492,15 @@ void Drain(long nextEpoch) [MethodImpl(MethodImplOptions.AggressiveInlining)] void Acquire() { - ref var entry = ref Metadata.Entries.GetRef(instanceId); + if (instanceId >= Metadata.entriesCapacity) + EnsureThreadInitialized(); + + ref var entry = ref *(Metadata.entriesPtr + instanceId); Debug.Assert(entry == kInvalidIndex, "Trying to acquire protected epoch. Make sure you do not re-enter Tsavorite from callbacks or IDevice implementations. If using tasks, use TaskCreationOptions.RunContinuationsAsynchronously."); // Reserve an entry in the epoch table for this thread - ReserveEntryForThread(ref entry); + ReserveEntry(ref entry); Debug.Assert((*(tableAligned + entry)).localCurrentEpoch == 0, "Trying to acquire protected epoch. Make sure you do not re-enter Tsavorite from callbacks or IDevice implementations. If using tasks, use TaskCreationOptions.RunContinuationsAsynchronously."); @@ -529,7 +523,8 @@ void Acquire() [MethodImpl(MethodImplOptions.AggressiveInlining)] void Release() { - ref var entry = ref Metadata.Entries.GetRef(instanceId); + Debug.Assert(Metadata.entriesPtr != null, "Release called before Resume on this thread"); + ref var entry = ref *(Metadata.entriesPtr + instanceId); Debug.Assert((*(tableAligned + entry)).localCurrentEpoch != 0, "Trying to release unprotected epoch. Make sure you do not re-enter Tsavorite from callbacks or IDevice implementations. If using tasks, use TaskCreationOptions.RunContinuationsAsynchronously."); @@ -654,20 +649,34 @@ void ReserveEntryWait(ref int entry) } /// - /// Allocate a new entry in epoch table + /// Initialize per-thread metadata and/or grow the entries array. Called when + /// instanceId >= Metadata.entriesCapacity, which covers both first-time + /// init (capacity is 0) and growth (new instance with higher ID). This method + /// is NoInlining to keep the fast path in Acquire compact. /// - /// Reserved entry - [MethodImpl(MethodImplOptions.AggressiveInlining)] - void ReserveEntryForThread(ref int entry) + [MethodImpl(MethodImplOptions.NoInlining)] + void EnsureThreadInitialized() { - if (Metadata.threadId == 0) // run once per thread for performance + if (Metadata.threadId == 0) { Metadata.threadId = Environment.CurrentManagedThreadId; - uint code = (uint)Utility.Murmur3(Metadata.threadId); + var code = (uint)Utility.Murmur3(Metadata.threadId); Metadata.startOffset1 = (ushort)(1 + (code % kTableSize)); Metadata.startOffset2 = (ushort)(1 + ((code >> 16) % kTableSize)); } - ReserveEntry(ref entry); + + // Compute required capacity (round up to power of 2) + var requiredCapacity = instanceId + 1; + var newCapacity = Math.Max(kInitialEntriesCapacity, Metadata.entriesCapacity); + while (newCapacity < requiredCapacity) + newCapacity *= 2; + + var newArray = GC.AllocateArray(newCapacity, pinned: true); + if (Metadata.entriesArray != null) + Array.Copy(Metadata.entriesArray, newArray, Metadata.entriesArray.Length); + Metadata.entriesArray = newArray; + Metadata.entriesPtr = (int*)Unsafe.AsPointer(ref newArray[0]); + Metadata.entriesCapacity = newCapacity; } /// diff --git a/libs/storage/Tsavorite/cs/src/core/Epochs/LightEpoch.cs b/libs/storage/Tsavorite/cs/src/core/Epochs/LightEpoch.cs index c286bd28672..8f2a8b1f573 100644 --- a/libs/storage/Tsavorite/cs/src/core/Epochs/LightEpoch.cs +++ b/libs/storage/Tsavorite/cs/src/core/Epochs/LightEpoch.cs @@ -2,6 +2,7 @@ // Licensed under the MIT license. using System; +using System.Collections.Concurrent; using System.Diagnostics; using System.Runtime.CompilerServices; using System.Runtime.InteropServices; @@ -15,37 +16,9 @@ namespace Tsavorite.core public sealed unsafe class LightEpoch : IEpochAccessor { /// - /// Buffer to track information for LightEpoch instances. This is used: - /// (1) in AssignInstance, to assign a unique instanceId to each LightEpoch instance, and - /// (2) in Metadata, to track per-thread epoch table entries for each LightEpoch instance. - /// - [StructLayout(LayoutKind.Explicit, Size = MaxInstances * sizeof(int))] - private struct InstanceIndexBuffer - { - /// - /// Maximum number of concurrent instances of LightEpoch supported. - /// - internal const int MaxInstances = 1024; - - /// - /// Anchor field for the buffer. - /// - [FieldOffset(0)] - int field0; - - /// - /// Reference to the entry for the given instance ID. - /// - [MethodImpl(MethodImplOptions.AggressiveInlining)] - internal ref int GetRef(int instanceId) - { - Debug.Assert(instanceId >= 0 && instanceId < MaxInstances); - return ref Unsafe.AsRef((int*)Unsafe.AsPointer(ref field0) + instanceId); - } - } - - /// - /// Store for thread-static metadata. + /// Store for thread-static metadata. Each thread lazily allocates a GC-pinned int[] + /// that maps instanceId to epoch table entry index. The pinned pointer enables + /// uniform access via pointer arithmetic with no branches on the hot path. /// private class Metadata { @@ -62,24 +35,31 @@ private class Metadata internal static ushort startOffset1; /// - /// Alternate start offset to reserve entry in the epoch table (to reduce probing if slot is already filled) + /// Alternate start offset to reserve entry in the epoch table /// [ThreadStatic] internal static ushort startOffset2; /// - /// This is the thread-static index for fast access to the tableAligned index - /// that is obtained when each LightEpoch instance calls ReserveEntry. - /// The instanceId of the LightEpoch instance (assigned to the instance - /// at constructor time using InstanceTracker) is the lookup offset into - /// Entries. - /// - /// Note that Entries effectively gives us ThreadLocal{T} semantics of - /// (instance, thread)-specific metadata, without the overhead of - /// ThreadLocal{T}. + /// Pointer to the per-thread entries array. Each LightEpoch instance's instanceId + /// is the lookup offset: *(entriesPtr + instanceId) gives the epoch table + /// entry index for this thread. Targets a GC-pinned int[] on the Pinned Object Heap. + /// Null until the thread's first call. + /// + [ThreadStatic] + internal static int* entriesPtr; + + /// + /// Keeps the pinned entries array rooted so GC does not collect it. /// [ThreadStatic] - internal static InstanceIndexBuffer Entries; + internal static int[] entriesArray; + + /// + /// Current capacity of . Zero until initialized. + /// + [ThreadStatic] + internal static int entriesCapacity; } /// @@ -102,6 +82,11 @@ private class Metadata /// const int kDrainListSize = 16; + /// + /// Initial per-thread entries array capacity. Grows by doubling as needed. + /// + const int kInitialEntriesCapacity = 16; + /// /// Thread protection status entries. /// @@ -152,12 +137,25 @@ private class Metadata readonly int instanceId; /// - /// This is the LightEpoch-level static buffer (array) of available instance slots. - /// On LightEpoch instance creation, it is used by SelectInstance() to find an - /// available slot in this array; this becomes the LightEpoch instance's instanceId, - /// which is the lookup index into the thread-static Metadata.Entries. + /// Set to 1 on first Dispose call to prevent double-dispose. + /// + int disposed; + + /// + /// Next instance ID to allocate. Monotonically increasing. + /// + static int nextInstanceId; + + /// + /// Pool of recycled instance IDs for reuse. Keeps per-thread arrays compact + /// when instances are frequently created and disposed. /// - static InstanceIndexBuffer InstanceTracker; + static readonly ConcurrentQueue freeInstanceIds = new(); + + /// + /// Number of currently active LightEpoch instances. + /// + static int activeInstanceCount; /// /// Instantiate the epoch table @@ -186,14 +184,10 @@ public LightEpoch() int SelectInstance() { - for (var i = 0; i < InstanceIndexBuffer.MaxInstances; i++) - { - ref var entry = ref InstanceTracker.GetRef(i); - // Try to claim this instance ID (indicated as 1 in the entry) - if (kInvalidIndex == Interlocked.CompareExchange(ref entry, 1, kInvalidIndex)) - return i; - } - throw new InvalidOperationException($"Exceeded maximum number of active LightEpoch instances {ActiveInstanceCount()} {InstanceIndexBuffer.MaxInstances}"); + Interlocked.Increment(ref activeInstanceCount); + if (freeInstanceIds.TryDequeue(out var recycledId)) + return recycledId; + return Interlocked.Increment(ref nextInstanceId) - 1; } /// @@ -202,13 +196,7 @@ int SelectInstance() /// public static int ActiveInstanceCount() { - int count = 0; - for (var i = 0; i < InstanceIndexBuffer.MaxInstances; i++) - { - if (kInvalidIndex != InstanceTracker.GetRef(i)) - count++; - } - return count; + return Volatile.Read(ref activeInstanceCount); } /// @@ -216,10 +204,9 @@ public static int ActiveInstanceCount() /// public static void ResetAllInstances() { - for (var i = 0; i < InstanceIndexBuffer.MaxInstances; i++) - { - InstanceTracker.GetRef(i) = kInvalidIndex; - } + nextInstanceId = 0; + activeInstanceCount = 0; + while (freeInstanceIds.TryDequeue(out _)) { } } /// @@ -227,6 +214,10 @@ public static void ResetAllInstances() /// public void Dispose() { + // Guard against double-dispose + if (Interlocked.Exchange(ref disposed, 1) != 0) + return; + // Cancel any threads currently waiting on the semaphore so they // unwind and decrement waiterCount. cts.Cancel(); @@ -241,8 +232,8 @@ public void Dispose() CurrentEpoch = 1; SafeToReclaimEpoch = 0; - // Mark this instance ID as available - InstanceTracker.GetRef(instanceId) = kInvalidIndex; + freeInstanceIds.Enqueue(instanceId); + Interlocked.Decrement(ref activeInstanceCount); cts.Dispose(); waiterSemaphore.Dispose(); @@ -255,7 +246,10 @@ public void Dispose() [MethodImpl(MethodImplOptions.AggressiveInlining)] public bool ThisInstanceProtected() { - ref var entry = ref Metadata.Entries.GetRef(instanceId); + var ptr = Metadata.entriesPtr; + if (ptr == null || (uint)instanceId >= (uint)Metadata.entriesCapacity) + return false; + var entry = *(ptr + instanceId); return kInvalidIndex != entry && (*(tableAligned + entry)).threadId == Metadata.threadId; } @@ -281,7 +275,8 @@ public bool TrySuspend() [MethodImpl(MethodImplOptions.AggressiveInlining)] public void ProtectAndDrain() { - ref var entry = ref Metadata.Entries.GetRef(instanceId); + Debug.Assert(Metadata.entriesPtr != null, "ProtectAndDrain called before Resume on this thread"); + ref var entry = ref *(Metadata.entriesPtr + instanceId); Debug.Assert(entry > 0, "Trying to refresh unacquired epoch"); Debug.Assert((*(tableAligned + entry)).threadId > 0, "Epoch table entry missing threadId"); @@ -498,12 +493,15 @@ void Drain(long nextEpoch) [MethodImpl(MethodImplOptions.AggressiveInlining)] void Acquire() { - ref var entry = ref Metadata.Entries.GetRef(instanceId); + if (instanceId >= Metadata.entriesCapacity) + EnsureThreadInitialized(); + + ref var entry = ref *(Metadata.entriesPtr + instanceId); Debug.Assert(entry == kInvalidIndex, "Trying to acquire protected epoch. Make sure you do not re-enter Tsavorite from callbacks or IDevice implementations. If using tasks, use TaskCreationOptions.RunContinuationsAsynchronously."); // Reserve an entry in the epoch table for this thread - ReserveEntryForThread(ref entry); + ReserveEntry(ref entry); Debug.Assert((*(tableAligned + entry)).localCurrentEpoch == 0, "Trying to acquire protected epoch. Make sure you do not re-enter Tsavorite from callbacks or IDevice implementations. If using tasks, use TaskCreationOptions.RunContinuationsAsynchronously."); @@ -526,7 +524,8 @@ void Acquire() [MethodImpl(MethodImplOptions.AggressiveInlining)] void Release() { - ref var entry = ref Metadata.Entries.GetRef(instanceId); + Debug.Assert(Metadata.entriesPtr != null, "Release called before Resume on this thread"); + ref var entry = ref *(Metadata.entriesPtr + instanceId); Debug.Assert((*(tableAligned + entry)).localCurrentEpoch != 0, "Trying to release unprotected epoch. Make sure you do not re-enter Tsavorite from callbacks or IDevice implementations. If using tasks, use TaskCreationOptions.RunContinuationsAsynchronously."); @@ -651,20 +650,34 @@ void ReserveEntryWait(ref int entry) } /// - /// Allocate a new entry in epoch table + /// Initialize per-thread metadata and/or grow the entries array. Called when + /// instanceId >= Metadata.entriesCapacity, which covers both first-time + /// init (capacity is 0) and growth (new instance with higher ID). This method + /// is NoInlining to keep the fast path in Acquire compact. /// - /// Reserved entry - [MethodImpl(MethodImplOptions.AggressiveInlining)] - void ReserveEntryForThread(ref int entry) + [MethodImpl(MethodImplOptions.NoInlining)] + void EnsureThreadInitialized() { - if (Metadata.threadId == 0) // run once per thread for performance + if (Metadata.threadId == 0) { Metadata.threadId = Environment.CurrentManagedThreadId; var code = (uint)Utility.Murmur3(Metadata.threadId); Metadata.startOffset1 = (ushort)(1 + (code % kTableSize)); Metadata.startOffset2 = (ushort)(1 + ((code >> 16) % kTableSize)); } - ReserveEntry(ref entry); + + // Compute required capacity (round up to power of 2) + var requiredCapacity = instanceId + 1; + var newCapacity = Math.Max(kInitialEntriesCapacity, Metadata.entriesCapacity); + while (newCapacity < requiredCapacity) + newCapacity *= 2; + + var newArray = GC.AllocateArray(newCapacity, pinned: true); + if (Metadata.entriesArray != null) + Array.Copy(Metadata.entriesArray, newArray, Metadata.entriesArray.Length); + Metadata.entriesArray = newArray; + Metadata.entriesPtr = (int*)Unsafe.AsPointer(ref newArray[0]); + Metadata.entriesCapacity = newCapacity; } ///