From 2fe2e26afa6bcee7bef22d698aa17fd3b8ef1969 Mon Sep 17 00:00:00 2001 From: pk910 Date: Wed, 28 Aug 2024 21:13:52 +0200 Subject: [PATCH] improve canonical chain selection --- .hack/devnet/run.sh | 2 +- indexer/beacon/canonical.go | 196 +++++++++++++++++++++----------- indexer/beacon/forkdetection.go | 4 +- indexer/beacon/indexer.go | 2 +- 4 files changed, 133 insertions(+), 71 deletions(-) diff --git a/.hack/devnet/run.sh b/.hack/devnet/run.sh index 18f860a3..ef087ff0 100755 --- a/.hack/devnet/run.sh +++ b/.hack/devnet/run.sh @@ -37,7 +37,7 @@ server: port: "8080" frontend: enabled: true - debug: false + debug: true pprof: true minimize: false siteName: "Dora the Explorer" diff --git a/indexer/beacon/canonical.go b/indexer/beacon/canonical.go index e72bbf30..065e5d27 100644 --- a/indexer/beacon/canonical.go +++ b/indexer/beacon/canonical.go @@ -2,9 +2,11 @@ package beacon import ( "bytes" + "fmt" "math" "slices" "sort" + "strings" "time" v1 "github.com/attestantio/go-eth2-client/api/v1" @@ -15,10 +17,9 @@ const FarFutureEpoch = phase0.Epoch(math.MaxUint64) // ChainHead represents a head block of the chain. type ChainHead struct { - HeadBlock *Block // The head block of the chain. - AggregatedHeadVotes phase0.Gwei // The aggregated votes of the last 2 epochs for the head block. - LastEpochVotingPercent float64 // The voting percentage in the last epoch. - ThisEpochVotingPercent float64 // The voting percentage in the current epoch. + HeadBlock *Block // The head block of the chain. + AggregatedHeadVotes phase0.Gwei // The aggregated votes of the last 2 epochs for the head block. + PerEpochVotingPercent []float64 // The voting percentage in the last epochs (ascendeing order). } // GetCanonicalHead returns the canonical head block of the chain. @@ -47,12 +48,21 @@ func (indexer *Indexer) GetCanonicalHead(overrideForkId *ForkKey) *Block { if len(chainHeadCandidates) > 0 { sort.Slice(chainHeadCandidates, func(i, j int) bool { - if chainHeadCandidates[i].LastEpochVotingPercent != chainHeadCandidates[j].LastEpochVotingPercent { - return chainHeadCandidates[i].LastEpochVotingPercent > chainHeadCandidates[j].LastEpochVotingPercent + percentagesI := float64(0) + percentagesJ := float64(0) + for k := range chainHeadCandidates[i].PerEpochVotingPercent { + factor := float64(1) + if k == len(chainHeadCandidates[i].PerEpochVotingPercent)-1 { + factor = 0.5 + } + percentagesI += chainHeadCandidates[i].PerEpochVotingPercent[k] * factor + percentagesJ += chainHeadCandidates[j].PerEpochVotingPercent[k] * factor } - if chainHeadCandidates[i].ThisEpochVotingPercent != chainHeadCandidates[j].ThisEpochVotingPercent { - return chainHeadCandidates[i].ThisEpochVotingPercent > chainHeadCandidates[j].ThisEpochVotingPercent + + if percentagesI != percentagesJ { + return percentagesI > percentagesJ } + return chainHeadCandidates[i].HeadBlock.Slot > chainHeadCandidates[j].HeadBlock.Slot }) @@ -70,12 +80,21 @@ func (indexer *Indexer) GetChainHeads() []*ChainHead { heads := make([]*ChainHead, len(indexer.cachedChainHeads)) copy(heads, indexer.cachedChainHeads) sort.Slice(heads, func(i, j int) bool { - if heads[i].LastEpochVotingPercent != heads[j].LastEpochVotingPercent { - return heads[i].LastEpochVotingPercent > heads[j].LastEpochVotingPercent + percentagesI := float64(0) + percentagesJ := float64(0) + for k := range heads[i].PerEpochVotingPercent { + factor := float64(1) + if k == len(heads[i].PerEpochVotingPercent)-1 { + factor = 0.5 + } + percentagesI += heads[i].PerEpochVotingPercent[k] * factor + percentagesJ += heads[j].PerEpochVotingPercent[k] * factor } - if heads[i].ThisEpochVotingPercent != heads[j].ThisEpochVotingPercent { - return heads[i].ThisEpochVotingPercent > heads[j].ThisEpochVotingPercent + + if percentagesI != percentagesJ { + return percentagesI > percentagesJ } + return heads[i].HeadBlock.Slot > heads[j].HeadBlock.Slot }) @@ -103,6 +122,14 @@ func (indexer *Indexer) computeCanonicalChain() bool { var headBlock *Block = nil var chainHeads []*ChainHead = nil + + chainState := indexer.consensusPool.GetChainState() + specs := chainState.GetSpecs() + aggregateEpochs := (32 / specs.SlotsPerEpoch) + 1 // aggregate votes of last 48 slots (2 epochs for mainnet, 5 epochs for minimal config) + if aggregateEpochs < 2 { + aggregateEpochs = 2 + } + t1 := time.Now() defer func() { @@ -124,22 +151,26 @@ func (indexer *Indexer) computeCanonicalChain() bool { if len(latestBlocks) > 0 { headBlock = latestBlocks[0] - forkVotes, thisEpochPercent, lastEpochPercent := indexer.aggregateForkVotes(headBlock.forkId) + forkVotes, epochParticipation := indexer.aggregateForkVotes(headBlock.forkId, aggregateEpochs) + participationStr := make([]string, len(epochParticipation)) + for i, p := range epochParticipation { + participationStr[i] = fmt.Sprintf("%.2f%%", p) + } + indexer.logger.Debugf( - "fork %v votes in last 2 epochs: %v ETH (%.2f%%, %.2f%%), head: %v (%v)", + "fork %v votes in last %v epochs: %v ETH (%v), head: %v (%v)", headBlock.forkId, + aggregateEpochs, forkVotes/EtherGweiFactor, - lastEpochPercent, - thisEpochPercent, + strings.Join(participationStr, ", "), headBlock.Slot, headBlock.Root.String(), ) chainHeads = []*ChainHead{{ - HeadBlock: headBlock, - AggregatedHeadVotes: forkVotes, - LastEpochVotingPercent: lastEpochPercent, - ThisEpochVotingPercent: thisEpochPercent, + HeadBlock: headBlock, + AggregatedHeadVotes: forkVotes, + PerEpochVotingPercent: epochParticipation, }} } } else { @@ -153,22 +184,25 @@ func (indexer *Indexer) computeCanonicalChain() bool { continue } - forkVotes, thisEpochPercent, lastEpochPercent := indexer.aggregateForkVotes(fork.ForkId) + forkVotes, epochParticipation := indexer.aggregateForkVotes(fork.ForkId, aggregateEpochs) headForkVotes[fork.ForkId] = forkVotes chainHeads = append(chainHeads, &ChainHead{ - HeadBlock: fork.Block, - AggregatedHeadVotes: forkVotes, - LastEpochVotingPercent: lastEpochPercent, - ThisEpochVotingPercent: thisEpochPercent, + HeadBlock: fork.Block, + AggregatedHeadVotes: forkVotes, + PerEpochVotingPercent: epochParticipation, }) if forkVotes > 0 { + participationStr := make([]string, len(epochParticipation)) + for i, p := range epochParticipation { + participationStr[i] = fmt.Sprintf("%.2f%%", p) + } + indexer.logger.Infof( - "fork %v votes in last 2 epochs: %v ETH (%.2f%%, %.2f%%), head: %v (%v)", + "fork %v: votes in last 2 epochs: %v ETH (%v), head: %v (%v)", fork.ForkId, forkVotes/EtherGweiFactor, - lastEpochPercent, - thisEpochPercent, + strings.Join(participationStr, ", "), fork.Block.Slot, fork.Block.Root.String(), ) @@ -187,13 +221,19 @@ func (indexer *Indexer) computeCanonicalChain() bool { } // aggregateForkVotes aggregates the votes for a given fork. -func (indexer *Indexer) aggregateForkVotes(forkId ForkKey) (totalVotes phase0.Gwei, thisEpochPercent float64, lastEpochPercent float64) { +func (indexer *Indexer) aggregateForkVotes(forkId ForkKey, epochLimit uint64) (totalVotes phase0.Gwei, epochPercent []float64) { chainState := indexer.consensusPool.GetChainState() specs := chainState.GetSpecs() currentEpoch := chainState.CurrentEpoch() + + epochPercent = make([]float64, 0, epochLimit) + if epochLimit == 0 { + return + } + minAggregateEpoch := currentEpoch - if minAggregateEpoch > 1 { - minAggregateEpoch -= 1 + if minAggregateEpoch > phase0.Epoch(epochLimit)-1 { + minAggregateEpoch -= phase0.Epoch(epochLimit) - 1 } else { minAggregateEpoch = 0 } @@ -206,12 +246,12 @@ func (indexer *Indexer) aggregateForkVotes(forkId ForkKey) (totalVotes phase0.Gw return } - // get all blocks for given fork (and its parents) from the last 2 epochs + // get all blocks for given fork (and its parents) from the last epochs lastBlocks := []*Block{} lastSlot := phase0.Slot(0) thisForkId := forkId for { - for _, block := range indexer.blockCache.getLatestBlocks(2*specs.SlotsPerEpoch, &thisForkId) { + for _, block := range indexer.blockCache.getLatestBlocks(epochLimit*specs.SlotsPerEpoch, &thisForkId) { lastSlot = block.Slot if block.Slot < minAggregateSlot { break @@ -236,52 +276,72 @@ func (indexer *Indexer) aggregateForkVotes(forkId ForkKey) (totalVotes phase0.Gw } // already sorted descending by getLatestBlocks, reverse to ascending for aggregation - lastBlock := lastBlocks[0] slices.Reverse(lastBlocks) - // aggregate votes for last & current epoch - if chainState.EpochOfSlot(lastBlock.Slot) == currentEpoch { - thisEpochDependent := indexer.blockCache.getDependentBlock(chainState, lastBlock, nil) - if thisEpochDependent == nil { - return - } - lastBlock = thisEpochDependent - - thisEpochStats := indexer.epochCache.getEpochStats(currentEpoch, thisEpochDependent.Root) - if thisEpochStats != nil { - thisBlocks := []*Block{} - for _, block := range lastBlocks { - if chainState.EpochOfSlot(block.Slot) == currentEpoch { - thisBlocks = append(thisBlocks, block) - } - } - - epochVotes := indexer.aggregateEpochVotes(currentEpoch, chainState, thisBlocks, thisEpochStats) - if epochVotes.AmountIsCount { - totalVotes += epochVotes.CurrentEpoch.TargetVoteAmount * 32 * EtherGweiFactor + // aggregate votes per epoch + lastBlockIdx := 0 + for epoch := minAggregateEpoch; epoch <= currentEpoch; epoch++ { + epochVotingBlocks := []*Block{} + nextBlockIdx := 0 + for lastBlockIdx < len(lastBlocks) { + if chainState.EpochOfSlot(lastBlocks[lastBlockIdx].Slot) == epoch { + epochVotingBlocks = append(epochVotingBlocks, lastBlocks[lastBlockIdx]) + lastBlockIdx++ + } else if lastBlockIdx+nextBlockIdx < len(lastBlocks) && chainState.EpochOfSlot(lastBlocks[lastBlockIdx+nextBlockIdx].Slot) == epoch+1 { + epochVotingBlocks = append(epochVotingBlocks, lastBlocks[lastBlockIdx+nextBlockIdx]) + nextBlockIdx++ } else { - totalVotes += epochVotes.CurrentEpoch.TargetVoteAmount + break } - thisEpochPercent = epochVotes.TargetVotePercent } - } - if chainState.EpochOfSlot(lastBlock.Slot)+1 == currentEpoch { - lastEpochDependent := indexer.blockCache.getDependentBlock(chainState, lastBlock, nil) - if lastEpochDependent == nil { - return + if len(epochVotingBlocks) == 0 { + epochPercent = append(epochPercent, 0) + continue + } + + dependentRoot := epochVotingBlocks[0].GetParentRoot() + if dependentRoot == nil { + epochPercent = append(epochPercent, 0) + continue + } + + epochStats := indexer.epochCache.getEpochStats(epoch, *dependentRoot) + if epochStats == nil { + epochPercent = append(epochPercent, 0) + continue } - lastEpochStats := indexer.epochCache.getEpochStats(currentEpoch-1, lastEpochDependent.Root) - if lastEpochStats != nil { - epochVotes := indexer.aggregateEpochVotes(currentEpoch-1, chainState, lastBlocks, lastEpochStats) - if epochVotes.AmountIsCount { - totalVotes += (epochVotes.CurrentEpoch.TargetVoteAmount + epochVotes.NextEpoch.TargetVoteAmount) * 32 * EtherGweiFactor + epochVotes := indexer.aggregateEpochVotes(epoch, chainState, epochVotingBlocks, epochStats) + if epochVotes.AmountIsCount { + totalVotes += (epochVotes.CurrentEpoch.TargetVoteAmount + epochVotes.NextEpoch.TargetVoteAmount) * 32 * EtherGweiFactor + } else { + totalVotes += epochVotes.CurrentEpoch.TargetVoteAmount + epochVotes.NextEpoch.TargetVoteAmount + } + + lastBlock := epochVotingBlocks[len(epochVotingBlocks)-1] + epochProgress := float64(100) + + if chainState.EpochOfSlot(lastBlock.Slot) == epoch { + lastBlockIndex := chainState.SlotToSlotIndex(lastBlock.Slot) + if lastBlockIndex > 0 { + epochProgress = float64(100*lastBlockIndex) / float64(chainState.GetSpecs().SlotsPerEpoch) } else { - totalVotes += epochVotes.CurrentEpoch.TargetVoteAmount + epochVotes.NextEpoch.TargetVoteAmount + epochProgress = 0 } - lastEpochPercent = epochVotes.TargetVotePercent } + + var participationExtrapolation float64 + if epochProgress == 0 { + participationExtrapolation = 0 + } else { + participationExtrapolation = 100 * epochVotes.TargetVotePercent / epochProgress + } + if participationExtrapolation > 100 { + participationExtrapolation = 100 + } + + epochPercent = append(epochPercent, participationExtrapolation) } return diff --git a/indexer/beacon/forkdetection.go b/indexer/beacon/forkdetection.go index d62ac73e..428c9c8c 100644 --- a/indexer/beacon/forkdetection.go +++ b/indexer/beacon/forkdetection.go @@ -126,7 +126,8 @@ func (cache *forkCache) processBlock(block *Block) error { otherFork := newFork(cache.lastForkId, parentSlot, *parentRoot, otherChildren[0], parentForkId) cache.addFork(otherFork) - updatedRoots, updatedFork, _ := cache.updateForkBlocks(otherChildren[0], otherFork.forkId, false) + updatedRoots, updatedFork, headBlock := cache.updateForkBlocks(otherChildren[0], otherFork.forkId, false) + otherFork.headBlock = headBlock newFork := &newForkInfo{ fork: otherFork, updateRoots: updatedRoots, @@ -295,6 +296,7 @@ func (cache *forkCache) updateForkBlocks(startBlock *Block, forkId ForkKey, skip if !skipStartBlock { blockRoots = append(blockRoots, startBlock.Root[:]) + startBlock.forkId = forkId headBlock = startBlock } diff --git a/indexer/beacon/indexer.go b/indexer/beacon/indexer.go index 5345fc16..dcdee71d 100644 --- a/indexer/beacon/indexer.go +++ b/indexer/beacon/indexer.go @@ -278,7 +278,7 @@ func (indexer *Indexer) StartIndexer() { err = db.StreamUnfinalizedEpochs(uint64(finalizedEpoch), func(unfinalizedEpoch *dbtypes.UnfinalizedEpoch) { epochStats := indexer.epochCache.getEpochStats(phase0.Epoch(unfinalizedEpoch.Epoch), phase0.Root(unfinalizedEpoch.DependentRoot)) if epochStats == nil { - indexer.logger.Warnf("failed restoring epoch aggregations for epoch %v [%x] from db: epoch stats not found", unfinalizedEpoch.Epoch, unfinalizedEpoch.DependentRoot) + indexer.logger.Debugf("failed restoring epoch aggregations for epoch %v [%x] from db: epoch stats not found", unfinalizedEpoch.Epoch, unfinalizedEpoch.DependentRoot) return }