Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions assigner/command/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"io"
"log/slog"
"os"

"github.com/ipfs/boxo/bootstrap"
Expand All @@ -17,10 +18,22 @@ import (
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/gologshim"
"github.com/multiformats/go-multiaddr"
"github.com/urfave/cli/v2"
)

func init() {
// Set go-log's slog handler as the application-wide default. This ensures
// all slog-based logging uses go-log's formatting.
slog.SetDefault(slog.New(logging.SlogHandler()))

// Wire go-log's slog bridge to go-libp2p's gologshim. This provides
// go-libp2p loggers with the "logger" attribute for per-subsystem level
// control.
gologshim.SetDefaultHandler(logging.SlogHandler())
}

var log = logging.Logger("assigner")

const progName = "assigner"
Expand Down
39 changes: 23 additions & 16 deletions command/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ import (
"context"
"errors"
"fmt"
"log/slog"
"os"
"os/signal"
"syscall"
"time"

pbl "github.com/cockroachdb/pebble"
"github.com/cockroachdb/pebble/bloom"
pbl "github.com/cockroachdb/pebble/v2"
"github.com/cockroachdb/pebble/v2/bloom"
"github.com/ipfs/boxo/bootstrap"
"github.com/ipfs/boxo/peering"
logging "github.com/ipfs/go-log/v2"
Expand All @@ -34,10 +35,22 @@ import (
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/gologshim"
"github.com/multiformats/go-multiaddr"
"github.com/urfave/cli/v2"
)

func init() {
// Set go-log's slog handler as the application-wide default. This ensures
// all slog-based logging uses go-log's formatting.
slog.SetDefault(slog.New(logging.SlogHandler()))

// Wire go-log's slog bridge to go-libp2p's gologshim. This provides
// go-libp2p loggers with the "logger" attribute for per-subsystem level
// control.
gologshim.SetDefaultHandler(logging.SlogHandler())
}

// Recognized valuestore type names.
const (
vstoreDHStore = "dhstore"
Expand Down Expand Up @@ -567,7 +580,7 @@ func createValueStore(ctx context.Context, cfgIndexer config.Indexer) (indexer.I
L0CompactionThreshold: 2,
L0StopWritesThreshold: 1000,
LBaseMaxBytes: 64 << 20, // 64 MiB
MaxConcurrentCompactions: func() int { return 10 },
CompactionConcurrencyRange: func() (int, int) { return 1, 10 },
MemTableSize: 64 << 20, // 64 MiB
MemTableStopWritesThreshold: 4,
WALBytesPerSync: 10 << 20, // 10 MiB
Expand All @@ -592,21 +605,15 @@ func createValueStore(ctx context.Context, cfgIndexer config.Indexer) (indexer.I

pebbleOpts.Experimental.ReadCompactionRate = 10 << 20 // 20 MiB

const numLevels = 7
pebbleOpts.Levels = make([]pbl.LevelOptions, numLevels)
for i := 0; i < numLevels; i++ {
l := &pebbleOpts.Levels[i]
l.BlockSize = 32 << 10 // 32 KiB
l.IndexBlockSize = 256 << 10 // 256 KiB
l.FilterPolicy = bloom.FilterPolicy(10)
l.FilterType = pbl.TableFilter
if i > 0 {
l.TargetFileSize = pebbleOpts.Levels[i-1].TargetFileSize * 2
}
l.EnsureDefaults()
for i := range pebbleOpts.Levels {
pebbleOpts.Levels[i].BlockSize = 32 << 10 // 32 KiB
pebbleOpts.Levels[i].IndexBlockSize = 256 << 10 // 256 KiB
pebbleOpts.Levels[i].FilterPolicy = bloom.FilterPolicy(10)
pebbleOpts.Levels[i].FilterType = pbl.TableFilter
}
pebbleOpts.Levels[numLevels-1].FilterPolicy = nil
pebbleOpts.Levels[len(pebbleOpts.Levels)-1].FilterPolicy = nil
pebbleOpts.Cache = pbl.NewCache(int64(cfgIndexer.PebbleBlockCacheSize))
pebbleOpts.EnsureDefaults()

vs, err = pebble.New(dir, pebbleOpts)
case vstoreRelayx:
Expand Down
1 change: 0 additions & 1 deletion command/gc/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ func daemonAction(cctx *cli.Context) error {
reaper.WithDatastoreDir(dsDir),
reaper.WithDatastoreTempDir(dsTmpDir),
reaper.WithPCache(pc),
reaper.WithTopicName(cfg.Ingest.PubSubTopic),
reaper.WithHttpTimeout(time.Duration(cfg.Ingest.HttpSyncTimeout)),
reaper.WithSyncSegmentSize(cctx.Int("sync-segment-size")),
)
Expand Down
1 change: 0 additions & 1 deletion command/gc/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,6 @@ func providerAction(cctx *cli.Context) error {
reaper.WithEntriesFromPublisher(cctx.Bool("ents-from-pub")),
reaper.WithPCache(pc),
reaper.WithSegmentSize(cctx.Int("segment-size")),
reaper.WithTopicName(cfg.Ingest.PubSubTopic),
reaper.WithHttpTimeout(time.Duration(cfg.Ingest.HttpSyncTimeout)),
reaper.WithSyncSegmentSize(cctx.Int("sync-segment-size")),
)
Expand Down
14 changes: 0 additions & 14 deletions config/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,6 @@ type Ingest struct {
// recent advertisement, set this to 1. A value of 0, the default, means
// unlimited depth.
FirstSyncDepth int
// GsMaxInRequests is the maximum number of incoming in-progress graphsync
// requests. Default is 1024.
GsMaxInRequests uint64
// GsMaxOutRequests is the maximum number of outgoing in-progress graphsync
// requests. Default is 1024.
GsMaxOutRequests uint64
// HttpSyncRetryMax sets the maximum number of times HTTP sync requests
// should be retried. A value of zero, the default, means no retry.
HttpSyncRetryMax int
Expand Down Expand Up @@ -97,8 +91,6 @@ func NewIngest() Ingest {
AdvertisementDepthLimit: 33554432,
AdvertisementMirror: NewMirror(),
EntriesDepthLimit: 65536,
GsMaxInRequests: 1024,
GsMaxOutRequests: 1024,
HttpSyncRetryWaitMax: Duration(30 * time.Second),
HttpSyncRetryWaitMin: Duration(1 * time.Second),
HttpSyncTimeout: Duration(10 * time.Second),
Expand All @@ -122,12 +114,6 @@ func (c *Ingest) populateUnset() {
if c.EntriesDepthLimit == 0 {
c.EntriesDepthLimit = def.EntriesDepthLimit
}
if c.GsMaxInRequests == 0 {
c.GsMaxInRequests = def.GsMaxInRequests
}
if c.GsMaxOutRequests == 0 {
c.GsMaxOutRequests = def.GsMaxOutRequests
}
if c.HttpSyncRetryWaitMax == 0 {
c.HttpSyncRetryWaitMax = def.HttpSyncRetryWaitMax
}
Expand Down
2 changes: 1 addition & 1 deletion config/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"fmt"
"io"

"github.com/cockroachdb/pebble"
"github.com/cockroachdb/pebble/v2"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
)
Expand Down
5 changes: 1 addition & 4 deletions config/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@ func NewLogging() Logging {
return Logging{
Level: "info",
Loggers: map[string]string{
"bootstrap": "warn",
"dt_graphsync": "warn",
"dt-impl": "warn",
"graphsync": "warn",
"bootstrap": "warn",
},
}
}
Expand Down
5 changes: 1 addition & 4 deletions deploy/manifests/base/storetheindex-single/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,7 @@
"Logging": {
"Level": "info",
"Loggers": {
"bootstrap": "warn",
"dt-impl": "warn",
"dt_graphsync": "warn",
"graphsync": "warn"
"bootstrap": "warn"
}
},
"Peering": {
Expand Down
5 changes: 1 addition & 4 deletions deploy/manifests/base/storetheindex/indexer-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,7 @@ data:
"Logging": {
"Level": "info",
"Loggers": {
"bootstrap": "warn",
"dt-impl": "warn",
"dt_graphsync": "warn",
"graphsync": "warn"
"bootstrap": "warn"
}
},
"Peering": {
Expand Down
14 changes: 2 additions & 12 deletions e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,10 @@ func TestEndToEndWithAllProviderTypes(t *testing.T) {
testEndToEndWithReferenceProvider(t, "http")
})

// Test with publisher running plain HTTP only, not over libp2p.
// Test with publisher running both plain HTTP, and HTTP over libp2p.
t.Run("Libp2pWithHTTPProvider", func(t *testing.T) {
testEndToEndWithReferenceProvider(t, "libp2phttp")
})

// Test with publisher running dtsync over libp2p.
t.Run("DTSyncProvider", func(t *testing.T) {
testEndToEndWithReferenceProvider(t, "dtsync")
})
}

func testEndToEndWithReferenceProvider(t *testing.T, publisherProto string) {
Expand Down Expand Up @@ -98,13 +93,10 @@ func testEndToEndWithReferenceProvider(t *testing.T, publisherProto string) {

// install index-provider
switch publisherProto {
case "dtsync":
// Install index-provider that supports dtsync.
rnr.Run(ctx, "go", "install", "github.com/ipni/index-provider/cmd/[email protected]")
case "libp2p", "libp2phttp", "http":
rnr.Run(ctx, "go", "install", "github.com/ipni/index-provider/cmd/provider@latest")
default:
panic("providerProto must be one of: libp2phttp, http, dtsync")
panic("providerProto must be one of: libp2p, libp2phttp, http")
}
// install dhstore
rnr.Run(ctx, "go", "install", "-tags", "nofdb", "github.com/ipni/dhstore/cmd/dhstore@latest")
Expand All @@ -117,8 +109,6 @@ func testEndToEndWithReferenceProvider(t *testing.T, publisherProto string) {

// initialize index-provider
switch publisherProto {
case "dtsync":
rnr.Run(ctx, provider, "init")
case "http":
rnr.Run(ctx, provider, "init", "--pubkind=http")
case "libp2p":
Expand Down
12 changes: 0 additions & 12 deletions gc/reaper/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ const (
defaultHttpTimeout = 10 * time.Second
defaultSegmentSize = 16384
defaultSyncSegSize = 4096
defaultTopic = "/indexer/ingest/mainnet"
)

type config struct {
Expand All @@ -32,7 +31,6 @@ type config struct {
pcache *pcache.ProviderCache
segmentSize int
syncSegSize int
topic string
}

// Option is a function that sets a value in a config.
Expand All @@ -47,7 +45,6 @@ func getOpts(opts []Option) (config, error) {
httpTimeout: defaultHttpTimeout,
segmentSize: defaultSegmentSize,
syncSegSize: defaultSyncSegSize,
topic: defaultTopic,
}

for i, opt := range opts {
Expand Down Expand Up @@ -169,15 +166,6 @@ func WithSyncSegmentSize(size int) Option {
}
}

// WithTopicName sets the topic name on which the provider announces advertised
// content. Defaults to '/indexer/ingest/mainnet'.
func WithTopicName(topic string) Option {
return func(c *config) error {
c.topic = topic
return nil
}
}

// WithEntriesDepthLimit sets the depth limit when syncing an
// advertisement entries chain. Setting to 0 means no limit.
func WithEntriesDepthLimit(depthLimit int64) Option {
Expand Down
17 changes: 14 additions & 3 deletions gc/reaper/reaper.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"io"
"io/fs"
"log/slog"
"os"
"path"
"path/filepath"
Expand All @@ -34,12 +35,24 @@ import (
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/gologshim"

// Import so these codecs get registered.
_ "github.com/ipld/go-ipld-prime/codec/dagcbor"
_ "github.com/ipld/go-ipld-prime/codec/dagjson"
)

func init() {
// Set go-log's slog handler as the application-wide default. This ensures
// all slog-based logging uses go-log's formatting.
slog.SetDefault(slog.New(logging.SlogHandler()))

// Wire go-log's slog bridge to go-libp2p's gologshim. This provides
// go-libp2p loggers with the "logger" attribute for per-subsystem level
// control.
gologshim.SetDefaultHandler(logging.SlogHandler())
}

var log = logging.Logger("ipni-gc")

var (
Expand Down Expand Up @@ -98,7 +111,6 @@ type Reaper struct {
stats GCStats
statsMutex sync.Mutex
syncSegSize int
topic string
}

type scythe struct {
Expand Down Expand Up @@ -193,7 +205,6 @@ func New(idxr indexer.Interface, fileStore filestore.Interface, options ...Optio
pcache: opts.pcache,
segmentSize: opts.segmentSize,
syncSegSize: opts.syncSegSize,
topic: opts.topic,
}, nil
}

Expand Down Expand Up @@ -616,7 +627,7 @@ func (r *Reaper) makeSubscriber(dstoreTmp datastore.Batching) (*dagsync.Subscrib
}, nil
}

return dagsync.NewSubscriber(r.host, dstoreTmp, linksys, r.topic,
return dagsync.NewSubscriber(r.host, linksys,
dagsync.HttpTimeout(r.httpTimeout),
dagsync.SegmentDepthLimit(int64(r.syncSegSize)))
}
Expand Down
5 changes: 0 additions & 5 deletions gc/reaper/reaper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ import (
"github.com/stretchr/testify/require"
)

const testTopic = "/indexer/ingest/test"

var pid1, pid2, pid3 peer.ID
var adCid cid.Cid

Expand Down Expand Up @@ -68,7 +66,6 @@ func TestReaper(t *testing.T) {
reaper.WithDatastoreDir(dsDir),
reaper.WithDatastoreTempDir(dsTmpDir),
reaper.WithPCache(pc),
reaper.WithTopicName(testTopic),
)
require.NoError(t, err)
defer gc.Close()
Expand All @@ -94,7 +91,6 @@ func TestReaper(t *testing.T) {
reaper.WithDatastoreTempDir(dsTmpDir),
reaper.WithDeleteNotFound(true),
reaper.WithPCache(pc),
reaper.WithTopicName(testTopic),
)
require.NoError(t, err)
defer gc2.Close()
Expand All @@ -116,7 +112,6 @@ func TestReaper(t *testing.T) {
reaper.WithDatastoreTempDir(dsTmpDir),
reaper.WithDeleteNotFound(true),
reaper.WithPCache(pc),
reaper.WithTopicName(testTopic),
)
require.NoError(t, err)

Expand Down
Loading
Loading