Skip to content

Commit

Permalink
polygon/sync: cleanup waypoint naming (#9395)
Browse files Browse the repository at this point in the history
  • Loading branch information
taratorio authored Feb 7, 2024
1 parent 66b0aa7 commit 114baa0
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 63 deletions.
3 changes: 0 additions & 3 deletions polygon/heimdall/waypoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@ import (
libcommon "github.com/ledgerwatch/erigon-lib/common"
)

// checkpoints and milestones are both hash hashAccumulators as defined
// here https://www.ethportal.net/concepts/hash-accumulators

type Waypoint interface {
fmt.Stringer
StartBlock() *big.Int
Expand Down
10 changes: 5 additions & 5 deletions polygon/sync/accumulated_headers_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ import (
"github.com/ledgerwatch/erigon/polygon/heimdall"
)

type AccumulatedHeadersVerifier func(hashAccumulator heimdall.Waypoint, headers []*types.Header) error
type AccumulatedHeadersVerifier func(waypoint heimdall.Waypoint, headers []*types.Header) error

func VerifyAccumulatedHeaders(accumulator heimdall.Waypoint, headers []*types.Header) error {
func VerifyAccumulatedHeaders(waypoint heimdall.Waypoint, headers []*types.Header) error {
rootHash, err := bor.ComputeHeadersRootHash(headers)
if err != nil {
return fmt.Errorf("VerifyStatePointHeaders: failed to compute headers root hash %w", err)
return fmt.Errorf("VerifyAccumulatedHeaders: failed to compute headers root hash %w", err)
}
if !bytes.Equal(rootHash, accumulator.RootHash().Bytes()) {
return fmt.Errorf("VerifyStatePointHeaders: bad headers root hash")
if !bytes.Equal(rootHash, waypoint.RootHash().Bytes()) {
return fmt.Errorf("VerifyAccumulatedHeaders: bad headers root hash")
}
return nil
}
105 changes: 52 additions & 53 deletions polygon/sync/header_downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,19 @@ import (
const headerDownloaderLogPrefix = "HeaderDownloader"

func NewHeaderDownloader(logger log.Logger, sentry Sentry, heimdall heimdall.Heimdall, verify AccumulatedHeadersVerifier) *HeaderDownloader {
statePointHeadersMemo, err := lru.New[common.Hash, []*types.Header](sentry.MaxPeers())
if err != nil {
panic(err)
}

return &HeaderDownloader{
logger: logger,
sentry: sentry,
heimdall: heimdall,
verify: verify,
statePointHeadersMemo: statePointHeadersMemo,
logger: logger,
sentry: sentry,
heimdall: heimdall,
verify: verify,
}
}

type HeaderDownloader struct {
logger log.Logger
sentry Sentry
heimdall heimdall.Heimdall
verify AccumulatedHeadersVerifier
statePointHeadersMemo *lru.Cache[common.Hash, []*types.Header] // statePoint.rootHash->[headers part of state point]
logger log.Logger
sentry Sentry
heimdall heimdall.Heimdall
verify AccumulatedHeadersVerifier
}

func (hd *HeaderDownloader) DownloadUsingCheckpoints(ctx context.Context, store CheckpointStore, start uint64) error {
Expand Down Expand Up @@ -70,98 +63,104 @@ func (hd *HeaderDownloader) DownloadUsingMilestones(ctx context.Context, store M
return nil
}

func (hd *HeaderDownloader) downloadUsingWaypoints(ctx context.Context, store HeaderStore, hashAccumulators heimdall.Waypoints) error {
for len(hashAccumulators) > 0 {
func (hd *HeaderDownloader) downloadUsingWaypoints(ctx context.Context, store HeaderStore, waypoints heimdall.Waypoints) error {
// waypoint rootHash->[headers part of waypoint]
waypointHeadersMemo, err := lru.New[common.Hash, []*types.Header](hd.sentry.MaxPeers())
if err != nil {
return err
}

for len(waypoints) > 0 {
allPeers := hd.sentry.PeersWithBlockNumInfo()
if len(allPeers) == 0 {
hd.logger.Warn(fmt.Sprintf("[%s] zero peers, will try again", headerDownloaderLogPrefix))
continue
}

sort.Sort(allPeers) // sort by block num in asc order
peers := hd.choosePeers(allPeers, hashAccumulators)
peers := hd.choosePeers(allPeers, waypoints)
if len(peers) == 0 {
hd.logger.Warn(
fmt.Sprintf("[%s] can't use any peers to sync, will try again", headerDownloaderLogPrefix),
"start", hashAccumulators[0].StartBlock(),
"end", hashAccumulators[len(hashAccumulators)-1].EndBlock(),
"start", waypoints[0].StartBlock(),
"end", waypoints[len(waypoints)-1].EndBlock(),
"minPeerBlockNum", allPeers[0].BlockNum,
"minPeerID", allPeers[0].ID,
)
continue
}

peerCount := len(peers)
statePointsBatch := hashAccumulators[:peerCount]
waypointsBatch := waypoints[:peerCount]
hd.logger.Info(
fmt.Sprintf("[%s] downloading headers", headerDownloaderLogPrefix),
"start", statePointsBatch[0].StartBlock(),
"end", statePointsBatch[len(statePointsBatch)-1].EndBlock(),
"kind", reflect.TypeOf(statePointsBatch[0]),
"start", waypointsBatch[0].StartBlock(),
"end", waypointsBatch[len(waypointsBatch)-1].EndBlock(),
"kind", reflect.TypeOf(waypointsBatch[0]),
"peerCount", peerCount,
)

headerBatches := make([][]*types.Header, len(statePointsBatch))
maxStatePointLength := float64(0)
headerBatches := make([][]*types.Header, len(waypointsBatch))
maxWaypointLength := float64(0)
wg := sync.WaitGroup{}
for i, point := range statePointsBatch {
maxStatePointLength = math.Max(float64(point.Length()), maxStatePointLength)
for i, waypoint := range waypointsBatch {
maxWaypointLength = math.Max(float64(waypoint.Length()), maxWaypointLength)
wg.Add(1)
go func(i int, statePoint heimdall.Waypoint, peerID string) {
go func(i int, waypoint heimdall.Waypoint, peerID string) {
defer wg.Done()

if headers, ok := hd.statePointHeadersMemo.Get(statePoint.RootHash()); ok {
if headers, ok := waypointHeadersMemo.Get(waypoint.RootHash()); ok {
headerBatches[i] = headers
return
}

headers, err := hd.sentry.DownloadHeaders(ctx, statePoint.StartBlock(), statePoint.EndBlock(), peerID)
headers, err := hd.sentry.DownloadHeaders(ctx, waypoint.StartBlock(), waypoint.EndBlock(), peerID)
if err != nil {
hd.logger.Debug(
fmt.Sprintf("[%s] issue downloading headers, will try again", headerDownloaderLogPrefix),
"err", err,
"start", statePoint.StartBlock(),
"end", statePoint.EndBlock(),
"rootHash", statePoint.RootHash(),
"kind", reflect.TypeOf(statePoint),
"start", waypoint.StartBlock(),
"end", waypoint.EndBlock(),
"rootHash", waypoint.RootHash(),
"kind", reflect.TypeOf(waypoint),
"peerID", peerID,
)
return
}

if err := hd.verify(statePoint, headers); err != nil {
if err := hd.verify(waypoint, headers); err != nil {
hd.logger.Debug(
fmt.Sprintf(
"[%s] bad headers received from peer for state point - penalizing and will try again",
"[%s] bad headers received from peer for waypoint - penalizing and will try again",
headerDownloaderLogPrefix,
),
"start", statePoint.StartBlock(),
"end", statePoint.EndBlock(),
"rootHash", statePoint.RootHash(),
"kind", reflect.TypeOf(statePoint),
"start", waypoint.StartBlock(),
"end", waypoint.EndBlock(),
"rootHash", waypoint.RootHash(),
"kind", reflect.TypeOf(waypoint),
"peerID", peerID,
)

hd.sentry.Penalize(peerID)
return
}

hd.statePointHeadersMemo.Add(statePoint.RootHash(), headers)
waypointHeadersMemo.Add(waypoint.RootHash(), headers)
headerBatches[i] = headers
}(i, point, peers[i].ID)
}(i, waypoint, peers[i].ID)
}

wg.Wait()
headers := make([]*types.Header, 0, int(maxStatePointLength)*peerCount)
headers := make([]*types.Header, 0, int(maxWaypointLength)*peerCount)
gapIndex := -1
for i, headerBatch := range headerBatches {
if len(headerBatch) == 0 {
hd.logger.Debug(
fmt.Sprintf("[%s] no headers, will try again", headerDownloaderLogPrefix),
"start", statePointsBatch[i].StartBlock(),
"end", statePointsBatch[i].EndBlock(),
"rootHash", statePointsBatch[i].RootHash(),
"kind", reflect.TypeOf(statePointsBatch[i]),
"start", waypointsBatch[i].StartBlock(),
"end", waypointsBatch[i].EndBlock(),
"rootHash", waypointsBatch[i].RootHash(),
"kind", reflect.TypeOf(waypointsBatch[i]),
)

gapIndex = i
Expand All @@ -172,9 +171,9 @@ func (hd *HeaderDownloader) downloadUsingWaypoints(ctx context.Context, store He
}

if gapIndex >= 0 {
hashAccumulators = hashAccumulators[gapIndex:]
waypoints = waypoints[gapIndex:]
} else {
hashAccumulators = hashAccumulators[len(statePointsBatch):]
waypoints = waypoints[len(waypointsBatch):]
}

dbWriteStartTime := time.Now()
Expand All @@ -193,16 +192,16 @@ func (hd *HeaderDownloader) downloadUsingWaypoints(ctx context.Context, store He
}

// choosePeers assumes peers are sorted in ascending order based on block num
func (hd *HeaderDownloader) choosePeers(peers PeersWithBlockNumInfo, hashAccumulators heimdall.Waypoints) PeersWithBlockNumInfo {
func (hd *HeaderDownloader) choosePeers(peers PeersWithBlockNumInfo, waypoints heimdall.Waypoints) PeersWithBlockNumInfo {
var peersIdx int
chosenPeers := make(PeersWithBlockNumInfo, 0, len(peers))
for _, statePoint := range hashAccumulators {
for _, waypoint := range waypoints {
if peersIdx >= len(peers) {
break
}

peer := peers[peersIdx]
if peer.BlockNum.Cmp(statePoint.EndBlock()) > -1 {
if peer.BlockNum.Cmp(waypoint.EndBlock()) > -1 {
chosenPeers = append(chosenPeers, peer)
}

Expand Down
4 changes: 2 additions & 2 deletions polygon/sync/header_downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,8 @@ func TestHeaderDownloadWhenInvalidStateThenPenalizePeerAndReDownload(t *testing.
var firstTimeInvalidReturned bool
firstTimeInvalidReturnedPtr := &firstTimeInvalidReturned
test := newHeaderDownloaderTestWithOpts(t, headerDownloaderTestOpts{
headerVerifier: func(hashAccumulator heimdall.Waypoint, headers []*types.Header) error {
if hashAccumulator.StartBlock().Cmp(new(big.Int).SetUint64(2)) == 0 && !*firstTimeInvalidReturnedPtr {
headerVerifier: func(waypoint heimdall.Waypoint, headers []*types.Header) error {
if waypoint.StartBlock().Cmp(new(big.Int).SetUint64(2)) == 0 && !*firstTimeInvalidReturnedPtr {
*firstTimeInvalidReturnedPtr = true
return errors.New("invalid checkpoint")
}
Expand Down

0 comments on commit 114baa0

Please sign in to comment.