Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions libs/client/ClientSession/GarnetClientSessionIncremental.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,18 @@ public enum MigrationRecordSpanType : byte
/// Bespoke encoding for Vector Set indexes.
/// </summary>
VectorSetIndex = 3,

/// <summary>
/// A single chunk of a gzipped BfTree snapshot. Streamed before <see cref="RangeIndexStub"/>.
/// Payload format: <c>[int keyLen][key bytes][int chunkIndex][int totalChunks][long uncompressedSnapshotSize][int chunkLen][chunk bytes]</c>.
/// </summary>
RangeIndexSnapshotChunk = 4,

/// <summary>
/// Finalizer for a RangeIndex (BfTree) migration. Sent after all <see cref="RangeIndexSnapshotChunk"/> entries for this key.
/// Payload format: <c>[int keyLen][key bytes][stub bytes]</c>.
/// </summary>
RangeIndexStub = 5,
}

public sealed unsafe partial class GarnetClientSession : IServerHook, IMessageConsumer
Expand Down
22 changes: 22 additions & 0 deletions libs/cluster/Server/Migration/MigrateOperation.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ internal sealed partial class MigrateOperation
public StoreScan storeScan;

private readonly ConcurrentDictionary<byte[], byte[]> vectorSetsIndexKeysToMigrate;
private readonly ConcurrentDictionary<byte[], byte[]> rangeIndexKeysToMigrate;

readonly MigrateSession session;
readonly GarnetClientSession gcs;
Expand All @@ -30,6 +31,8 @@ internal sealed partial class MigrateOperation

public IEnumerable<KeyValuePair<byte[], byte[]>> VectorSets => vectorSetsIndexKeysToMigrate;

public IEnumerable<KeyValuePair<byte[], byte[]>> RangeIndexes => rangeIndexKeysToMigrate;

public void ThrowIfCancelled() => session._cts.Token.ThrowIfCancellationRequested();

public bool Contains(int slot) => session._sslots.Contains(slot);
Expand All @@ -45,6 +48,9 @@ public bool ContainsNamespace(ReadOnlySpan<byte> namespaceBytes)
public void EncounteredVectorSet(byte[] key, byte[] value)
=> vectorSetsIndexKeysToMigrate.TryAdd(key, value);

public void EncounteredRangeIndex(byte[] key, byte[] stubBytes)
=> rangeIndexKeysToMigrate.TryAdd(key, stubBytes);

public MigrateOperation(MigrateSession session, Sketch sketch = null, int batchSize = 1 << 18)
{
this.session = session;
Expand All @@ -54,6 +60,7 @@ public MigrateOperation(MigrateSession session, Sketch sketch = null, int batchS
storeScan = new StoreScan(this);
keysToDelete = [];
vectorSetsIndexKeysToMigrate = new(ByteArrayComparer.Instance);
rangeIndexKeysToMigrate = new(ByteArrayComparer.Instance);
}

public bool Initialize()
Expand Down Expand Up @@ -284,6 +291,21 @@ public void DeleteVectorSet(PinnedSpanByte key)

session.logger?.LogDebug("Deleting Vector Set {key} after migration: {delRes}", System.Text.Encoding.UTF8.GetString(key), delRes);
}

