Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 4 additions & 69 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,17 @@ package main
import (
"fmt"
"github.com/ardanlabs/conf"
"github.com/cockroachdb/pebble"
"github.com/pkg/errors"
"github.com/qubic/go-archiver/processor"
"github.com/qubic/go-archiver/rpc"
"github.com/qubic/go-archiver/store"
"github.com/qubic/go-archiver/validator/tick"
qubic "github.com/qubic/go-node-connector"
"github.com/qubic/go-node-connector/types"
"log"
"net/http"
_ "net/http/pprof"
"os"
"os/signal"
"runtime"
"syscall"
"time"
)
Expand Down Expand Up @@ -58,6 +55,7 @@ func run() error {
}
Store struct {
ResetEmptyTickKeys bool `conf:"default:false"`
EpochCount int `conf:"default:2"`
}
}

Expand Down Expand Up @@ -87,74 +85,11 @@ func run() error {
}
log.Printf("main: Config :\n%v\n", out)

l1Options := pebble.LevelOptions{
BlockRestartInterval: 16,
BlockSize: 4096,
BlockSizeThreshold: 90,
Compression: pebble.NoCompression,
FilterPolicy: nil,
FilterType: pebble.TableFilter,
IndexBlockSize: 4096,
TargetFileSize: 268435456, // 256 MB
}
l2Options := pebble.LevelOptions{
BlockRestartInterval: 16,
BlockSize: 4096,
BlockSizeThreshold: 90,
Compression: pebble.ZstdCompression,
FilterPolicy: nil,
FilterType: pebble.TableFilter,
IndexBlockSize: 4096,
TargetFileSize: l1Options.TargetFileSize * 10, // 2.5 GB
}
l3Options := pebble.LevelOptions{
BlockRestartInterval: 16,
BlockSize: 4096,
BlockSizeThreshold: 90,
Compression: pebble.ZstdCompression,
FilterPolicy: nil,
FilterType: pebble.TableFilter,
IndexBlockSize: 4096,
TargetFileSize: l2Options.TargetFileSize * 10, // 25 GB
}
l4Options := pebble.LevelOptions{
BlockRestartInterval: 16,
BlockSize: 4096,
BlockSizeThreshold: 90,
Compression: pebble.ZstdCompression,
FilterPolicy: nil,
FilterType: pebble.TableFilter,
IndexBlockSize: 4096,
TargetFileSize: l3Options.TargetFileSize * 10, // 250 GB
}

pebbleOptions := pebble.Options{
Levels: []pebble.LevelOptions{l1Options, l2Options, l3Options, l4Options},
MaxConcurrentCompactions: func() int { return runtime.NumCPU() },
MemTableSize: 268435456, // 256 MB
EventListener: store.NewPebbleEventListener(),
}

db, err := pebble.Open(cfg.Qubic.StorageFolder, &pebbleOptions)
if err != nil {
return errors.Wrap(err, "opening db with zstd compression")
}
defer db.Close()

ps := store.NewPebbleStore(db, nil)

if cfg.Store.ResetEmptyTickKeys {
fmt.Printf("Resetting empty ticks for all epochs...\n")
err = tick.ResetEmptyTicksForAllEpochs(ps)
if err != nil {
return errors.Wrap(err, "resetting empty ticks keys")
}
}

err = tick.CalculateEmptyTicksForAllEpochs(ps)
ps, err := store.NewPebbleStore(cfg.Qubic.StorageFolder, nil, cfg.Store.EpochCount)
if err != nil {
return errors.Wrap(err, "calculating empty ticks for all epochs")
return errors.Wrap(err, "creating info store")
}
defer ps.Close()

