Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions libs/cluster/Server/ClusterConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ internal sealed partial class ClusterConfig
/// </summary>
public const int MAX_HASH_SLOT_VALUE = 16384;

/// <summary>
/// Version of the cluster config serialization format.
/// Increment when the binary layout of <see cref="ToByteArray"/>/<see cref="FromByteArray"/> changes.
/// </summary>
public const byte ClusterConfigVersion = 1;

/// <summary>
///
/// </summary>
Expand Down
36 changes: 32 additions & 4 deletions libs/cluster/Server/ClusterConfigSerializer.cs
Original file line number Diff line number Diff line change
@@ -1,20 +1,40 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

using System;
using System.IO;
using System.Text;

namespace Garnet.cluster
{
internal sealed partial class ClusterConfig
{
/// <summary>
/// Peek the serialization version from a config byte array without full deserialization.
/// </summary>
/// <param name="data">Serialized cluster config payload.</param>
/// <param name="version">The version byte at the start of the payload.</param>
/// <returns>True if the payload is large enough to contain a version byte; false otherwise.</returns>
public static bool TryPeekVersion(ReadOnlySpan<byte> data, out byte version)
{
if (data.Length < 1)
{
version = 0;
return false;
}
version = data[0];
return true;
}

/// <summary>
/// Serialize config to byte array
/// </summary>
public byte[] ToByteArray()
{
var ms = new MemoryStream();
var writer = new BinaryWriter(ms, Encoding.ASCII);
var writer = new BinaryWriter(ms);

// Write serialization format version
writer.Write(ClusterConfigVersion);

SerializeSlotMap(ref ms, ref writer);

Expand Down Expand Up @@ -60,6 +80,7 @@ public byte[] ToByteArray()
private void SerializeSlotMap(ref MemoryStream ms, ref BinaryWriter writer)
{
//serialize slotMap
var segmentCountPosition = ms.Position;
ms.Position += 2;
ushort segmentCount = 0;
ushort count = 1;
Expand Down Expand Up @@ -93,9 +114,9 @@ private void SerializeSlotMap(ref MemoryStream ms, ref BinaryWriter writer)
writer.Write(workerId);
writer.Write(state);

//Write segment count in the beginning of memory stream
//Write segment count at the reserved position
var _position = ms.Position;
ms.Position = 0;
ms.Position = segmentCountPosition;
writer.Write(segmentCount);
ms.Position = _position;
}
Expand All @@ -108,6 +129,13 @@ public static ClusterConfig FromByteArray(byte[] other)
var ms = new MemoryStream(other);
var reader = new BinaryReader(ms);
Comment thread
vazois marked this conversation as resolved.

// Read and validate serialization format version
if (other.Length < 1)
throw new InvalidDataException("Invalid ClusterConfig payload: too short to contain a version");
var version = reader.ReadByte();
if (version != ClusterConfigVersion)
throw new InvalidDataException($"Incompatible ClusterConfig version: expected {ClusterConfigVersion}, got {version}");

Comment thread
vazois marked this conversation as resolved.
var newSlotMap = DeserializeSlotMap(ref reader);

int numWorkers = reader.ReadInt32();
Expand Down
19 changes: 14 additions & 5 deletions libs/cluster/Server/Failover/ReplicaFailoverSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -216,13 +216,22 @@ private async Task BroadcastConfigAndRequestAttachAsync(string replicaId, byte[]
{
clusterProvider.clusterManager.gossipStats.UpdateGossipBytesRecv(resp.Length);
var returnedConfigArray = resp.Span.ToArray();
var other = ClusterConfig.FromByteArray(returnedConfigArray);

// Check if gossip is from a node that is known and trusted before merging
if (current.IsKnown(other.LocalNodeId))
_ = clusterProvider.clusterManager.TryMerge(ClusterConfig.FromByteArray(returnedConfigArray));
// Validate config version before full deserialization
if (!ClusterConfig.TryPeekVersion(returnedConfigArray, out var version) || version != ClusterConfig.ClusterConfigVersion)
{
logger?.LogWarning("Received failover gossip response with incompatible config version: {version}", version);
}
else
logger?.LogWarning("Received gossip from unknown node: {node-id}", other.LocalNodeId);
{
var other = ClusterConfig.FromByteArray(returnedConfigArray);

// Check if gossip is from a node that is known and trusted before merging
if (current.IsKnown(other.LocalNodeId))
_ = clusterProvider.clusterManager.TryMerge(other);
else
logger?.LogWarning("Received gossip from unknown node: {node-id}", other.LocalNodeId);
}
}
}
catch (Exception ex)
Expand Down
10 changes: 9 additions & 1 deletion libs/cluster/Server/Gossip/GarnetServerNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -189,11 +189,19 @@ private async Task GossipAsync(byte[] configByteArray)
{
clusterProvider.clusterManager.gossipStats.UpdateGossipBytesRecv(resp.Length);
var returnedConfigArray = resp.Span.ToArray();

// Validate config version before full deserialization
if (!ClusterConfig.TryPeekVersion(returnedConfigArray, out var version) || version != ClusterConfig.ClusterConfigVersion)
{
logger?.LogWarning("Received gossip response with incompatible config version: {version}", version);
return;
}

var other = ClusterConfig.FromByteArray(returnedConfigArray);
var current = clusterProvider.clusterManager.CurrentConfig;
// Check if gossip is from a node that is known and trusted before merging
if (current.IsKnown(other.LocalNodeId))
clusterProvider.clusterManager.TryMerge(ClusterConfig.FromByteArray(returnedConfigArray));
clusterProvider.clusterManager.TryMerge(other);
else
logger?.LogWarning("Received gossip from unknown node: {node-id}", other.LocalNodeId);
}
Expand Down
36 changes: 24 additions & 12 deletions libs/cluster/Server/Gossip/Gossip.cs
Original file line number Diff line number Diff line change
Expand Up @@ -186,21 +186,33 @@ public async Task TryMeetAsync(string address, int port, bool acquireLock = true
resp = await gsn.TryMeetAsync(conf.ToByteArray()).ConfigureAwait(false);
if (resp.Length > 0)
{
var other = ClusterConfig.FromByteArray(resp.Span.ToArray());
nodeId = other.LocalNodeId;
gsn.NodeId = nodeId;
var respArray = resp.Span.ToArray();

logger?.LogInformation("MEET {nodeId} {address} {port}", nodeId, address, port);
// Merge without a check because node is trusted as meet was issued by admin
_ = TryMerge(other, acquireLock);
// Validate config version before full deserialization
if (!ClusterConfig.TryPeekVersion(respArray, out var version) || version != ClusterConfig.ClusterConfigVersion)
{
logger?.LogWarning("MEET response has incompatible config version: {version}", version);
Comment thread
vazois marked this conversation as resolved.
if (created) gsn?.Dispose();
gossipStats.UpdateMeetRequestsFailed();
}
else
{
var other = ClusterConfig.FromByteArray(respArray);
nodeId = other.LocalNodeId;
gsn.NodeId = nodeId;

logger?.LogInformation("MEET {nodeId} {address} {port}", nodeId, address, port);
// Merge without a check because node is trusted as meet was issued by admin
_ = TryMerge(other, acquireLock);

gossipStats.UpdateMeetRequestsSucceed();
gossipStats.UpdateMeetRequestsSucceed();

// If failed to add newly created connection dispose of it to reclaim resources
// Dispose only connections that this meet task has created to avoid conflicts with existing connections from gossip main thread
// After connection is added we are no longer the owner. Background gossip task will be owner
if (created && !await clusterConnectionStore.AddConnectionAsync(gsn).ConfigureAwait(false))
gsn.Dispose();
// If failed to add newly created connection dispose of it to reclaim resources
// Dispose only connections that this meet task has created to avoid conflicts with existing connections from gossip main thread
// After connection is added we are no longer the owner. Background gossip task will be owner
if (created && !await clusterConnectionStore.AddConnectionAsync(gsn).ConfigureAwait(false))
gsn.Dispose();
}
}
}
catch (Exception ex)
Expand Down
34 changes: 25 additions & 9 deletions libs/cluster/Server/Replication/ReplicationHistoryManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

using System;
using System.IO;
using System.Text;
using System.Threading;
using Garnet.common;
using Garnet.server;
Expand All @@ -14,6 +13,12 @@ namespace Garnet.cluster
{
internal sealed class ReplicationHistory
{
/// <summary>
/// Version of the replication history serialization format.
/// Increment when the binary layout of <see cref="ToByteArray"/>/<see cref="FromByteArray"/> changes.
/// </summary>
public const byte ReplicationHistoryVersion = 1;

public string PrimaryReplId => primary_replid;
string primary_replid;
public string PrimaryReplId2 => primary_replid2;
Expand Down Expand Up @@ -43,8 +48,9 @@ public ReplicationHistory Copy()
public byte[] ToByteArray()
{
using var ms = new MemoryStream();
using var writer = new BinaryWriter(ms, Encoding.ASCII);
using var writer = new BinaryWriter(ms);

writer.Write(ReplicationHistoryVersion);
writer.Write(primary_replid);
writer.Write(primary_replid2);
replicationOffset.Serialize(writer);
Expand All @@ -59,6 +65,14 @@ public static ReplicationHistory FromByteArray(byte[] data)
using var ms = new MemoryStream(data);
using var reader = new BinaryReader(ms);
Comment thread
vazois marked this conversation as resolved.

// Read and validate serialization format version
if (data.Length < 1)
throw new InvalidDataException("Invalid ReplicationHistory payload: too short to contain a version");

var version = reader.ReadByte();
if (version != ReplicationHistoryVersion)
throw new InvalidDataException($"Incompatible ReplicationHistory version: expected {ReplicationHistoryVersion}, got {version}");

var primary_replid = reader.ReadString();
var primary_replid2 = reader.ReadString();
var replicationOffset = AofAddress.Deserialize(reader);
Expand Down Expand Up @@ -105,13 +119,15 @@ private void InitializeReplicationHistory(int aofPhysicalSublogCount)
private void RecoverReplicationHistory()
{
var replConfig = ClusterUtils.ReadDevice(replicationConfigDevice, replicationConfigDevicePool, logger);
currentReplicationConfig = ReplicationHistory.FromByteArray(replConfig);
//TODO: handle scenario where replica crashed before became a primary and it has two replication ids
//var current = storeWrapper.clusterManager.CurrentConfig;
//if(current.GetLocalNodeRole() == NodeRole.REPLICA && !primary_replid2.Equals(Generator.DefaultHexId()))
//{

//}
try
{
currentReplicationConfig = ReplicationHistory.FromByteArray(replConfig);
}
catch (Exception ex) when (ex is InvalidDataException or EndOfStreamException or IOException)
{
logger?.LogWarning(ex, "Corrupt or incompatible replication history on disk, reinitializing fresh state");
InitializeReplicationHistory(storeWrapper.serverOptions.AofPhysicalSublogCount);
}
}

private void TryUpdateMyPrimaryReplId(string primaryReplicationId)
Expand Down
44 changes: 26 additions & 18 deletions libs/cluster/Session/RespClusterBasicCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -385,28 +385,36 @@ private bool NetworkClusterGossip(out bool invalidParameters)
// Try merge if not just a ping message
if (gossipMessage.Length > 0)
{
var other = ClusterConfig.FromByteArray(gossipMessage);
// Accept gossip message if it is a gossipWithMeet or node from node that is already known and trusted
// GossipWithMeet messages are only send through a call to CLUSTER MEET at the remote node
if (gossipWithMeet || current.IsKnown(other.LocalNodeId))
// Validate config version before full deserialization
if (!ClusterConfig.TryPeekVersion(gossipMessage, out var version) || version != ClusterConfig.ClusterConfigVersion)
{
// NOTE: release the epoch to avoid deadlock with MIGRATE config suspension
ReleaseCurrentEpoch();
try
{
_ = clusterProvider.clusterManager.TryMerge(other);
}
finally
logger?.LogWarning("Received gossip with incompatible config version: {version}", version);
}
else
{
var other = ClusterConfig.FromByteArray(gossipMessage);
// Accept gossip message if it is a gossipWithMeet or node from node that is already known and trusted
// GossipWithMeet messages are only send through a call to CLUSTER MEET at the remote node
if (gossipWithMeet || current.IsKnown(other.LocalNodeId))
{
AcquireCurrentEpoch();
// NOTE: release the epoch to avoid deadlock with MIGRATE config suspension
ReleaseCurrentEpoch();
try
{
_ = clusterProvider.clusterManager.TryMerge(other);
}
finally
{
AcquireCurrentEpoch();
}

// Remember that this connection is being used for another cluster node to talk to us
Debug.Assert(RemoteNodeId is null || RemoteNodeId == other.LocalNodeId, "Node Id shouldn't change once set for a connection");
RemoteNodeId = other.LocalNodeId;
}

// Remember that this connection is being used for another cluster node to talk to us
Debug.Assert(RemoteNodeId is null || RemoteNodeId == other.LocalNodeId, "Node Id shouldn't change once set for a connection");
RemoteNodeId = other.LocalNodeId;
else
logger?.LogWarning("Received gossip from unknown node: {node-id}", other.LocalNodeId);
}
else
logger?.LogWarning("Received gossip from unknown node: {node-id}", other.LocalNodeId);
}

// Respond if configuration has changed or gossipWithMeet option is specified
Expand Down
83 changes: 83 additions & 0 deletions test/Garnet.test.cluster/ClusterConfigTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -157,5 +157,88 @@ public void ClusterAnyIPAnnounce()
resp = client.QuitAsync().GetAwaiter().GetResult();
ClassicAssert.AreEqual("OK", resp);
}

[Test, Order(4)]
[Category("CLUSTER-CONFIG"), CancelAfter(1000)]
public void ClusterConfigVersionRoundTripTest()
{
var config = new ClusterConfig().InitializeLocalWorker(
Generator.CreateHexId(),
"127.0.0.1",
7001,
configEpoch: 1,
Garnet.cluster.NodeRole.PRIMARY,
null,
"");

var configBytes = config.ToByteArray();

// Verify version byte at start of payload
Assert.That(ClusterConfig.TryPeekVersion(configBytes, out var version), Is.True);
Assert.That(version, Is.EqualTo(ClusterConfig.ClusterConfigVersion));

// Round-trip should succeed
var restored = ClusterConfig.FromByteArray(configBytes);
Assert.That(restored.LocalNodeId, Is.EqualTo(config.LocalNodeId));
}

[Test, Order(5)]
[Category("CLUSTER-CONFIG"), CancelAfter(1000)]
public void ClusterConfigVersionMismatchThrowsTest()
{
var config = new ClusterConfig().InitializeLocalWorker(
Generator.CreateHexId(),
"127.0.0.1",
7001,
configEpoch: 1,
Garnet.cluster.NodeRole.PRIMARY,
null,
"");

var configBytes = config.ToByteArray();

// Corrupt the version byte (at index 0)
configBytes[0] = (byte)(ClusterConfig.ClusterConfigVersion + 1);

// Deserialization should throw
Assert.Throws<System.IO.InvalidDataException>(() => ClusterConfig.FromByteArray(configBytes));
}

[Test, Order(6)]
[Category("CLUSTER-CONFIG"), CancelAfter(1000)]
public void ClusterConfigTryPeekVersionEmptyDataTest()
{
Assert.That(ClusterConfig.TryPeekVersion([], out _), Is.False);
}

[Test, Order(7)]
[Category("CLUSTER-CONFIG"), CancelAfter(1000)]
public void ReplicationHistoryVersionRoundTripTest()
{
var history = new ReplicationHistory(1);
var bytes = history.ToByteArray();

// Verify version byte at start of payload
Assert.That(bytes[0], Is.EqualTo(ReplicationHistory.ReplicationHistoryVersion));

// Round-trip should succeed and preserve fields
var restored = ReplicationHistory.FromByteArray(bytes);
Assert.That(restored.PrimaryReplId, Is.EqualTo(history.PrimaryReplId));
Assert.That(restored.PrimaryReplId2, Is.EqualTo(history.PrimaryReplId2));
}

[Test, Order(9)]
[Category("CLUSTER-CONFIG"), CancelAfter(1000)]
public void ReplicationHistoryVersionMismatchThrowsTest()
{
var history = new ReplicationHistory(1);
var bytes = history.ToByteArray();

// Corrupt the version byte (at index 0)
bytes[0] = (byte)(ReplicationHistory.ReplicationHistoryVersion + 1);

// Deserialization should throw
Assert.Throws<System.IO.InvalidDataException>(() => ReplicationHistory.FromByteArray(bytes));
}
}
}
Loading