/// <summary>
/// Delete a migrated RangeIndex key after transfer if _copyOption is not set.
/// The normal DELETE path triggers the RangeIndex eviction callback which tears down
/// the native BfTree instance and removes its data files.
/// </summary>
public void DeleteRangeIndex(PinnedSpanByte key)
{
if (session._copyOption)
return;

var delRes = localServerSession.BasicGarnetApi.DELETE(key);

session.logger?.LogDebug("Deleting RangeIndex {key} after migration: {delRes}", System.Text.Encoding.UTF8.GetString(key), delRes);
}
}
}
}
7 changes: 7 additions & 0 deletions libs/cluster/Server/Migration/MigrateScanFunctions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@ public bool Reader<TSourceLogRecord>(in TSourceLogRecord srcLogRecord, RecordMet
// we just need to remember it to migrate once the associated namespaces are all moved over
migrateOperation.EncounteredVectorSet(key.ToArray(), srcLogRecord.ValueSpan.ToArray());
}
else if (srcLogRecord.RecordType == RangeIndexManager.RangeIndexRecordType)
{
// RangeIndex stubs can't be migrated via the normal LogRecord path because the
// actual BfTree data lives in a separate on-disk file. Capture the key + stub and
// defer to the post-scan RangeIndex migration driver which snapshots/compresses/chunks.
migrateOperation.EncounteredRangeIndex(key.ToArray(), srcLogRecord.ValueSpan.ToArray());
}
else if (!migrateOperation.sketch.TryHashAndStore(key))
{
return false;
Expand Down
181 changes: 181 additions & 0 deletions libs/cluster/Server/Migration/MigrateSessionRangeIndex.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

using System;
using System.Buffers;
using System.Buffers.Binary;
using System.IO;
using Garnet.client;
using Garnet.server;
using Microsoft.Extensions.Logging;
using Tsavorite.core;

namespace Garnet.cluster
{
internal sealed partial class MigrateSession : IDisposable
{
/// <summary>
/// Ship a single RangeIndex (BfTree) key from the source primary to the destination primary
/// as a sequence of <see cref="MigrationRecordSpanType.RangeIndexSnapshotChunk"/> payloads
/// followed by a <see cref="MigrationRecordSpanType.RangeIndexStub"/> finalizer.
/// </summary>
/// <remarks>
/// <para>The BfTree on-disk file format is not directly transportable as a single network record because
/// it may be large (GB-scale) and because the backing <c>TreeHandle</c> in the stub is a local process
/// pointer. To transport it we:
/// <list type="number">
/// <item>Take a point-in-time snapshot of the live tree under the per-key exclusive lock.</item>
/// <item>GZip-compress the snapshot into a sibling <c>.gz</c> file.</item>
/// <item>Stream the compressed file as fixed-size chunks via the existing migration transport.</item>
/// <item>Send the raw stub bytes last; the destination treats that record as the finalizer.</item>
/// <item>Clean up the temp files and delete the source key (unless <c>COPY</c> is set).</item>
/// </list></para>
/// </remarks>
/// <param name="mo">The source migrate operation whose client connects to the destination.</param>
/// <param name="keyBytes">The Garnet key of the BfTree being migrated.</param>
/// <param name="stubBytes">The raw stub bytes captured during scan (contains config fields).</param>
/// <returns><c>true</c> on a fully-successful transfer; <c>false</c> on any failure (migration should abort).</returns>
private bool TransmitRangeIndex(MigrateOperation mo, byte[] keyBytes, byte[] stubBytes)
{
var rim = clusterProvider.storeWrapper.DefaultDatabase.RangeIndexManager;
if (rim is null)
{
logger?.LogError("TransmitRangeIndex: RangeIndexManager is null on source — cannot migrate");
return false;
}

if (!rim.SnapshotForMigration(keyBytes, stubBytes, out var gzPath, out var uncompressedSize, out var compressedSize))
{
logger?.LogError("TransmitRangeIndex: snapshot+compress failed for key len={keyLen}", keyBytes.Length);
return false;
}

var chunkSize = RangeIndexManager.DefaultMigrationChunkSize;
var totalChunks = compressedSize == 0 ? 1 : (int)((compressedSize + chunkSize - 1) / chunkSize);
var headerSize = sizeof(int) + keyBytes.Length + sizeof(int) + sizeof(int) + sizeof(long) + sizeof(int);
var recordSize = headerSize + chunkSize;

var recordBuffer = ArrayPool<byte>.Shared.Rent(recordSize);
var chunkScratch = ArrayPool<byte>.Shared.Rent(chunkSize);

var gcs = mo.Client;

try
{
using (var input = new FileStream(gzPath, FileMode.Open, FileAccess.Read, FileShare.Read, bufferSize: 1 << 16))
{
for (var chunkIndex = 0; chunkIndex < totalChunks; chunkIndex++)
{
// Read up to chunkSize bytes. ReadBlock in .NET FileStream is emulated via loop of Read calls.
var read = 0;
while (read < chunkSize)
{
var n = input.Read(chunkScratch, read, chunkSize - read);
if (n <= 0)
break;
read += n;
}

// Build the chunk payload: [keyLen][key][chunkIndex][totalChunks][uncompressedSize][chunkLen][chunk bytes]
var offset = 0;
BinaryPrimitives.WriteInt32LittleEndian(recordBuffer.AsSpan(offset), keyBytes.Length);
offset += sizeof(int);
keyBytes.CopyTo(recordBuffer.AsSpan(offset));
offset += keyBytes.Length;
BinaryPrimitives.WriteInt32LittleEndian(recordBuffer.AsSpan(offset), chunkIndex);
offset += sizeof(int);
BinaryPrimitives.WriteInt32LittleEndian(recordBuffer.AsSpan(offset), totalChunks);
offset += sizeof(int);
BinaryPrimitives.WriteInt64LittleEndian(recordBuffer.AsSpan(offset), uncompressedSize);
offset += sizeof(long);
BinaryPrimitives.WriteInt32LittleEndian(recordBuffer.AsSpan(offset), read);
offset += sizeof(int);
Array.Copy(chunkScratch, 0, recordBuffer, offset, read);
offset += read;

if (gcs.NeedsInitialization)
gcs.SetClusterMigrateHeader(_sourceNodeId, _replaceOption, isVectorSets: false);

while (!gcs.TryWriteRecordSpan(recordBuffer.AsSpan(0, offset), MigrationRecordSpanType.RangeIndexSnapshotChunk, out var task))
{
if (!HandleMigrateTaskResponse(task))
{
logger?.LogCritical("TransmitRangeIndex: flush failed while sending chunk {idx}/{total}", chunkIndex, totalChunks);
return false;
}

gcs.SetClusterMigrateHeader(_sourceNodeId, _replaceOption, isVectorSets: false);
}
}
}

// Finalizer: [keyLen][key][stub bytes]
{
var finalSize = sizeof(int) + keyBytes.Length + stubBytes.Length;
var finalizer = finalSize <= recordBuffer.Length ? recordBuffer : ArrayPool<byte>.Shared.Rent(finalSize);
try
{
var offset = 0;
BinaryPrimitives.WriteInt32LittleEndian(finalizer.AsSpan(offset), keyBytes.Length);
offset += sizeof(int);
keyBytes.CopyTo(finalizer.AsSpan(offset));
offset += keyBytes.Length;
stubBytes.CopyTo(finalizer.AsSpan(offset));
offset += stubBytes.Length;

if (gcs.NeedsInitialization)
gcs.SetClusterMigrateHeader(_sourceNodeId, _replaceOption, isVectorSets: false);

while (!gcs.TryWriteRecordSpan(finalizer.AsSpan(0, offset), MigrationRecordSpanType.RangeIndexStub, out var task))
{
if (!HandleMigrateTaskResponse(task))
{
logger?.LogCritical("TransmitRangeIndex: flush failed while sending stub finalizer");
return false;
}

gcs.SetClusterMigrateHeader(_sourceNodeId, _replaceOption, isVectorSets: false);
}
}
finally
{
if (!ReferenceEquals(finalizer, recordBuffer))
ArrayPool<byte>.Shared.Return(finalizer);
}
}

// Force a flush so the destination has definitely seen everything before we delete the source.
if (!HandleMigrateTaskResponse(gcs.SendAndResetIterationBuffer()))
{
logger?.LogCritical("TransmitRangeIndex: flush failed before source-side delete");
return false;
}

unsafe
{
fixed (byte* keyPtr = keyBytes)
{
var pinned = PinnedSpanByte.FromPinnedPointer(keyPtr, keyBytes.Length);
mo.DeleteRangeIndex(pinned);
}
}

logger?.LogInformation(
"TransmitRangeIndex: migrated key len={keyLen} uncompressed={uncompressed}B compressed={compressed}B chunks={chunks}",
keyBytes.Length, uncompressedSize, compressedSize, totalChunks);
return true;
}
catch (Exception ex)
{
logger?.LogError(ex, "TransmitRangeIndex: unexpected failure");
return false;
}
finally
{
ArrayPool<byte>.Shared.Return(recordBuffer);
ArrayPool<byte>.Shared.Return(chunkScratch);
rim.CleanupMigrationArtifacts(keyBytes);
}
}
}
}
16 changes: 16 additions & 0 deletions libs/cluster/Server/Migration/MigrateSessionSlots.cs
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,22 @@ async Task<bool> CreateAndRunMigrateTasks(long beginAddress, long tailAddress, i
}
}
}
// Handle migration of discovered RangeIndex (BfTree) keys. These are shipped as a
// sequence of GZipped snapshot chunks followed by a stub finalizer record.
var rangeIndexes = migrateOperation.SelectMany(static mo => mo.RangeIndexes).GroupBy(static g => g.Key, ByteArrayComparer.Instance).ToDictionary(static g => g.Key, g => g.First().Value, ByteArrayComparer.Instance);