p, err := qubic.NewPoolConnection(qubic.PoolConfig{
InitialCap: cfg.Pool.InitialCap,
Expand Down
28 changes: 17 additions & 11 deletions processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (p *Processor) processStatus(ctx context.Context, lastTick *protobuff.Proce
return errors.Wrap(err, "processing skipped ticks")
}

err = p.ps.SetLastProcessedTick(ctx, nextTick)
err = p.ps.SetLastProcessedTick(nextTick)
if err != nil {
return errors.Wrapf(err, "setting last processed tick %d", nextTick.TickNumber)
}
Expand All @@ -130,7 +130,21 @@ func (p *Processor) processStatus(ctx context.Context, lastTick *protobuff.Proce
func (p *Processor) getNextProcessingTick(_ context.Context, lastTick *protobuff.ProcessedTick, currentTickInfo types.TickInfo) (*protobuff.ProcessedTick, error) {
//handles the case where the initial tick of epoch returned by the node is greater than the last processed tick
// which means that we are in the next epoch and we should start from the initial tick of the current epoch

//Simulate epoch transition
if lastTick.TickNumber > 22690020 {
currentTickInfo.InitialTick = 22690023
currentTickInfo.Epoch = 156
lastTick.Epoch = 156
}

if currentTickInfo.InitialTick > lastTick.TickNumber {

err := p.ps.HandleEpochTransition(uint32(currentTickInfo.Epoch))
if err != nil {
return nil, errors.Wrap(err, "handling epoch transition")
}

return &protobuff.ProcessedTick{TickNumber: currentTickInfo.InitialTick, Epoch: uint32(currentTickInfo.Epoch)}, nil
}

Expand All @@ -139,7 +153,7 @@ func (p *Processor) getNextProcessingTick(_ context.Context, lastTick *protobuff
}

func (p *Processor) getLastProcessedTick(ctx context.Context, currentTickInfo types.TickInfo) (*protobuff.ProcessedTick, error) {
lastTick, err := p.ps.GetLastProcessedTick(ctx)
lastTick, err := p.ps.GetLastProcessedTick()
if err != nil {
//handles first run of the archiver where there is nothing in storage
// in this case last tick is 0 and epoch is current tick info epoch
Expand All @@ -163,18 +177,10 @@ func (p *Processor) processSkippedTicks(ctx context.Context, lastTick *protobuff
return errors.Errorf("Next tick should not be equal to last tick %d", nextTick.TickNumber)
}

err := p.ps.AppendProcessedTickInterval(ctx, nextTick.Epoch, &protobuff.ProcessedTickInterval{InitialProcessedTick: nextTick.TickNumber, LastProcessedTick: nextTick.TickNumber})
err := p.ps.GetCurrentEpochStore().AppendProcessedTickInterval(&protobuff.ProcessedTickInterval{InitialProcessedTick: nextTick.TickNumber, LastProcessedTick: nextTick.TickNumber})
if err != nil {
return errors.Wrap(err, "appending processed tick interval")
}

err = p.ps.SetSkippedTicksInterval(ctx, &protobuff.SkippedTicksInterval{
StartTick: lastTick.TickNumber + 1,
EndTick: nextTick.TickNumber - 1,
})
if err != nil {
return errors.Wrap(err, "setting skipped ticks interval")
}

return nil
}
58 changes: 34 additions & 24 deletions processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package processor

import (
"context"
"github.com/cockroachdb/pebble"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
pb "github.com/qubic/go-archiver/protobuff"
Expand All @@ -11,6 +10,7 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
"log"
"os"
"path/filepath"
"testing"
Expand All @@ -24,12 +24,14 @@ func TestProcessor_GetLastProcessedTick(t *testing.T) {
require.NoError(t, err)
defer os.RemoveAll(dbDir)

db, err := pebble.Open(filepath.Join(dbDir, "testdb"), &pebble.Options{})
require.NoError(t, err)
defer db.Close()
testPath := filepath.Join(dbDir, "testdb")

logger, _ := zap.NewDevelopment()
s := store.NewPebbleStore(db, logger)
s, err := store.NewPebbleStore(testPath, logger, 10)
require.NoError(t, err)

err = s.HandleEpochTransition(1)
require.NoError(t, err)

p := Processor{ps: s}

Expand All @@ -39,6 +41,7 @@ func TestProcessor_GetLastProcessedTick(t *testing.T) {

got, err := p.getLastProcessedTick(ctx, currentTickInfo)
require.NoError(t, err)
log.Printf("GOT: %v EXPECTED: %v", got, &expected)
require.True(t, proto.Equal(got, &expected))
}

Expand All @@ -50,12 +53,11 @@ func TestProcessor_GetNextProcessingTick(t *testing.T) {
require.NoError(t, err)
defer os.RemoveAll(dbDir)

db, err := pebble.Open(filepath.Join(dbDir, "testdb"), &pebble.Options{})
require.NoError(t, err)
defer db.Close()
testPath := filepath.Join(dbDir, "testdb")

logger, _ := zap.NewDevelopment()
s := store.NewPebbleStore(db, logger)
s, err := store.NewPebbleStore(testPath, logger, 10)
require.NoError(t, err)

p := Processor{ps: s}

Expand Down Expand Up @@ -96,12 +98,14 @@ func TestProcessor_ProcessStatus(t *testing.T) {
require.NoError(t, err)
defer os.RemoveAll(dbDir)

db, err := pebble.Open(filepath.Join(dbDir, "testdb"), &pebble.Options{})
require.NoError(t, err)
defer db.Close()
testPath := filepath.Join(dbDir, "testdb")

logger, _ := zap.NewDevelopment()
s := store.NewPebbleStore(db, logger)
s, err := store.NewPebbleStore(testPath, logger, 10)
require.NoError(t, err)

err = s.HandleEpochTransition(1)
require.NoError(t, err)

p := Processor{ps: s}

Expand All @@ -123,7 +127,7 @@ func TestProcessor_ProcessStatus(t *testing.T) {
},
},
}
got, err := s.GetProcessedTickIntervals(ctx)
got, err := s.GetProcessedTickIntervals()
require.NoError(t, err)
diff := cmp.Diff(got, expected, cmpopts.IgnoreUnexported(pb.ProcessedTickInterval{}, pb.ProcessedTickIntervalsPerEpoch{}))
require.True(t, cmp.Equal(diff, ""))
Expand All @@ -135,7 +139,7 @@ func TestProcessor_ProcessStatus(t *testing.T) {
require.NoError(t, err)

expected[0].Intervals[0].LastProcessedTick = nextTick.TickNumber
got, err = s.GetProcessedTickIntervals(ctx)
got, err = s.GetProcessedTickIntervals()
require.NoError(t, err)

diff = cmp.Diff(got, expected, cmpopts.IgnoreUnexported(pb.ProcessedTickInterval{}, pb.ProcessedTickIntervalsPerEpoch{}))
Expand All @@ -151,7 +155,7 @@ func TestProcessor_ProcessStatus(t *testing.T) {
LastProcessedTick: nextTick.TickNumber,
})

got, err = s.GetProcessedTickIntervals(ctx)
got, err = s.GetProcessedTickIntervals()
require.NoError(t, err)

diff = cmp.Diff(got, expected, cmpopts.IgnoreUnexported(pb.ProcessedTickInterval{}, pb.ProcessedTickIntervalsPerEpoch{}))
Expand All @@ -163,13 +167,15 @@ func TestProcessor_ProcessStatus(t *testing.T) {
require.NoError(t, err)

expected[0].Intervals[1].LastProcessedTick = nextTick.TickNumber
got, err = s.GetProcessedTickIntervals(ctx)
got, err = s.GetProcessedTickIntervals()
require.NoError(t, err)

diff = cmp.Diff(got, expected, cmpopts.IgnoreUnexported(pb.ProcessedTickInterval{}, pb.ProcessedTickIntervalsPerEpoch{}))
require.True(t, cmp.Equal(diff, ""))

// new epoch
err = s.HandleEpochTransition(2)
require.NoError(t, err)
lastTick.TickNumber = nextTick.TickNumber
nextTick = pb.ProcessedTick{TickNumber: 200, Epoch: 2}
err = p.processStatus(ctx, &lastTick, &nextTick)
Expand All @@ -184,10 +190,11 @@ func TestProcessor_ProcessStatus(t *testing.T) {
},
})

got, err = s.GetProcessedTickIntervals(ctx)
got, err = s.GetProcessedTickIntervals()
require.NoError(t, err)

diff = cmp.Diff(got, expected, cmpopts.IgnoreUnexported(pb.ProcessedTickInterval{}, pb.ProcessedTickIntervalsPerEpoch{}))
log.Printf("DIFF: %s\n", diff)
require.True(t, cmp.Equal(diff, ""))

lastTick.TickNumber = nextTick.TickNumber
Expand All @@ -197,10 +204,11 @@ func TestProcessor_ProcessStatus(t *testing.T) {
require.NoError(t, err)

expected[1].Intervals[0].LastProcessedTick = nextTick.TickNumber
got, err = s.GetProcessedTickIntervals(ctx)
got, err = s.GetProcessedTickIntervals()
require.NoError(t, err)

diff = cmp.Diff(got, expected, cmpopts.IgnoreUnexported(pb.ProcessedTickInterval{}, pb.ProcessedTickIntervalsPerEpoch{}))
log.Printf("DIFF: %s\n", diff)
require.True(t, cmp.Equal(diff, ""))
}

Expand All @@ -212,12 +220,14 @@ func TestProcessor_ProcessStatusOnthefly(t *testing.T) {
require.NoError(t, err)
defer os.RemoveAll(dbDir)

db, err := pebble.Open(filepath.Join(dbDir, "testdb"), &pebble.Options{})
require.NoError(t, err)
defer db.Close()
testPath := filepath.Join(dbDir, "testdb")

logger, _ := zap.NewDevelopment()
s := store.NewPebbleStore(db, logger)
s, err := store.NewPebbleStore(testPath, logger, 10)
require.NoError(t, err)

err = s.HandleEpochTransition(1)
require.NoError(t, err)

p := Processor{ps: s}

Expand All @@ -239,7 +249,7 @@ func TestProcessor_ProcessStatusOnthefly(t *testing.T) {
},
},
}
got, err := s.GetProcessedTickIntervals(ctx)
got, err := s.GetProcessedTickIntervals()
require.NoError(t, err)
diff := cmp.Diff(got, expected, cmpopts.IgnoreUnexported(pb.ProcessedTickInterval{}, pb.ProcessedTickIntervalsPerEpoch{}))
require.True(t, cmp.Equal(diff, ""))
Expand Down
Loading