Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions cmd/downloader/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,13 +544,13 @@ func manifestVerify(ctx context.Context, logger log.Logger) error {
}

if strings.HasPrefix(webseed, "v1:") {
withoutVerisonPrefix := webseed[3:]
if !strings.HasPrefix(withoutVerisonPrefix, "https:") {
withoutVersionPrefix := webseed[3:]
if !strings.HasPrefix(withoutVersionPrefix, "https:") {
continue
}
uri, err := url.ParseRequestURI(withoutVerisonPrefix)
uri, err := url.ParseRequestURI(withoutVersionPrefix)
if err != nil {
log.Warn("[webseed] can't parse url", "err", err, "url", withoutVerisonPrefix)
log.Warn("[webseed] can't parse url", "err", err, "url", withoutVersionPrefix)
continue
}
webseedHttpProviders = append(webseedHttpProviders, uri)
Expand Down
51 changes: 51 additions & 0 deletions cmd/utils/app/flags.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package app

import (
"fmt"
"os"
"runtime"

"github.com/anacrolix/missinggo/v2/panicif"
"github.com/erigontech/erigon/db/datadir"
"github.com/erigontech/erigon/db/snapcfg"
"github.com/urfave/cli/v2"
)

var (
preverifiedFlag = cli.StringFlag{
Name: "preverified",
Category: "Snapshots",
Usage: "preverified to use (remote, local, embedded)",
Value: "remote",
}
concurrencyFlag = cli.IntFlag{
Name: "concurrency",
Usage: "level of concurrency for some operation",
Value: runtime.GOMAXPROCS(0),
}
verifyChainFlag = cli.StringFlag{
Name: "verify.chain",
Usage: "name of the chain to verify",
}
)

func handlePreverifiedFlag(cliCtx *cli.Context, dirs *datadir.Dirs) (err error) {
switch value := preverifiedFlag.Get(cliCtx); value {
case "local":
panicif.Err(os.Setenv(snapcfg.RemotePreverifiedEnvKey, dirs.PreverifiedPath()))
fallthrough
case "remote":
err = snapcfg.LoadRemotePreverified(cliCtx.Context)
if err != nil {
// TODO: Check if we should continue? What if we ask for a git revision and
// can't get it? What about a branch? Can we reset to the embedded snapshot hashes?
return fmt.Errorf("loading remote preverified snapshots: %w", err)
}
case "embedded":
// Should already be loaded.
default:
err = fmt.Errorf("invalid preverified flag value %q", value)
return
}
return
}
7 changes: 7 additions & 0 deletions cmd/utils/app/make_app.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,13 @@ func MakeApp(name string, action cli.ActionFunc, cliFlags []cli.Flag) *cli.App {
return action(context)
}

app.Before = func(c *cli.Context) error {
var cancel context.CancelFunc
c.Context, cancel = context.WithCancel(c.Context)
go debug.ListenSignals(cancel, log.Root())
return nil
}

app.Flags = appFlags(cliFlags)

app.After = func(ctx *cli.Context) error {
Expand Down
24 changes: 2 additions & 22 deletions cmd/utils/app/reset-datadir.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"os"

g "github.com/anacrolix/generics"
"github.com/anacrolix/missinggo/v2/panicif"
"github.com/erigontech/erigon/common/dir"
"github.com/erigontech/erigon/db/datadir/reset"
"github.com/erigontech/erigon/db/kv"
Expand Down Expand Up @@ -37,12 +36,6 @@ var (
Aliases: []string{"n"},
Category: "Reset",
}
preverifiedFlag = cli.StringFlag{
Name: "preverified",
Category: "Reset",
Usage: "preverified to use (remote, local, embedded)",
Value: "remote",
}
)

// Checks if a value was explicitly set in the given CLI command context or any of its parents. In
Expand Down Expand Up @@ -96,21 +89,8 @@ func resetCliAction(cliCtx *cli.Context) (err error) {
}
defer unlock()

switch value := preverifiedFlag.Get(cliCtx); value {
case "local":
panicif.Err(os.Setenv(snapcfg.RemotePreverifiedEnvKey, dirs.PreverifiedPath()))
fallthrough
case "remote":
err = snapcfg.LoadRemotePreverified(cliCtx.Context)
if err != nil {
// TODO: Check if we should continue? What if we ask for a git revision and
// can't get it? What about a branch? Can we reset to the embedded snapshot hashes?
return fmt.Errorf("loading remote preverified snapshots: %w", err)
}
case "embedded":
// Should already be loaded.
default:
err = fmt.Errorf("invalid preverified flag value %q", value)
err = handlePreverifiedFlag(cliCtx, &dirs)
if err != nil {
return
}

Expand Down
10 changes: 10 additions & 0 deletions cmd/utils/app/snapshots_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,16 @@ var snapshotCommand = cli.Command{
},
},
},
{
Name: "preverified",
Action: verifyWebseeds,
Flags: []cli.Flag{
&utils.DataDirFlag,
&preverifiedFlag,
&verifyChainFlag,
&concurrencyFlag,
},
},
},
}