if (rangeIndexes.Count > 0)
{
var rimo = migrateOperation[0];
foreach (var (key, stub) in rangeIndexes)
{
if (!TransmitRangeIndex(rimo, key, stub))
{
logger?.LogCritical("Failed to migrate RangeIndex key during migration");
return false;
}
}
}
}
catch (Exception ex)
{
Expand Down
37 changes: 37 additions & 0 deletions libs/cluster/Session/RespClusterMigrateCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,43 @@ void Process(BasicGarnetApi basicGarnetApi, byte[] input, bool replaceOption, bo

clusterProvider.storeWrapper.DefaultDatabase.VectorManager.HandleMigratedElementKey(ref stringBasicContext, ref vectorBasicContext, namespaceBytes, keyBytes, valueBytes);
}
else if (kind == MigrationRecordSpanType.RangeIndexSnapshotChunk)
{
// Intermediate chunk for a BFTree snapshot. No slot check is performed per chunk;
// the final stub record (which carries the key) is validated against the importing
// slot below. If migrateState is already an error state, simply discard.
if (migrateState > 0)
{
i++;
continue;
}

clusterProvider.storeWrapper.DefaultDatabase.RangeIndexManager.HandleMigratedRangeIndexChunk(payloadRaw.ReadOnlySpan);
}
else if (kind == MigrationRecordSpanType.RangeIndexStub)
{
var payload = payloadRaw.ReadOnlySpan;

// Payload layout: [int keyLen][key bytes][stub bytes]
var keyLen = BinaryPrimitives.ReadInt32LittleEndian(payload);
var keyBytes = payload.Slice(sizeof(int), keyLen);

if (migrateState > 0)
{
i++;
continue;
}

var slot = HashSlotUtils.HashSlot(keyBytes);
if (!currentConfig.IsImportingSlot(slot))
{
migrateState = 1;
i++;
continue;
}

clusterProvider.storeWrapper.DefaultDatabase.RangeIndexManager.HandleMigratedRangeIndexStub(ref stringBasicContext, payload);
}
else if (kind == MigrationRecordSpanType.LogRecord)
{
// An error has occurred
Expand Down
Loading
Loading