@@ -10,6 +10,7 @@ import (
1010 "net/http"
1111 "os"
1212 "path/filepath"
13+ "runtime"
1314 "strings"
1415 "time"
1516
@@ -18,6 +19,7 @@ import (
1819 "github.com/btcsuite/btcd/wire"
1920 "github.com/lightninglabs/neutrino/headerfs"
2021 "golang.org/x/exp/mmap"
22+ "golang.org/x/sync/errgroup"
2123)
2224
2325const (
@@ -45,6 +47,40 @@ const (
4547 DefaultWriteBatchSizePerRegion = 16384
4648)
4749
50+ // OverlapMode defines how to handle headers that overlap between the import
51+ // source and existing headers in the stores.
52+ type OverlapMode uint8
53+
54+ const (
55+ // AppendOnly adds headers only beyond what already exists in each
56+ // store. If import source contains headers that overlap with existing
57+ // data, it will skip those without validation and only append the new
58+ // ones. This mode is optimized for performance with large header
59+ // datasets from trusted sources, avoiding the overhead of additional
60+ // validation between import and target sources while preserving
61+ // existing data. It is the default mode as it efficiently handles
62+ // imports while minimizing the risk of unintended chain
63+ // reorganizations.
64+ AppendOnly OverlapMode = iota
65+
66+ // ValidateAndAppend validates that any overlapping headers match before
67+ // appending new ones. If mismatches are found during validation, the
68+ // import operation is aborted.
69+ ValidateAndAppend
70+ )
71+
72+ // String returns a human-readable representation of the overlap mode.
73+ func (m OverlapMode ) String () string {
74+ switch m {
75+ case AppendOnly :
76+ return "AppendOnlyMode"
77+ case ValidateAndAppend :
78+ return "ValidateAndAppendMode"
79+ default :
80+ return fmt .Sprintf ("OverlapMode(%d)" , m )
81+ }
82+ }
83+
4884// HeaderMetadata contains the metadata about the header source.
4985type HeaderMetadata struct {
5086 BitcoinChainType wire.BitcoinNet
@@ -326,8 +362,8 @@ func (v *FilterHeadersImportSourceValidator) Validate(
326362 iterator HeaderIterator ,
327363 targetChainParams chaincfg.Params ) error {
328364
329- log .Debug ("Skipping filter headers validation - missing access to " +
330- "original compact filters" )
365+ log .Debug ("Skipping filter headers import validation - missing " +
366+ "access to original compact filters" )
331367 return nil
332368}
333369
@@ -772,27 +808,29 @@ func (s *HeadersImport) Import(ctx context.Context) (*ImportResult, error) {
772808 }
773809
774810 // Validate all block headers from import source.
775- log .Debugf ("Validating %d block headers" , metadata .HeadersCount )
811+ log .Debugf ("Validating %d block headers from import source" ,
812+ metadata .HeadersCount )
776813 if err := s .BlockHeadersValidator .Validate (
777814 s .BlockHeadersImportSource .Iterator (
778815 0 , metadata .HeadersCount - 1 ,
779816 ),
780817 s .options .TargetChainParams ,
781818 ); err != nil {
782819 return nil , fmt .Errorf ("failed to validate block " +
783- "headers: %w" , err )
820+ "headers from import source : %w" , err )
784821 }
785822
786823 // Validate all filter headers from import source.
787- log .Debugf ("Validating %d filter headers" , metadata .HeadersCount )
824+ log .Debugf ("Validating %d filter headers from import source" ,
825+ metadata .HeadersCount )
788826 if err := s .FilterHeadersValidator .Validate (
789827 s .FilterHeadersImportSource .Iterator (
790828 0 , metadata .HeadersCount - 1 ,
791829 ),
792830 s .options .TargetChainParams ,
793831 ); err != nil {
794832 return nil , fmt .Errorf ("failed to validate filter " +
795- "headers: %w" , err )
833+ "headers from import source : %w" , err )
796834 }
797835
798836 // Determine processing regions that partition the import task into
@@ -805,9 +843,15 @@ func (s *HeadersImport) Import(ctx context.Context) (*ImportResult, error) {
805843 result .StartHeight = regions .ImportStartHeight
806844 result .EndHeight = regions .ImportEndHeight
807845
808- // TODO(mohamedawnallah): process the overlap region. This mainly
809- // includes a validation strategy for the overlap region between headers
810- // from import and target sources.
846+ // Process overlap headers region.
847+ // Process headers in the overlap region by either skipping validation
848+ // (AppendOnly mode) or validating that headers match exactly between
849+ // import and target sources (ValidateAndAppend mode).
850+ err = s .processOverlapHeadersRegion (ctx , regions .Overlap , result )
851+ if err != nil {
852+ return nil , fmt .Errorf ("headers import failed: overlap region " +
853+ "headers validation failed: %w" , err )
854+ }
811855
812856 // TODO(mohamedawnallah): Process the divergence region. This includes
813857 // strategy/strategies for handling divergence region that may exist in
@@ -837,32 +881,6 @@ func (s *HeadersImport) Import(ctx context.Context) (*ImportResult, error) {
837881 return result , nil
838882}
839883
840- // isTargetFresh checks if the target header stores are in their initial state,
841- // meaning they contain only the genesis header (height 0).
842- func (s * HeadersImport ) isTargetFresh (
843- targetBlockHeaderStore headerfs.BlockHeaderStore ,
844- targetFilterHeaderStore headerfs.FilterHeaderStore ) (bool , error ) {
845-
846- // Get the chain tip from both target stores.
847- _ , blockTipHeight , err := targetBlockHeaderStore .ChainTip ()
848- if err != nil {
849- return false , fmt .Errorf ("failed to get target block header " +
850- "chain tip: %w" , err )
851- }
852-
853- _ , filterTipHeight , err := targetFilterHeaderStore .ChainTip ()
854- if err != nil {
855- return false , fmt .Errorf ("failed to get target filter header " +
856- "chain tip: %w" , err )
857- }
858-
859- if blockTipHeight == 0 && filterTipHeight == 0 {
860- return true , nil
861- }
862-
863- return false , nil
864- }
865-
866884// openSources initializes and opens all required header import sources. It
867885// verifies that all necessary import sources and validators are properly
868886// configured, then opens each source to prepare for data reading. Returns an
@@ -1301,16 +1319,131 @@ func (s *HeadersImport) verifyHeadersAtTargetHeight(height uint32) error {
13011319 sourceFilterHeaderHash , targetFilterHeaderHash )
13021320 }
13031321
1304- log .Debugf ("Headers from %s (block) and %s (filter) verified at " +
1305- "height %d" , s .BlockHeadersImportSource .GetURI (),
1306- s .FilterHeadersImportSource .GetURI (), height )
1322+ log .Debugf ("Headers from import sources verified at height %d" , height )
1323+ return nil
1324+ }
1325+
1326+ // processOverlapHeadersRegion processes the headers in the overlap region by
1327+ // either skipping validation (AppendOnly mode) or validating that headers match
1328+ // exactly between import and target sources (ValidateAndAppend mode). When
1329+ // using ValidateAndAppend mode, if any mismatches are found during validation,
1330+ // the import operation is aborted.
1331+ func (s * HeadersImport ) processOverlapHeadersRegion (ctx context.Context ,
1332+ region HeaderRegion , result * ImportResult ) error {
1333+
1334+ if ! region .Exists {
1335+ return nil
1336+ }
1337+
1338+ log .Infof ("Validating headers in the overlap region between import " +
1339+ "and target sources from heights %d to %d" , region .Start ,
1340+ region .End )
1341+
1342+ switch s .options .OverlapMode {
1343+ case AppendOnly :
1344+ // Skip all headers in overlap region.
1345+ log .Infof ("Skipping validating %d headers (block and filter) " +
1346+ "in overlap region due to %s mode" ,
1347+ region .End - region .Start + 1 , s .options .OverlapMode )
1348+ case ValidateAndAppend :
1349+ // Validate all headers in overlap region match. If mismatches
1350+ // are found during validation, the import operation is aborted.
1351+ if err := s .validateHeadersExactMatch (
1352+ ctx , region .Start , region .End ,
1353+ ); err != nil {
1354+ return fmt .Errorf ("overlap region validation failed: " +
1355+ "%w" , err )
1356+ }
1357+
1358+ log .Infof ("Successfully validated %d headers " +
1359+ "(block and filter) in overlap region" ,
1360+ region .End - region .Start + 1 )
1361+ }
1362+
1363+ result .SkippedCount += int (region .End - region .Start + 1 )
1364+
1365+ return nil
1366+ }
1367+
1368+ // validateHeadersExactMatch validates all headers in the specified range match
1369+ // exactly between import and target sources.
1370+ //
1371+ // The function heuristically processes sequentially for smaller ranges and uses
1372+ // parallel validation for larger ranges to optimize performance.
1373+ func (s * HeadersImport ) validateHeadersExactMatch (ctx context.Context ,
1374+ startHeight , endHeight uint32 ) error {
1375+
1376+ // Calculate the range size.
1377+ rangeSize := endHeight - startHeight + 1
1378+
1379+ // If range is small, heuristically process sequentially.
1380+ if rangeSize <= 100 {
1381+ return s .validateHeadersSequential (startHeight , endHeight )
1382+ }
1383+
1384+ // Create an errgroup with a derived context.
1385+ g , ctx := errgroup .WithContext (ctx )
1386+
1387+ // Set concurrency limit based on CPU cores, but cap reasonably.
1388+ g .SetLimit (min (runtime .NumCPU (), 8 ))
1389+
1390+ // Queue up all the heights to validate.
1391+ for height := startHeight ; height <= endHeight ; height ++ {
1392+ // Add work to the errgroup.
1393+ g .Go (func () error {
1394+ // Check if context has been canceled before starting
1395+ // work.
1396+ select {
1397+ case <- ctx .Done ():
1398+ return ctx .Err ()
1399+ default :
1400+ }
1401+
1402+ // Verify headers at this target height.
1403+ err := s .verifyHeadersAtTargetHeight (height )
1404+ if err != nil {
1405+ return fmt .Errorf ("header verification failed " +
1406+ "at height %d: %w" , height , err )
1407+ }
1408+
1409+ return nil
1410+ })
1411+ }
1412+
1413+ // Wait for all verification goroutines to complete or for any error.
1414+ err := g .Wait ()
1415+ if err != nil {
1416+ return err
1417+ }
1418+
1419+ log .Infof ("Validated %d headers in the overlap region between import " +
1420+ "and target sources from heights %d to %d" ,
1421+ endHeight - startHeight + 1 , startHeight , endHeight )
13071422
13081423 return nil
13091424}
13101425
1311- // processNewHeadersRegion imports headers from the specified region into the
1312- // target stores. This method handles the case where headers exist in the import
1313- // source but not in the target stores.
1426+ // validateHeadersSequential performs sequential validation for smaller ranges.
1427+ //
1428+ // The function validates headers sequentially for smaller ranges to optimize
1429+ // performance.
1430+ func (s * HeadersImport ) validateHeadersSequential (startHeight ,
1431+ endHeight uint32 ) error {
1432+
1433+ for height := startHeight ; height <= endHeight ; height ++ {
1434+ err := s .verifyHeadersAtTargetHeight (height )
1435+ if err != nil {
1436+ return fmt .Errorf ("header verification failed at " +
1437+ "height %d: %w" , height , err )
1438+ }
1439+ }
1440+
1441+ return nil
1442+ }
1443+
1444+ // processNewHeadersRegion processes the headers in the new headers region by
1445+ // importing them into the target stores. This method handles the case where
1446+ // headers exist in the import source but not in the target stores.
13141447func (s * HeadersImport ) processNewHeadersRegion (region HeaderRegion ,
13151448 result * ImportResult ) error {
13161449
@@ -1548,6 +1681,11 @@ type ImportOptions struct {
15481681 // each batch per region. This controls performance characteristics of
15491682 // the import.
15501683 WriteBatchSizePerRegion int
1684+
1685+ // OverlapMode defines how to handle headers that overlap between the
1686+ // import source and existing data in the target stores. Defaults to
1687+ // AppendOnly.
1688+ OverlapMode OverlapMode
15511689}
15521690
15531691// Import executes the header import process with the configuration specified in
0 commit comments