Expand Down
217 changes: 217 additions & 0 deletions cmd/utils/app/verify-webseeds.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
package app

import (
"context"
"crypto/sha1"
"errors"
"fmt"
"io"
"iter"
"net/http"
"sync/atomic"
"unsafe"

g "github.com/anacrolix/generics"
"github.com/anacrolix/generics/result"
"github.com/anacrolix/missinggo/v2/panicif"
"github.com/anacrolix/torrent/metainfo"
"github.com/erigontech/erigon/cmd/utils"
"github.com/erigontech/erigon/common/log/v3"
"github.com/erigontech/erigon/db/datadir"
"github.com/erigontech/erigon/db/downloader"
"github.com/erigontech/erigon/db/snapcfg"
"github.com/urfave/cli/v2"
"golang.org/x/sync/errgroup"
)

func verifyWebseeds(cliCtx *cli.Context) (err error) {
var dirs datadir.Dirs
if cliCtx.IsSet(utils.DataDirFlag.Name) {
dirs = datadir.Open(cliCtx.String(utils.DataDirFlag.Name))
}
err = handlePreverifiedFlag(cliCtx, &dirs)
if err != nil {
return
}
allPreverified := snapcfg.GetAllCurrentPreverified()
for chain, webseeds := range snapcfg.KnownWebseeds {
log.Info("loaded preverified", "chain", chain, "webseeds", webseeds)
panicif.False(g.MapContains(allPreverified, chain))
}
ctx := cliCtx.Context
httpClient := &http.Client{
Transport: downloader.MakeWebseedRoundTripper(),
}
logger := log.Root()
checker := webseedChecker{
ctx: ctx,
logger: logger,
httpClient: httpClient,
}
defer func() {
logger.Info("finished check",
"total bytes read", checker.totalBytesRead.Load(),
"total request count", checker.totalRequestCount.Load())
}()
items, ctx := errgroup.WithContext(ctx)
items.SetLimit(concurrencyFlag.Get(cliCtx))
var targetChain g.Option[string]
if cliCtx.IsSet(verifyChainFlag.Name) {
targetChain.Set(verifyChainFlag.Get(cliCtx))
}
addItems:
for chain, webseeds := range snapcfg.KnownWebseeds {
logger.Debug("maybe skip chain", "target", targetChain, "chain", chain)
if targetChain.Ok && targetChain.Value != chain {
continue
}
var baseUrl string
err := errors.New("no valid webseeds")
for _, webseed := range webseeds {
baseUrl, err = snapcfg.WebseedToUrl(webseed)
if err == nil {
break
}
}
panicif.Err(err)
preverified := g.MapMustGet(allPreverified, chain)
panicif.True(preverified.Local)
for _, item := range preverified.Items {
if checker.ctx.Err() != nil {
break addItems
}
items.Go(func() (err error) {
for range 3 {
if err != nil {
logger.Warn("retrying failed preverified item check", "baseUrl", baseUrl, "item", item, "err", err)
}
var done bool
done, err = checker.checkPreverifiedItem(baseUrl, item)
if done || ctx.Err() != nil {
return err
}
panicif.True(err == nil && !done)
}
return
})
}
}
return items.Wait()
}

