diff --git a/polygon/heimdall/waypoint.go b/polygon/heimdall/waypoint.go index d98cd55ad44..309ff89d7bf 100644 --- a/polygon/heimdall/waypoint.go +++ b/polygon/heimdall/waypoint.go @@ -7,9 +7,6 @@ import ( libcommon "github.com/ledgerwatch/erigon-lib/common" ) -// checkpoints and milestones are both hash hashAccumulators as defined -// here https://www.ethportal.net/concepts/hash-accumulators - type Waypoint interface { fmt.Stringer StartBlock() *big.Int diff --git a/polygon/sync/accumulated_headers_verifier.go b/polygon/sync/accumulated_headers_verifier.go index 89db9c988bc..97b8043691c 100644 --- a/polygon/sync/accumulated_headers_verifier.go +++ b/polygon/sync/accumulated_headers_verifier.go @@ -9,15 +9,15 @@ import ( "github.com/ledgerwatch/erigon/polygon/heimdall" ) -type AccumulatedHeadersVerifier func(hashAccumulator heimdall.Waypoint, headers []*types.Header) error +type AccumulatedHeadersVerifier func(waypoint heimdall.Waypoint, headers []*types.Header) error -func VerifyAccumulatedHeaders(accumulator heimdall.Waypoint, headers []*types.Header) error { +func VerifyAccumulatedHeaders(waypoint heimdall.Waypoint, headers []*types.Header) error { rootHash, err := bor.ComputeHeadersRootHash(headers) if err != nil { - return fmt.Errorf("VerifyStatePointHeaders: failed to compute headers root hash %w", err) + return fmt.Errorf("VerifyAccumulatedHeaders: failed to compute headers root hash %w", err) } - if !bytes.Equal(rootHash, accumulator.RootHash().Bytes()) { - return fmt.Errorf("VerifyStatePointHeaders: bad headers root hash") + if !bytes.Equal(rootHash, waypoint.RootHash().Bytes()) { + return fmt.Errorf("VerifyAccumulatedHeaders: bad headers root hash") } return nil } diff --git a/polygon/sync/header_downloader.go b/polygon/sync/header_downloader.go index 6ca3e4403ba..8f3cfe3c2f2 100644 --- a/polygon/sync/header_downloader.go +++ b/polygon/sync/header_downloader.go @@ -20,26 +20,19 @@ import ( const headerDownloaderLogPrefix = "HeaderDownloader" func NewHeaderDownloader(logger log.Logger, sentry Sentry, heimdall heimdall.Heimdall, verify AccumulatedHeadersVerifier) *HeaderDownloader { - statePointHeadersMemo, err := lru.New[common.Hash, []*types.Header](sentry.MaxPeers()) - if err != nil { - panic(err) - } - return &HeaderDownloader{ - logger: logger, - sentry: sentry, - heimdall: heimdall, - verify: verify, - statePointHeadersMemo: statePointHeadersMemo, + logger: logger, + sentry: sentry, + heimdall: heimdall, + verify: verify, } } type HeaderDownloader struct { - logger log.Logger - sentry Sentry - heimdall heimdall.Heimdall - verify AccumulatedHeadersVerifier - statePointHeadersMemo *lru.Cache[common.Hash, []*types.Header] // statePoint.rootHash->[headers part of state point] + logger log.Logger + sentry Sentry + heimdall heimdall.Heimdall + verify AccumulatedHeadersVerifier } func (hd *HeaderDownloader) DownloadUsingCheckpoints(ctx context.Context, store CheckpointStore, start uint64) error { @@ -70,8 +63,14 @@ func (hd *HeaderDownloader) DownloadUsingMilestones(ctx context.Context, store M return nil } -func (hd *HeaderDownloader) downloadUsingWaypoints(ctx context.Context, store HeaderStore, hashAccumulators heimdall.Waypoints) error { - for len(hashAccumulators) > 0 { +func (hd *HeaderDownloader) downloadUsingWaypoints(ctx context.Context, store HeaderStore, waypoints heimdall.Waypoints) error { + // waypoint rootHash->[headers part of waypoint] + waypointHeadersMemo, err := lru.New[common.Hash, []*types.Header](hd.sentry.MaxPeers()) + if err != nil { + return err + } + + for len(waypoints) > 0 { allPeers := hd.sentry.PeersWithBlockNumInfo() if len(allPeers) == 0 { hd.logger.Warn(fmt.Sprintf("[%s] zero peers, will try again", headerDownloaderLogPrefix)) @@ -79,12 +78,12 @@ func (hd *HeaderDownloader) downloadUsingWaypoints(ctx context.Context, store He } sort.Sort(allPeers) // sort by block num in asc order - peers := hd.choosePeers(allPeers, hashAccumulators) + peers := hd.choosePeers(allPeers, waypoints) if len(peers) == 0 { hd.logger.Warn( fmt.Sprintf("[%s] can't use any peers to sync, will try again", headerDownloaderLogPrefix), - "start", hashAccumulators[0].StartBlock(), - "end", hashAccumulators[len(hashAccumulators)-1].EndBlock(), + "start", waypoints[0].StartBlock(), + "end", waypoints[len(waypoints)-1].EndBlock(), "minPeerBlockNum", allPeers[0].BlockNum, "minPeerID", allPeers[0].ID, ) @@ -92,53 +91,53 @@ func (hd *HeaderDownloader) downloadUsingWaypoints(ctx context.Context, store He } peerCount := len(peers) - statePointsBatch := hashAccumulators[:peerCount] + waypointsBatch := waypoints[:peerCount] hd.logger.Info( fmt.Sprintf("[%s] downloading headers", headerDownloaderLogPrefix), - "start", statePointsBatch[0].StartBlock(), - "end", statePointsBatch[len(statePointsBatch)-1].EndBlock(), - "kind", reflect.TypeOf(statePointsBatch[0]), + "start", waypointsBatch[0].StartBlock(), + "end", waypointsBatch[len(waypointsBatch)-1].EndBlock(), + "kind", reflect.TypeOf(waypointsBatch[0]), "peerCount", peerCount, ) - headerBatches := make([][]*types.Header, len(statePointsBatch)) - maxStatePointLength := float64(0) + headerBatches := make([][]*types.Header, len(waypointsBatch)) + maxWaypointLength := float64(0) wg := sync.WaitGroup{} - for i, point := range statePointsBatch { - maxStatePointLength = math.Max(float64(point.Length()), maxStatePointLength) + for i, waypoint := range waypointsBatch { + maxWaypointLength = math.Max(float64(waypoint.Length()), maxWaypointLength) wg.Add(1) - go func(i int, statePoint heimdall.Waypoint, peerID string) { + go func(i int, waypoint heimdall.Waypoint, peerID string) { defer wg.Done() - if headers, ok := hd.statePointHeadersMemo.Get(statePoint.RootHash()); ok { + if headers, ok := waypointHeadersMemo.Get(waypoint.RootHash()); ok { headerBatches[i] = headers return } - headers, err := hd.sentry.DownloadHeaders(ctx, statePoint.StartBlock(), statePoint.EndBlock(), peerID) + headers, err := hd.sentry.DownloadHeaders(ctx, waypoint.StartBlock(), waypoint.EndBlock(), peerID) if err != nil { hd.logger.Debug( fmt.Sprintf("[%s] issue downloading headers, will try again", headerDownloaderLogPrefix), "err", err, - "start", statePoint.StartBlock(), - "end", statePoint.EndBlock(), - "rootHash", statePoint.RootHash(), - "kind", reflect.TypeOf(statePoint), + "start", waypoint.StartBlock(), + "end", waypoint.EndBlock(), + "rootHash", waypoint.RootHash(), + "kind", reflect.TypeOf(waypoint), "peerID", peerID, ) return } - if err := hd.verify(statePoint, headers); err != nil { + if err := hd.verify(waypoint, headers); err != nil { hd.logger.Debug( fmt.Sprintf( - "[%s] bad headers received from peer for state point - penalizing and will try again", + "[%s] bad headers received from peer for waypoint - penalizing and will try again", headerDownloaderLogPrefix, ), - "start", statePoint.StartBlock(), - "end", statePoint.EndBlock(), - "rootHash", statePoint.RootHash(), - "kind", reflect.TypeOf(statePoint), + "start", waypoint.StartBlock(), + "end", waypoint.EndBlock(), + "rootHash", waypoint.RootHash(), + "kind", reflect.TypeOf(waypoint), "peerID", peerID, ) @@ -146,22 +145,22 @@ func (hd *HeaderDownloader) downloadUsingWaypoints(ctx context.Context, store He return } - hd.statePointHeadersMemo.Add(statePoint.RootHash(), headers) + waypointHeadersMemo.Add(waypoint.RootHash(), headers) headerBatches[i] = headers - }(i, point, peers[i].ID) + }(i, waypoint, peers[i].ID) } wg.Wait() - headers := make([]*types.Header, 0, int(maxStatePointLength)*peerCount) + headers := make([]*types.Header, 0, int(maxWaypointLength)*peerCount) gapIndex := -1 for i, headerBatch := range headerBatches { if len(headerBatch) == 0 { hd.logger.Debug( fmt.Sprintf("[%s] no headers, will try again", headerDownloaderLogPrefix), - "start", statePointsBatch[i].StartBlock(), - "end", statePointsBatch[i].EndBlock(), - "rootHash", statePointsBatch[i].RootHash(), - "kind", reflect.TypeOf(statePointsBatch[i]), + "start", waypointsBatch[i].StartBlock(), + "end", waypointsBatch[i].EndBlock(), + "rootHash", waypointsBatch[i].RootHash(), + "kind", reflect.TypeOf(waypointsBatch[i]), ) gapIndex = i @@ -172,9 +171,9 @@ func (hd *HeaderDownloader) downloadUsingWaypoints(ctx context.Context, store He } if gapIndex >= 0 { - hashAccumulators = hashAccumulators[gapIndex:] + waypoints = waypoints[gapIndex:] } else { - hashAccumulators = hashAccumulators[len(statePointsBatch):] + waypoints = waypoints[len(waypointsBatch):] } dbWriteStartTime := time.Now() @@ -193,16 +192,16 @@ func (hd *HeaderDownloader) downloadUsingWaypoints(ctx context.Context, store He } // choosePeers assumes peers are sorted in ascending order based on block num -func (hd *HeaderDownloader) choosePeers(peers PeersWithBlockNumInfo, hashAccumulators heimdall.Waypoints) PeersWithBlockNumInfo { +func (hd *HeaderDownloader) choosePeers(peers PeersWithBlockNumInfo, waypoints heimdall.Waypoints) PeersWithBlockNumInfo { var peersIdx int chosenPeers := make(PeersWithBlockNumInfo, 0, len(peers)) - for _, statePoint := range hashAccumulators { + for _, waypoint := range waypoints { if peersIdx >= len(peers) { break } peer := peers[peersIdx] - if peer.BlockNum.Cmp(statePoint.EndBlock()) > -1 { + if peer.BlockNum.Cmp(waypoint.EndBlock()) > -1 { chosenPeers = append(chosenPeers, peer) } diff --git a/polygon/sync/header_downloader_test.go b/polygon/sync/header_downloader_test.go index 95fd61d1cb1..e424199a605 100644 --- a/polygon/sync/header_downloader_test.go +++ b/polygon/sync/header_downloader_test.go @@ -201,8 +201,8 @@ func TestHeaderDownloadWhenInvalidStateThenPenalizePeerAndReDownload(t *testing. var firstTimeInvalidReturned bool firstTimeInvalidReturnedPtr := &firstTimeInvalidReturned test := newHeaderDownloaderTestWithOpts(t, headerDownloaderTestOpts{ - headerVerifier: func(hashAccumulator heimdall.Waypoint, headers []*types.Header) error { - if hashAccumulator.StartBlock().Cmp(new(big.Int).SetUint64(2)) == 0 && !*firstTimeInvalidReturnedPtr { + headerVerifier: func(waypoint heimdall.Waypoint, headers []*types.Header) error { + if waypoint.StartBlock().Cmp(new(big.Int).SetUint64(2)) == 0 && !*firstTimeInvalidReturnedPtr { *firstTimeInvalidReturnedPtr = true return errors.New("invalid checkpoint") }