diff --git a/libs/client/ClientSession/GarnetClientSessionIncremental.cs b/libs/client/ClientSession/GarnetClientSessionIncremental.cs index 6b23ed4d0fd..2002487098c 100644 --- a/libs/client/ClientSession/GarnetClientSessionIncremental.cs +++ b/libs/client/ClientSession/GarnetClientSessionIncremental.cs @@ -42,6 +42,12 @@ public enum MigrationRecordSpanType : byte /// Bespoke encoding for Vector Set indexes. /// VectorSetIndex = 3, + + /// + /// Chunked serialization stream for a RangeIndex key during migration. + /// The receiver uses a state machine to track the in-progress stream. + /// + SerializedRangeIndexStream = 4, } public sealed unsafe partial class GarnetClientSession : IServerHook, IMessageConsumer diff --git a/libs/cluster/Server/Migration/MigrateOperation.cs b/libs/cluster/Server/Migration/MigrateOperation.cs index 1637c0afad8..9755d83c40e 100644 --- a/libs/cluster/Server/Migration/MigrateOperation.cs +++ b/libs/cluster/Server/Migration/MigrateOperation.cs @@ -21,6 +21,7 @@ internal sealed partial class MigrateOperation public StoreScan storeScan; private readonly ConcurrentDictionary vectorSetsIndexKeysToMigrate; + private readonly ConcurrentDictionary rangeIndexKeysToMigrate; readonly MigrateSession session; readonly GarnetClientSession gcs; @@ -28,8 +29,12 @@ internal sealed partial class MigrateOperation public GarnetClientSession Client => gcs; + public LocalServerSession LocalSession => localServerSession; + public IEnumerable> VectorSets => vectorSetsIndexKeysToMigrate; + public IEnumerable> RangeIndexes => rangeIndexKeysToMigrate; + public void ThrowIfCancelled() => session._cts.Token.ThrowIfCancellationRequested(); public bool Contains(int slot) => session._sslots.Contains(slot); @@ -45,6 +50,8 @@ public bool ContainsNamespace(ReadOnlySpan namespaceBytes) public void EncounteredVectorSet(byte[] key, byte[] value) => vectorSetsIndexKeysToMigrate.TryAdd(key, value); + public void EncounteredRangeIndex(byte[] key, byte[] value) => rangeIndexKeysToMigrate.TryAdd(key, value); + public MigrateOperation(MigrateSession session, Sketch sketch = null, int batchSize = 1 << 18) { this.session = session; @@ -54,6 +61,7 @@ public MigrateOperation(MigrateSession session, Sketch sketch = null, int batchS storeScan = new StoreScan(this); keysToDelete = []; vectorSetsIndexKeysToMigrate = new(ByteArrayComparer.Instance); + rangeIndexKeysToMigrate = new(ByteArrayComparer.Instance); } public bool Initialize() @@ -126,7 +134,7 @@ public bool TransmitSlots() return true; } - public bool TransmitKeys(Dictionary vectorSetKeysToIgnore) + public bool TransmitKeys(Dictionary vectorSetKeysToIgnore, Dictionary rangeIndexKeysToIgnore) { // Use this for both stores; main store will just use the SpanByteAndMemory directly. We want it to be outside iterations // so we can reuse the SpanByteAndMemory.Memory across iterations. @@ -135,7 +143,8 @@ public bool TransmitKeys(Dictionary vectorSetKeysToIgnore) var output = new UnifiedOutput(); #if NET9_0_OR_GREATER - var ignoreLookup = vectorSetKeysToIgnore.GetAlternateLookup>(); + var vectorSetIgnoreLookup = vectorSetKeysToIgnore.GetAlternateLookup>(); + var rangeIndexIgnoreLookup = rangeIndexKeysToIgnore.GetAlternateLookup>(); #endif try @@ -153,19 +162,23 @@ public bool TransmitKeys(Dictionary vectorSetKeysToIgnore) var spanByte = keys[i].Item1; - // Don't transmit if a Vector Set - var isVectorSet = - vectorSetKeysToIgnore.Count > 0 && + // Don't transmit VectorSet or RangeIndex keys — these are handled out-of-band + var shouldSkip = + (vectorSetKeysToIgnore.Count > 0 && #if NET9_0_OR_GREATER - ignoreLookup.ContainsKey(spanByte.ReadOnlySpan); + vectorSetIgnoreLookup.ContainsKey(spanByte.ReadOnlySpan)) || #else - vectorSetKeysToIgnore.ContainsKey(spanByte.ToArray()); + vectorSetKeysToIgnore.ContainsKey(spanByte.ToArray())) || +#endif + (rangeIndexKeysToIgnore.Count > 0 && +#if NET9_0_OR_GREATER + rangeIndexIgnoreLookup.ContainsKey(spanByte.ReadOnlySpan)); +#else + rangeIndexKeysToIgnore.ContainsKey(spanByte.ToArray())); #endif - if (isVectorSet) - { + if (shouldSkip) continue; - } if (!session.WriteOrSendRecord(gcs, localServerSession, keys[i].Item1, ref input, ref output, out var status)) return false; @@ -284,6 +297,19 @@ public void DeleteVectorSet(PinnedSpanByte key) session.logger?.LogDebug("Deleting Vector Set {key} after migration: {delRes}", System.Text.Encoding.UTF8.GetString(key), delRes); } + + /// + /// Delete a RangeIndex after migration if _copyOption is not set. + /// + public void DeleteRangeIndex(PinnedSpanByte key) + { + if (session._copyOption) + return; + + var delRes = localServerSession.BasicGarnetApi.DELETE(key); + + session.logger?.LogDebug("Deleting RangeIndex {key} after migration: {delRes}", System.Text.Encoding.UTF8.GetString(key), delRes); + } } } } \ No newline at end of file diff --git a/libs/cluster/Server/Migration/MigrateScanFunctions.cs b/libs/cluster/Server/Migration/MigrateScanFunctions.cs index c8c43da7e0a..791c287d5b7 100644 --- a/libs/cluster/Server/Migration/MigrateScanFunctions.cs +++ b/libs/cluster/Server/Migration/MigrateScanFunctions.cs @@ -50,7 +50,12 @@ public bool Reader(in TSourceLogRecord srcLogRecord, RecordMet // Check if key belongs to slot that is being migrated and if it can be added to our buffer if (migrateOperation.Contains(slot)) { - if (srcLogRecord.RecordType == VectorManager.RecordType) + if (srcLogRecord.RecordType == RangeIndexManager.RangeIndexRecordType) + { + // RangeIndex keys need out-of-band migration (snapshot + chunks) + migrateOperation.EncounteredRangeIndex(key.ToArray(), srcLogRecord.ValueSpan.ToArray()); + } + else if (srcLogRecord.RecordType == VectorManager.RecordType) { // We can't delete the vector set _yet_ nor can we migrate it, // we just need to remember it to migrate once the associated namespaces are all moved over diff --git a/libs/cluster/Server/Migration/MigrateSession.RangeIndex.cs b/libs/cluster/Server/Migration/MigrateSession.RangeIndex.cs new file mode 100644 index 00000000000..770e8c90854 --- /dev/null +++ b/libs/cluster/Server/Migration/MigrateSession.RangeIndex.cs @@ -0,0 +1,78 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +using System; +using System.Buffers; +using System.Threading.Tasks; +using Garnet.client; +using Garnet.server; +using Microsoft.Extensions.Logging; + +namespace Garnet.cluster +{ + /// + /// RangeIndex migration support: source-side transmit driver. + /// + internal sealed partial class MigrateSession : IDisposable + { + /// + /// Transmit a single RangeIndex key to the destination node. + /// Uses to obtain an async + /// migration reader that snapshots and streams the BfTree data. + /// Forces a flush and awaits ACK. Does not delete the source key — the caller + /// is responsible for deletion in the appropriate sketch status phase. + /// + private async Task TransmitRangeIndexAsync(MigrateOperation mo, byte[] keyBytes, byte[] stubBytes, int chunkSize = RangeIndexManager.DefaultMigrationChunkSize) + { + var rangeIndexManager = clusterProvider.storeWrapper.DefaultDatabase.RangeIndexManager; + var gcs = mo.Client; + + RangeIndexMigrationReader reader; + try + { + reader = rangeIndexManager.SnapshotRangeIndexAndCreateReader(mo.LocalSession, keyBytes, stubBytes, chunkSize); + } + catch (Exception ex) + { + logger?.LogError(ex, "TransmitRangeIndex: failed to snapshot BfTree for key"); + return false; + } + + var buffer = ArrayPool.Shared.Rent(chunkSize); + try + { + using (reader) + { + while (!reader.IsComplete) + { + var payloadLen = await reader.ReadNextChunkAsync(buffer).ConfigureAwait(false); + if (payloadLen == 0) + { + logger?.LogError("TransmitRangeIndex: reader returned zero-length payload with a {Size}-byte buffer", chunkSize); + return false; + } + + if (!WriteOrSendRecordSpan(gcs, MigrationRecordSpanType.SerializedRangeIndexStream, buffer.AsSpan(0, payloadLen))) + { + logger?.LogError("TransmitRangeIndex: failed to write chunk"); + return false; + } + } + + // Force flush and await ACK + if (!HandleMigrateTaskResponse(gcs.SendAndResetIterationBuffer())) + { + logger?.LogError("TransmitRangeIndex: flush failed"); + return false; + } + + return true; + } + } + finally + { + ArrayPool.Shared.Return(buffer); + } + } + } +} \ No newline at end of file diff --git a/libs/cluster/Server/Migration/MigrateSessionKeys.cs b/libs/cluster/Server/Migration/MigrateSessionKeys.cs index 05e2c4ada35..287ae31e12d 100644 --- a/libs/cluster/Server/Migration/MigrateSessionKeys.cs +++ b/libs/cluster/Server/Migration/MigrateSessionKeys.cs @@ -34,8 +34,13 @@ private bool MigrateKeysFromStore() WaitForConfigPropagation(); // Discover Vector Sets linked namespaces + var allKeys = migrateTask.sketch.Keys.Select(t => t.Item1.ToArray()); var indexesToMigrate = new Dictionary(ByteArrayComparer.Instance); - _namespaces = clusterProvider.storeWrapper.DefaultDatabase.VectorManager.GetNamespacesForKeys(clusterProvider.storeWrapper, migrateTask.sketch.Keys.Select(t => t.Item1.ToArray()), indexesToMigrate); + _namespaces = clusterProvider.storeWrapper.DefaultDatabase.VectorManager.GetNamespacesForKeys(clusterProvider.storeWrapper, allKeys, indexesToMigrate); + + // Discover RangeIndex keys upfront + var rangeIndexKeysToMigrate = clusterProvider.storeWrapper.DefaultDatabase.RangeIndexManager?.GetRangeIndexKeysForMigration(clusterProvider.storeWrapper, allKeys) + ?? new Dictionary(ByteArrayComparer.Instance); // If we have any namespaces, that implies Vector Sets, and if we have any of THOSE // we need to reserve destination sets on the other side @@ -45,8 +50,8 @@ private bool MigrateKeysFromStore() return false; } - // Transmit keys from store - if (!migrateTask.TransmitKeys(indexesToMigrate)) + // Transmit keys from store (skipping VectorSet and RangeIndex keys, which are handled out-of-band) + if (!migrateTask.TransmitKeys(indexesToMigrate, rangeIndexKeysToMigrate)) { logger?.LogError("Failed transmitting keys from store"); return false; @@ -125,6 +130,32 @@ private bool MigrateKeysFromStore() } } + // Migrate RangeIndex keys (snapshot + chunk stream). + // Keys are already in the sketch (added by caller during key enumeration), + // so they're protected by the TRANSMITTING status. Mark for deletion so + // DeleteKeys() handles them in the DELETING sketch status sequence. + if (rangeIndexKeysToMigrate.Count > 0) + { + logger?.LogWarning("Migrating {count} RangeIndex keys via KEYS path", rangeIndexKeysToMigrate.Count); + + foreach (var (key, stubBytes) in rangeIndexKeysToMigrate) + { + if (!TransmitRangeIndexAsync(migrateTask, key, stubBytes).GetAwaiter().GetResult()) + { + logger?.LogError("Failed to migrate RangeIndex key via KEYS path"); + return false; + } + } + + // Mark all transmitted RI keys in the sketch for deletion by DeleteKeys() + var keys = migrateTask.sketch.Keys; + for (var i = 0; i < keys.Count; i++) + { + if (rangeIndexKeysToMigrate.ContainsKey(keys[i].Item1.ToArray())) + keys[i] = (keys[i].Item1, true); + } + } + DeleteKeys(); } finally diff --git a/libs/cluster/Server/Migration/MigrateSessionSlots.cs b/libs/cluster/Server/Migration/MigrateSessionSlots.cs index 292866a80e1..5940be24175 100644 --- a/libs/cluster/Server/Migration/MigrateSessionSlots.cs +++ b/libs/cluster/Server/Migration/MigrateSessionSlots.cs @@ -113,6 +113,47 @@ async Task CreateAndRunMigrateTasks(long beginAddress, long tailAddress, i return false; } + // Handle migration of discovered RangeIndex keys with sketch protection. + // Each RI key gets its own sketch cycle so concurrent operations are properly + // blocked during transmit (writes blocked) and delete (reads+writes blocked). + var rangeIndexKeys = migrateOperation.SelectMany(static mo => mo.RangeIndexes).GroupBy(static g => g.Key, ByteArrayComparer.Instance).ToDictionary(static g => g.Key, g => g.First().Value, ByteArrayComparer.Instance); + + if (rangeIndexKeys.Count > 0) + { + logger?.LogWarning("Migrating {count} RangeIndex keys", rangeIndexKeys.Count); + var mo = migrateOperation[0]; + + foreach (var (key, stubBytes) in rangeIndexKeys) + { + // Add key to sketch so Probe() gates concurrent operations + mo.sketch.Clear(); + mo.sketch.SetStatus(SketchStatus.INITIALIZING); + mo.sketch.TryHashAndStore(key); + + // Block writes during snapshot + transmit + mo.sketch.SetStatus(SketchStatus.TRANSMITTING); + WaitForConfigPropagation(); + + if (!await TransmitRangeIndexAsync(mo, key, stubBytes).ConfigureAwait(false)) + { + logger?.LogError("Failed to migrate RangeIndex key"); + return false; + } + + // Block reads + writes during delete + mo.sketch.SetStatus(SketchStatus.DELETING); + WaitForConfigPropagation(); + + var pinnedKey = PinnedSpanByte.FromPinnedSpan(key); + mo.DeleteRangeIndex(pinnedKey); + + // Release — concurrent operations will get ASK redirection + mo.sketch.SetStatus(SketchStatus.MIGRATED); + WaitForConfigPropagation(); + mo.sketch.Clear(); + } + } + // Handle migration of discovered Vector Set keys now that they're namespaces have been moved var vectorSets = migrateOperation.SelectMany(static mo => mo.VectorSets).GroupBy(static g => g.Key, ByteArrayComparer.Instance).ToDictionary(static g => g.Key, g => g.First().Value, ByteArrayComparer.Instance); diff --git a/libs/cluster/Session/ClusterSession.cs b/libs/cluster/Session/ClusterSession.cs index b7e9c244b8b..30be8590e4a 100644 --- a/libs/cluster/Session/ClusterSession.cs +++ b/libs/cluster/Session/ClusterSession.cs @@ -51,6 +51,7 @@ internal sealed unsafe partial class ClusterSession : IClusterSession private StringBasicContext stringBasicContext; private VectorBasicContext vectorBasicContext; + private readonly RangeIndexMigrationReceiveState rangeIndexMigrationState; public ClusterSession(ClusterProvider clusterProvider, TransactionManager txnManager, IGarnetAuthenticator authenticator, UserHandle userHandle, GarnetSessionMetrics sessionMetrics, BasicGarnetApi basicGarnetApi, StringBasicContext stringBasicContext, VectorBasicContext vectorBasicContext, INetworkSender networkSender, ILogger logger = null) { @@ -64,6 +65,13 @@ public ClusterSession(ClusterProvider clusterProvider, TransactionManager txnMan this.vectorBasicContext = vectorBasicContext; this.networkSender = networkSender; this.logger = logger; + var riManager = clusterProvider.storeWrapper.DefaultDatabase.RangeIndexManager; + rangeIndexMigrationState = riManager != null ? new RangeIndexMigrationReceiveState(riManager) : null; + } + + public void Dispose() + { + rangeIndexMigrationState?.Dispose(); } public void ProcessClusterCommands(RespCommand command, VectorManager vectorManager, ref SessionParseState parseState, ref byte* dcurr, ref byte* dend) diff --git a/libs/cluster/Session/MigrationReceiveSession.cs b/libs/cluster/Session/MigrationReceiveSession.cs new file mode 100644 index 00000000000..5c6e38a52ab --- /dev/null +++ b/libs/cluster/Session/MigrationReceiveSession.cs @@ -0,0 +1,92 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +using System; +using Garnet.common; +using Garnet.server; + +namespace Garnet.cluster +{ + /// + /// Per- state for receiving inbound RangeIndex migration data. + /// Implements a state machine: IDLE → RECEIVING → IDLE. + /// + /// + /// Because the sender uses a single TCP connection, all CLUSTER MIGRATE commands + /// from one migration arrive on the same , guaranteeing + /// in-order delivery. + /// + internal sealed class RangeIndexMigrationReceiveState : IDisposable + { + private readonly RangeIndexManager rangeIndexManager; + private RangeIndexChunkedDeserializer currentDeserializer; + private bool isReceiving; + + /// Whether a stream is currently in progress. + internal bool IsReceiving => isReceiving; + + internal RangeIndexMigrationReceiveState(RangeIndexManager rangeIndexManager) + { + this.rangeIndexManager = rangeIndexManager; + } + + /// + /// Process a SerializedRangeIndexStream record. + /// The first record creates the deserializer; subsequent records feed it. + /// On completion: the deserializer extracts the key, validates checksum, + /// does slot check, and recovers the BfTree. + /// + public bool ProcessRecord(ReadOnlySpan recordPayload, ClusterConfig currentConfig, ref StringBasicContext stringBasicContext, bool replaceOption) + { + if (recordPayload.Length == 0) + return false; + + if (!isReceiving) + { + currentDeserializer = new RangeIndexChunkedDeserializer(rangeIndexManager); + isReceiving = true; + } + + if (!currentDeserializer.ProcessChunk(recordPayload)) + { + Reset(); + return false; + } + + if (currentDeserializer.IsComplete) + { + var keyBytes = currentDeserializer.Key; + var slot = HashSlotUtils.HashSlot(keyBytes); + if (!currentConfig.IsImportingSlot(slot)) + { + Reset(); + return false; + } + + if (!currentDeserializer.Publish(ref stringBasicContext)) + { + Reset(); + return false; + } + + Reset(); + } + + return true; + } + + /// Reset state for the next key stream. + private void Reset() + { + currentDeserializer?.Dispose(); + currentDeserializer = null; + isReceiving = false; + } + + /// + public void Dispose() + { + Reset(); + } + } +} diff --git a/libs/cluster/Session/RespClusterMigrateCommands.cs b/libs/cluster/Session/RespClusterMigrateCommands.cs index a5d56e6e742..bb9f2fa9621 100644 --- a/libs/cluster/Session/RespClusterMigrateCommands.cs +++ b/libs/cluster/Session/RespClusterMigrateCommands.cs @@ -135,6 +135,22 @@ void Process(BasicGarnetApi basicGarnetApi, byte[] input, bool replaceOption, bo if (!RespReadUtils.GetSerializedRecordSpan(out var payloadRaw, ref payloadPtr, payloadEndPtr)) return; + // An error has occurred + if (migrateState > 0) + { + i++; + continue; + } + + // Protocol enforcement: while receiving a RangeIndex stream, only SerializedRangeIndexStream records are valid + if (rangeIndexMigrationState.IsReceiving && kind != MigrationRecordSpanType.SerializedRangeIndexStream) + { + logger?.LogError("Protocol violation: expected SerializedRangeIndexStream continuation, got {Kind}", kind); + migrateState = 1; + i++; + continue; + } + if (kind == MigrationRecordSpanType.VectorSetElement) { // This is a Vector Set namespace key being migrated - it won't necessarily look like it's "in" a hash slot @@ -160,6 +176,15 @@ void Process(BasicGarnetApi basicGarnetApi, byte[] input, bool replaceOption, bo clusterProvider.storeWrapper.DefaultDatabase.VectorManager.HandleMigratedElementKey(ref stringBasicContext, ref vectorBasicContext, namespaceBytes, keyBytes, valueBytes); } + else if (kind == MigrationRecordSpanType.SerializedRangeIndexStream) + { + if (!rangeIndexMigrationState.ProcessRecord(payloadRaw.ReadOnlySpan, currentConfig, ref stringBasicContext, replaceOption)) + { + migrateState = 1; + i++; + continue; + } + } else if (kind == MigrationRecordSpanType.LogRecord) { // An error has occurred diff --git a/libs/host/GarnetServer.cs b/libs/host/GarnetServer.cs index a35f9829d4c..28da5c5bb43 100644 --- a/libs/host/GarnetServer.cs +++ b/libs/host/GarnetServer.cs @@ -193,9 +193,6 @@ private void InitializeServer() var clusterFactory = opts.EnableCluster ? new ClusterFactory() : null; - if (opts.EnableCluster && opts.EnableRangeIndexPreview) - throw new GarnetException("Range Index (preview) is not supported in cluster mode."); - this.logger = this.loggerFactory?.CreateLogger("GarnetServer"); logger?.LogInformation("Garnet {version} {bits} bit; {clusterMode} mode; Endpoint: [{endpoint}]", version, IntPtr.Size == 8 ? "64" : "32", diff --git a/libs/server/Cluster/IClusterSession.cs b/libs/server/Cluster/IClusterSession.cs index 539cbbac99f..5ecff1ce667 100644 --- a/libs/server/Cluster/IClusterSession.cs +++ b/libs/server/Cluster/IClusterSession.cs @@ -1,6 +1,7 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. +using System; using Garnet.common; using Garnet.server.ACL; using Tsavorite.core; @@ -10,7 +11,7 @@ namespace Garnet.server /// /// Cluster RESP session /// - public interface IClusterSession + public interface IClusterSession : IDisposable { /// /// If the current session is being used by a remote cluster node, the id that was last presented during a GOSSIP message. diff --git a/libs/server/Resp/LocalServerSession.cs b/libs/server/Resp/LocalServerSession.cs index 843a73dcc7b..d053d2f56d3 100644 --- a/libs/server/Resp/LocalServerSession.cs +++ b/libs/server/Resp/LocalServerSession.cs @@ -17,6 +17,12 @@ public class LocalServerSession : IDisposable readonly ILogger logger = null; readonly StoreWrapper storeWrapper; readonly StorageSession storageSession; + + /// + /// The underlying storage session. Used by migration paths that need + /// direct access to Tsavorite contexts (e.g. RangeIndex snapshot). + /// + internal StorageSession StorageSession => storageSession; readonly ScratchBufferBuilder scratchBufferBuilder; readonly ScratchBufferAllocator scratchBufferAllocator; diff --git a/libs/server/Resp/RangeIndex/RangeIndexChunkedDeserializer.cs b/libs/server/Resp/RangeIndex/RangeIndexChunkedDeserializer.cs new file mode 100644 index 00000000000..60eaf09c9e5 --- /dev/null +++ b/libs/server/Resp/RangeIndex/RangeIndexChunkedDeserializer.cs @@ -0,0 +1,326 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +using System; +using System.Buffers.Binary; +using System.IO; +using System.IO.Hashing; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; +using Garnet.common; +using Garnet.server.BfTreeInterop; +using Microsoft.Extensions.Logging; +using Tsavorite.core; + +namespace Garnet.server +{ + /// + /// Deserializer that reassembles a RangeIndex key from incoming migration records. + /// Accumulates file data, validates an xxHash64 checksum, recovers the native BfTree, + /// and publishes the stub to the store. + /// + /// Stream format (across one or more chunks): + /// + /// [4-byte keyLen][key bytes][8-byte fileCount][file bytes][8-byte xxHash64][4-byte stubLen][stub] + /// + /// Key bytes and file bytes may span multiple chunks. + /// All other elements (keyLen, fileCount, hash, stubLen, stub) must fit entirely within a single chunk. + /// + /// State machine: + /// + /// WaitingForKeyHeader → parses 4-byte key length + /// ReceivingKeyData → accumulates key bytes (may span chunks) + /// WaitingForFileHeader → parses 8-byte file size + /// ReceivingFileData → accumulates file bytes to temp file, updates running hash + /// Complete → trailer parsed, checksum valid, ready for + /// Error → irrecoverable (invalid protocol or checksum mismatch) + /// Disposed → resources released, temp file deleted + /// + /// + public sealed class RangeIndexChunkedDeserializer : IDisposable + { + private enum State : byte + { + WaitingForKeyHeader, + ReceivingKeyData, + WaitingForFileHeader, + ReceivingFileData, + Complete, + Error, + Disposed, + } + + private FileStream stream; + private readonly string tempPath; + private readonly RangeIndexManager manager; + private readonly XxHash64 hasher; + private long fileBytesRemaining; + private byte[] finalizerStub; + private byte[] finalizerKey; + private int keyBytesReceived; + private State state; + + /// Whether the stream completed successfully. + public bool IsComplete => state == State.Complete; + + /// Whether the stream encountered an irrecoverable error. + public bool HasError => state == State.Error; + + /// The key bytes extracted from the header. Valid only after . + public ReadOnlySpan Key => finalizerKey; + + public RangeIndexChunkedDeserializer(RangeIndexManager manager) + { + this.manager = manager; + hasher = new XxHash64(); + state = State.WaitingForKeyHeader; + tempPath = manager.DeriveTempMigrationPath(); + } + + /// Process an incoming record payload. + /// true if valid; false if corruption or invalid data detected. + public bool ProcessChunk(ReadOnlySpan data) + { + switch (state) + { + case State.Error: + case State.Complete: + case State.Disposed: + return false; + + case State.WaitingForKeyHeader: + if (data.Length == 0) + return true; + + if (data.Length < sizeof(int)) + { + state = State.Error; + return false; + } + + var keyLen = BinaryPrimitives.ReadInt32LittleEndian(data); + data = data[sizeof(int)..]; + + if (keyLen < 0) + { + state = State.Error; + return false; + } + + finalizerKey = new byte[keyLen]; + keyBytesReceived = 0; + state = State.ReceivingKeyData; + goto case State.ReceivingKeyData; + + case State.ReceivingKeyData: + if (keyBytesReceived < finalizerKey.Length) + { + var n = Math.Min(data.Length, finalizerKey.Length - keyBytesReceived); + data[..n].CopyTo(finalizerKey.AsSpan(keyBytesReceived)); + keyBytesReceived += n; + data = data[n..]; + } + + if (keyBytesReceived < finalizerKey.Length) + return true; + + state = State.WaitingForFileHeader; + if (data.Length == 0) + return true; + + goto case State.WaitingForFileHeader; + + case State.WaitingForFileHeader: + if (data.Length == 0) + return true; + + if (data.Length < sizeof(long)) + { + manager.Logger?.LogError("RangeIndexChunkedDeserializer: split file count header ({Size} bytes)", data.Length); + state = State.Error; + return false; + } + + fileBytesRemaining = BinaryPrimitives.ReadInt64LittleEndian(data); + data = data[sizeof(long)..]; + + if (fileBytesRemaining < 0) + { + state = State.Error; + return false; + } + + state = State.ReceivingFileData; + + if (fileBytesRemaining > 0) + stream = new FileStream(tempPath, FileMode.Create, FileAccess.Write, FileShare.None); + + goto case State.ReceivingFileData; + + case State.ReceivingFileData: + if (fileBytesRemaining > 0) + WriteFileBytes(ref data); + + if (fileBytesRemaining == 0 && data.Length > 0) + { + CloseStream(); + return ParseTrailer(data); + } + + return true; + + default: + return false; + } + } + + private bool ParseTrailer(ReadOnlySpan data) + { + // The trailer must arrive in a single chunk — serializer guarantees this. + // [8-byte xxHash64][4-byte stubLen][stub] + if (data.Length < sizeof(ulong) + sizeof(int)) + { + manager.Logger?.LogError("RangeIndexChunkedDeserializer: trailer too small ({Size} bytes)", data.Length); + state = State.Error; + return false; + } + + var receivedHash = BinaryPrimitives.ReadUInt64LittleEndian(data); + data = data[sizeof(ulong)..]; + var stubLen = BinaryPrimitives.ReadInt32LittleEndian(data); + data = data[sizeof(int)..]; + if (stubLen != RangeIndexManager.IndexSizeBytes) + { + manager.Logger?.LogError("RangeIndexChunkedDeserializer: invalid stub size {StubLen}, expected {Expected}", stubLen, RangeIndexManager.IndexSizeBytes); + state = State.Error; + return false; + } + + finalizerStub = data[..RangeIndexManager.IndexSizeBytes].ToArray(); + + Span computedHashBytes = stackalloc byte[sizeof(ulong)]; + hasher.GetHashAndReset(computedHashBytes); + var computedHash = BinaryPrimitives.ReadUInt64LittleEndian(computedHashBytes); + + if (receivedHash != computedHash) + { + manager.Logger?.LogError("RangeIndexChunkedDeserializer: checksum mismatch (received {Received:X16}, computed {Computed:X16})", receivedHash, computedHash); + state = State.Error; + return false; + } + + state = State.Complete; + return true; + } + + /// + /// Publish: rename temp file to key-hashed path, recover native tree, + /// publish stub via RICREATE RMW. + /// + public unsafe bool Publish(ref StringBasicContext ctx) + { + if (state != State.Complete) + { + manager.Logger?.LogError("RangeIndexChunkedDeserializer.Publish: cannot finalize in state {State}", state); + return false; + } + + ReadOnlySpan keyBytes = finalizerKey; + + try + { + var workingPath = manager.DeriveWorkingPath(keyBytes); + Directory.CreateDirectory(Path.GetDirectoryName(workingPath)!); + + // If a data file already exists (e.g., from a previous migration of the same key + // that was later deleted), remove it so the new snapshot can take its place. + if (File.Exists(workingPath)) + File.Delete(workingPath); + + File.Move(tempPath, workingPath); + + ref readonly var srcStub = ref RangeIndexManager.ReadIndex(finalizerStub); + + var bfTree = BfTreeService.RecoverFromSnapshot( + workingPath, + (StorageBackendType)srcStub.StorageBackend, + srcStub.CacheSize, + srcStub.MinRecordSize, + srcStub.MaxRecordSize, + srcStub.MaxKeyLen, + srcStub.LeafPageSize); + + Span newStubBytes = stackalloc byte[RangeIndexManager.IndexSizeBytes]; + finalizerStub.CopyTo(newStubBytes); + ref var newStub = ref Unsafe.As(ref MemoryMarshal.GetReference(newStubBytes)); + newStub.TreeHandle = bfTree.NativePtr; + newStub.Flags = 0; + newStub.SerializationPhase = 0; + + var parseState = new SessionParseState(); + fixed (byte* stubPtr = newStubBytes) + { + var stubSlice = PinnedSpanByte.FromPinnedPointer(stubPtr, RangeIndexManager.IndexSizeBytes); + parseState.InitializeWithArgument(stubSlice); + + var input = new StringInput(RespCommand.RICREATE, ref parseState); + var output = new StringOutput(); + var pinnedKey = PinnedSpanByte.FromPinnedSpan(keyBytes); + var status = ctx.RMW((FixedSpanByteKey)pinnedKey, ref input, ref output); + if (status.IsPending) + StorageSession.CompletePendingForSession(ref status, ref output, ref ctx); + + if (status.Record.Created || status.Record.InPlaceUpdated || status.Record.CopyUpdated) + { + var keyHash = ctx.GetKeyHash((FixedSpanByteKey)pinnedKey); + manager.RegisterIndex(bfTree, keyHash, keyBytes); + } + else + { + bfTree.Dispose(); + } + } + + return true; + } + catch (Exception ex) + { + manager.Logger?.LogError(ex, "RangeIndexChunkedDeserializer.Publish: failed to recover BfTree"); + return false; + } + } + + private void WriteFileBytes(ref ReadOnlySpan data) + { + var count = (int)Math.Min(data.Length, fileBytesRemaining); + var filePart = data[..count]; + stream.Write(filePart); + hasher.Append(filePart); + fileBytesRemaining -= count; + data = data[count..]; + } + + private void CloseStream() + { + if (stream == null) return; + stream.Flush(); + stream.Dispose(); + stream = null; + } + + /// + public void Dispose() + { + if (state == State.Disposed) return; + state = State.Disposed; + CloseStream(); + + try + { + if (File.Exists(tempPath)) + File.Delete(tempPath); + } + catch { } + } + } +} diff --git a/libs/server/Resp/RangeIndex/RangeIndexChunkedSerializer.cs b/libs/server/Resp/RangeIndex/RangeIndexChunkedSerializer.cs new file mode 100644 index 00000000000..c1fa1395c60 --- /dev/null +++ b/libs/server/Resp/RangeIndex/RangeIndexChunkedSerializer.cs @@ -0,0 +1,166 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +using System; +using System.Buffers.Binary; +using System.IO.Hashing; + +namespace Garnet.server +{ + /// + /// Pure state-machine serializer that frames a RangeIndex key, file data, and stub into + /// a chunked migration stream. Does not perform any I/O — file data is supplied by the + /// caller via the fileData parameter of . + /// + /// Stream format (across one or more chunks): + /// + /// [4-byte keyLen][key bytes][8-byte fileCount][file bytes][8-byte xxHash64][4-byte stubLen][stub] + /// + /// Key bytes and file bytes may span multiple chunks. + /// All other elements (keyLen, fileCount, hash, stubLen, stub) must fit entirely within a single chunk. + /// The caller provides a destination span on each call; + /// the serializer writes at most destination.Length total bytes into it. + /// + public sealed class RangeIndexChunkedSerializer + { + private readonly byte[] keyBytes; + private readonly byte[] stubBytes; + private readonly long totalFileBytes; + private readonly XxHash64 hasher; + private int keyBytesEmitted; + private long fileBytesEmitted; + private Phase phase; + + private enum Phase : byte { KeyHeader, KeyData, FileHeader, FileData, Trailer, Done } + + /// Whether the serializer has emitted all data. + public bool IsComplete => phase == Phase.Done; + + /// Whether the serializer is in the FileData phase and needs file bytes from the caller. + public bool NeedsFileData => phase == Phase.FileData && fileBytesEmitted < totalFileBytes; + + /// Number of file bytes remaining to be emitted. + public long FileDataRemaining => totalFileBytes - fileBytesEmitted; + + /// + /// Create a serializer for a RangeIndex key. + /// + /// The key bytes to include in the stream header. + /// The stub bytes to include in the stream trailer. + /// The total number of file data bytes that will be supplied via . + public RangeIndexChunkedSerializer(byte[] keyBytes, byte[] stubBytes, long totalFileBytes) + { + this.keyBytes = keyBytes; + this.stubBytes = stubBytes; + this.totalFileBytes = totalFileBytes; + hasher = new XxHash64(); + phase = Phase.KeyHeader; + } + + /// + /// Advance to the next chunk. Fills with as much payload as fits. + /// When is true, the caller must supply file data via ; + /// otherwise pass . + /// Returns the number of bytes written to . + /// + /// Output buffer to write framed data into. + /// File data bytes supplied by the caller (only consumed during the FileData phase). + /// Number of bytes consumed from . + /// Number of bytes written to . + public int MoveNext(Span destination, ReadOnlySpan fileData, out int fileBytesConsumed) + { + fileBytesConsumed = 0; + + if (phase == Phase.Done) + throw new InvalidOperationException("Serializer has already completed"); + + var initialLength = destination.Length; + + // Key length header (must fit entirely in the current chunk) + if (phase == Phase.KeyHeader) + { + if (destination.Length < sizeof(int)) + return initialLength - destination.Length; + + BinaryPrimitives.WriteInt32LittleEndian(destination, keyBytes.Length); + destination = destination[sizeof(int)..]; + phase = Phase.KeyData; + } + + // Key bytes (may span chunks) + if (phase == Phase.KeyData) + { + var n = Math.Min(keyBytes.Length - keyBytesEmitted, destination.Length); + keyBytes.AsSpan(keyBytesEmitted, n).CopyTo(destination); + destination = destination[n..]; + keyBytesEmitted += n; + + if (keyBytesEmitted < keyBytes.Length) + return initialLength - destination.Length; + + phase = Phase.FileHeader; + } + + // File byte count header (must fit entirely in the current chunk) + if (phase == Phase.FileHeader) + { + if (destination.Length < sizeof(long)) + return initialLength - destination.Length; + + BinaryPrimitives.WriteInt64LittleEndian(destination, totalFileBytes); + destination = destination[sizeof(long)..]; + phase = Phase.FileData; + } + + // File bytes (may span chunks) — data supplied by caller + if (phase == Phase.FileData) + { + if (fileBytesEmitted < totalFileBytes) + { + var maxCopy = (int)Math.Min(destination.Length, totalFileBytes - fileBytesEmitted); + if (maxCopy == 0) + return initialLength - destination.Length; + + // Copy as many file bytes as available and fit in destination + var toCopy = Math.Min(maxCopy, fileData.Length); + if (toCopy == 0) + return initialLength - destination.Length; + + fileData[..toCopy].CopyTo(destination); + hasher.Append(destination[..toCopy]); + destination = destination[toCopy..]; + fileBytesEmitted += toCopy; + fileBytesConsumed = toCopy; + } + + if (fileBytesEmitted >= totalFileBytes) + phase = Phase.Trailer; + } + + // Trailer (must fit entirely in the current chunk) + if (phase == Phase.Trailer) + { + if (destination.Length < TrailerSize) + return initialLength - destination.Length; + + WriteTrailer(destination); + destination = destination[TrailerSize..]; + phase = Phase.Done; + } + + return initialLength - destination.Length; + } + + private int TrailerSize => sizeof(ulong) + sizeof(int) + stubBytes.Length; + + private void WriteTrailer(Span target) + { + // [8-byte xxHash64][4-byte stubLen][stub] + hasher.GetHashAndReset(target); + target = target[sizeof(ulong)..]; + BinaryPrimitives.WriteInt32LittleEndian(target, stubBytes.Length); + target = target[sizeof(int)..]; + stubBytes.CopyTo(target); + } + } +} diff --git a/libs/server/Resp/RangeIndex/RangeIndexManager.Index.cs b/libs/server/Resp/RangeIndex/RangeIndexManager.Index.cs index 3f8402764b3..e090f2e040e 100644 --- a/libs/server/Resp/RangeIndex/RangeIndexManager.Index.cs +++ b/libs/server/Resp/RangeIndex/RangeIndexManager.Index.cs @@ -27,7 +27,7 @@ public sealed partial class RangeIndexManager /// Total size: (35 bytes). /// [StructLayout(LayoutKind.Explicit, Size = Size)] - internal struct RangeIndexStub + public struct RangeIndexStub { internal const int Size = 35; @@ -125,7 +125,7 @@ internal void CreateIndex( /// Raw value bytes from the store record. /// A readonly reference to the reinterpreted stub. [MethodImpl(MethodImplOptions.AggressiveInlining)] - internal static ref readonly RangeIndexStub ReadIndex(ReadOnlySpan value) + public static ref readonly RangeIndexStub ReadIndex(ReadOnlySpan value) => ref Unsafe.As(ref MemoryMarshal.GetReference(value)); /// diff --git a/libs/server/Resp/RangeIndex/RangeIndexManager.Migration.cs b/libs/server/Resp/RangeIndex/RangeIndexManager.Migration.cs new file mode 100644 index 00000000000..5743ff22c90 --- /dev/null +++ b/libs/server/Resp/RangeIndex/RangeIndexManager.Migration.cs @@ -0,0 +1,168 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +using System; +using System.Collections.Generic; +using System.IO; +using Garnet.common; +using Garnet.server.BfTreeInterop; +using Microsoft.Extensions.Logging; +using Tsavorite.core; + +namespace Garnet.server +{ + /// + /// Migration support for RangeIndex keys: source-side snapshot and factory methods. + /// + public sealed partial class RangeIndexManager + { + /// + /// Discover which of the given keys are RangeIndex keys by reading each via + /// through the main store string context. + /// Returns OK + stub bytes for RI keys, WRONGTYPE for non-RI keys. + /// Mirrors pattern. + /// + public unsafe Dictionary GetRangeIndexKeysForMigration(StoreWrapper storeWrapper, IEnumerable keys) + { + var rangeIndexKeys = new Dictionary(ByteArrayComparer.Instance); + + using var storageSession = new StorageSession(storeWrapper, new(), new(), null, null, + storeWrapper.DefaultDatabase.Id, storeWrapper.DefaultDatabase.VectorManager, logger); + + Span stubSpan = stackalloc byte[IndexSizeBytes]; + + foreach (var key in keys) + { + fixed (byte* keyPtr = key) + { + var keyPsb = PinnedSpanByte.FromPinnedPointer(keyPtr, key.Length); + + StringInput input = default; + input.header.cmd = RespCommand.RIGET; + + var output = StringOutput.FromPinnedSpan(stubSpan); + var status = storageSession.Read_MainStore(keyPsb.ReadOnlySpan, ref input, ref output, ref storageSession.stringBasicContext); + + if (status != GarnetStatus.OK) + continue; + + var outputSpan = output.SpanByteAndMemory.IsSpanByte + ? output.SpanByteAndMemory.SpanByte.ReadOnlySpan + : output.SpanByteAndMemory.MemoryReadOnlySpan; + + rangeIndexKeys[key] = outputSpan.ToArray(); + + if (!output.SpanByteAndMemory.IsSpanByte) + output.SpanByteAndMemory.Dispose(); + } + } + + return rangeIndexKeys; + } + /// + /// Default chunk size for streaming BfTree snapshot data during migration. + /// + public const int DefaultMigrationChunkSize = 256 * 1024; + + /// + /// Source side: create a migration reader that snapshots the BfTree under an exclusive lock + /// and produces chunked migration records via async file reads. + /// + /// The local server session for store access. + /// The key bytes of the RangeIndex to serialize. + /// The stub bytes for the RangeIndex. + /// The chunk size for streaming. Defaults to . + public RangeIndexMigrationReader SnapshotRangeIndexAndCreateReader(LocalServerSession localServerSession, ReadOnlySpan keyBytes, ReadOnlySpan stubBytes, int chunkSize = DefaultMigrationChunkSize) + { + if (!SnapshotForMigration(localServerSession.StorageSession, keyBytes, out var snapshotPath, out var totalBytes)) + throw new InvalidOperationException("Failed to snapshot BfTree for migration"); + + var serializer = new RangeIndexChunkedSerializer(keyBytes.ToArray(), stubBytes.ToArray(), totalBytes); + var fileStream = new FileStream(snapshotPath, FileMode.Open, FileAccess.Read, FileShare.Read, bufferSize: chunkSize); + return new RangeIndexMigrationReader(serializer, fileStream, chunkSize); + } + + /// + /// Derive a temporary file path for an in-progress inbound migration. + /// Format: {dataDir}/rangeindex/.migration-tmp/{guid}.bftree + /// + public string DeriveTempMigrationPath() + { + var tmpDir = Path.Combine(dataDir ?? string.Empty, "rangeindex", ".migration-tmp"); + Directory.CreateDirectory(tmpDir); + return Path.Combine(tmpDir, $"{Guid.NewGuid():N}.bftree"); + } + + /// + /// Source side: snapshot a BfTree for migration under an exclusive lock. + /// Acquires the exclusive lock, re-reads the stub from the store to get a fresh + /// TreeHandle. If the tree is live, snapshots it to a temporary migration + /// file. If evicted, copies the existing flush/checkpoint snapshot file to a + /// temporary migration file. The temp file is safe from concurrent overwrites + /// by or the native BfTree. + /// + internal bool SnapshotForMigration(StorageSession session, ReadOnlySpan keyBytes, out string path, out long totalBytes) + { + path = null; + totalBytes = 0; + + Span stubSpan = stackalloc byte[IndexSizeBytes]; + var keyHash = session.stringBasicContext.GetKeyHash((FixedSpanByteKey)PinnedSpanByte.FromPinnedSpan(keyBytes)); + var migrationPath = DeriveTempMigrationPath(); + + rangeIndexLocks.AcquireExclusiveLock(keyHash, out var lockToken); + try + { + // Re-read the stub under exclusive lock to get a fresh TreeHandle + StringInput input = default; + input.header.cmd = RespCommand.RIGET; + var output = StringOutput.FromPinnedSpan(stubSpan); + + var status = session.Read_MainStore(keyBytes, ref input, ref output, ref session.stringBasicContext); + if (status != GarnetStatus.OK) + { + logger?.LogWarning("SnapshotForMigration: key not found in store"); + return false; + } + + ref readonly var stub = ref ReadIndex(stubSpan); + + if (stub.StorageBackend == (byte)StorageBackendType.Memory) + { + logger?.LogWarning("SnapshotForMigration: memory-only trees cannot be migrated"); + return false; + } + + if (stub.TreeHandle != nint.Zero && liveIndexes.TryGetValue(stub.TreeHandle, out var treeEntry)) + { + // Tree is live — snapshot to a temp migration file (safe from concurrent OnFlush overwrites) + treeEntry.Tree.SnapshotToFile(migrationPath); + } + else + { + // Tree was evicted — copy the existing flush/checkpoint file to a temp migration file. + // Tsavorite guarantees OnFlush runs before OnEvict, so a snapshot file must exist. + var existingPath = stub.IsRecovered && recoveredCheckpointToken != Guid.Empty + ? DeriveCheckpointPath(keyBytes, recoveredCheckpointToken) + : DeriveFlushPath(keyBytes); + + if (!File.Exists(existingPath)) + { + logger?.LogWarning("SnapshotForMigration: expected snapshot file not found: {Path}", existingPath); + return false; + } + + File.Copy(existingPath, migrationPath, overwrite: true); + } + } + finally + { + rangeIndexLocks.ReleaseExclusiveLock(lockToken); + } + + path = migrationPath; + totalBytes = new FileInfo(migrationPath).Length; + return true; + } + } +} \ No newline at end of file diff --git a/libs/server/Resp/RangeIndex/RangeIndexManager.cs b/libs/server/Resp/RangeIndex/RangeIndexManager.cs index 812d83c6148..d9716d63e2a 100644 --- a/libs/server/Resp/RangeIndex/RangeIndexManager.cs +++ b/libs/server/Resp/RangeIndex/RangeIndexManager.cs @@ -38,10 +38,10 @@ public sealed partial class RangeIndexManager : IDisposable /// Stored in RecordDataHeader.RecordType to distinguish RI stubs /// from normal strings (0) and VectorSet stubs. /// - internal const byte RangeIndexRecordType = 2; + public const byte RangeIndexRecordType = 2; /// Size of the RangeIndex stub in bytes. - internal const int IndexSizeBytes = RangeIndexStub.Size; + public const int IndexSizeBytes = RangeIndexStub.Size; /// Whether range index commands are enabled. public bool IsEnabled { get; } @@ -56,6 +56,9 @@ public sealed partial class RangeIndexManager : IDisposable private readonly ILogger logger; + /// Logger for migration diagnostics. + public ILogger Logger => logger; + /// /// Base directory for deterministic BfTree data file paths. /// @@ -130,6 +133,23 @@ public RangeIndexManager(bool enabled, string dataDir = null, bool removeOutdate this.removeOutdatedCheckpoints = removeOutdatedCheckpoints; this.logger = logger; rangeIndexLocks = new ReadOptimizedLock(Environment.ProcessorCount); + + // Clean up partial migration artifacts from prior crashes + if (dataDir != null) + { + var migrationTmpDir = Path.Combine(dataDir, "rangeindex", ".migration-tmp"); + if (Directory.Exists(migrationTmpDir)) + { + try + { + Directory.Delete(migrationTmpDir, recursive: true); + } + catch (Exception ex) + { + logger?.LogWarning(ex, "Failed to clean up migration temp directory: {Path}", migrationTmpDir); + } + } + } } /// @@ -209,7 +229,7 @@ private static uint RoundUpToPowerOf2(uint v) /// The BfTree instance to register. /// Hash of the Garnet key, used for lock striping. /// Raw key bytes, used to derive the snapshot directory name. - internal void RegisterIndex(BfTreeService bfTree, long keyHash, ReadOnlySpan keyBytes) + public void RegisterIndex(BfTreeService bfTree, long keyHash, ReadOnlySpan keyBytes) { var keyDir = Path.Combine(dataDir ?? string.Empty, "rangeindex", HashKeyToDirectoryName(keyBytes)); liveIndexes[bfTree.NativePtr] = new TreeEntry(bfTree, keyHash, keyDir); diff --git a/libs/server/Resp/RangeIndex/RangeIndexMigrationReader.cs b/libs/server/Resp/RangeIndex/RangeIndexMigrationReader.cs new file mode 100644 index 00000000000..9d88eed7ba1 --- /dev/null +++ b/libs/server/Resp/RangeIndex/RangeIndexMigrationReader.cs @@ -0,0 +1,94 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +using System; +using System.IO; +using System.Threading; +using System.Threading.Tasks; + +namespace Garnet.server +{ + /// + /// Async wrapper around that owns a file source + /// and reads file data asynchronously, then calls the sync serializer to frame it. + /// + public sealed class RangeIndexMigrationReader : IDisposable + { + private readonly RangeIndexChunkedSerializer serializer; + private FileStream fileStream; + private readonly byte[] fileBuffer; + private int fileBufferOffset; + private int fileBufferAvailable; + private bool disposed; + + /// Whether the serializer has emitted all data. + public bool IsComplete => serializer.IsComplete; + + /// + /// Create a migration reader that wraps a serializer and file stream. + /// + /// The pure state-machine serializer. + /// The file stream to read snapshot data from. + /// Buffer size for file reads. + public RangeIndexMigrationReader(RangeIndexChunkedSerializer serializer, FileStream fileStream, int chunkSize) + { + this.serializer = serializer; + this.fileStream = fileStream; + fileBuffer = new byte[chunkSize]; + } + + /// + /// Read the next chunk: reads file data asynchronously if needed, then calls the + /// sync serializer to frame it into the destination buffer. Loops to handle + /// phase transitions (e.g., headers → file data → trailer) within a single call. + /// + /// Output buffer. + /// Cancellation token. + /// Number of bytes written to . + public async ValueTask ReadNextChunkAsync(Memory destination, CancellationToken cancellationToken = default) + { + var totalWritten = 0; + + while (!serializer.IsComplete && destination.Length > 0) + { + // Refill the file buffer if the serializer needs file data and we've consumed everything + if (serializer.NeedsFileData && fileBufferAvailable == 0) + { + var maxRead = (int)Math.Min(fileBuffer.Length, serializer.FileDataRemaining); + var bytesRead = await fileStream.ReadAsync(fileBuffer.AsMemory(0, maxRead), cancellationToken).ConfigureAwait(false); + if (bytesRead == 0) + throw new EndOfStreamException($"RangeIndex file truncated: {serializer.FileDataRemaining} bytes remaining"); + + fileBufferOffset = 0; + fileBufferAvailable = bytesRead; + } + + var fileData = fileBufferAvailable > 0 + ? fileBuffer.AsSpan(fileBufferOffset, fileBufferAvailable) + : ReadOnlySpan.Empty; + + var written = serializer.MoveNext(destination.Span, fileData, out var consumed); + fileBufferOffset += consumed; + fileBufferAvailable -= consumed; + + if (written == 0) + break; + + destination = destination[written..]; + totalWritten += written; + } + + return totalWritten; + } + + /// + public void Dispose() + { + if (disposed) return; + disposed = true; + + fileStream?.Dispose(); + fileStream = null; + } + } +} diff --git a/libs/server/Resp/RespServerSession.cs b/libs/server/Resp/RespServerSession.cs index 6d40e12f87e..65fea9294b7 100644 --- a/libs/server/Resp/RespServerSession.cs +++ b/libs/server/Resp/RespServerSession.cs @@ -365,6 +365,7 @@ public override void Dispose() subscribeBroker?.RemoveSubscription(this); storeWrapper.itemBroker?.HandleSessionDisposed(this); sessionScriptCache?.Dispose(); + clusterSession?.Dispose(); // Cancel the async processor, if any asyncWaiterCancel?.Cancel(); diff --git a/test/Garnet.test.cluster/ClusterRangeIndexMigrateTests.cs b/test/Garnet.test.cluster/ClusterRangeIndexMigrateTests.cs new file mode 100644 index 00000000000..6fce62bc286 --- /dev/null +++ b/test/Garnet.test.cluster/ClusterRangeIndexMigrateTests.cs @@ -0,0 +1,936 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Net; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Allure.NUnit; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; +using NUnit.Framework; +using NUnit.Framework.Legacy; +using StackExchange.Redis; + +namespace Garnet.test.cluster +{ + [AllureNUnit] + [TestFixture, NonParallelizable] + public class ClusterRangeIndexMigrateTests : AllureTestBase + { + ClusterTestContext context; + readonly int defaultShards = 3; + + [SetUp] + public void Setup() + { + context = new ClusterTestContext(); + context.Setup([]); + } + + [TearDown] + public void TearDown() + { + context?.TearDown(); + } + + #region Helpers + + /// + /// Find a key name whose hash slot is owned by the node with the given ID. + /// + private string FindKeyOnNode(string prefix, string nodeId, List slots) + { + for (var ix = 0; ; ix++) + { + var key = $"{prefix}_{ix}"; + var slot = context.clusterTestUtils.HashSlot(key); + if (slots.Any(x => x.nnInfo.Any(y => y.nodeid == nodeId) && slot >= x.startSlot && slot <= x.endSlot)) + return key; + } + } + + /// + /// Create a RangeIndex key and insert fields on the given endpoint. + /// + private void CreateRangeIndexWithFields(IPEndPoint endpoint, string key, IEnumerable<(string Field, string Value)> fields) + { + var createResult = (string)context.clusterTestUtils.Execute( + endpoint, "RI.CREATE", + [key, "DISK", "CACHESIZE", "65536", "MINRECORD", "8"], + flags: CommandFlags.NoRedirect); + ClassicAssert.AreEqual("OK", createResult, $"RI.CREATE should succeed for key {key}"); + + foreach (var (field, value) in fields) + { + var setResult = (string)context.clusterTestUtils.Execute( + endpoint, "RI.SET", [key, field, value], + flags: CommandFlags.NoRedirect); + ClassicAssert.AreEqual("OK", setResult, $"RI.SET should succeed for {key}/{field}"); + } + } + + /// + /// Verify all fields are readable on the given endpoint. + /// + private void VerifyFieldsOnEndpoint(IPEndPoint endpoint, string key, IEnumerable<(string Field, string Value)> fields) + { + foreach (var (field, value) in fields) + { + var result = (string)context.clusterTestUtils.Execute( + endpoint, "RI.GET", [key, field], + flags: CommandFlags.NoRedirect); + ClassicAssert.AreEqual(value, result, $"RI.GET {key}/{field} should return {value}"); + } + } + + /// + /// Wait for slot ownership to propagate: slot must be on target and not on source. + /// + private void WaitForSlotOwnership(IPEndPoint source, IPEndPoint target, int slot, int timeoutSeconds = 10) + { + var start = Stopwatch.GetTimestamp(); + while (Stopwatch.GetElapsedTime(start) < TimeSpan.FromSeconds(timeoutSeconds)) + { + var sourceSlots = context.clusterTestUtils.GetOwnedSlotsFromNode(source, NullLogger.Instance); + var targetSlots = context.clusterTestUtils.GetOwnedSlotsFromNode(target, NullLogger.Instance); + + if (!sourceSlots.Contains(slot) && targetSlots.Contains(slot)) + return; + + Thread.Sleep(100); + } + + ClassicAssert.Fail($"Slot {slot} ownership did not propagate within {timeoutSeconds}s"); + } + + #endregion + + /// + /// Verifies that a RangeIndex key (RI.CREATE + RI.SET) survives slot migration + /// and is accessible via RI.GET on the destination node. + /// + [Test, Order(1)] + [Category("CLUSTER")] + public void ClusterMigrateRangeIndexSlot() + { + context.CreateInstances(defaultShards, enableRangeIndexPreview: true); + context.CreateConnection(); + + var (_, _) = context.clusterTestUtils.SimpleSetupCluster(logger: context.logger); + var riKey = "{ri-migrate-test}"; + var slot = context.clusterTestUtils.HashSlot(riKey); + var sourceNodeIndex = context.clusterTestUtils.GetSourceNodeIndexFromSlot((ushort)slot, context.logger); + + // Determine source and target endpoints + var sourceEndpoint = context.clusterTestUtils.GetEndPoint(sourceNodeIndex); + var targetNodeIndex = (sourceNodeIndex + 1) % defaultShards; + var targetEndpoint = context.clusterTestUtils.GetEndPoint(targetNodeIndex); + + context.logger?.LogWarning("RI migration test: slot={slot}, source=node{sourceIndex}({sourcePort}), target=node{targetIndex}({targetPort})", + slot, sourceNodeIndex, ((IPEndPoint)sourceEndpoint).Port, targetNodeIndex, ((IPEndPoint)targetEndpoint).Port); + + // Create RangeIndex and insert data on source node + var createResult = (string)context.clusterTestUtils.Execute( + (IPEndPoint)sourceEndpoint, "RI.CREATE", + [riKey, "DISK", "CACHESIZE", "65536", "MINRECORD", "8"], + flags: CommandFlags.NoRedirect); + ClassicAssert.AreEqual("OK", createResult, "RI.CREATE should succeed on source node"); + + var setResult = (string)context.clusterTestUtils.Execute( + (IPEndPoint)sourceEndpoint, "RI.SET", + [riKey, "field1", "value1"], + flags: CommandFlags.NoRedirect); + ClassicAssert.AreEqual("OK", setResult, "RI.SET should succeed on source node"); + + // Verify data is readable on source before migration + var getResult = (string)context.clusterTestUtils.Execute( + (IPEndPoint)sourceEndpoint, "RI.GET", + [riKey, "field1"], + flags: CommandFlags.NoRedirect); + ClassicAssert.AreEqual("value1", getResult, "RI.GET should return correct value before migration"); + + // Migrate the slot from source to target + context.logger?.LogWarning("Initiating slot migration"); + context.clusterTestUtils.MigrateSlots( + (IPEndPoint)sourceEndpoint, + (IPEndPoint)targetEndpoint, + new List { slot }, + logger: context.logger); + + context.clusterTestUtils.WaitForMigrationCleanup(logger: context.logger); + context.logger?.LogWarning("Migration cleanup complete"); + + // Verify data is accessible on the target node + var retries = 0; + string targetGetResult = null; + while (retries < 50) + { + try + { + targetGetResult = (string)context.clusterTestUtils.Execute( + (IPEndPoint)targetEndpoint, "RI.GET", + [riKey, "field1"], + flags: CommandFlags.NoRedirect); + if (targetGetResult != null) + break; + } + catch + { + // Slot may not be fully transferred yet + } + Thread.Sleep(100); + retries++; + } + + ClassicAssert.AreEqual("value1", targetGetResult, "RI.GET should return correct value on target node after migration"); + + context.logger?.LogWarning("ClusterMigrateRangeIndexSlot test passed"); + } + + /// + /// Single RI key with multiple fields, slot-based migration between 2 primaries. + /// Verifies all fields survive and source returns MOVED. + /// + [Test] + [Category("CLUSTER")] + public void ClusterMigrateRangeIndexSingleBySlot() + { + const int shards = 2; + context.CreateInstances(shards, enableRangeIndexPreview: true); + context.CreateConnection(); + + _ = context.clusterTestUtils.SimpleSetupCluster(logger: context.logger); + + var primary0 = (IPEndPoint)context.clusterTestUtils.GetEndPoint(0); + var primary1 = (IPEndPoint)context.clusterTestUtils.GetEndPoint(1); + var primary0Id = context.clusterTestUtils.ClusterMyId(primary0); + var slots = context.clusterTestUtils.ClusterSlots(primary0); + + var riKey = FindKeyOnNode(nameof(ClusterMigrateRangeIndexSingleBySlot), primary0Id, slots); + var slot = context.clusterTestUtils.HashSlot(riKey); + + // Create RI with multiple fields + var fields = new[] + { + ("field1", "value1"), ("field2", "value2"), ("field3", "value3"), + ("field4", "value4"), ("field5", "value5"), + }; + CreateRangeIndexWithFields(primary0, riKey, fields); + VerifyFieldsOnEndpoint(primary0, riKey, fields); + + // Migrate + context.clusterTestUtils.MigrateSlots(primary0, primary1, [slot]); + context.clusterTestUtils.WaitForMigrationCleanup(0); + context.clusterTestUtils.WaitForMigrationCleanup(1); + + WaitForSlotOwnership(primary0, primary1, slot); + + // Verify on target + VerifyFieldsOnEndpoint(primary1, riKey, fields); + + // Verify source returns MOVED + var movedResult = (string)context.clusterTestUtils.Execute( + primary0, "RI.GET", [riKey, "field1"], + flags: CommandFlags.NoRedirect); + ClassicAssert.IsTrue(movedResult.StartsWith("Key has MOVED to "), + $"Expected MOVED response from source, got: {movedResult}"); + } + + /// + /// Key-based migration of multiple RI keys in random order. + /// + [Test] + [Category("CLUSTER")] + public void ClusterMigrateRangeIndexByKeys() + { + const int shardCount = 3; + const int keyCount = 10; + + context.CreateInstances(shardCount, enableRangeIndexPreview: true); + context.CreateConnection(); + + var (_, _) = context.clusterTestUtils.SimpleSetupCluster(logger: context.logger); + + var sourceNodeIndex = 1; + var targetNodeIndex = 2; + var sourceNodeId = context.clusterTestUtils.GetNodeIdFromNode(sourceNodeIndex, NullLogger.Instance); + var targetNodeId = context.clusterTestUtils.GetNodeIdFromNode(targetNodeIndex, NullLogger.Instance); + var sourceEndpoint = (IPEndPoint)context.clusterTestUtils.GetEndPoint(sourceNodeIndex); + var targetEndpoint = (IPEndPoint)context.clusterTestUtils.GetEndPoint(targetNodeIndex); + + var keyBase = Encoding.ASCII.GetBytes("{abc}ri_"); + var workingSlot = ClusterTestUtils.HashSlot(keyBase); + + var rand = new Random(2025_05_03_00); + var allKeys = new List<(byte[] Key, List<(string Field, string Value)> Fields)>(); + + for (var i = 0; i < keyCount; i++) + { + var newKey = new byte[keyBase.Length + 1]; + Array.Copy(keyBase, 0, newKey, 0, keyBase.Length); + newKey[^1] = (byte)('a' + i); + ClassicAssert.AreEqual(workingSlot, ClusterTestUtils.HashSlot(newKey)); + + var keyStr = Encoding.ASCII.GetString(newKey); + var fields = new List<(string Field, string Value)>(); + var fieldCount = rand.Next(1, 4); + for (var f = 0; f < fieldCount; f++) + fields.Add(($"field_{f:D4}", $"value_{i}_{f}_{rand.Next(10000)}")); + + CreateRangeIndexWithFields(sourceEndpoint, keyStr, fields); + allKeys.Add((newKey, fields)); + } + + // Manual slot migration setup + var respImport = context.clusterTestUtils.SetSlot(targetNodeIndex, workingSlot, "IMPORTING", sourceNodeId); + ClassicAssert.AreEqual("OK", respImport); + var respMigrate = context.clusterTestUtils.SetSlot(sourceNodeIndex, workingSlot, "MIGRATING", targetNodeId); + ClassicAssert.AreEqual("OK", respMigrate); + + // Migrate keys one at a time in random order + var toMigrate = allKeys.Select(k => k.Key).ToList(); + while (toMigrate.Count > 0) + { + var ix = rand.Next(toMigrate.Count); + context.clusterTestUtils.MigrateKeys(sourceEndpoint, targetEndpoint, [toMigrate[ix]], NullLogger.Instance); + toMigrate.RemoveAt(ix); + } + + // Complete migration + var respNodeTarget = context.clusterTestUtils.SetSlot(targetNodeIndex, workingSlot, "NODE", targetNodeId); + ClassicAssert.AreEqual("OK", respNodeTarget); + context.clusterTestUtils.BumpEpoch(targetNodeIndex, waitForSync: true); + + var respNodeSource = context.clusterTestUtils.SetSlot(sourceNodeIndex, workingSlot, "NODE", targetNodeId); + ClassicAssert.AreEqual("OK", respNodeSource); + context.clusterTestUtils.BumpEpoch(sourceNodeIndex, waitForSync: true); + + context.clusterTestUtils.WaitForMigrationCleanup(); + + // Verify all keys and fields on target + foreach (var (key, fields) in allKeys) + { + var keyStr = Encoding.ASCII.GetString(key); + VerifyFieldsOnEndpoint(targetEndpoint, keyStr, fields); + } + } + + /// + /// Multiple RI keys in the same slot, slot-based migration. + /// + [Test] + [Category("CLUSTER")] + public void ClusterMigrateRangeIndexManyBySlot() + { + const int shards = 2; + const int keysPerPrimary = 4; + + context.CreateInstances(shards, enableRangeIndexPreview: true); + context.CreateConnection(); + + _ = context.clusterTestUtils.SimpleSetupCluster(logger: context.logger); + + var primary0 = (IPEndPoint)context.clusterTestUtils.GetEndPoint(0); + var primary1 = (IPEndPoint)context.clusterTestUtils.GetEndPoint(1); + var primary0Id = context.clusterTestUtils.ClusterMyId(primary0); + var slots = context.clusterTestUtils.ClusterSlots(primary0); + + var rand = new Random(42); + var primary0Keys = new List<(string Key, int Slot, List<(string Field, string Value)> Fields)>(); + + var ix = 0; + while (primary0Keys.Count < keysPerPrimary) + { + var key = $"{nameof(ClusterMigrateRangeIndexManyBySlot)}_{ix}"; + var slot = context.clusterTestUtils.HashSlot(key); + if (slots.Any(x => x.nnInfo.Any(y => y.nodeid == primary0Id) && slot >= x.startSlot && slot <= x.endSlot)) + { + var fields = new List<(string Field, string Value)>(); + var fieldCount = rand.Next(1, 6); + for (var f = 0; f < fieldCount; f++) + fields.Add(($"field_{f:D4}", $"value_{ix}_{f:D4}")); + + primary0Keys.Add((key, slot, fields)); + } + ix++; + } + + // Create all keys on primary0 + foreach (var (key, _, fields) in primary0Keys) + CreateRangeIndexWithFields(primary0, key, fields); + + // Migrate all distinct slots + var migrateSlots = primary0Keys.Select(k => k.Slot).Distinct().ToList(); + context.clusterTestUtils.MigrateSlots(primary0, primary1, migrateSlots); + context.clusterTestUtils.WaitForMigrationCleanup(0); + context.clusterTestUtils.WaitForMigrationCleanup(1); + + foreach (var slot in migrateSlots) + WaitForSlotOwnership(primary0, primary1, slot); + + // Verify all keys on primary1 + foreach (var (key, _, fields) in primary0Keys) + VerifyFieldsOnEndpoint(primary1, key, fields); + + // Verify source returns MOVED + foreach (var (key, _, _) in primary0Keys) + { + var result = (string)context.clusterTestUtils.Execute( + primary0, "RI.GET", [key, "field_0000"], + flags: CommandFlags.NoRedirect); + ClassicAssert.IsTrue(result.StartsWith("Key has MOVED to "), + $"Expected MOVED from source for key {key}"); + } + } + + /// + /// Client writes RI.SET continuously while migration happens, following MOVED redirects. + /// + [Test] + [Category("CLUSTER")] + public async Task ClusterMigrateRangeIndexWhileModifyingAsync() + { + const int shards = 2; + + context.CreateInstances(shards, enableRangeIndexPreview: true); + context.CreateConnection(); + + _ = context.clusterTestUtils.SimpleSetupCluster(logger: context.logger); + + var primary0 = (IPEndPoint)context.clusterTestUtils.GetEndPoint(0); + var primary1 = (IPEndPoint)context.clusterTestUtils.GetEndPoint(1); + var primary0Id = context.clusterTestUtils.ClusterMyId(primary0); + var slots = context.clusterTestUtils.ClusterSlots(primary0); + + var riKey = FindKeyOnNode(nameof(ClusterMigrateRangeIndexWhileModifyingAsync), primary0Id, slots); + var slot = context.clusterTestUtils.HashSlot(riKey); + + // Create RI key + var createResult = (string)context.clusterTestUtils.Execute( + primary0, "RI.CREATE", + [riKey, "DISK", "CACHESIZE", "65536", "MINRECORD", "8"], + flags: CommandFlags.NoRedirect); + ClassicAssert.AreEqual("OK", createResult); + + // Start background writer + using var cts = new CancellationTokenSource(); + var written = new ConcurrentBag<(string Field, string Value)>(); + + var writeTask = Task.Run(async () => + { + await Task.Yield(); + + using var con = ConnectionMultiplexer.Connect(context.clusterTestUtils.GetRedisConfig(context.endpoints)); + var db = con.GetDatabase(); + var ix = 0; + + while (!cts.IsCancellationRequested) + { + var field = $"field_{ix}"; + var value = $"value_{ix}"; + + try + { + var result = (string)db.Execute("RI.SET", [new RedisKey(riKey), field, value]); + if (result == "OK") + written.Add((field, value)); + } + catch (Exception exc) when ( + exc is RedisTimeoutException + || exc is RedisConnectionException + || (exc is RedisServerException rse && ( + rse.Message.StartsWith("MOVED ") + || rse.Message.StartsWith("Key has MOVED to ")))) + { + continue; + } + + ix++; + } + }); + + await Task.Delay(1_000).ConfigureAwait(false); + + var countPreMigration = written.Count; + ClassicAssert.IsTrue(countPreMigration > 0, "Should have some writes before migration"); + + // Migrate + using (var migrateToken = new CancellationTokenSource()) + { + migrateToken.CancelAfter(30_000); + + context.clusterTestUtils.MigrateSlots(primary0, primary1, [slot]); + context.clusterTestUtils.WaitForMigrationCleanup(0, cancellationToken: migrateToken.Token); + context.clusterTestUtils.WaitForMigrationCleanup(1, cancellationToken: migrateToken.Token); + } + + WaitForSlotOwnership(primary0, primary1, slot); + + // Wait for more writes after migration + var countPostMigration = written.Count; + await Task.Delay(2_000).ConfigureAwait(false); + var countAfterPause = written.Count; + + ClassicAssert.IsTrue(countAfterPause > countPostMigration, "Writes should resume after migration"); + + cts.Cancel(); + await writeTask.ConfigureAwait(false); + + // Write directly to the target after migration completes and verify readback + for (var i = 0; i < 5; i++) + { + var field = $"verify_field_{i}"; + var value = $"verify_value_{i}"; + var setRes = (string)context.clusterTestUtils.Execute( + primary1, "RI.SET", [riKey, field, value], + flags: CommandFlags.NoRedirect); + ClassicAssert.AreEqual("OK", setRes, $"RI.SET {field} should succeed on target after migration"); + + var getRes = (string)context.clusterTestUtils.Execute( + primary1, "RI.GET", [riKey, field], + flags: CommandFlags.NoRedirect); + ClassicAssert.AreEqual(value, getRes, $"RI.GET {field} should return correct value on target"); + } + } + + /// + /// Round-trip migration: P0 → P1 → P0, with data additions between each migration. + /// + [Test] + [Category("CLUSTER")] + public void ClusterMigrateRangeIndexBack() + { + const int shards = 2; + + context.CreateInstances(shards, enableRangeIndexPreview: true); + context.CreateConnection(); + + _ = context.clusterTestUtils.SimpleSetupCluster(logger: context.logger); + + var primary0 = (IPEndPoint)context.clusterTestUtils.GetEndPoint(0); + var primary1 = (IPEndPoint)context.clusterTestUtils.GetEndPoint(1); + var primary0Id = context.clusterTestUtils.ClusterMyId(primary0); + var slots = context.clusterTestUtils.ClusterSlots(primary0); + + var riKey = FindKeyOnNode(nameof(ClusterMigrateRangeIndexBack), primary0Id, slots); + var slot = context.clusterTestUtils.HashSlot(riKey); + + // Create RI with initial data on P0 + CreateRangeIndexWithFields(primary0, riKey, [("field_00", "value_00"), ("field_01", "value_01")]); + + // Migrate P0 → P1 + { + using var migrateToken = new CancellationTokenSource(); + migrateToken.CancelAfter(30_000); + + context.clusterTestUtils.MigrateSlots(primary0, primary1, [slot]); + context.clusterTestUtils.WaitForMigrationCleanup(0, cancellationToken: migrateToken.Token); + context.clusterTestUtils.WaitForMigrationCleanup(1, cancellationToken: migrateToken.Token); + } + + WaitForSlotOwnership(primary0, primary1, slot); + + // Verify on P1 and add more data + VerifyFieldsOnEndpoint(primary1, riKey, [("field_00", "value_00"), ("field_01", "value_01")]); + + var setResult = (string)context.clusterTestUtils.Execute( + primary1, "RI.SET", [riKey, "field_02", "value_02"], + flags: CommandFlags.NoRedirect); + ClassicAssert.AreEqual("OK", setResult); + + // Migrate P1 → P0 + { + using var migrateToken = new CancellationTokenSource(); + migrateToken.CancelAfter(30_000); + + context.clusterTestUtils.MigrateSlots(primary1, primary0, [slot]); + context.clusterTestUtils.WaitForMigrationCleanup(0, cancellationToken: migrateToken.Token); + context.clusterTestUtils.WaitForMigrationCleanup(1, cancellationToken: migrateToken.Token); + } + + WaitForSlotOwnership(primary1, primary0, slot); + + // Verify all data (original + added) survived round-trip + VerifyFieldsOnEndpoint(primary0, riKey, [("field_00", "value_00"), ("field_01", "value_01"), ("field_02", "value_02")]); + + // Add more data on P0 + var setResult2 = (string)context.clusterTestUtils.Execute( + primary0, "RI.SET", [riKey, "field_03", "value_03"], + flags: CommandFlags.NoRedirect); + ClassicAssert.AreEqual("OK", setResult2); + + VerifyFieldsOnEndpoint(primary0, riKey, [("field_00", "value_00"), ("field_01", "value_01"), ("field_02", "value_02"), ("field_03", "value_03")]); + } + + /// + /// Stress test: concurrent reads + writes + repeated back-and-forth migrations. + /// Verifies zero data loss. + /// + /// + /// Currently marked Explicit because RI.SET via cluster-mode client redirect can hit + /// "ERR no such range index" on the target if the RI key was just migrated and the + /// BfTree native instance isn't yet registered. This needs investigation in the + /// migration pipeline before this test can run reliably. + /// + [Test, Explicit("RI.SET via cluster redirect not yet reliable during migration")] + [Category("CLUSTER")] + public async Task ClusterMigrateRangeIndexStressAsync() + { + const int shards = 2; + const int keysPerPrimary = 2; + + context.CreateInstances(shards, enableRangeIndexPreview: true); + context.CreateConnection(); + + _ = context.clusterTestUtils.SimpleSetupCluster(logger: context.logger); + + var primary0 = (IPEndPoint)context.clusterTestUtils.GetEndPoint(0); + var primary1 = (IPEndPoint)context.clusterTestUtils.GetEndPoint(1); + var primary0Id = context.clusterTestUtils.ClusterMyId(primary0); + var slots = context.clusterTestUtils.ClusterSlots(primary0); + + // Find keys on each primary + var allKeys = new List<(string Key, int Slot, bool OnPrimary0)>(); + var numP0 = 0; + var numP1 = 0; + var ix = 0; + + while (numP0 < keysPerPrimary || numP1 < keysPerPrimary) + { + var key = $"{nameof(ClusterMigrateRangeIndexStressAsync)}_{ix}"; + var slot = context.clusterTestUtils.HashSlot(key); + var isOnP0 = slots.Any(x => x.nnInfo.Any(y => y.nodeid == primary0Id) && slot >= x.startSlot && slot <= x.endSlot); + + if (isOnP0 && numP0 < keysPerPrimary) + { + allKeys.Add((key, slot, true)); + numP0++; + } + else if (!isOnP0 && numP1 < keysPerPrimary) + { + allKeys.Add((key, slot, false)); + numP1++; + } + ix++; + } + + // Create RI keys on their respective primaries + foreach (var (key, _, onP0) in allKeys) + { + var endpoint = onP0 ? primary0 : primary1; + var createResult = (string)context.clusterTestUtils.Execute( + endpoint, "RI.CREATE", + [key, "DISK", "CACHESIZE", "65536", "MINRECORD", "8"], + flags: CommandFlags.NoRedirect); + ClassicAssert.AreEqual("OK", createResult); + } + + // Start concurrent writers + using var writeCancel = new CancellationTokenSource(); + var writeResults = new ConcurrentBag<(string Field, string Value)>[allKeys.Count]; + var writeTasks = new Task[allKeys.Count]; + var mostRecentWrite = 0L; + + using var readWriteCon = ConnectionMultiplexer.Connect(context.clusterTestUtils.GetRedisConfig(context.endpoints)); + var readWriteDb = readWriteCon.GetDatabase(); + + for (var i = 0; i < allKeys.Count; i++) + { + var (key, _, _) = allKeys[i]; + var bag = writeResults[i] = new ConcurrentBag<(string, string)>(); + + writeTasks[i] = Task.Run(async () => + { + await Task.Yield(); + var wix = 0; + + while (!writeCancel.IsCancellationRequested) + { + var field = $"field_{wix}"; + var value = $"value_{wix}"; + + try + { + var result = (string)readWriteDb.Execute("RI.SET", [new RedisKey(key), field, value]); + if (result == "OK") + bag.Add((field, value)); + } + catch (Exception exc) when ( + exc is RedisTimeoutException + || exc is RedisConnectionException + || (exc is RedisServerException rse && ( + rse.Message.StartsWith("MOVED ") + || rse.Message.StartsWith("Key has MOVED to ")))) + { + if (writeCancel.IsCancellationRequested) return; + continue; + } + + var now = DateTime.UtcNow.Ticks; + var prev = Interlocked.CompareExchange(ref mostRecentWrite, now, mostRecentWrite); + while (prev < now) + prev = Interlocked.CompareExchange(ref mostRecentWrite, now, prev); + + wix++; + } + }); + } + + // Start concurrent readers + using var readCancel = new CancellationTokenSource(); + var readTasks = new Task[allKeys.Count]; + + for (var i = 0; i < allKeys.Count; i++) + { + var (key, _, _) = allKeys[i]; + var bag = writeResults[i]; + + readTasks[i] = Task.Run(async () => + { + await Task.Yield(); + var successfulReads = 0; + var rng = new Random(i); + + while (!readCancel.IsCancellationRequested) + { + var snapshot = bag.ToList(); + if (snapshot.Count == 0) + { + await Task.Delay(10).ConfigureAwait(false); + continue; + } + + var (field, expectedValue) = snapshot[rng.Next(snapshot.Count)]; + + try + { + var result = (string)readWriteDb.Execute("RI.GET", [new RedisKey(key), field]); + if (result != null) + { + ClassicAssert.AreEqual(expectedValue, result, $"Read mismatch for {key}/{field}"); + successfulReads++; + } + } + catch (Exception exc) when ( + exc is RedisTimeoutException + || exc is RedisConnectionException + || (exc is RedisServerException rse && ( + rse.Message.StartsWith("MOVED ") + || rse.Message.StartsWith("Key has MOVED to ")))) + { + continue; + } + } + + return successfulReads; + }); + } + + await Task.Delay(1_000).ConfigureAwait(false); + ClassicAssert.IsTrue(writeResults.All(r => !r.IsEmpty), "Should have writes before migration"); + + // Migrator: ping-pong slots between primaries + using var migrateCancel = new CancellationTokenSource(); + + var migrateTask = Task.Run(async () => + { + var slotsOnP0 = allKeys.Where(k => k.OnPrimary0).Select(k => k.Slot).Distinct().ToList(); + var slotsOnP1 = allKeys.Where(k => !k.OnPrimary0).Select(k => k.Slot).Distinct().ToList(); + var migrationCount = 0; + var mostRecentMigration = 0L; + + while (!migrateCancel.IsCancellationRequested) + { + await Task.Delay(100).ConfigureAwait(false); + + // Wait for at least one write since last migration + if (Interlocked.CompareExchange(ref mostRecentWrite, 0, 0) < mostRecentMigration) + continue; + + // Move P0 → P1 + if (slotsOnP0.Count > 0) + { + using var token = new CancellationTokenSource(); + token.CancelAfter(30_000); + + context.clusterTestUtils.MigrateSlots(primary0, primary1, slotsOnP0); + context.clusterTestUtils.WaitForMigrationCleanup(0, cancellationToken: token.Token); + context.clusterTestUtils.WaitForMigrationCleanup(1, cancellationToken: token.Token); + } + + // Move P1 → P0 + if (slotsOnP1.Count > 0) + { + using var token = new CancellationTokenSource(); + token.CancelAfter(30_000); + + context.clusterTestUtils.MigrateSlots(primary1, primary0, slotsOnP1); + context.clusterTestUtils.WaitForMigrationCleanup(0, cancellationToken: token.Token); + context.clusterTestUtils.WaitForMigrationCleanup(1, cancellationToken: token.Token); + } + + mostRecentMigration = DateTime.UtcNow.Ticks; + migrationCount++; + + // Flip for next pass + (slotsOnP0, slotsOnP1) = (slotsOnP1, slotsOnP0); + } + + return migrationCount; + }); + + await Task.Delay(10_000).ConfigureAwait(false); + + migrateCancel.Cancel(); + var migrations = await migrateTask.ConfigureAwait(false); + ClassicAssert.IsTrue(migrations >= 2, $"Should have at least 2 migrations, had {migrations}"); + + writeCancel.Cancel(); + await Task.WhenAll(writeTasks).ConfigureAwait(false); + + readCancel.Cancel(); + var readResults = await Task.WhenAll(readTasks).ConfigureAwait(false); + ClassicAssert.IsTrue(readResults.All(r => r > 0), "Should have successful reads on all keys"); + + // Final verification: every written field must be readable + var curP0Slots = context.clusterTestUtils.GetOwnedSlotsFromNode(primary0, NullLogger.Instance); + + for (var i = 0; i < allKeys.Count; i++) + { + var (key, slot, _) = allKeys[i]; + var endpoint = curP0Slots.Contains(slot) ? primary0 : primary1; + + foreach (var (field, value) in writeResults[i]) + { + var result = (string)context.clusterTestUtils.Execute( + endpoint, "RI.GET", [key, field], + flags: CommandFlags.NoRedirect); + ClassicAssert.AreEqual(value, result, $"Data loss: {key}/{field} not found after stress"); + } + } + } + + /// + /// Test migration with different chunk sizes to exercise multi-chunk paths. + /// + [Test] + [Category("CLUSTER")] + [TestCase(1024)] // 1 KB — forces many chunks + [TestCase(4096)] // 4 KB + [TestCase(256 * 1024)] // 256 KB — default + public void ClusterMigrateRangeIndexWithChunkSize(int chunkSize) + { + const int shards = 2; + + context.CreateInstances(shards, enableRangeIndexPreview: true); + context.CreateConnection(); + + _ = context.clusterTestUtils.SimpleSetupCluster(logger: context.logger); + + var primary0 = (IPEndPoint)context.clusterTestUtils.GetEndPoint(0); + var primary1 = (IPEndPoint)context.clusterTestUtils.GetEndPoint(1); + var primary0Id = context.clusterTestUtils.ClusterMyId(primary0); + var slots = context.clusterTestUtils.ClusterSlots(primary0); + + var riKey = FindKeyOnNode($"{nameof(ClusterMigrateRangeIndexWithChunkSize)}_{chunkSize}", primary0Id, slots); + var slot = context.clusterTestUtils.HashSlot(riKey); + + // Create RI with enough data to span multiple chunks at small sizes + var fields = new List<(string Field, string Value)>(); + for (var i = 0; i < 50; i++) + fields.Add(($"field_{i}", new string('x', 100) + $"_{i}")); + + CreateRangeIndexWithFields(primary0, riKey, fields); + VerifyFieldsOnEndpoint(primary0, riKey, fields); + + // Migrate + context.clusterTestUtils.MigrateSlots(primary0, primary1, [slot]); + context.clusterTestUtils.WaitForMigrationCleanup(0); + context.clusterTestUtils.WaitForMigrationCleanup(1); + + WaitForSlotOwnership(primary0, primary1, slot); + + // Verify all fields on target + VerifyFieldsOnEndpoint(primary1, riKey, fields); + } + + /// + /// Large RangeIndex migration that generates enough data to span multiple chunks + /// even at the default 256 KB chunk size. Verifies all data survives. + /// + [Test] + [Category("CLUSTER")] + [TestCase(1024)] // 1 KB chunks — many chunks for large tree + [TestCase(256 * 1024)] // default — still multiple chunks with enough data + public void ClusterMigrateRangeIndexLargeTree(int chunkSize) + { + const int shards = 2; + const int fieldCount = 500; + + context.CreateInstances(shards, enableRangeIndexPreview: true); + context.CreateConnection(); + + _ = context.clusterTestUtils.SimpleSetupCluster(logger: context.logger); + + var primary0 = (IPEndPoint)context.clusterTestUtils.GetEndPoint(0); + var primary1 = (IPEndPoint)context.clusterTestUtils.GetEndPoint(1); + var primary0Id = context.clusterTestUtils.ClusterMyId(primary0); + var slots = context.clusterTestUtils.ClusterSlots(primary0); + + var riKey = FindKeyOnNode($"{nameof(ClusterMigrateRangeIndexLargeTree)}_{chunkSize}", primary0Id, slots); + var slot = context.clusterTestUtils.HashSlot(riKey); + + // Create RI with large data — 500 fields × ~1 KB values ≈ 500 KB of data + var rand = new Random(42); + var fields = new List<(string Field, string Value)>(); + for (var i = 0; i < fieldCount; i++) + { + var valueBytes = new byte[512]; + rand.NextBytes(valueBytes); + var value = Convert.ToBase64String(valueBytes); + fields.Add(($"field_{i:D5}", value)); + } + + // Use a larger max record size to accommodate the ~700-byte base64 values + var createResult = (string)context.clusterTestUtils.Execute( + primary0, "RI.CREATE", + [riKey, "DISK", "CACHESIZE", "65536", "MINRECORD", "8", "MAXRECORD", "1024"], + flags: CommandFlags.NoRedirect); + ClassicAssert.AreEqual("OK", createResult, "RI.CREATE should succeed"); + + foreach (var (field, value) in fields) + { + var setResult = (string)context.clusterTestUtils.Execute( + primary0, "RI.SET", [riKey, field, value], + flags: CommandFlags.NoRedirect); + ClassicAssert.AreEqual("OK", setResult, $"RI.SET should succeed for {field}"); + } + + // Verify a sample before migration + VerifyFieldsOnEndpoint(primary0, riKey, fields.Take(10)); + + // Migrate + context.clusterTestUtils.MigrateSlots(primary0, primary1, [slot]); + context.clusterTestUtils.WaitForMigrationCleanup(0); + context.clusterTestUtils.WaitForMigrationCleanup(1); + + WaitForSlotOwnership(primary0, primary1, slot); + + // Verify ALL fields on target — every single one must survive + VerifyFieldsOnEndpoint(primary1, riKey, fields); + + // Verify source returns MOVED + var movedResult = (string)context.clusterTestUtils.Execute( + primary0, "RI.GET", [riKey, "field_00000"], + flags: CommandFlags.NoRedirect); + ClassicAssert.IsTrue(movedResult.StartsWith("Key has MOVED to "), + $"Expected MOVED from source, got: {movedResult}"); + } + } +} diff --git a/test/Garnet.test.cluster/ClusterTestContext.cs b/test/Garnet.test.cluster/ClusterTestContext.cs index 94ea171cb48..750a0096ac2 100644 --- a/test/Garnet.test.cluster/ClusterTestContext.cs +++ b/test/Garnet.test.cluster/ClusterTestContext.cs @@ -304,7 +304,8 @@ public void CreateInstances( ClusterPreferredEndpointType clusterPreferredEndpointType = ClusterPreferredEndpointType.Ip, bool useClusterAnnounceHostname = false, int vectorSetReplayTaskCount = 0, - int threadPoolMinIOCompletionThreads = 0) + int threadPoolMinIOCompletionThreads = 0, + bool enableRangeIndexPreview = false) { var ipAddress = IPAddress.Loopback; TestUtils.EndPoint = new IPEndPoint(ipAddress, 7000); @@ -363,7 +364,8 @@ public void CreateInstances( clusterPreferredEndpointType: clusterPreferredEndpointType, clusterAnnounceHostname: useClusterAnnounceHostname ? "localhost" : null, vectorSetReplayTaskCount: vectorSetReplayTaskCount, - threadPoolMinIOCompletionThreads: threadPoolMinIOCompletionThreads); + threadPoolMinIOCompletionThreads: threadPoolMinIOCompletionThreads, + enableRangeIndexPreview: enableRangeIndexPreview); foreach (var node in nodes) node.Start(); diff --git a/test/Garnet.test/RangeIndexChunkedSerializerTests.cs b/test/Garnet.test/RangeIndexChunkedSerializerTests.cs new file mode 100644 index 00000000000..5b7fee69524 --- /dev/null +++ b/test/Garnet.test/RangeIndexChunkedSerializerTests.cs @@ -0,0 +1,1016 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +using System; +using System.Buffers.Binary; +using System.IO; +using System.IO.Hashing; +using System.Text; +using Allure.NUnit; +using Garnet.server; +using NUnit.Framework; +using NUnit.Framework.Legacy; + +namespace Garnet.test +{ + /// + /// Unit tests for and . + /// Tests the serialization wire format, checksum validation, state machine transitions, + /// and round-trip correctness without requiring a running Garnet server. + /// + [AllureNUnit] + [TestFixture] + public class RangeIndexChunkedSerializerTests : AllureTestBase + { + private string testDir; + + [SetUp] + public void Setup() + { + testDir = Path.Combine(TestUtils.MethodTestDir, "ri-serializer-test"); + if (Directory.Exists(testDir)) + Directory.Delete(testDir, recursive: true); + Directory.CreateDirectory(testDir); + } + + [TearDown] + public void TearDown() + { + if (Directory.Exists(testDir)) + Directory.Delete(testDir, recursive: true); + TestUtils.OnTearDown(); + } + + private static byte[] CreateStub() + { + var stub = new byte[RangeIndexManager.IndexSizeBytes]; + for (var i = 0; i < stub.Length; i++) + stub[i] = (byte)(0xA0 + i); + return stub; + } + + private static byte[] CreateBuffer() + { + return new byte[RangeIndexManager.DefaultMigrationChunkSize]; + } + + /// + /// Test helper: drives the serializer with file data from a FileStream, mimicking the + /// old single-call MoveNext API. Reads file data synchronously and feeds it to the serializer. + /// May call MoveNext multiple times to handle phase transitions within one "logical" chunk. + /// + private static int SerializerMoveNext(RangeIndexChunkedSerializer serializer, Span destination, FileStream fileStream) + { + var totalWritten = 0; + + while (!serializer.IsComplete && destination.Length > 0) + { + ReadOnlySpan fileData = default; + if (serializer.NeedsFileData) + { + var fileBuffer = new byte[destination.Length]; + var maxRead = (int)Math.Min(fileBuffer.Length, serializer.FileDataRemaining); + var bytesRead = fileStream.Read(fileBuffer, 0, maxRead); + if (bytesRead == 0 && serializer.FileDataRemaining > 0) + throw new EndOfStreamException($"RangeIndex file truncated: {serializer.FileDataRemaining} bytes remaining"); + fileData = fileBuffer.AsSpan(0, bytesRead); + } + + var written = serializer.MoveNext(destination, fileData, out _); + if (written == 0) + break; + + destination = destination[written..]; + totalWritten += written; + } + + return totalWritten; + } + + /// + /// Test helper: wraps the deserializer ProcessChunk with file writing, mimicking the + /// old single-call ProcessChunk API. + /// + private bool DeserializerProcessChunk(RangeIndexChunkedDeserializer deserializer, ReadOnlySpan data, FileStream stream) + { + var result = deserializer.ProcessChunk(data, out var fileDataOffset, out var fileDataLength); + if (fileDataLength > 0 && stream != null) + stream.Write(data.Slice(fileDataOffset, fileDataLength)); + return result; + } + + /// + /// Test helper: creates a deserializer + temp file stream and processes a chunk, + /// handling file writes automatically. + /// + private (bool Result, RangeIndexMigrationWriter Writer) ProcessChunkWithWriter(RangeIndexManager manager, ReadOnlySpan data) + { + var deserializer = new RangeIndexChunkedDeserializer(manager); + var result = writer.ProcessChunkAsync(data.ToArray()).GetAwaiter().GetResult(); + return (result, writer); + } + + /// + /// Small file that fits in a single chunk — serializer should emit exactly one MoveNext. + /// + [Test] + public void SingleChunkRoundTrip() + { + var fileData = new byte[1024]; + new Random(42).NextBytes(fileData); + var filePath = Path.Combine(testDir, "small.bftree"); + File.WriteAllBytes(filePath, fileData); + + var key = Encoding.UTF8.GetBytes("mykey"); + var stub = CreateStub(); + var buffer = CreateBuffer(); + + using var fs = new FileStream(filePath, FileMode.Open, FileAccess.Read); + var serializer = new RangeIndexChunkedSerializer(key, stub, fileData.Length); + + var len = SerializerMoveNext(serializer, buffer, fs); + ClassicAssert.Greater(len, 0); + var payload = buffer.AsSpan(0, len).ToArray(); + ClassicAssert.IsTrue(serializer.IsComplete); + + // Verify wire format: [4-byte keyLen][key][8-byte fileCount]... + var offset = 0; + var keyLenFromPayload = BinaryPrimitives.ReadInt32LittleEndian(payload); + ClassicAssert.AreEqual(key.Length, keyLenFromPayload); + offset += sizeof(int); + ClassicAssert.AreEqual(key, payload.AsSpan(offset, key.Length).ToArray()); + offset += key.Length; + var fileSizeFromPayload = BinaryPrimitives.ReadInt64LittleEndian(payload.AsSpan(offset)); + ClassicAssert.AreEqual(fileData.Length, fileSizeFromPayload); + + // Round-trip through deserializer + var manager = new RangeIndexManager(enabled: true, dataDir: testDir); + using var deserializer = new RangeIndexChunkedDeserializer(manager); + + ClassicAssert.IsTrue(deserializer.ProcessChunk(payload)); + ClassicAssert.IsTrue(deserializer.IsComplete); + ClassicAssert.IsFalse(deserializer.HasError); + ClassicAssert.AreEqual(key, deserializer.Key.ToArray()); + } + + /// + /// Large file that spans multiple chunks — verify all chunks round-trip correctly. + /// + [Test] + public void MultiChunkRoundTrip() + { + var fileSize = RangeIndexManager.DefaultMigrationChunkSize * 3 + 1000; + var fileData = new byte[fileSize]; + new Random(123).NextBytes(fileData); + var filePath = Path.Combine(testDir, "large.bftree"); + File.WriteAllBytes(filePath, fileData); + + var key = Encoding.UTF8.GetBytes("largekey"); + var stub = CreateStub(); + var buffer = CreateBuffer(); + + using var fs = new FileStream(filePath, FileMode.Open, FileAccess.Read); + var serializer = new RangeIndexChunkedSerializer(key, stub, fileSize); + + var manager = new RangeIndexManager(enabled: true, dataDir: testDir); + using var deserializer = new RangeIndexChunkedDeserializer(manager); + + var chunkCount = 0; + while (!serializer.IsComplete) + { + var len = SerializerMoveNext(serializer, buffer, fs); + ClassicAssert.IsTrue(deserializer.ProcessChunk(buffer.AsSpan(0, len).ToArray(), out _, out _)); + chunkCount++; + } + + ClassicAssert.Greater(chunkCount, 1); + ClassicAssert.IsTrue(deserializer.IsComplete); + ClassicAssert.IsFalse(deserializer.HasError); + ClassicAssert.AreEqual(key, deserializer.Key.ToArray()); + } + + /// + /// Empty file (0 bytes) — should still produce a valid record with key + header + trailer. + /// + [Test] + public void EmptyFileRoundTrip() + { + var filePath = Path.Combine(testDir, "empty.bftree"); + File.WriteAllBytes(filePath, Array.Empty()); + + var key = Encoding.UTF8.GetBytes("emptykey"); + var stub = CreateStub(); + var buffer = CreateBuffer(); + + using var fs = new FileStream(filePath, FileMode.Open, FileAccess.Read); + var serializer = new RangeIndexChunkedSerializer(key, stub, 0); + + var len = SerializerMoveNext(serializer, buffer, fs); + ClassicAssert.Greater(len, 0); + var payload = buffer.AsSpan(0, len).ToArray(); + ClassicAssert.IsTrue(serializer.IsComplete); + + // File count should be 0 + var fileCountOffset = sizeof(int) + key.Length; + var fileSizeFromPayload = BinaryPrimitives.ReadInt64LittleEndian(payload.AsSpan(fileCountOffset)); + ClassicAssert.AreEqual(0, fileSizeFromPayload); + + var manager = new RangeIndexManager(enabled: true, dataDir: testDir); + using var deserializer = new RangeIndexChunkedDeserializer(manager); + + ClassicAssert.IsTrue(deserializer.ProcessChunk(payload)); + ClassicAssert.IsTrue(deserializer.IsComplete); + ClassicAssert.IsFalse(deserializer.HasError); + ClassicAssert.AreEqual(key, deserializer.Key.ToArray()); + } + + /// + /// Corrupted checksum should cause the deserializer to enter Error state. + /// + [Test] + public void CorruptedChecksumDetected() + { + var fileData = new byte[512]; + new Random(99).NextBytes(fileData); + var filePath = Path.Combine(testDir, "corrupt.bftree"); + File.WriteAllBytes(filePath, fileData); + + var key = Encoding.UTF8.GetBytes("corruptkey"); + var stub = CreateStub(); + var buffer = CreateBuffer(); + + using var fs = new FileStream(filePath, FileMode.Open, FileAccess.Read); + var serializer = new RangeIndexChunkedSerializer(key, stub, fileData.Length); + + var len = SerializerMoveNext(serializer, buffer, fs); + ClassicAssert.Greater(len, 0); + var payload = buffer.AsSpan(0, len).ToArray(); + + // Corrupt a file data byte (after keyLen + key + fileCount) + var fileDataOffset = sizeof(int) + key.Length + sizeof(long); + payload[fileDataOffset + 10] ^= 0xFF; + + var manager = new RangeIndexManager(enabled: true, dataDir: testDir); + using var deserializer = new RangeIndexChunkedDeserializer(manager); + + ClassicAssert.IsFalse(deserializer.ProcessChunk(payload)); + ClassicAssert.IsTrue(deserializer.HasError); + ClassicAssert.IsFalse(deserializer.IsComplete); + } + + /// + /// Negative file size should cause Error state. + /// + [Test] + public void NegativeFileSizeIsError() + { + // [4-byte keyLen=0][8-byte negative fileCount] + var payload = new byte[sizeof(int) + sizeof(long)]; + BinaryPrimitives.WriteInt32LittleEndian(payload, 0); + BinaryPrimitives.WriteInt64LittleEndian(payload.AsSpan(sizeof(int)), -1); + + var manager = new RangeIndexManager(enabled: true, dataDir: testDir); + using var deserializer = new RangeIndexChunkedDeserializer(manager); + + ClassicAssert.IsFalse(deserializer.ProcessChunk(payload)); + ClassicAssert.IsTrue(deserializer.HasError); + } + + /// + /// Too-small first record (less than 4 bytes for key header) should cause Error state. + /// + [Test] + public void TooSmallHeaderIsError() + { + var payload = new byte[2]; // Less than sizeof(int) + + var manager = new RangeIndexManager(enabled: true, dataDir: testDir); + using var deserializer = new RangeIndexChunkedDeserializer(manager); + + ClassicAssert.IsFalse(deserializer.ProcessChunk(payload)); + ClassicAssert.IsTrue(deserializer.HasError); + } + + /// + /// After Error state, subsequent ProcessChunk calls should return false. + /// + [Test] + public void ErrorStateIsTerminal() + { + // Trigger error with negative file size + var payload = new byte[sizeof(int) + sizeof(long)]; + BinaryPrimitives.WriteInt32LittleEndian(payload, 0); + BinaryPrimitives.WriteInt64LittleEndian(payload.AsSpan(sizeof(int)), -1); + + var manager = new RangeIndexManager(enabled: true, dataDir: testDir); + using var deserializer = new RangeIndexChunkedDeserializer(manager); + + ClassicAssert.IsFalse(deserializer.ProcessChunk(payload)); + ClassicAssert.IsTrue(deserializer.HasError); + + ClassicAssert.IsFalse(deserializer.ProcessChunk(new byte[100])); + ClassicAssert.IsTrue(deserializer.HasError); + } + + /// + /// Verify the serializer preserves exact file content through round-trip. + /// + [Test] + public void FileContentPreservedInRoundTrip() + { + var fileData = new byte[RangeIndexManager.DefaultMigrationChunkSize * 2 + 500]; + new Random(77).NextBytes(fileData); + var filePath = Path.Combine(testDir, "content.bftree"); + File.WriteAllBytes(filePath, fileData); + + var key = Encoding.UTF8.GetBytes("contentkey"); + var stub = CreateStub(); + var buffer = CreateBuffer(); + + using var fs = new FileStream(filePath, FileMode.Open, FileAccess.Read); + var serializer = new RangeIndexChunkedSerializer(key, stub, fileData.Length); + + var manager = new RangeIndexManager(enabled: true, dataDir: testDir); + using var deserializer = new RangeIndexChunkedDeserializer(manager); + + while (!serializer.IsComplete) + { + var len = SerializerMoveNext(serializer, buffer, fs); + ClassicAssert.IsTrue(deserializer.ProcessChunk(buffer.AsSpan(0, len).ToArray())); + } + + ClassicAssert.IsTrue(deserializer.IsComplete); + + var tmpDir = Path.Combine(testDir, "rangeindex", ".migration-tmp"); + var tmpFiles = Directory.GetFiles(tmpDir, "*.bftree"); + ClassicAssert.AreEqual(1, tmpFiles.Length); + + var restoredData = File.ReadAllBytes(tmpFiles[0]); + ClassicAssert.AreEqual(fileData.Length, restoredData.Length); + ClassicAssert.AreEqual(fileData, restoredData); + } + + /// + /// Stub bytes round-trip correctly through serializer → deserializer. + /// + [Test] + public void StubPreservedInRoundTrip() + { + var filePath = Path.Combine(testDir, "stubtest.bftree"); + File.WriteAllBytes(filePath, new byte[100]); + + var key = Encoding.UTF8.GetBytes("stubkey"); + var stub = CreateStub(); + var buffer = CreateBuffer(); + + using var fs = new FileStream(filePath, FileMode.Open, FileAccess.Read); + var serializer = new RangeIndexChunkedSerializer(key, stub, 100); + + var manager = new RangeIndexManager(enabled: true, dataDir: testDir); + using var deserializer = new RangeIndexChunkedDeserializer(manager); + + while (!serializer.IsComplete) + { + var len = SerializerMoveNext(serializer, buffer, fs); + deserializer.ProcessChunk(buffer.AsSpan(0, len).ToArray(), out _, out _); + } + + ClassicAssert.IsTrue(deserializer.IsComplete); + ClassicAssert.AreEqual(key, deserializer.Key.ToArray()); + } + + /// + /// Dispose cleans up temp file. + /// + [Test] + public void DisposeCleansTempFile() + { + var manager = new RangeIndexManager(enabled: true, dataDir: testDir); + var deserializer = new RangeIndexChunkedDeserializer(manager); + + // Feed key header + key + file header with fileCount > 0 to create the file stream + var key = Encoding.UTF8.GetBytes("tmp"); + var payload = new byte[sizeof(int) + key.Length + sizeof(long) + 10]; + var offset = 0; + BinaryPrimitives.WriteInt32LittleEndian(payload, key.Length); + offset += sizeof(int); + key.CopyTo(payload.AsSpan(offset)); + offset += key.Length; + BinaryPrimitives.WriteInt64LittleEndian(payload.AsSpan(offset), 100); + // Remaining 10 bytes are file data (partial) + deserializer.ProcessChunk(payload); + + var tmpDir = Path.Combine(testDir, "rangeindex", ".migration-tmp"); + var tmpFiles = Directory.GetFiles(tmpDir, "*.bftree"); + ClassicAssert.AreEqual(1, tmpFiles.Length); + ClassicAssert.IsTrue(File.Exists(tmpFiles[0])); + + deserializer.Dispose(); + + ClassicAssert.IsFalse(File.Exists(tmpFiles[0])); + } + + /// + /// Startup cleanup removes .migration-tmp directory. + /// + [Test] + public void StartupCleansUpMigrationTmpDir() + { + var tmpDir = Path.Combine(testDir, "rangeindex", ".migration-tmp"); + Directory.CreateDirectory(tmpDir); + File.WriteAllText(Path.Combine(tmpDir, "orphan.bftree"), "leftover"); + + var manager = new RangeIndexManager(enabled: true, dataDir: testDir); + + ClassicAssert.IsFalse(Directory.Exists(tmpDir)); + manager.Dispose(); + } + + /// + /// Invalid stub size in trailer should cause Error state. + /// + [Test] + public void InvalidStubSizeIsError() + { + var badStubSize = 10; // Not IndexSizeBytes + var key = Encoding.UTF8.GetBytes("badstub"); + var badStub = new byte[badStubSize]; + + // Compute hash of empty file + var hasher = new XxHash64(); + Span hashBytes = stackalloc byte[sizeof(ulong)]; + hasher.GetHashAndReset(hashBytes); + + // [4-byte keyLen][key][8-byte fileCount=0][8-byte hash][4-byte badStubLen][badStub] + var trailerSize = sizeof(ulong) + sizeof(int) + badStubSize; + var payload = new byte[sizeof(int) + key.Length + sizeof(long) + trailerSize]; + var offset = 0; + BinaryPrimitives.WriteInt32LittleEndian(payload, key.Length); + offset += sizeof(int); + key.CopyTo(payload.AsSpan(offset)); + offset += key.Length; + BinaryPrimitives.WriteInt64LittleEndian(payload.AsSpan(offset), 0); + offset += sizeof(long); + hashBytes.CopyTo(payload.AsSpan(offset)); + offset += sizeof(ulong); + BinaryPrimitives.WriteInt32LittleEndian(payload.AsSpan(offset), badStubSize); + offset += sizeof(int); + badStub.CopyTo(payload.AsSpan(offset)); + + var manager = new RangeIndexManager(enabled: true, dataDir: testDir); + using var deserializer = new RangeIndexChunkedDeserializer(manager); + + ClassicAssert.IsFalse(deserializer.ProcessChunk(payload)); + ClassicAssert.IsTrue(deserializer.HasError); + } + + /// + /// Key spanning multiple chunks with a tiny chunkSize round-trips correctly. + /// + [Test] + public void KeySpanningMultipleChunksRoundTrip() + { + var filePath = Path.Combine(testDir, "tinyChunk.bftree"); + var fileData = new byte[100]; + new Random(55).NextBytes(fileData); + File.WriteAllBytes(filePath, fileData); + + // Key larger than chunkSize to force key chunking + var key = new byte[200]; + new Random(66).NextBytes(key); + var stub = CreateStub(); + const int tinyChunkSize = 50; + var buffer = new byte[tinyChunkSize]; + + using var fs = new FileStream(filePath, FileMode.Open, FileAccess.Read); + var serializer = new RangeIndexChunkedSerializer(key, stub, fileData.Length); + + var manager = new RangeIndexManager(enabled: true, dataDir: testDir); + using var deserializer = new RangeIndexChunkedDeserializer(manager); + + var chunkCount = 0; + while (!serializer.IsComplete) + { + var len = SerializerMoveNext(serializer, buffer, fs); + ClassicAssert.IsTrue(deserializer.ProcessChunk(buffer.AsSpan(0, len).ToArray(), out _, out _)); + chunkCount++; + } + + // Key (200 bytes) at chunkSize=50 → 4 key chunks + file chunks + ClassicAssert.Greater(chunkCount, 4); + ClassicAssert.IsTrue(deserializer.IsComplete); + ClassicAssert.IsFalse(deserializer.HasError); + ClassicAssert.AreEqual(key, deserializer.Key.ToArray()); + } + + /// + /// Calling MoveNext after the serializer is complete should throw InvalidOperationException. + /// + [Test] + public void MoveNextAfterDoneThrows() + { + var filePath = Path.Combine(testDir, "done.bftree"); + File.WriteAllBytes(filePath, new byte[64]); + + var key = Encoding.UTF8.GetBytes("k"); + var stub = CreateStub(); + var buffer = CreateBuffer(); + + using var fs = new FileStream(filePath, FileMode.Open, FileAccess.Read); + var serializer = new RangeIndexChunkedSerializer(key, stub, 64); + + while (!serializer.IsComplete) + SerializerMoveNext(serializer, buffer, fs); + + Assert.Throws(() => serializer.MoveNext(buffer, default, out _)); + } + + /// + /// When totalFileBytes exceeds the actual file size, the serializer should throw EndOfStreamException. + /// + [Test] + public void TruncatedFileThrowsEndOfStreamException() + { + var filePath = Path.Combine(testDir, "truncated.bftree"); + File.WriteAllBytes(filePath, new byte[50]); + + var key = Encoding.UTF8.GetBytes("k"); + var stub = CreateStub(); + var buffer = CreateBuffer(); + + using var fs = new FileStream(filePath, FileMode.Open, FileAccess.Read); + var serializer = new RangeIndexChunkedSerializer(key, stub, totalFileBytes: 1000); + + Assert.Throws(() => + { + while (!serializer.IsComplete) + SerializerMoveNext(serializer, buffer, fs); + }); + } + + /// + /// When the destination buffer is smaller than sizeof(int), the serializer should + /// return 0 and defer the key header to the next call. + /// + [Test] + public void BufferTooSmallForKeyHeaderDefersToNextChunk() + { + var filePath = Path.Combine(testDir, "keyheader.bftree"); + File.WriteAllBytes(filePath, new byte[32]); + + var key = Encoding.UTF8.GetBytes("mykey"); + var stub = CreateStub(); + + using var fs = new FileStream(filePath, FileMode.Open, FileAccess.Read); + var serializer = new RangeIndexChunkedSerializer(key, stub, 32); + + // Buffer too small for 4-byte key header + var tinyBuf = new byte[3]; + var written = SerializerMoveNext(serializer, tinyBuf, fs); + ClassicAssert.AreEqual(0, written); + ClassicAssert.IsFalse(serializer.IsComplete); + + // Retry with adequate buffer — should complete successfully + var buffer = CreateBuffer(); + var manager = new RangeIndexManager(enabled: true, dataDir: testDir); + using var deserializer = new RangeIndexChunkedDeserializer(manager); + + while (!serializer.IsComplete) + { + var len = SerializerMoveNext(serializer, buffer, fs); + ClassicAssert.IsTrue(deserializer.ProcessChunk(buffer.AsSpan(0, len).ToArray(), out _, out _)); + } + + ClassicAssert.IsTrue(deserializer.IsComplete); + ClassicAssert.IsFalse(deserializer.HasError); + ClassicAssert.AreEqual(key, deserializer.Key.ToArray()); + } + + /// + /// When the remaining buffer after key data is too small for the 8-byte file header, + /// the serializer defers the file header to the next chunk. + /// + [Test] + public void BufferTooSmallForFileHeaderDefersToNextChunk() + { + var filePath = Path.Combine(testDir, "fileheader.bftree"); + var fileData = new byte[64]; + new Random(42).NextBytes(fileData); + File.WriteAllBytes(filePath, fileData); + + var key = Encoding.UTF8.GetBytes("k"); + var stub = CreateStub(); + + using var fs = new FileStream(filePath, FileMode.Open, FileAccess.Read); + var serializer = new RangeIndexChunkedSerializer(key, stub, fileData.Length); + + // Buffer fits key header (4) + key (1) but leaves < 8 bytes for file header + // sizeof(int) + 1 + 6 = 11 → remaining after key = 6, which is < sizeof(long) + var smallBuf = new byte[sizeof(int) + key.Length + 6]; + var written = SerializerMoveNext(serializer, smallBuf, fs); + + // Should have written key header + key data only + ClassicAssert.AreEqual(sizeof(int) + key.Length, written); + ClassicAssert.IsFalse(serializer.IsComplete); + + // Verify key header was written correctly + var keyLen = BinaryPrimitives.ReadInt32LittleEndian(smallBuf); + ClassicAssert.AreEqual(key.Length, keyLen); + + // Continue with adequate buffer — should round-trip + var buffer = CreateBuffer(); + var manager = new RangeIndexManager(enabled: true, dataDir: testDir); + using var deserializer = new RangeIndexChunkedDeserializer(manager); + + // Feed the first partial chunk + ClassicAssert.IsTrue(deserializer.ProcessChunk(smallBuf.AsSpan(0, written).ToArray(), out _, out _)); + + while (!serializer.IsComplete) + { + var len = SerializerMoveNext(serializer, buffer, fs); + ClassicAssert.IsTrue(deserializer.ProcessChunk(buffer.AsSpan(0, len).ToArray(), out _, out _)); + } + + ClassicAssert.IsTrue(deserializer.IsComplete); + ClassicAssert.IsFalse(deserializer.HasError); + ClassicAssert.AreEqual(key, deserializer.Key.ToArray()); + } + + /// + /// When the remaining buffer after file data is too small for the trailer, + /// the serializer defers the trailer to the next chunk. + /// + [Test] + public void BufferTooSmallForTrailerDefersToNextChunk() + { + var fileData = new byte[64]; + new Random(42).NextBytes(fileData); + var filePath = Path.Combine(testDir, "trailer.bftree"); + File.WriteAllBytes(filePath, fileData); + + var key = Encoding.UTF8.GetBytes("k"); + var stub = CreateStub(); + var trailerSize = sizeof(ulong) + sizeof(int) + stub.Length; + + using var fs = new FileStream(filePath, FileMode.Open, FileAccess.Read); + var serializer = new RangeIndexChunkedSerializer(key, stub, fileData.Length); + + // Buffer fits everything except the trailer: keyHeader(4) + key(1) + fileHeader(8) + fileData(64) + (trailerSize - 1) + var bufSize = sizeof(int) + key.Length + sizeof(long) + fileData.Length + trailerSize - 1; + var buf = new byte[bufSize]; + var written = SerializerMoveNext(serializer, buf, fs); + + // Should have written everything except the trailer + ClassicAssert.AreEqual(sizeof(int) + key.Length + sizeof(long) + fileData.Length, written); + ClassicAssert.IsFalse(serializer.IsComplete); + + // Next call with adequate buffer should emit the trailer + var buffer = CreateBuffer(); + var trailerLen = SerializerMoveNext(serializer, buffer, fs); + ClassicAssert.AreEqual(trailerSize, trailerLen); + ClassicAssert.IsTrue(serializer.IsComplete); + + // Round-trip through deserializer + var manager = new RangeIndexManager(enabled: true, dataDir: testDir); + using var deserializer = new RangeIndexChunkedDeserializer(manager); + + ClassicAssert.IsTrue(deserializer.ProcessChunk(buf.AsSpan(0, written).ToArray(), out _, out _)); + ClassicAssert.IsFalse(deserializer.IsComplete); + ClassicAssert.IsTrue(deserializer.ProcessChunk(buffer.AsSpan(0, trailerLen).ToArray(), out _, out _)); + ClassicAssert.IsTrue(deserializer.IsComplete); + ClassicAssert.IsFalse(deserializer.HasError); + } + + /// + /// File size that is an exact multiple of the chunk size — verifies that + /// phase transitions at exact chunk boundaries work correctly. + /// + [Test] + public void ExactPhaseBoundaryTransitions() + { + var key = Encoding.UTF8.GetBytes("k"); + var stub = CreateStub(); + const int chunkSize = 64; + var headerOverhead = sizeof(int) + key.Length + sizeof(long); + + // File size = chunkSize - headerOverhead so that key+fileHeader+fileData fills exactly one chunk, + // leaving the trailer for the next chunk + var fileSize = chunkSize - headerOverhead; + var fileData = new byte[fileSize]; + new Random(77).NextBytes(fileData); + var filePath = Path.Combine(testDir, "boundary.bftree"); + File.WriteAllBytes(filePath, fileData); + + using var fs = new FileStream(filePath, FileMode.Open, FileAccess.Read); + var serializer = new RangeIndexChunkedSerializer(key, stub, fileSize); + + var buffer = new byte[chunkSize]; + + // First chunk: should contain key header + key + file header + all file data + var len1 = SerializerMoveNext(serializer, buffer, fs); + ClassicAssert.AreEqual(chunkSize, len1); + ClassicAssert.IsFalse(serializer.IsComplete); + + // Second chunk: should contain trailer only + var len2 = SerializerMoveNext(serializer, buffer, fs); + var trailerSize = sizeof(ulong) + sizeof(int) + stub.Length; + ClassicAssert.AreEqual(trailerSize, len2); + ClassicAssert.IsTrue(serializer.IsComplete); + + // Round-trip + var manager = new RangeIndexManager(enabled: true, dataDir: testDir); + using var deserializer = new RangeIndexChunkedDeserializer(manager); + + var allPayload = new byte[len1 + len2]; + buffer.AsSpan(0, len1).CopyTo(allPayload); // reuse buffer for chunk 2, so must reconstruct + + // Re-run to get clean data + fs.Seek(0, SeekOrigin.Begin); + var serializer2 = new RangeIndexChunkedSerializer(key, stub, fileSize); + using var deserializer2 = new RangeIndexChunkedDeserializer(manager); + + while (!serializer2.IsComplete) + { + var len = SerializerMoveNext(serializer2, buffer, fs); + ClassicAssert.IsTrue(deserializer2.ProcessChunk(buffer.AsSpan(0, len).ToArray(), out _, out _)); + } + + ClassicAssert.IsTrue(deserializer2.IsComplete); + ClassicAssert.IsFalse(deserializer2.HasError); + } + + /// + /// Calling Dispose twice on the migration reader should not throw. + /// + [Test] + public void DoubleDisposeIsIdempotent() + { + var filePath = Path.Combine(testDir, "dispose.bftree"); + File.WriteAllBytes(filePath, new byte[16]); + + var fs = new FileStream(filePath, FileMode.Open, FileAccess.Read); + var serializer = new RangeIndexChunkedSerializer(Encoding.UTF8.GetBytes("k"), CreateStub(), 16); + var reader = new RangeIndexMigrationReader(serializer, fs, 256); + + reader.Dispose(); + reader.Dispose(); // Should not throw + } + + /// + /// Zero-length key should serialize and round-trip correctly. + /// + [Test] + public void ZeroLengthKeyRoundTrip() + { + var fileData = new byte[100]; + new Random(88).NextBytes(fileData); + var filePath = Path.Combine(testDir, "emptykey.bftree"); + File.WriteAllBytes(filePath, fileData); + + var key = Array.Empty(); + var stub = CreateStub(); + var buffer = CreateBuffer(); + + using var fs = new FileStream(filePath, FileMode.Open, FileAccess.Read); + var serializer = new RangeIndexChunkedSerializer(key, stub, fileData.Length); + + var manager = new RangeIndexManager(enabled: true, dataDir: testDir); + using var deserializer = new RangeIndexChunkedDeserializer(manager); + + while (!serializer.IsComplete) + { + var len = SerializerMoveNext(serializer, buffer, fs); + ClassicAssert.IsTrue(deserializer.ProcessChunk(buffer.AsSpan(0, len).ToArray(), out _, out _)); + } + + ClassicAssert.IsTrue(deserializer.IsComplete); + ClassicAssert.IsFalse(deserializer.HasError); + ClassicAssert.AreEqual(0, deserializer.Key.Length); + } + + /// + /// Verify the xxHash64 checksum in the trailer matches a manually computed hash + /// over the file data bytes. + /// + [Test] + public void TrailerChecksumAndStubContentVerification() + { + var fileData = new byte[256]; + new Random(44).NextBytes(fileData); + var filePath = Path.Combine(testDir, "checksum.bftree"); + File.WriteAllBytes(filePath, fileData); + + var key = Encoding.UTF8.GetBytes("hashkey"); + var stub = CreateStub(); + var buffer = CreateBuffer(); + + using var fs = new FileStream(filePath, FileMode.Open, FileAccess.Read); + var serializer = new RangeIndexChunkedSerializer(key, stub, fileData.Length); + + var len = SerializerMoveNext(serializer, buffer, fs); + ClassicAssert.IsTrue(serializer.IsComplete); + + var payload = buffer.AsSpan(0, len); + + // Parse trailer from the end: [8-byte hash][4-byte stubLen][stub] + var trailerSize = sizeof(ulong) + sizeof(int) + stub.Length; + var trailerStart = len - trailerSize; + + var hashFromPayload = BinaryPrimitives.ReadUInt64LittleEndian(payload[trailerStart..]); + var stubLenFromPayload = BinaryPrimitives.ReadInt32LittleEndian(payload[(trailerStart + sizeof(ulong))..]); + var stubFromPayload = payload[(trailerStart + sizeof(ulong) + sizeof(int)).. + (trailerStart + sizeof(ulong) + sizeof(int) + stub.Length)].ToArray(); + + // Verify stub content + ClassicAssert.AreEqual(RangeIndexManager.IndexSizeBytes, stubLenFromPayload); + ClassicAssert.AreEqual(stub, stubFromPayload); + + // Manually compute xxHash64 over file data bytes in the payload + var fileDataOffset = sizeof(int) + key.Length + sizeof(long); + var fileDataFromPayload = payload[fileDataOffset..(fileDataOffset + fileData.Length)]; + + var manualHasher = new XxHash64(); + manualHasher.Append(fileDataFromPayload); + Span manualHashBytes = stackalloc byte[sizeof(ulong)]; + manualHasher.GetHashAndReset(manualHashBytes); + var manualHash = BinaryPrimitives.ReadUInt64LittleEndian(manualHashBytes); + + ClassicAssert.AreEqual(manualHash, hashFromPayload); + } + + /// + /// IsComplete should be false during the entire serialization process + /// and only become true after the final MoveNext emits the trailer. + /// + [Test] + public void IsCompleteTransitionsCorrectly() + { + var fileData = new byte[RangeIndexManager.DefaultMigrationChunkSize + 100]; + new Random(33).NextBytes(fileData); + var filePath = Path.Combine(testDir, "complete.bftree"); + File.WriteAllBytes(filePath, fileData); + + var key = Encoding.UTF8.GetBytes("progresskey"); + var stub = CreateStub(); + var buffer = CreateBuffer(); + + using var fs = new FileStream(filePath, FileMode.Open, FileAccess.Read); + var serializer = new RangeIndexChunkedSerializer(key, stub, fileData.Length); + + ClassicAssert.IsFalse(serializer.IsComplete); + + var chunkCount = 0; + while (!serializer.IsComplete) + { + if (chunkCount > 0) + ClassicAssert.IsFalse(serializer.IsComplete); + SerializerMoveNext(serializer, buffer, fs); + chunkCount++; + } + + ClassicAssert.IsTrue(serializer.IsComplete); + ClassicAssert.Greater(chunkCount, 1); + } + + /// + /// When totalFileBytes is less than the actual file size, the serializer + /// should only emit the declared number of bytes (truncated prefix). + /// + [Test] + public void DeclaredSizeSmallerThanActualFileEmitsTruncatedPrefix() + { + var fullFileData = new byte[500]; + new Random(55).NextBytes(fullFileData); + var filePath = Path.Combine(testDir, "shorter.bftree"); + File.WriteAllBytes(filePath, fullFileData); + + var declaredSize = 200L; + var key = Encoding.UTF8.GetBytes("shortkey"); + var stub = CreateStub(); + var buffer = CreateBuffer(); + + using var fs = new FileStream(filePath, FileMode.Open, FileAccess.Read); + var serializer = new RangeIndexChunkedSerializer(key, stub, declaredSize); + + var len = SerializerMoveNext(serializer, buffer, fs); + ClassicAssert.IsTrue(serializer.IsComplete); + + // Verify file count in payload matches declared size, not actual + var fileSizeOffset = sizeof(int) + key.Length; + var fileSizeFromPayload = BinaryPrimitives.ReadInt64LittleEndian(buffer.AsSpan(fileSizeOffset)); + ClassicAssert.AreEqual(declaredSize, fileSizeFromPayload); + + // Round-trip through writer + var manager = new RangeIndexManager(enabled: true, dataDir: testDir); + using var deserializer = new RangeIndexChunkedDeserializer(manager); + + ClassicAssert.IsTrue(deserializer.ProcessChunk(buffer.AsSpan(0, len).ToArray())); + ClassicAssert.IsTrue(deserializer.IsComplete); + ClassicAssert.IsFalse(deserializer.HasError); + + // Verify the temp file contains exactly declaredSize bytes + var tmpDir = Path.Combine(testDir, "rangeindex", ".migration-tmp"); + var tmpFiles = Directory.GetFiles(tmpDir, "*.bftree"); + ClassicAssert.AreEqual(1, tmpFiles.Length); + var restoredData = File.ReadAllBytes(tmpFiles[0]); + ClassicAssert.AreEqual(declaredSize, restoredData.Length); + ClassicAssert.AreEqual(fullFileData.AsSpan(0, (int)declaredSize).ToArray(), restoredData); + } + + /// + /// When the destination buffer becomes empty exactly when entering the FileData phase + /// (e.g., the buffer was fully consumed by key header + key + file header), the + /// serializer should return the bytes written so far without throwing. + /// + [Test] + public void BufferExhaustedAtFileDataPhaseDoesNotThrow() + { + var fileData = new byte[100]; + new Random(42).NextBytes(fileData); + var filePath = Path.Combine(testDir, "exhausted.bftree"); + File.WriteAllBytes(filePath, fileData); + + var key = Encoding.UTF8.GetBytes("k"); + var stub = CreateStub(); + + using var fs = new FileStream(filePath, FileMode.Open, FileAccess.Read); + var serializer = new RangeIndexChunkedSerializer(key, stub, fileData.Length); + + // Buffer exactly fits key header (4) + key (1) + file header (8) = 13 + // This leaves 0 bytes for file data + var exactBuf = new byte[sizeof(int) + key.Length + sizeof(long)]; + var written = SerializerMoveNext(serializer, exactBuf, fs); + + // Should have written all 13 bytes without throwing + ClassicAssert.AreEqual(exactBuf.Length, written); + ClassicAssert.IsFalse(serializer.IsComplete); + + // Continue with adequate buffer — should complete and round-trip + var buffer = CreateBuffer(); + var manager = new RangeIndexManager(enabled: true, dataDir: testDir); + using var deserializer = new RangeIndexChunkedDeserializer(manager); + + ClassicAssert.IsTrue(deserializer.ProcessChunk(exactBuf.AsSpan(0, written).ToArray(), out _, out _)); + + while (!serializer.IsComplete) + { + var len = SerializerMoveNext(serializer, buffer, fs); + ClassicAssert.IsTrue(deserializer.ProcessChunk(buffer.AsSpan(0, len).ToArray(), out _, out _)); + } + + ClassicAssert.IsTrue(deserializer.IsComplete); + ClassicAssert.IsFalse(deserializer.HasError); + } + + /// + /// Serializing with a very small buffer that forces key data and file data + /// to span many chunks. The buffer must be at least as large as the trailer + /// (the largest element that must fit entirely), but forces data to be emitted + /// in small increments. + /// + [Test] + public void SmallBufferRoundTrip() + { + var fileData = new byte[50]; + new Random(99).NextBytes(fileData); + var filePath = Path.Combine(testDir, "smallbuf.bftree"); + File.WriteAllBytes(filePath, fileData); + + var key = Encoding.UTF8.GetBytes("abcdef"); + var stub = CreateStub(); + + using var fs = new FileStream(filePath, FileMode.Open, FileAccess.Read); + var serializer = new RangeIndexChunkedSerializer(key, stub, fileData.Length); + + var manager = new RangeIndexManager(enabled: true, dataDir: testDir); + using var deserializer = new RangeIndexChunkedDeserializer(manager); + + // Buffer must fit the trailer (largest atomic element): + // sizeof(ulong) + sizeof(int) + stub.Length + var trailerSize = sizeof(ulong) + sizeof(int) + stub.Length; + var buffer = new byte[trailerSize]; + + using var allChunks = new MemoryStream(); + var chunkCount = 0; + + while (!serializer.IsComplete) + { + var len = SerializerMoveNext(serializer, buffer, fs); + if (len > 0) + allChunks.Write(buffer, 0, len); + chunkCount++; + + ClassicAssert.Less(chunkCount, 1000, "Serializer did not complete within expected iterations"); + } + + // With a small buffer, data should span multiple chunks + ClassicAssert.Greater(chunkCount, 1); + + // Feed entire concatenated payload to deserializer at once + ClassicAssert.IsTrue(deserializer.ProcessChunk(allChunks.ToArray())); + ClassicAssert.IsTrue(deserializer.IsComplete); + ClassicAssert.IsFalse(deserializer.HasError); + ClassicAssert.AreEqual(key, deserializer.Key.ToArray()); + } + } +} diff --git a/test/Garnet.test/TestUtils.cs b/test/Garnet.test/TestUtils.cs index 806d91153ec..b07feba0b62 100644 --- a/test/Garnet.test/TestUtils.cs +++ b/test/Garnet.test/TestUtils.cs @@ -530,7 +530,8 @@ public static (GarnetServer[] Nodes, GarnetServerOptions[] Options) CreateGarnet ClusterPreferredEndpointType clusterPreferredEndpointType = ClusterPreferredEndpointType.Ip, string clusterAnnounceHostname = null, int vectorSetReplayTaskCount = 0, - int threadPoolMinIOCompletionThreads = 0) + int threadPoolMinIOCompletionThreads = 0, + bool enableRangeIndexPreview = false) { if (UseAzureStorage) IgnoreIfNotRunningAzureTests(); @@ -597,7 +598,8 @@ public static (GarnetServer[] Nodes, GarnetServerOptions[] Options) CreateGarnet clusterPreferredEndpointType: clusterPreferredEndpointType, clusterAnnounceHostname: clusterAnnounceHostname, vectorSetReplayTaskCount: vectorSetReplayTaskCount, - threadPoolMinIOCompletionThreads: threadPoolMinIOCompletionThreads); + threadPoolMinIOCompletionThreads: threadPoolMinIOCompletionThreads, + enableRangeIndexPreview: enableRangeIndexPreview); ClassicAssert.IsNotNull(opts); diff --git a/website/docs/dev/range-index-resp-api.md b/website/docs/dev/range-index-resp-api.md index 4dc61630fd4..d02feaa852f 100644 --- a/website/docs/dev/range-index-resp-api.md +++ b/website/docs/dev/range-index-resp-api.md @@ -2495,115 +2495,230 @@ and restores from the snapshot files at the expected paths. ### E. Key Migration (Cluster Slot Migration) -> **Reference:** `libs/cluster/Server/Migration/MigrateSessionKeys.cs` — -> migrates individual keys during cluster slot migration via `MigrateKeysFromStore()`. -> Key scanning: `MigrateScanFunctions.cs` — iterates store records for slot matching. -> Receiver: `libs/cluster/Session/RespClusterMigrateCommands.cs`. -> Currently, migration handles only standard string and object records. RangeIndex -> keys require special 2-phase migration since BfTree data lives outside Tsavorite. - -**Problem:** During slot migration, individual keys are transferred to the target node. -For a RangeIndex key, we can't just send the 51-byte stub — we must also send the -entire BfTree data (all entries in the index). The target node must recreate the BfTree -from this data. - -**Solution: 2-Phase Migration** - -#### Phase 1: Serialize BfTree data - -When the migration scanner encounters a RangeIndex record (identified by `RecordType`): - -```csharp -// In MigrateScanFunctions.cs (add RangeIndex detection): -if (srcLogRecord.RecordType == RangeIndexManager.RangeIndexRecordType) -{ - mss.EncounteredRangeIndex(ref key, ref value); - return; // Don't transmit the stub yet — Phase 2 -} -``` - -```csharp -// In MigrateSessionKeys.cs (new method): -internal void EncounteredRangeIndex(ReadOnlySpan key, ReadOnlySpan value) -{ - // Save key + value for Phase 2 - rangeIndexKeysToMigrate.Add((key.ToByteArray(), value.ToByteArray())); -} -``` - -#### Phase 2: Transmit BfTree snapshot + stub - -```csharp -// In MigrateSessionKeys.cs, TransmitRangeIndexKeys() (new method): -internal bool TransmitRangeIndexKeys() -{ - foreach (var (keyBytes, stubBytes) in rangeIndexKeysToMigrate) - { - // 1. Read stub to get TreePtr - RangeIndexManager.ReadIndex(stubBytes, out var treePtr, out var cacheSize, - out var minRecordSize, out var maxRecordSize, - out var maxKeyLen, out var leafPageSize, - out var storageBackend, ...); - - // 2. Snapshot BfTree to a temporary file - var tempSnapshotPath = Path.GetTempFileName(); - rangeIndexManager.SnapshotToPath(treePtr, tempSnapshotPath); - - // 3. Read snapshot file bytes - var snapshotBytes = File.ReadAllBytes(tempSnapshotPath); - - // 4. Send to target: [stub (51 bytes)] + [snapshot_length (4 bytes)] - // + [snapshot_bytes (N bytes)] - var payload = new byte[stubBytes.Length + 4 + snapshotBytes.Length]; - Buffer.BlockCopy(stubBytes, 0, payload, 0, stubBytes.Length); - BitConverter.TryWriteBytes(payload.AsSpan(stubBytes.Length), snapshotBytes.Length); - Buffer.BlockCopy(snapshotBytes, 0, payload, stubBytes.Length + 4, - snapshotBytes.Length); - - // 5. Transmit as a special store type "RISTORE" - gcs.TryWriteRangeIndexMigration(keyBytes, payload); - - // 6. Cleanup temp file - File.Delete(tempSnapshotPath); - - // 7. Delete local key (if not COPY) - if (!isCopy) DeleteLocalKey(keyBytes); - } - return true; -} -``` - -#### Receiver side +> **Reference:** +> - `libs/cluster/Server/Migration/MigrationDriver.cs` — `MigrateSession.BeginAsyncMigrationTask` +> orchestrates state transitions (IMPORTING / MIGRATING) and calls `MigrateSlotsDriverInline`. +> - `libs/cluster/Server/Migration/MigrateSessionSlots.cs` — `CreateAndRunMigrateTasks` spawns +> parallel `ScanStoreTask` workers; after the scan completes it runs inline loops for +> special record types (Vector Sets today; RangeIndex to be added). +> - `libs/cluster/Server/Migration/MigrateSessionKeys.cs` — handles the `MIGRATE … KEYS` path. +> - `libs/cluster/Server/Migration/MigrateScanFunctions.cs` — per-record scan callback that +> classifies each record and either ships it directly or captures it for a post-scan phase. +> - `libs/cluster/Server/Migration/MigrateOperation.cs` — per-batch state that accumulates +> special record types encountered during a scan (e.g. `VectorSets`). +> - `libs/cluster/Session/RespClusterMigrateCommands.cs` — destination dispatcher +> (`NetworkClusterMigrate`) that peels `MigrationRecordSpanType` tags and routes each record +> to the appropriate handler. +> - `libs/client/ClientSession/GarnetClientSessionIncremental.cs` — defines +> `MigrationRecordSpanType` and `TryWriteRecordSpan`, the transport envelope. + +**Problem.** During slot migration, a stub-only transfer is insufficient: the 51-byte stub +points to a process-local native `BfTree` whose on-disk data file lives outside Tsavorite +(`{dataDir}/rangeindex/{key_hash}/data.bftree`). The target node has no access to that +file. We must ship the entire tree-data file alongside the stub, and the target must +rebuild the native `BfTree` from it before publishing a usable stub into its main store. + +#### Design goals + +- **Piggyback on the `DiskLogRecord` transport.** The existing migration pipeline + (`TryWriteRecordSpan`) is already chunked, flushed, ACK'd, and integrated with the + cluster state machine. We reuse it by adding two new `MigrationRecordSpanType` tags + rather than introducing a parallel channel. This is the same pattern Vector Sets use + for `VectorSetIndex` / `VectorSetElement`. +- **Stream the snapshot file as raw bytes — no compression.** The native + `BfTree::snapshot()` already writes a compact on-disk representation; an extra + compression pass costs CPU on both ends with limited win. Streaming raw bytes also + keeps the destination logic simple: it only has to append chunks to a file. +- **Decouple data transfer from stub publication.** The target writes incoming bytes to + a working file but does **not** publish a stub into its main store until the full + snapshot has arrived and the native `BfTree` is successfully recovered. This keeps the + target in a consistent state if the migration aborts mid-stream. +- **Integrate with the existing `MIGRATING` / `IMPORTING` slot state machine.** No new + coordination between primaries is needed beyond what slot migration already provides + — RangeIndex keys live in the main keyspace and inherit its hash-slot semantics. + +#### Wire format + +Two new values are added to `MigrationRecordSpanType` in +`libs/client/ClientSession/GarnetClientSessionIncremental.cs`: + +| Tag | Name | Payload | +|---|---|---| +| `4` | `RangeIndexSnapshotChunk` | `[int keyLen][key bytes][int chunkIndex][int totalChunks][long uncompressedLen][int chunkLen][chunk bytes]` | +| `5` | `RangeIndexStub` | `[int keyLen][key bytes][51-byte RangeIndex stub]` | + +All integers are little-endian. `uncompressedLen` is retained in the schema for future +use (and to let the destination preallocate / sanity-check) but the chunk bytes are raw +snapshot bytes. `chunkIndex` is 0-based; the destination completes reassembly when it +has seen `totalChunks` chunks. + +**Max chunk size.** Bounded by the outgoing network send buffer in `GarnetClientSession`. +A constant (`RangeIndexManager.DefaultMigrationChunkSize`, e.g. `256 * 1024`) is chosen +to comfortably fit inside the default send buffer while still amortizing per-chunk +framing overhead. The last chunk is whatever remains. + +#### Source side — producing the stream + +1. **Scan detection.** `MigrateScanFunctions.Reader` classifies each record by + `logRecord.RecordType`. When it encounters + `RangeIndexManager.RangeIndexRecordType == 2`, it does **not** ship the record + verbatim. Instead it calls `mo.EncounteredRangeIndex(key, stubBytes)` to capture the + key + stub bytes onto the batch's `MigrateOperation`. This mirrors + `EncounteredVectorSet`. + +2. **Post-scan drain.** After `Task.WhenAll` on the scan workers completes, + `MigrateSessionSlots.CreateAndRunMigrateTasks` runs a new inline loop over + `mo.RangeIndexes` (analogous to the existing Vector Set drain loop). For each + captured key it calls `TransmitRangeIndex(mo, keyBytes, stubBytes)`. + +3. **Snapshot + chunk transmit.** `MigrateSessionRangeIndex.TransmitRangeIndex` does: + ``` + a. RangeIndexManager.CreateSerializer(localServerSession, keyBytes, stubBytes) + — internally calls SnapshotForMigration which: + 1. Acquires the per-key exclusive lock. + 2. Re-reads the stub from the main store (RIGET) under the lock to get a + fresh TreeHandle — the stub from discovery may be stale. + 3. If the tree is live (TreeHandle ≠ 0, in liveIndexes): + snapshots it to a temporary migration-specific file + ({dataDir}/rangeindex/.migration-tmp/{guid}.bftree). + 4. If the tree was evicted (TreeHandle == 0): + copies the existing flush.bftree or checkpoint snapshot file to the + same temp directory. Tsavorite guarantees OnFlush runs before OnEvict, + so a snapshot file must exist on disk. + 5. Releases the exclusive lock. + — The temp file is safe from concurrent overwrites because: + • It has a unique GUID name (no collision with other migrations). + • OnFlush writes to flush.bftree, not the migration temp file. + • The native BfTree writes to data.bftree, not the migration temp file. + b. Open FileStream(tempMigrationPath, Read). + c. Stream chunks via RangeIndexChunkedSerializer. + d. For each chunk: + - Read up to DefaultMigrationChunkSize bytes into a pooled buffer. + - Build the SerializedRangeIndexStream payload. + - gcs.TryWriteRecordSpan(payload, MigrationRecordSpanType.SerializedRangeIndexStream). + - If the session send buffer is full, the existing flush/ACK logic drains it. + e. Force flush and await ACK before deleting the source key. + f. On ACK success: mo.DeleteRangeIndex(keyBytes) deletes the local key + (normal MIGRATE semantics; skipped under COPY). + g. RangeIndexChunkedSerializer.Dispose() closes the file stream and + deletes the temp migration file. + ``` -```csharp -// In RespClusterMigrateCommands.cs, add case for RISTORE: -case "RISTORE": - // 1. Parse payload: stub + snapshot bytes - var stub = payload.AsSpan(0, RangeIndexManager.IndexSizeBytes); - var snapshotLen = BitConverter.ToInt32( - payload.AsSpan(RangeIndexManager.IndexSizeBytes)); - var snapshotBytes = payload.AsSpan( - RangeIndexManager.IndexSizeBytes + 4, snapshotLen); - - // 2. Write snapshot to local file - var localSnapshotPath = DeriveSnapshotPath(key); - File.WriteAllBytes(localSnapshotPath, snapshotBytes.ToArray()); - - // 3. Restore BfTree from snapshot - RangeIndexManager.ReadIndex(stub, out _, out var cacheSize, ...); - var newTreePtr = rangeIndexManager.RestoreFromSnapshot( - localSnapshotPath, cacheSize, minRecordSize, maxRecordSize, - maxKeyLen, leafPageSize); - - // 4. Build new stub with updated TreePtr + ProcessInstanceId - var newStubBytes = new byte[RangeIndexManager.IndexSizeBytes]; - stub.CopyTo(newStubBytes); - rangeIndexManager.UpdateStubPointer(newStubBytes, newTreePtr); - - // 5. Insert into local main store with RangeIndex RecordType - InsertRangeIndexKey(keyBytes, newStubBytes); - break; -``` +4. **Headers.** Transmission uses `SetClusterMigrateHeader(sourceNodeId, replaceOption, + isVectorSets: false)` — RangeIndex keys live in moving hash slots, so the destination + applies the normal importing-slot validation. + +**Per-migration temp file for safety.** `SnapshotForMigration` writes to a unique temp +file under `{dataDir}/rangeindex/.migration-tmp/{guid}.bftree` rather than streaming +from `data.bftree` or `flush.bftree` directly. This is necessary because: +- **Live tree:** The native BfTree actively writes to `data.bftree`. A concurrent Tsavorite + page flush would trigger `OnFlush → SnapshotTreeForFlush`, overwriting `flush.bftree`. + Neither file is safe to read without holding the exclusive lock for the entire stream + duration, which would block all RI operations. +- **Evicted tree:** `flush.bftree` could be overwritten if the tree is concurrently restored + (by another client operation) and then flushed again by Tsavorite. +- **Checkpoints:** A checkpoint can run concurrently with migration (both use the same + exclusive lock but at different times). The checkpoint writes to `snapshot.{token}.bftree` + and `PurgeOldCheckpointSnapshots` can delete old snapshot files. The temp migration file + is immune to checkpoint lifecycle operations. + +> [!IMPORTANT] +> Once the slot transitions to `MIGRATING` the existing slot-verify guard in +> `ClusterSession.SingleKeySlotVerify` rejects further writes to keys in that slot, so +> the tree is stable during snapshot. We do **not** need Vector-Set-style "pause WRITE +> commands" plumbing — RangeIndex keys map 1:1 to hash slots. + +#### Destination side — reassembly + finalization + +On the destination, `RespClusterMigrateCommands.NetworkClusterMigrate` peels the +`MigrationRecordSpanType` tag and dispatches: + +- **`RangeIndexSnapshotChunk`** → `RangeIndexManager.HandleMigratedRangeIndexChunk(payload)`. + No importing-slot check yet — we're just assembling bytes. Behavior: + 1. Parse the key + chunk header from the payload. + 2. Look up (or create) an `InboundRestoreState` entry in `inboundRestoreByKey`, keyed + by the migrating key. Holds the open `FileStream` pointing at + `{dataDir}/rangeindex/{key_hash}/restore.bftree`, the expected `totalChunks`, the + next expected `chunkIndex`, and the running byte count. + 3. Validate `chunkIndex` is the next expected one, write the chunk bytes, advance. + 4. When `chunkIndex == totalChunks - 1`, close the file but leave the entry in + `inboundRestoreByKey` — we still need the stub. + +- **`RangeIndexStub`** → importing-slot check on the key, then + `RangeIndexManager.HandleMigratedRangeIndexStub(ref stringBasicContext, payload)`. + Behavior: + 1. Look up the `InboundRestoreState` entry — error if the stub arrives without all + chunks assembled. + 2. Rename `restore.bftree` → `data.bftree` (atomic move on both Windows and Linux). + 3. Decode the 51-byte stub to extract `cacheSize`, `minRecordSize`, `maxRecordSize`, + `maxKeyLen`, `leafPageSize`, `storageBackend`. + 4. `bfTree = BfTreeService.RecoverFromSnapshot(workingPath, storageBackend, cacheSize, + minRecordSize, maxRecordSize, maxKeyLen, leafPageSize)`. On failure, delete the + working file and return an error — the migration will be retried / aborted. + 5. Rewrite the 51-byte stub: `TreeHandle = bfTree.NativePtr`, clear `Flags` + (not flushed / no serialization in-flight), stamp the local + `ProcessInstanceId`, zero `SerializationPhase`. + 6. Publish the stub into the main store by issuing an internal + `RICREATE` RMW inline via `stringBasicContext.RMW` + `StorageSession.CompletePendingForSession`. + The RMW path is used (not `Upsert`) so a pre-existing RangeIndex key under the + same name is detected and the migration fails cleanly rather than silently + clobbering state. + 7. `RangeIndexManager.RegisterIndex(bfTree, keyHash, keyBytes)` wires the new native + instance into the per-key lock map and instance-id registry. + 8. Remove the entry from `inboundRestoreByKey`. + +Files on the destination live at the **same deterministic path** as source-side trees: +`{dataDir}/rangeindex/{key_hash}/data.bftree`. No bookkeeping beyond the transient +`restore.bftree` rename is needed. + +> [!NOTE] +> The destination's `RangeIndexManager` already generates a fresh `processInstanceId` at +> startup (see Recovery above), so stamping it onto the migrated stub is sufficient to +> make all subsequent cold-read / promote logic work transparently on the new node. + +#### The `MIGRATE … KEYS` path + +`MigrateSessionKeys.MigrateKeysFromStore` does not run the parallel store scanner. Instead +it handles key discovery and transmission in distinct phases: + +1. **Upfront discovery.** Before transmitting any keys, two discovery passes identify + special key types that require out-of-band migration: + - `VectorManager.GetNamespacesForKeys` — reads each key via `VSIM` through the string + context. VectorSet keys return OK; non-VectorSet keys return WRONGTYPE and are skipped. + Also extracts namespace IDs for VectorSet element migration. + - `RangeIndexManager.GetRangeIndexKeysForMigration` — reads each key via `RIGET` through + the string context. RangeIndex keys return OK + stub bytes; non-RI keys return WRONGTYPE. + Returns a `Dictionary` mapping RI key bytes to stub bytes. + +2. **Regular key transmission.** `TransmitKeys` iterates `sketch.Keys` and ships each key + via `Read_UnifiedStore(MIGRATE)` → `WriteOrSendRecord`. Keys identified as VectorSet or + RangeIndex in step 1 are **skipped** (not transmitted as `LogRecord` records) and not + marked for deletion by `DeleteKeys`. + +3. **VectorSet migration.** Element data is transmitted via a full log scan + (`TransmitKeysNamespaces`), then VectorSet index records are shipped with namespace + remapping. (Same as before — see VectorSet migration docs.) + +4. **RangeIndex migration.** Each discovered RI key is transmitted via `TransmitRangeIndex` + (same source-side driver as the SLOTS path). `CreateSerializer` calls + `SnapshotForMigration` which acquires the exclusive lock, re-reads the stub for a fresh + `TreeHandle`, and snapshots to a temp migration file. After the destination ACKs, the + source key is deleted via `DeleteRangeIndex`. + +All destination dispatch code is shared with the SLOTS path. + +#### Failure modes + +- **Chunk out of order / missing.** Destination rejects with an error and drops the + `InboundRestoreState`. The source retries the batch under the normal migration retry + policy; a retry re-snapshots and restarts chunk 0. +- **Stub arrives without a completed chunk stream.** Destination rejects; same retry. +- **`RecoverFromSnapshot` fails.** Destination returns error to source; source leaves + the key intact (no `DeleteRangeIndex`) and marks the migration as failed so + `MigrateSession.TryRecoverFromFailure` can revert slot state. +- **Source crashes mid-stream.** The partial `restore.bftree` on the destination is a + stray file in the per-key directory; it is overwritten on retry and cleaned up by a + periodic sweep (same mechanism used for orphaned `flush.bftree` files). --- @@ -2665,15 +2780,19 @@ These are not yet implemented and will be added when migration support is built. | # | File Path | Purpose | |---|---|---| | NEW | `libs/server/Resp/RangeIndex/RangeIndexManager.Persistence.cs` | `DisposeRecord` handler, `OnSnapshotRecord` handler, snapshot path derivation | -| NEW | `libs/server/Resp/RangeIndex/RangeIndexManager.Migration.cs` | `HandleMigratedRangeIndexKey()`, migration serialization/deserialization | +| NEW | `libs/server/Resp/RangeIndex/RangeIndexManager.Migration.cs` | Source: `SnapshotForMigration()`, `CleanupMigrationArtifacts()`. Destination: `HandleMigratedRangeIndexChunk()`, `HandleMigratedRangeIndexStub()`, `InboundRestoreState`, path helpers. `DefaultMigrationChunkSize` const. | +| NEW | `libs/cluster/Server/Migration/MigrateSessionRangeIndex.cs` | Source-side driver `TransmitRangeIndex(MigrateOperation, keyBytes, stubBytes)` — snapshots, opens the data file, streams chunks via `TryWriteRecordSpan`, emits the final stub record. | | MOD | `libs/server/Databases/DatabaseManagerBase.cs` | *(no RangeIndex-specific changes needed — checkpoint handled via per-record callback)* | | MOD | `libs/server/Databases/SingleDatabaseManager.cs` | *(no RangeIndex-specific changes needed — recovery is lazy)* | | MOD | `libs/cluster/Server/Replication/CheckpointFileType.cs` | Add `RANGEINDEX_SNAPSHOT` enum value | | MOD | `libs/cluster/Server/Replication/PrimaryOps/ReplicaSyncSession.cs` | Send BfTree snapshot files during replica sync | -| MOD | `libs/cluster/Session/RespClusterMigrateCommands.cs` | Handle `RISTORE` type during key migration | -| MOD | `libs/cluster/Server/Migration/MigrateSessionKeys.cs` | Detect RangeIndex `RecordType`, serialize BfTree, 2-phase transmit | -| MOD | `libs/cluster/Server/Migration/MigrateScanFunctions.cs` | Check `RecordType` during slot scan | -| MOD | `libs/native/bftree-garnet/src/lib.rs` | *(future)* Add `bftree_serialize_to_buffer`, `bftree_deserialize_from_buffer` for migration | +| MOD | `libs/cluster/Session/RespClusterMigrateCommands.cs` | Destination dispatcher: add `RangeIndexSnapshotChunk` and `RangeIndexStub` branches inside `NetworkClusterMigrate`. | +| MOD | `libs/cluster/Server/Migration/MigrateOperation.cs` | Add `rangeIndexKeysToMigrate` dict, `RangeIndexes`, `EncounteredRangeIndex`, `DeleteRangeIndex`. Mirrors the Vector Set hooks. | +| MOD | `libs/cluster/Server/Migration/MigrateSessionSlots.cs` | After the scan workers finish and the Vector Set drain, run a new inline loop over `mo.RangeIndexes` that calls `TransmitRangeIndex`. | +| MOD | `libs/cluster/Server/Migration/MigrateSessionKeys.cs` | KEYS path: when the per-key probe sees `RangeIndexRecordType`, route through `TransmitRangeIndex` instead of the default `TryWriteRecordSpan(LogRecord)` branch. | +| MOD | `libs/cluster/Server/Migration/MigrateScanFunctions.cs` | `Reader`: detect `RangeIndexRecordType` and call `mo.EncounteredRangeIndex(key, stubBytes)` instead of shipping the record verbatim. | +| MOD | `libs/client/ClientSession/GarnetClientSessionIncremental.cs` | Add `MigrationRecordSpanType.RangeIndexSnapshotChunk = 4` and `RangeIndexStub = 5`. | +| MOD | `libs/server/Resp/RangeIndex/RangeIndexManager.cs` | Make `RangeIndexRecordType` and migration entry points `public` — they are called from `Garnet.cluster`, which is a separate assembly without `InternalsVisibleTo`. | ---