type webseedChecker struct {
ctx context.Context
logger log.Logger
httpClient *http.Client
totalBytesRead atomic.Int64
totalRequestCount atomic.Int64
cache webseedCheckerCache
}

type webseedCheckerCache struct {
DataEtags map[string][]metainfo.Hash
}

func (me *webseedChecker) checkPreverifiedItem(baseUrl string, item snapcfg.PreverifiedItem) (done bool, err error) {
ctx := me.ctx
httpClient := me.httpClient
me.logger.Debug("checking preverified item", "webseed", baseUrl, "item", item)
mi, err := downloader.GetMetainfoFromWebseed(ctx, baseUrl+"/", item.Name, httpClient, io.Discard)
if err != nil {
err = fmt.Errorf("getting metainfo from webseed: %w", err)
return
}
me.totalRequestCount.Add(1)
info, err := mi.UnmarshalInfo()
panicif.Err(err)
me.logger.Debug("got metainfo", "piece length", info.PieceLength, "length", info.Length)
dataUrl := baseUrl + "/" + item.Name
req, err := http.NewRequestWithContext(ctx, http.MethodGet, dataUrl, nil)
panicif.Err(err)
resp, err := httpClient.Do(req)
if err != nil {
return
}
defer resp.Body.Close()
me.totalRequestCount.Add(1)
if resp.StatusCode != http.StatusOK {
err = fmt.Errorf("bad http response status code: %v", resp.StatusCode)
return
}
me.logger.Debug("item response", "etag", resp.Header.Get("etag"), "content length", resp.ContentLength)
done, err = me.matchHashes(&info, resp)
if err == nil {
me.logger.Info("snapshot matches",
"url", dataUrl,
//"name", item.Name,
"content length", resp.ContentLength,
//"etag", resp.Header.Get("etag"),
)
}
return
}

// Keep this open to multiple bodies, and matching ETags.
func (me *webseedChecker) matchHashes(info *metainfo.Info, resp *http.Response) (
done bool, // Trying again won't change anything
err error,
) {
if resp.ContentLength != -1 {
panicif.NotEq(resp.ContentLength, info.Length)
}
nextHash, stop := iter.Pull(me.yieldHashes(resp.Body, info.PieceLength))
defer stop()
for i := range info.NumPieces() {
p := info.Piece(i)
hr, ok := nextHash()
if !ok {
err = me.ctx.Err()
if err != nil {
return
}
}
panicif.False(ok)
if hr.Err != nil {
err = fmt.Errorf("getting hash for piece %v: %w", i, hr.Err)
return
}
h := hr.Ok
panicif.NotEq(h, p.V1Hash().Unwrap())
log.Debug("matched piece hash", "hash", h, "url", resp.Request.URL)
}
// Trying again won't change anything.
done = true
panicif.Err(err)
_, ok := nextHash()
if ok {
err = errors.New("response longer than expected")
}
return
}

func (me *webseedChecker) yieldHashes(r io.Reader, pieceLength int64) iter.Seq[g.Result[metainfo.Hash]] {
return func(yield func(g.Result[metainfo.Hash]) bool) {
h := sha1.New()
for {
h.Reset()
n, err := io.CopyN(h, r, pieceLength)
me.totalBytesRead.Add(n)
if err != nil {
if err != io.EOF {
yield(result.Err[metainfo.Hash](err))
return
}
if n == 0 {
return
}
}
var mh metainfo.Hash
sumRet := h.Sum(mh[:0])
panicif.NotEq(unsafe.SliceData(mh[:]), unsafe.SliceData(sumRet))
if !yield(result.Ok(mh)) {
return
}
}

}
}
Loading
Loading