diff --git a/cmd/downloader/main.go b/cmd/downloader/main.go index 3145bcbb5cc..7ab9deb4a9b 100644 --- a/cmd/downloader/main.go +++ b/cmd/downloader/main.go @@ -544,13 +544,13 @@ func manifestVerify(ctx context.Context, logger log.Logger) error { } if strings.HasPrefix(webseed, "v1:") { - withoutVerisonPrefix := webseed[3:] - if !strings.HasPrefix(withoutVerisonPrefix, "https:") { + withoutVersionPrefix := webseed[3:] + if !strings.HasPrefix(withoutVersionPrefix, "https:") { continue } - uri, err := url.ParseRequestURI(withoutVerisonPrefix) + uri, err := url.ParseRequestURI(withoutVersionPrefix) if err != nil { - log.Warn("[webseed] can't parse url", "err", err, "url", withoutVerisonPrefix) + log.Warn("[webseed] can't parse url", "err", err, "url", withoutVersionPrefix) continue } webseedHttpProviders = append(webseedHttpProviders, uri) diff --git a/cmd/utils/app/flags.go b/cmd/utils/app/flags.go new file mode 100644 index 00000000000..325835a30ab --- /dev/null +++ b/cmd/utils/app/flags.go @@ -0,0 +1,51 @@ +package app + +import ( + "fmt" + "os" + "runtime" + + "github.com/anacrolix/missinggo/v2/panicif" + "github.com/erigontech/erigon/db/datadir" + "github.com/erigontech/erigon/db/snapcfg" + "github.com/urfave/cli/v2" +) + +var ( + preverifiedFlag = cli.StringFlag{ + Name: "preverified", + Category: "Snapshots", + Usage: "preverified to use (remote, local, embedded)", + Value: "remote", + } + concurrencyFlag = cli.IntFlag{ + Name: "concurrency", + Usage: "level of concurrency for some operation", + Value: runtime.GOMAXPROCS(0), + } + verifyChainFlag = cli.StringFlag{ + Name: "verify.chain", + Usage: "name of the chain to verify", + } +) + +func handlePreverifiedFlag(cliCtx *cli.Context, dirs *datadir.Dirs) (err error) { + switch value := preverifiedFlag.Get(cliCtx); value { + case "local": + panicif.Err(os.Setenv(snapcfg.RemotePreverifiedEnvKey, dirs.PreverifiedPath())) + fallthrough + case "remote": + err = snapcfg.LoadRemotePreverified(cliCtx.Context) + if err != nil { + // TODO: Check if we should continue? What if we ask for a git revision and + // can't get it? What about a branch? Can we reset to the embedded snapshot hashes? + return fmt.Errorf("loading remote preverified snapshots: %w", err) + } + case "embedded": + // Should already be loaded. + default: + err = fmt.Errorf("invalid preverified flag value %q", value) + return + } + return +} diff --git a/cmd/utils/app/make_app.go b/cmd/utils/app/make_app.go index 2de4d8e23a5..985b128ae20 100644 --- a/cmd/utils/app/make_app.go +++ b/cmd/utils/app/make_app.go @@ -70,6 +70,13 @@ func MakeApp(name string, action cli.ActionFunc, cliFlags []cli.Flag) *cli.App { return action(context) } + app.Before = func(c *cli.Context) error { + var cancel context.CancelFunc + c.Context, cancel = context.WithCancel(c.Context) + go debug.ListenSignals(cancel, log.Root()) + return nil + } + app.Flags = appFlags(cliFlags) app.After = func(ctx *cli.Context) error { diff --git a/cmd/utils/app/reset-datadir.go b/cmd/utils/app/reset-datadir.go index a07588d6dc6..982939651a5 100644 --- a/cmd/utils/app/reset-datadir.go +++ b/cmd/utils/app/reset-datadir.go @@ -6,7 +6,6 @@ import ( "os" g "github.com/anacrolix/generics" - "github.com/anacrolix/missinggo/v2/panicif" "github.com/erigontech/erigon/common/dir" "github.com/erigontech/erigon/db/datadir/reset" "github.com/erigontech/erigon/db/kv" @@ -37,12 +36,6 @@ var ( Aliases: []string{"n"}, Category: "Reset", } - preverifiedFlag = cli.StringFlag{ - Name: "preverified", - Category: "Reset", - Usage: "preverified to use (remote, local, embedded)", - Value: "remote", - } ) // Checks if a value was explicitly set in the given CLI command context or any of its parents. In @@ -96,21 +89,8 @@ func resetCliAction(cliCtx *cli.Context) (err error) { } defer unlock() - switch value := preverifiedFlag.Get(cliCtx); value { - case "local": - panicif.Err(os.Setenv(snapcfg.RemotePreverifiedEnvKey, dirs.PreverifiedPath())) - fallthrough - case "remote": - err = snapcfg.LoadRemotePreverified(cliCtx.Context) - if err != nil { - // TODO: Check if we should continue? What if we ask for a git revision and - // can't get it? What about a branch? Can we reset to the embedded snapshot hashes? - return fmt.Errorf("loading remote preverified snapshots: %w", err) - } - case "embedded": - // Should already be loaded. - default: - err = fmt.Errorf("invalid preverified flag value %q", value) + err = handlePreverifiedFlag(cliCtx, &dirs) + if err != nil { return } diff --git a/cmd/utils/app/snapshots_cmd.go b/cmd/utils/app/snapshots_cmd.go index a75c37f6375..95b916aa40e 100644 --- a/cmd/utils/app/snapshots_cmd.go +++ b/cmd/utils/app/snapshots_cmd.go @@ -500,6 +500,16 @@ var snapshotCommand = cli.Command{ }, }, }, + { + Name: "preverified", + Action: verifyWebseeds, + Flags: []cli.Flag{ + &utils.DataDirFlag, + &preverifiedFlag, + &verifyChainFlag, + &concurrencyFlag, + }, + }, }, } diff --git a/cmd/utils/app/verify-webseeds.go b/cmd/utils/app/verify-webseeds.go new file mode 100644 index 00000000000..039d459667f --- /dev/null +++ b/cmd/utils/app/verify-webseeds.go @@ -0,0 +1,217 @@ +package app + +import ( + "context" + "crypto/sha1" + "errors" + "fmt" + "io" + "iter" + "net/http" + "sync/atomic" + "unsafe" + + g "github.com/anacrolix/generics" + "github.com/anacrolix/generics/result" + "github.com/anacrolix/missinggo/v2/panicif" + "github.com/anacrolix/torrent/metainfo" + "github.com/erigontech/erigon/cmd/utils" + "github.com/erigontech/erigon/common/log/v3" + "github.com/erigontech/erigon/db/datadir" + "github.com/erigontech/erigon/db/downloader" + "github.com/erigontech/erigon/db/snapcfg" + "github.com/urfave/cli/v2" + "golang.org/x/sync/errgroup" +) + +func verifyWebseeds(cliCtx *cli.Context) (err error) { + var dirs datadir.Dirs + if cliCtx.IsSet(utils.DataDirFlag.Name) { + dirs = datadir.Open(cliCtx.String(utils.DataDirFlag.Name)) + } + err = handlePreverifiedFlag(cliCtx, &dirs) + if err != nil { + return + } + allPreverified := snapcfg.GetAllCurrentPreverified() + for chain, webseeds := range snapcfg.KnownWebseeds { + log.Info("loaded preverified", "chain", chain, "webseeds", webseeds) + panicif.False(g.MapContains(allPreverified, chain)) + } + ctx := cliCtx.Context + httpClient := &http.Client{ + Transport: downloader.MakeWebseedRoundTripper(), + } + logger := log.Root() + checker := webseedChecker{ + ctx: ctx, + logger: logger, + httpClient: httpClient, + } + defer func() { + logger.Info("finished check", + "total bytes read", checker.totalBytesRead.Load(), + "total request count", checker.totalRequestCount.Load()) + }() + items, ctx := errgroup.WithContext(ctx) + items.SetLimit(concurrencyFlag.Get(cliCtx)) + var targetChain g.Option[string] + if cliCtx.IsSet(verifyChainFlag.Name) { + targetChain.Set(verifyChainFlag.Get(cliCtx)) + } +addItems: + for chain, webseeds := range snapcfg.KnownWebseeds { + logger.Debug("maybe skip chain", "target", targetChain, "chain", chain) + if targetChain.Ok && targetChain.Value != chain { + continue + } + var baseUrl string + err := errors.New("no valid webseeds") + for _, webseed := range webseeds { + baseUrl, err = snapcfg.WebseedToUrl(webseed) + if err == nil { + break + } + } + panicif.Err(err) + preverified := g.MapMustGet(allPreverified, chain) + panicif.True(preverified.Local) + for _, item := range preverified.Items { + if checker.ctx.Err() != nil { + break addItems + } + items.Go(func() (err error) { + for range 3 { + if err != nil { + logger.Warn("retrying failed preverified item check", "baseUrl", baseUrl, "item", item, "err", err) + } + var done bool + done, err = checker.checkPreverifiedItem(baseUrl, item) + if done || ctx.Err() != nil { + return err + } + panicif.True(err == nil && !done) + } + return + }) + } + } + return items.Wait() +} + +type webseedChecker struct { + ctx context.Context + logger log.Logger + httpClient *http.Client + totalBytesRead atomic.Int64 + totalRequestCount atomic.Int64 + cache webseedCheckerCache +} + +type webseedCheckerCache struct { + DataEtags map[string][]metainfo.Hash +} + +func (me *webseedChecker) checkPreverifiedItem(baseUrl string, item snapcfg.PreverifiedItem) (done bool, err error) { + ctx := me.ctx + httpClient := me.httpClient + me.logger.Debug("checking preverified item", "webseed", baseUrl, "item", item) + mi, err := downloader.GetMetainfoFromWebseed(ctx, baseUrl+"/", item.Name, httpClient, io.Discard) + if err != nil { + err = fmt.Errorf("getting metainfo from webseed: %w", err) + return + } + me.totalRequestCount.Add(1) + info, err := mi.UnmarshalInfo() + panicif.Err(err) + me.logger.Debug("got metainfo", "piece length", info.PieceLength, "length", info.Length) + dataUrl := baseUrl + "/" + item.Name + req, err := http.NewRequestWithContext(ctx, http.MethodGet, dataUrl, nil) + panicif.Err(err) + resp, err := httpClient.Do(req) + if err != nil { + return + } + defer resp.Body.Close() + me.totalRequestCount.Add(1) + if resp.StatusCode != http.StatusOK { + err = fmt.Errorf("bad http response status code: %v", resp.StatusCode) + return + } + me.logger.Debug("item response", "etag", resp.Header.Get("etag"), "content length", resp.ContentLength) + done, err = me.matchHashes(&info, resp) + if err == nil { + me.logger.Info("snapshot matches", + "url", dataUrl, + //"name", item.Name, + "content length", resp.ContentLength, + //"etag", resp.Header.Get("etag"), + ) + } + return +} + +// Keep this open to multiple bodies, and matching ETags. +func (me *webseedChecker) matchHashes(info *metainfo.Info, resp *http.Response) ( + done bool, // Trying again won't change anything + err error, +) { + if resp.ContentLength != -1 { + panicif.NotEq(resp.ContentLength, info.Length) + } + nextHash, stop := iter.Pull(me.yieldHashes(resp.Body, info.PieceLength)) + defer stop() + for i := range info.NumPieces() { + p := info.Piece(i) + hr, ok := nextHash() + if !ok { + err = me.ctx.Err() + if err != nil { + return + } + } + panicif.False(ok) + if hr.Err != nil { + err = fmt.Errorf("getting hash for piece %v: %w", i, hr.Err) + return + } + h := hr.Ok + panicif.NotEq(h, p.V1Hash().Unwrap()) + log.Debug("matched piece hash", "hash", h, "url", resp.Request.URL) + } + // Trying again won't change anything. + done = true + panicif.Err(err) + _, ok := nextHash() + if ok { + err = errors.New("response longer than expected") + } + return +} + +func (me *webseedChecker) yieldHashes(r io.Reader, pieceLength int64) iter.Seq[g.Result[metainfo.Hash]] { + return func(yield func(g.Result[metainfo.Hash]) bool) { + h := sha1.New() + for { + h.Reset() + n, err := io.CopyN(h, r, pieceLength) + me.totalBytesRead.Add(n) + if err != nil { + if err != io.EOF { + yield(result.Err[metainfo.Hash](err)) + return + } + if n == 0 { + return + } + } + var mh metainfo.Hash + sumRet := h.Sum(mh[:0]) + panicif.NotEq(unsafe.SliceData(mh[:]), unsafe.SliceData(sumRet)) + if !yield(result.Ok(mh)) { + return + } + } + + } +} diff --git a/db/downloader/downloader.go b/db/downloader/downloader.go index 393f7afe768..4e22f5b38a6 100644 --- a/db/downloader/downloader.go +++ b/db/downloader/downloader.go @@ -148,8 +148,7 @@ func (me *AggStats) AllTorrentsComplete() bool { type requestHandler struct { // Separated this rather than embedded it to ensure our wrapper RoundTrip is called. - rt http.RoundTripper - downloader *Downloader + rt http.RoundTripper } var cloudflareHeaders = http.Header{ @@ -266,22 +265,18 @@ func configureHttp2(t *http.Transport) { h2t.MaxReadFrameSize = 1 << 20 // Same as net/http.Transport.ReadBufferSize? } -func New(ctx context.Context, cfg *downloadercfg.Cfg, logger log.Logger) (*Downloader, error) { +func MakeWebseedRoundTripper() http.RoundTripper { requestHandler := &requestHandler{} - { - requestHandler.rt = makeTransport() - cfg.ClientConfig.WebTransport = requestHandler - // requestHandler.downloader is set later. - } + requestHandler.rt = makeTransport() + return requestHandler +} + +func New(ctx context.Context, cfg *downloadercfg.Cfg, logger log.Logger) (*Downloader, error) { + cfg.ClientConfig.WebTransport = MakeWebseedRoundTripper() metainfoSourcesHttpClient := func() *http.Client { - tr := makeTransport() // Separate transport so webseed requests and metainfo fetching don't block each other. - // Additionally, we can tune for their specific workloads. return &http.Client{ - Transport: roundTripperFunc(func(req *http.Request) (*http.Response, error) { - insertCloudflareHeaders(req) - return tr.RoundTrip(req) - }), + Transport: MakeWebseedRoundTripper(), } }() cfg.ClientConfig.MetainfoSourcesClient = metainfoSourcesHttpClient @@ -335,8 +330,6 @@ func New(ctx context.Context, cfg *downloadercfg.Cfg, logger log.Logger) (*Downl d.logConfig() - requestHandler.downloader = d - d.ctx, d.stop = context.WithCancel(context.Background()) return d, nil @@ -1026,7 +1019,7 @@ func (d *Downloader) applyMetainfo( func (d *Downloader) webseedMetainfoUrls(snapshotName string) iter.Seq[string] { return func(yield func(string) bool) { for base := range d.webSeedUrlStrs() { - if !yield(d.webseedMetainfoUrl(base, snapshotName)) { + if !yield(webseedMetainfoUrl(base, snapshotName)) { return } } @@ -1039,7 +1032,8 @@ func (d *Downloader) fetchMetainfoFromWebseeds(ctx context.Context, name string, for base := range d.webSeedUrlStrs() { buf.Reset() var mi metainfo.MetaInfo - mi, err = d.fetchMetainfoFromWebseed(ctx, name, base, &buf) + var w io.Writer = &buf + mi, err = GetMetainfoFromWebseed(ctx, base, base, d.metainfoHttpClient, w) if err != nil { d.log(log.LvlDebug, "error fetching metainfo from webseed", "err", err, "name", name, "webseed", base) // Whither error? @@ -1060,20 +1054,21 @@ func (d *Downloader) fetchMetainfoFromWebseeds(ctx context.Context, name string, return } -func (d *Downloader) webseedMetainfoUrl(webseedUrlBase, snapshotName string) string { +func webseedMetainfoUrl(webseedUrlBase, snapshotName string) string { return webseedUrlBase + snapshotName + ".torrent" } -func (d *Downloader) fetchMetainfoFromWebseed( +func GetMetainfoFromWebseed( ctx context.Context, - name string, webseedUrlBase string, + name string, + httpClient *http.Client, w io.Writer, // Receives the serialized metainfo ) ( mi metainfo.MetaInfo, err error, ) { - url := d.webseedMetainfoUrl(webseedUrlBase, name) + url := webseedMetainfoUrl(webseedUrlBase, name) defer func() { if err != nil { err = fmt.Errorf("fetching from %q: %w", url, err) @@ -1083,15 +1078,15 @@ func (d *Downloader) fetchMetainfoFromWebseed( if err != nil { return } - resp, err := d.metainfoHttpClient.Do(req) + resp, err := httpClient.Do(req) if err != nil { return } + defer resp.Body.Close() if resp.StatusCode != http.StatusOK { err = fmt.Errorf("unexpected http response status code: %v", resp.StatusCode) return } - defer resp.Body.Close() tr := io.TeeReader(resp.Body, w) dec := bencode.NewDecoder(tr) err = dec.Decode(&mi) diff --git a/db/downloader/downloadercfg/downloadercfg.go b/db/downloader/downloadercfg/downloadercfg.go index 2ef208af296..145ebdb493e 100644 --- a/db/downloader/downloadercfg/downloadercfg.go +++ b/db/downloader/downloadercfg/downloadercfg.go @@ -258,14 +258,14 @@ func New( cfg.WebSeedUrls = make([]string, 0, len(webseeds)) for _, webseed := range webseeds { - if after, ok := strings.CutPrefix(webseed, "v1:"); ok { - // WebSeed URLs must have a trailing slash if the implementation should append the file - // name. In Erigon they don't, in anacrolix/torrent they do for our use case. - cfg.WebSeedUrls = append(cfg.WebSeedUrls, after+"/") - } else { - err = fmt.Errorf("unhandled webseed %q", webseed) + var after string + after, err = snapcfg.WebseedToUrl(webseed) + if err != nil { return } + // WebSeed URLs must have a trailing slash if the implementation should append the file + // name. In Erigon they don't, in anacrolix/torrent they do for our use case. + cfg.WebSeedUrls = append(cfg.WebSeedUrls, after+"/") } for value := range opts.WebseedDownloadRateLimit.Iter { diff --git a/db/snapcfg/util.go b/db/snapcfg/util.go index cf1c0a88dd5..745d28f48bd 100644 --- a/db/snapcfg/util.go +++ b/db/snapcfg/util.go @@ -22,6 +22,7 @@ import ( _ "embed" "errors" "fmt" + "maps" "os" "path/filepath" "slices" @@ -69,6 +70,12 @@ var registry = &preverifiedRegistry{ cached: make(map[string]*Cfg), } +func (r *preverifiedRegistry) All() map[string]Preverified { + r.mu.RLock() + defer r.mu.RUnlock() + return maps.Clone(r.data) +} + func (r *preverifiedRegistry) Get(networkName string) (*Cfg, bool) { r.mu.RLock() if cfg, ok := r.cached[networkName]; ok { @@ -577,3 +584,19 @@ func GetToml(networkName string) []byte { return nil } } + +// Gets the current preverified for all chains. +func GetAllCurrentPreverified() map[string]Preverified { + return registry.All() +} + +// Converts webseed value to URL. Mostly this is just stripping v1: for now, as nothing else is in +// active use. +func WebseedToUrl(s string) (_ string, err error) { + after, ok := strings.CutPrefix(s, "v1:") + if !ok { + err = fmt.Errorf("unhandled webseed %q", s) + return + } + return after, nil +} diff --git a/node/debug/signal.go b/node/debug/signal.go index 095a1018459..8c0da3dc31d 100644 --- a/node/debug/signal.go +++ b/node/debug/signal.go @@ -19,7 +19,6 @@ package debug import ( - "io" "os" "os/signal" "runtime/pprof" @@ -30,7 +29,7 @@ import ( "github.com/erigontech/erigon/common/log/v3" ) -func ListenSignals(stack io.Closer, logger log.Logger) { +func ListenSignals(handle func(), logger log.Logger) { sigc := make(chan os.Signal, 1) signal.Notify(sigc, unix.SIGINT, unix.SIGTERM) dbg.GetSigC(&sigc) @@ -42,9 +41,7 @@ func ListenSignals(stack io.Closer, logger log.Logger) { select { case <-sigc: logger.Info("Got interrupt, shutting down...") - if stack != nil { - go stack.Close() - } + go handle() for i := 10; i > 0; i-- { <-sigc if i > 1 { diff --git a/node/node.go b/node/node.go index e9ecdf3fb9c..7eb462118f2 100644 --- a/node/node.go +++ b/node/node.go @@ -411,5 +411,5 @@ func StartNode(stack *Node) { utils.Fatalf("Error starting protocol stack: %v", err) } - go debug.ListenSignals(stack, stack.logger) + go debug.ListenSignals(func() { stack.Close() }, stack.logger) }