Skip to content

Commit

Permalink
Merge pull request #110 from ethpandaops/pk910/fix-missing-pruned-blocks
Browse files Browse the repository at this point in the history
Fix block selection methods to include pruned missed slots properly
  • Loading branch information
pk910 authored Aug 23, 2024
2 parents ce4ddc8 + 8a772a2 commit 2304064
Show file tree
Hide file tree
Showing 7 changed files with 361 additions and 298 deletions.
70 changes: 38 additions & 32 deletions db/slots.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"math"
"strings"

"github.com/attestantio/go-eth2-client/spec/phase0"
"github.com/ethpandaops/dora/dbtypes"
"github.com/jmoiron/sqlx"
"github.com/mitchellh/mapstructure"
Expand Down Expand Up @@ -83,38 +84,6 @@ func InsertMissingSlot(block *dbtypes.SlotHeader, tx *sqlx.Tx) error {
return nil
}

func GetSlots(firstSlot uint64, limit uint32, withMissing bool, withOrphaned bool) []*dbtypes.AssignedSlot {
var sql strings.Builder
fmt.Fprintf(&sql, `SELECT slots.slot, slots.proposer`)
blockFields := []string{
"state_root", "root", "slot", "proposer", "status", "parent_root", "graffiti", "graffiti_text",
"attestation_count", "deposit_count", "exit_count", "withdraw_count", "withdraw_amount", "attester_slashing_count",
"proposer_slashing_count", "bls_change_count", "eth_transaction_count", "eth_block_number", "eth_block_hash",
"eth_block_extra", "eth_block_extra_text", "sync_participation", "fork_id",
}
for _, blockField := range blockFields {
fmt.Fprintf(&sql, ", slots.%v AS \"block.%v\"", blockField, blockField)
}
fmt.Fprintf(&sql, ` FROM slots `)
fmt.Fprintf(&sql, ` WHERE slot <= $1 `)

if !withMissing {
fmt.Fprintf(&sql, ` AND slots.status != 0 `)
}
if !withOrphaned {
fmt.Fprintf(&sql, ` AND slots.status != 2 `)
}
fmt.Fprintf(&sql, ` ORDER BY slot DESC LIMIT $2`)

rows, err := ReaderDb.Query(sql.String(), firstSlot, limit)
if err != nil {
logger.WithError(err).Errorf("Error while fetching slots: %v", sql.String())
return nil
}

return parseAssignedSlots(rows, blockFields, 2)
}

