Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
4e73507
LightEpoch user-word primitive; fold SafeTailAddress refresh into pro…
Apr 17, 2026
cfa9db4
Fix RefreshSafeTailAddress read ordering with memory barrier
badrishc Apr 17, 2026
9c57217
Rename SafeTailShiftCallback to SafeTailPageShiftCallback; fix silent…
badrishc Apr 17, 2026
90b03f2
Address review feedback; remove SubscriberRefreshFrequencyMs
badrishc Apr 17, 2026
eef496b
Use cached SafeTailAddress in GetNext hot path
badrishc Apr 18, 2026
c830605
Replace visitor-pattern scan with direct unsafe GetMinUserWord
badrishc Apr 18, 2026
42816f3
Remove ForEachUserWord and ILightEpochUserWordVisitor
badrishc Apr 18, 2026
4dba557
Pre-refresh SafeTailAddress when multiple iterators are registered
badrishc Apr 18, 2026
8824a3a
Add comments for activeSingleIteratorCount usage and race safety
badrishc Apr 18, 2026
c6f7424
Remove CommitInflightEnqueue; simplify to Begin/End protocol
badrishc Apr 18, 2026
72cf5ea
Add tail-based fast path to skip epoch scan in RefreshSafeTailAddress
badrishc Apr 18, 2026
eaf2363
Add exponential backoff to consumer scan in ScanSingle iterator
badrishc Apr 18, 2026
bdb90aa
Replace lock with CAS in user-word allocation; remove per-Acquire/Rel…
badrishc Apr 22, 2026
5b691ad
Split NotifyParkedWaiters into inline fast-path + NoInlining slow path
badrishc Apr 22, 2026
50f55cf
Address PR review: fix AllocateBlock inflight leak; remove hardcoded …
badrishc Apr 22, 2026
3807960
Fix GetNext hang: use cached SafeTailAddress with fallback to scan
badrishc Apr 23, 2026
58c27b0
Add spin-wait in GetNext before epoch-table scan
badrishc Apr 23, 2026
b51627e
Wrap all enqueue methods in try/finally for exception safety
badrishc Apr 24, 2026
9692491
Address GPT 5.5 review: exception safety for callbacks and TEpochAcce…
badrishc Apr 24, 2026
b0035b7
Fix AllocateBlock cannedException path: restore epoch+inflight before…
badrishc May 1, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion benchmark/BDN.benchmark/Embedded/EmbeddedRespServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ public EmbeddedRespServer(GarnetServerOptions opts, ILoggerFactory loggerFactory
new SubscribeBroker(
null,
opts.PubSubPageSizeBytes(),
opts.SubscriberRefreshFrequencyMs,
pubSubEpoch,
true);
}
Expand Down
1 change: 0 additions & 1 deletion benchmark/Resp.benchmark/OfflineBench/AOFBench/AofBench.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ public static GarnetServerOptions GetServerOptions(Options options)
AofReplayTaskCount = options.AofReplayTaskCount,
ReplicationOffsetMaxLag = 0,
CheckpointDir = OperatingSystem.IsLinux() ? "/tmp" : null,
AofReplicationRefreshFrequencyMs = 100,
};
return serverOptions;
}
Expand Down
1 change: 0 additions & 1 deletion benchmark/Resp.benchmark/OfflineBench/AOFBench/AofGen.cs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,6 @@ public AofGen(Options options)
EnableFastCommit = true,
CommitFrequencyMs = -1,
FastAofTruncate = true,
AofReplicationRefreshFrequencyMs = options.AofReplicationRefreshFrequencyMs,
EnableCluster = true,
ReplicationOffsetMaxLag = 0,
AofPhysicalSublogCount = options.AofPhysicalSublogCount
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ public static GarnetServerOptions GetServerOptions(Options options)
AofReplayTaskCount = options.AofReplayTaskCount,
ReplicationOffsetMaxLag = 0,
CheckpointDir = OperatingSystem.IsLinux() ? "/tmp" : null,
AofReplicationRefreshFrequencyMs = 100,
};
return serverOptions;
}
Expand Down
3 changes: 0 additions & 3 deletions benchmark/Resp.benchmark/Options.cs
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,6 @@ public partial class Options
[Option("aof-page-size", Required = false, Default = "4m", HelpText = "Size of each AOF page in bytes(rounds down to power of 2)")]
public string AofPageSize { get; set; }

