Skip to content

Commit 44bbb30

Browse files
authored
fix(store): unexpected compaction after restart (#603)
Signed-off-by: James Yin <[email protected]>
1 parent 43db43b commit 44bbb30

File tree

6 files changed

+18
-25
lines changed

6 files changed

+18
-25
lines changed

internal/store/raft/storage/compaction.go

Lines changed: 3 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -255,23 +255,14 @@ func (w *WAL) removeNode(ctx context.Context, nodeID vanus.ID) error {
255255
}
256256

257257
func (w *WAL) recoverNode(nodeID vanus.ID, offset int64) {
258-
task := adminTask{
259-
nodeID: nodeID,
260-
offset: offset,
261-
}
262-
263-
w.closeMu.RLock()
264-
select {
265-
case <-w.closeC:
266-
default:
267-
w.compactC <- task.recoverNode
258+
w.nodes[nodeID] = true
259+
if offset != 0 {
260+
w.barrier.Set(offset, nodeID)
268261
}
269-
w.closeMu.RUnlock()
270262
}
271263

272264
type adminTask struct {
273265
nodeID vanus.ID
274-
offset int64
275266
ch chan<- error
276267
}
277268

@@ -309,13 +300,6 @@ func (t *adminTask) removeNode(w *WAL, cCtx *compactContext) {
309300
})
310301
}
311302

312-
func (t *adminTask) recoverNode(w *WAL, _ *compactContext) {
313-
w.nodes[t.nodeID] = true
314-
if t.offset != 0 {
315-
w.barrier.Set(t.offset, t.nodeID)
316-
}
317-
}
318-
319303
type compactInfo struct {
320304
index, term uint64
321305
}

internal/store/raft/storage/compaction_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ func TestStorage_Compact(t *testing.T) {
5353

5454
rawWAL, err := walog.Open(ctx, walDir, walog.WithFileSize(int64(fileSize)))
5555
So(err, ShouldBeNil)
56-
wal := newWAL(rawWAL, stateStore)
56+
wal := newWAL(rawWAL, stateStore, true)
5757

5858
s1, _ := NewStorage(ctx, nodeID1, wal, stateStore, hintStore, nil)
5959
s2, _ := NewStorage(ctx, nodeID2, wal, stateStore, hintStore, nil)

internal/store/raft/storage/recovery.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ func Recover(
123123
if err != nil {
124124
return nil, nil, err
125125
}
126-
wal2 := newWAL(wal, stateStore)
126+
wal2 := newWAL(wal, stateStore, false)
127127

128128
// convert, and set wal
129129
storages := make(map[vanus.ID]*Storage, len(sb.storages))
@@ -140,6 +140,9 @@ func Recover(
140140
storages[nodeID] = storage
141141
}
142142

143+
// Start compaction after recover nodes.
144+
wal2.startCompaction()
145+
143146
return storages, wal2, nil
144147
}
145148

internal/store/raft/storage/snapshot_storage_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ func TestStorage_SnapshotStorage(t *testing.T) {
6161

6262
rawWAL, err := walog.Open(ctx, walDir, walog.WithFileSize(int64(fileSize)))
6363
So(err, ShouldBeNil)
64-
wal := newWAL(rawWAL, stateStore)
64+
wal := newWAL(rawWAL, stateStore, true)
6565
defer wal.Close()
6666

6767
snapOp := NewMockSnapshotOperator(ctrl)

internal/store/raft/storage/storage_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ func TestStorage(t *testing.T) {
8686
Convey("create raft log", func() {
8787
rawWAL, err := walog.Open(ctx, raftDir, walog.WithFileSize(int64(fileSize)))
8888
So(err, ShouldBeNil)
89-
wal := newWAL(rawWAL, stateStore)
89+
wal := newWAL(rawWAL, stateStore, true)
9090
defer func() {
9191
wal.Close()
9292
wal.Wait()

internal/store/raft/storage/wal.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ type WAL struct {
4545
doneC chan struct{}
4646
}
4747

48-
func newWAL(wal *walog.WAL, stateStore *meta.SyncStore) *WAL {
48+
func newWAL(wal *walog.WAL, stateStore *meta.SyncStore, startCompaction bool) *WAL {
4949
w := &WAL{
5050
WAL: wal,
5151
stateStore: stateStore,
@@ -56,11 +56,17 @@ func newWAL(wal *walog.WAL, stateStore *meta.SyncStore) *WAL {
5656
doneC: make(chan struct{}),
5757
}
5858

59-
go w.runCompact()
59+
if startCompaction {
60+
w.startCompaction()
61+
}
6062

6163
return w
6264
}
6365

66+
func (w *WAL) startCompaction() {
67+
go w.runCompact()
68+
}
69+
6470
func (w *WAL) Close() {
6571
w.WAL.Close()
6672
go func() {

0 commit comments

Comments
 (0)