func GetSlotsRange(firstSlot uint64, lastSlot uint64, withMissing bool, withOrphaned bool) []*dbtypes.AssignedSlot {
var sql strings.Builder
fmt.Fprintf(&sql, `SELECT slots.slot, slots.proposer`)
Expand Down Expand Up @@ -184,6 +153,43 @@ func GetSlotByRoot(root []byte) *dbtypes.Slot {
return &block
}

func GetSlotsByRoots(roots [][]byte) map[phase0.Root]*dbtypes.Slot {
argIdx := 0
args := make([]any, len(roots))
plcList := make([]string, len(roots))
for i, root := range roots {
plcList[i] = fmt.Sprintf("$%v", argIdx+1)
args[argIdx] = root
argIdx += 1
}

sql := fmt.Sprintf(
`SELECT
root, slot, parent_root, state_root, status, proposer, graffiti, graffiti_text,
attestation_count, deposit_count, exit_count, withdraw_count, withdraw_amount, attester_slashing_count,
proposer_slashing_count, bls_change_count, eth_transaction_count, eth_block_number, eth_block_hash,
eth_block_extra, eth_block_extra_text, sync_participation, fork_id
FROM slots
WHERE root IN (%v)
ORDER BY slot DESC`,
strings.Join(plcList, ", "),
)

slots := []*dbtypes.Slot{}
err := ReaderDb.Select(&slots, sql, args...)
if err != nil {
logger.Errorf("Error while fetching block by roots: %v", err)
return nil
}

slotMap := make(map[phase0.Root]*dbtypes.Slot)
for _, slot := range slots {
slotMap[phase0.Root(slot.Root)] = slot
}

return slotMap
}

func GetBlockHeadByRoot(root []byte) *dbtypes.BlockHead {
blockHead := dbtypes.BlockHead{}
err := ReaderDb.Get(&blockHead, `
Expand Down
22 changes: 17 additions & 5 deletions handlers/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func buildIndexPageData() (*models.IndexPageData, time.Duration) {
buildIndexPageRecentEpochsData(pageData, currentEpoch, finalizedEpoch, justifiedEpoch, recentEpochCount)

// load recent blocks
buildIndexPageRecentBlocksData(pageData, currentSlot, recentBlockCount)
buildIndexPageRecentBlocksData(pageData, recentBlockCount)

// load recent slots
buildIndexPageRecentSlotsData(pageData, currentSlot, recentSlotsCount)
Expand Down Expand Up @@ -257,14 +257,22 @@ func buildIndexPageRecentEpochsData(pageData *models.IndexPageData, currentEpoch
pageData.RecentEpochCount = uint64(len(pageData.RecentEpochs))
}

func buildIndexPageRecentBlocksData(pageData *models.IndexPageData, currentSlot phase0.Slot, recentBlockCount int) {
func buildIndexPageRecentBlocksData(pageData *models.IndexPageData, recentBlockCount int) {
pageData.RecentBlocks = make([]*models.IndexPageDataBlocks, 0)

chainState := services.GlobalBeaconService.GetChainState()

blocksData := services.GlobalBeaconService.GetDbBlocks(uint64(currentSlot), int32(recentBlockCount), false, false)
for i := 0; i < len(blocksData); i++ {
blockData := blocksData[i]
blocksData := services.GlobalBeaconService.GetDbBlocksByFilter(&dbtypes.BlockFilter{
WithOrphaned: 0,
WithMissing: 0,
}, 0, uint32(recentBlockCount), 0)
limit := len(blocksData)
if limit > recentBlockCount {
limit = recentBlockCount
}

for i := 0; i < limit; i++ {
blockData := blocksData[i].Block
if blockData == nil {
continue
}
Expand Down Expand Up @@ -327,6 +335,10 @@ func buildIndexPageRecentSlotsData(pageData *models.IndexPageData, firstSlot pha
pageData.RecentSlots = append(pageData.RecentSlots, slotData)
blockCount++
buildIndexPageSlotGraph(slotData, &maxOpenFork, openForks)

if blockCount >= uint64(slotLimit) {
break
}
}
}
pageData.RecentSlotCount = uint64(blockCount)
Expand Down
7 changes: 6 additions & 1 deletion handlers/slots_filtered.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,12 @@ func buildFilteredSlotsPageData(pageIdx uint64, pageSize uint64, graffiti string
blockFilter.ProposerIndex = &pidx
}

dbBlocks := services.GlobalBeaconService.GetDbBlocksByFilter(blockFilter, pageIdx, uint32(pageSize))
withScheduledCount := chainState.GetSpecs().SlotsPerEpoch - uint64(chainState.SlotToSlotIndex(currentSlot)) - 1
if withScheduledCount > 16 {
withScheduledCount = 16
}

dbBlocks := services.GlobalBeaconService.GetDbBlocksByFilter(blockFilter, pageIdx, uint32(pageSize), withScheduledCount)
haveMore := false
for idx, dbBlock := range dbBlocks {
if idx >= int(pageSize) {
Expand Down
2 changes: 1 addition & 1 deletion handlers/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func buildValidatorPageData(validatorIndex uint64) (*models.ValidatorPageData, t
ProposerIndex: &validatorIndex,
WithOrphaned: 1,
WithMissing: 1,
}, 0, 10)
}, 0, 10, chainState.GetSpecs().SlotsPerEpoch)
for _, blockData := range blocksData {
var blockStatus dbtypes.SlotStatus
if blockData.Block == nil {
Expand Down
2 changes: 1 addition & 1 deletion handlers/validator_slots.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func buildValidatorSlotsPageData(validator uint64, pageIdx uint64, pageSize uint
ProposerIndex: &validator,
WithOrphaned: 1,
WithMissing: 1,
}, pageIdx, uint32(pageSize))
}, pageIdx, uint32(pageSize), chainState.GetSpecs().SlotsPerEpoch)
haveMore := false
for idx, blockAssignment := range dbBlocks {
if idx >= int(pageSize) {
Expand Down
11 changes: 0 additions & 11 deletions indexer/beacon/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,17 +362,6 @@ func (block *Block) GetDbConsolidationRequests(indexer *Indexer) []*dbtypes.Cons
return indexer.dbWriter.buildDbConsolidationRequests(block, orphaned, nil)
}

// GetExecutionExtraData returns the execution extra data of this block.
func (block *Block) GetExecutionExtraData() []byte {
blockBody := block.GetBlock()
if blockBody == nil {
return []byte{}
}

data, _ := getBlockExecutionExtraData(blockBody)
return data
}

// GetForkId returns the fork ID of this block.
func (block *Block) GetForkId() ForkKey {
return block.forkId
Expand Down
Loading

0 comments on commit 2304064

Please sign in to comment.