[Option("aof-tail-ref-freq", Required = false, Default = 100, HelpText = "Aof Tail Refresh Frequency.")]
public int AofReplicationRefreshFrequencyMs { get; set; }

/// <summary>
/// Parse size from string specification
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,18 @@ internal sealed class LogShiftTailCallback(int physicalSublogIdx, AofSyncDriverS
readonly int physicalSublogIdx = physicalSublogIdx;
readonly AofSyncDriverStore store = store;

internal void SafeTailShiftCallback(long oldTailAddress, long newTailAddress)
internal void SafeTailPageShiftCallback(long oldTailAddress, long newTailAddress)
{
var oldPage = oldTailAddress >> store.logPageSizeBits;
var newPage = newTailAddress >> store.logPageSizeBits;

// Call truncate only once per page
if (oldPage != newPage)
{
// Truncate 2 pages above ReadOnly mark, so that we have sufficient time to shift begin before we flush.
// Make sure this is page-aligned, in case we go to a non-page-aligned ReadOnlyAddress.
var truncateUntilAddress = store.clusterProvider.storeWrapper.appendOnlyFile.Log.UnsafeGetReadOnlyAddressAbove(physicalSublogIdx, newTailAddress, numPagesAbove: 2);
if (truncateUntilAddress > 0)
_ = store.SafeTruncateAof(truncateUntilAddress, physicalSublogIdx);
}
// Truncate 2 pages above ReadOnly mark, so that we have sufficient time to shift begin before we flush.
// Make sure this is page-aligned, in case we go to a non-page-aligned ReadOnlyAddress.
var truncateUntilAddress = store.clusterProvider.storeWrapper.appendOnlyFile.Log.UnsafeGetReadOnlyAddressAbove(physicalSublogIdx, newTailAddress, numPagesAbove: 2);
if (truncateUntilAddress > 0)
_ = store.SafeTruncateAof(truncateUntilAddress, physicalSublogIdx);
}
}

readonly ClusterProvider clusterProvider;
readonly ILogger logger;
readonly int logPageSizeBits, logPageSizeMask;

AofSyncDriver[] syncDrivers;
int numDrivers;
Expand All @@ -58,15 +50,12 @@ public AofSyncDriverStore(ClusterProvider clusterProvider, int initialSize = 1,
numDrivers = 0;
if (clusterProvider.storeWrapper.appendOnlyFile != null)
{
logPageSizeBits = clusterProvider.storeWrapper.appendOnlyFile.Log.UnsafeGetLogPageSizeBits();
var logPageSize = 1 << logPageSizeBits;
logPageSizeMask = logPageSize - 1;
if (clusterProvider.serverOptions.FastAofTruncate)
{
for (var i = 0; i < clusterProvider.serverOptions.AofPhysicalSublogCount; i++)
{
var logShiftTailCallback = new LogShiftTailCallback(i, this);
clusterProvider.storeWrapper.appendOnlyFile.Log.SetLogShiftTailCallback(i, logShiftTailCallback.SafeTailShiftCallback);
clusterProvider.storeWrapper.appendOnlyFile.Log.SetLogShiftTailCallback(i, logShiftTailCallback.SafeTailPageShiftCallback);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion libs/cluster/Server/Replication/ReplicationManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -689,7 +689,7 @@ async Task AdvanceTimeWorkerAsync(CancellationToken token)
else
converged = false;
}
await Task.Delay(storeWrapper.serverOptions.AofReplicationRefreshFrequencyMs, token).ConfigureAwait(false);
await Task.Delay(storeWrapper.serverOptions.AofTailWitnessFreqMs, token).ConfigureAwait(false);
}
}
}
Expand Down
9 changes: 0 additions & 9 deletions libs/host/Configuration/Options.cs
Original file line number Diff line number Diff line change
Expand Up @@ -228,14 +228,6 @@ internal sealed class Options : ICloneable
[Option("aof-size-limit-enforce-frequency", Required = false, HelpText = "Frequency (in secs) of execution of the AutoCheckpointBasedOnAofSizeLimit background task.")]
public int AofSizeLimitEnforceFrequencySecs { get; set; }

[IntRangeValidation(0, int.MaxValue)]
[Option("aof-refresh-freq", Required = false, HelpText = "AOF replication (safe tail address) refresh frequency in milliseconds. 0 = auto refresh after every enqueue.")]
public int AofReplicationRefreshFrequencyMs { get; set; }

[IntRangeValidation(0, int.MaxValue)]
[Option("subscriber-refresh-freq", Required = false, HelpText = "Subscriber (safe tail address) refresh frequency in milliseconds (for pub-sub). 0 = auto refresh after every enqueue.")]
public int SubscriberRefreshFrequencyMs { get; set; }

[IntRangeValidation(0, int.MaxValue)]
[Option("compaction-freq", Required = false, HelpText = "Background hybrid log compaction frequency in seconds. 0 = disabled (compaction performed before checkpointing instead)")]
public int CompactionFrequencySecs { get; set; }
Expand Down Expand Up @@ -856,7 +848,6 @@ endpoint is IPEndPoint listenEp && clusterAnnounceEndpoint[0] is IPEndPoint anno
AofPhysicalSublogCount = AofPhysicalSublogCount,
AofReplayTaskCount = AofReplayTaskCount,
AofTailWitnessFreqMs = AofTailWitnessFreqMs,
AofReplicationRefreshFrequencyMs = AofReplicationRefreshFrequencyMs,
CommitFrequencyMs = CommitFrequencyMs,
WaitForCommit = WaitForCommit.GetValueOrDefault(),
AofSizeLimit = AofSizeLimit,
Expand Down
2 changes: 1 addition & 1 deletion libs/host/GarnetServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ private void InitializeServer()
CreateDatabase(dbId, opts, clusterFactory, customCommandManager);

if (!opts.DisablePubSub)
subscribeBroker = new SubscribeBroker(null, opts.PubSubPageSizeBytes(), opts.SubscriberRefreshFrequencyMs, pubSubEpoch, startFresh: true, logger);
subscribeBroker = new SubscribeBroker(null, opts.PubSubPageSizeBytes(), pubSubEpoch, startFresh: true, logger);

logger?.LogTrace("TLS is {tlsEnabled}", opts.TlsOptions == null ? "disabled" : "enabled");

Expand Down
6 changes: 0 additions & 6 deletions libs/host/defaults.conf
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,6 @@
/* Size of each AOF page in bytes(rounds down to power of 2) */
"AofPageSize" : "4m",

/* AOF replication (safe tail address) refresh frequency in milliseconds. 0 = auto refresh after every enqueue. */
"AofReplicationRefreshFrequencyMs": 10,

/* Number of AOF physical sublogs (i.e. TsavoriteLog instances) used (=1 equivalent to the legacy single log implementation >1: sharded log implementation. */
"AofPhysicalSublogCount": 1,

Expand All @@ -144,9 +141,6 @@
/* Polling frequency of the background task responsible for moving time ahead for all physical sublogs (Used only with physical sublog value >1). */
"AofTailWitnessFreqMs": 10,

/* Subscriber (safe tail address) refresh frequency in milliseconds (for pub-sub). 0 = auto refresh after every enqueue. */
"SubscriberRefreshFrequencyMs": 0,

/* Write ahead logging (append-only file) commit issue frequency in milliseconds. 0 = issue an immediate commit per operation, -1 = manually issue commits using COMMITAOF command */
"CommitFrequencyMs" : 0,

Expand Down
8 changes: 4 additions & 4 deletions libs/server/AOF/GarnetLog.cs
Original file line number Diff line number Diff line change
Expand Up @@ -309,16 +309,16 @@ public long GetBeginAddress(int sublogIdx)
/// Set log shift tail callbacks
/// </summary>
/// <param name="sublogIdx"></param>
/// <param name="SafeTailShiftCallback"></param>
public void SetLogShiftTailCallback(int sublogIdx, Action<long, long> SafeTailShiftCallback)
/// <param name="safeTailPageShiftCallback"></param>
public void SetLogShiftTailCallback(int sublogIdx, Action<long, long> safeTailPageShiftCallback)
{
if (singleLog != null)
{
singleLog.log.SafeTailShiftCallback = SafeTailShiftCallback;
singleLog.log.SafeTailPageShiftCallback = safeTailPageShiftCallback;
}
else
{
shardedLog.sublog[sublogIdx].SafeTailShiftCallback = SafeTailShiftCallback;
shardedLog.sublog[sublogIdx].SafeTailPageShiftCallback = safeTailPageShiftCallback;
}
}

Expand Down
5 changes: 2 additions & 3 deletions libs/server/PubSub/SubscribeBroker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,12 @@ public sealed class SubscribeBroker : IDisposable, ILogEntryConsumer
/// </summary>
/// <param name="logDir">Directory where the log will be stored</param>
/// <param name="pageSize">Page size of log used for pub/sub</param>
/// <param name="subscriberRefreshFrequencyMs">Subscriber log refresh frequency</param>
/// <param name="startFresh">start the log from scratch, do not continue</param>
public SubscribeBroker(string logDir, long pageSize, int subscriberRefreshFrequencyMs, LightEpoch epoch, bool startFresh = true, ILogger logger = null)
public SubscribeBroker(string logDir, long pageSize, LightEpoch epoch, bool startFresh = true, ILogger logger = null)
{
device = logDir == null ? new NullDevice() : Devices.CreateLogDevice(logDir + "/pubsubkv", preallocateFile: false);
device.Initialize((long)(1 << 30) * 64);
aof = new TsavoriteLog(new TsavoriteLogSettings { LogDevice = device, PageSize = pageSize, MemorySize = pageSize * 4, SafeTailRefreshFrequencyMs = subscriberRefreshFrequencyMs, Epoch = epoch });
aof = new TsavoriteLog(new TsavoriteLogSettings { LogDevice = device, PageSize = pageSize, MemorySize = pageSize * 4, Epoch = epoch });
pageSizeBits = aof.UnsafeGetLogPageSizeBits();
if (startFresh)
aof.TruncateUntil(aof.CommittedUntilAddress);
Expand Down
11 changes: 0 additions & 11 deletions libs/server/Servers/GarnetServerOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,6 @@ public class GarnetServerOptions : ServerOptions
/// </summary>
public string AofPageSize = "4m";

/// <summary>
/// AOF replication (safe tail address) refresh frequency in milliseconds. 0 = auto refresh after every enqueue.
/// </summary>
public int AofReplicationRefreshFrequencyMs = 10;

/// <summary>
/// Number of AOF physical sublogs (i.e. TsavoriteLog instances) used (=1 equivalent to the legacy single log implementation >1: sharded log implementation.
/// </summary>
Expand All @@ -91,11 +86,6 @@ public class GarnetServerOptions : ServerOptions
/// </summary>
public int AofTailWitnessFreqMs = 100;

/// <summary>
/// Subscriber (safe tail address) refresh frequency in milliseconds (for pub-sub). 0 = auto refresh after every enqueue.
/// </summary>
public int SubscriberRefreshFrequencyMs = 0;

/// <summary>
/// Write ahead logging (append-only file) commit issue frequency in milliseconds.
/// 0 = issue an immediate commit per operation
Expand Down Expand Up @@ -809,7 +799,6 @@ public void GetAofSettings(int dbId, out TsavoriteLogSettings[] tsavoriteLogSett
PageSizeBits = AofPageSizeBits(),
LogDevice = GetAofDevice(dbId, subLogIdx: AofPhysicalSublogCount == 1 ? -1 : i),
TryRecoverLatest = false,
SafeTailRefreshFrequencyMs = EnableCluster ? AofReplicationRefreshFrequencyMs : -1,
FastCommitMode = EnableFastCommit,
AutoCommit = AofAutoCommit && (AofPhysicalSublogCount == 1),
MutableFraction = 0.9,
Expand Down
Loading
Loading