Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve fork detection performance #115

Merged
merged 9 commits into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions db/forks.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,16 @@ func UpdateFinalizedForkParents(finalizedRoots [][]byte, tx *sqlx.Tx) error {

return nil
}

func UpdateForkParent(parentRoot []byte, parentForkId uint64, tx *sqlx.Tx) error {
_, err := tx.Exec(`
UPDATE forks
SET parent_fork = $1
WHERE base_root = $2
`, parentForkId, parentRoot)
if err != nil {
return err
}

return nil
}
69 changes: 47 additions & 22 deletions indexer/beacon/blockcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,17 @@ type blockCache struct {
lowestSlot int64
slotMap map[phase0.Slot][]*Block
rootMap map[phase0.Root]*Block
parentMap map[phase0.Root][]*Block
latestBlock *Block // latest added block (might not be the head block, just a marker for cache changes)
}

// newBlockCache creates a new instance of blockCache.
func newBlockCache(indexer *Indexer) *blockCache {
return &blockCache{
indexer: indexer,
slotMap: map[phase0.Slot][]*Block{},
rootMap: map[phase0.Root]*Block{},
indexer: indexer,
slotMap: map[phase0.Slot][]*Block{},
rootMap: map[phase0.Root]*Block{},
parentMap: map[phase0.Root][]*Block{},
}
}

Expand Down Expand Up @@ -60,6 +62,25 @@ func (cache *blockCache) createOrGetBlock(root phase0.Root, slot phase0.Slot) (*
return cacheBlock, true
}

// addBlockToParentMap adds the given block to the parent map.
func (cache *blockCache) addBlockToParentMap(block *Block) {
cache.cacheMutex.Lock()
defer cache.cacheMutex.Unlock()

parentRoot := block.GetParentRoot()
if parentRoot == nil {
return
}

for _, parentBlock := range cache.parentMap[*parentRoot] {
if parentBlock == block {
return
}
}

cache.parentMap[*parentRoot] = append(cache.parentMap[*parentRoot], block)
}

// getBlockByRoot returns the cached block with the given root.
func (cache *blockCache) getBlockByRoot(root phase0.Root) *Block {
cache.cacheMutex.RLock()
Expand All @@ -86,27 +107,13 @@ func (cache *blockCache) getBlocksByParentRoot(parentRoot phase0.Root) []*Block
cache.cacheMutex.RLock()
defer cache.cacheMutex.RUnlock()

parentBlock := cache.rootMap[parentRoot]

resBlocks := []*Block{}
for slot, blocks := range cache.slotMap {
if parentBlock != nil && slot <= parentBlock.Slot {
continue
}

for _, block := range blocks {
blockParentRoot := block.GetParentRoot()
if blockParentRoot == nil {
continue
}

if bytes.Equal((*blockParentRoot)[:], parentRoot[:]) {
resBlocks = append(resBlocks, block)
}
}
cachedBlocks := cache.parentMap[parentRoot]
blocks := make([]*Block, len(cachedBlocks))
if len(blocks) > 0 {
copy(blocks, cachedBlocks)
}

return resBlocks
return blocks
}

// getBlockByStateRoot returns the block with the given state root.
Expand Down Expand Up @@ -265,8 +272,10 @@ func (cache *blockCache) removeBlock(block *Block) {
cache.cacheMutex.Lock()
defer cache.cacheMutex.Unlock()

// remove the block from the root map.
delete(cache.rootMap, block.Root)

// remove the block from the slot map.
slotBlocks := cache.slotMap[block.Slot]
if len(slotBlocks) == 1 && slotBlocks[0] == block {
delete(cache.slotMap, block.Slot)
Expand All @@ -278,6 +287,22 @@ func (cache *blockCache) removeBlock(block *Block) {
}
}
}

// remove the block from the parent map.
if parentRoot := block.GetParentRoot(); parentRoot != nil {
parentBlocks := cache.parentMap[*parentRoot]
if len(parentBlocks) == 1 && parentBlocks[0] == block {
delete(cache.parentMap, *parentRoot)
} else if len(parentBlocks) > 1 {
for i, parentBlock := range parentBlocks {
if parentBlock == block {
cache.parentMap[*parentRoot] = append(parentBlocks[:i], parentBlocks[i+1:]...)
break
}
}
}
}

}

// getEpochBlocks returns the blocks that belong to the specified epoch.
Expand Down
61 changes: 51 additions & 10 deletions indexer/beacon/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"fmt"
"runtime/debug"
"strings"
"time"

v1 "github.com/attestantio/go-eth2-client/api/v1"
Expand Down Expand Up @@ -105,6 +106,25 @@ func (c *Client) startClientLoop() {
}
}

func (c *Client) formatProcessingTimes(processingTimes []time.Duration) string {
if len(processingTimes) == 0 {
return ""
}

str := strings.Builder{}
str.WriteString(" (")
for i, pt := range processingTimes {
if i > 0 {
str.WriteString(", ")
}

str.WriteString(fmt.Sprintf("%v ms", pt.Milliseconds()))
}
str.WriteString(")")

return str.String()
}

// runClientLoop runs the client event processing subroutine.
func (c *Client) runClientLoop() error {
// 1 - load & process head block
Expand All @@ -116,7 +136,7 @@ func (c *Client) runClientLoop() error {

c.headRoot = headRoot

headBlock, isNew, err := c.processBlock(headSlot, headRoot, nil)
headBlock, isNew, processingTimes, err := c.processBlock(headSlot, headRoot, nil)
if err != nil {
return fmt.Errorf("failed processing head block: %v", err)
}
Expand All @@ -126,9 +146,9 @@ func (c *Client) runClientLoop() error {
}

if isNew {
c.logger.Infof("received block %v:%v [0x%x] head", c.client.GetPool().GetChainState().EpochOfSlot(headSlot), headSlot, headRoot)
c.logger.Infof("received block %v:%v [0x%x] head %v", c.client.GetPool().GetChainState().EpochOfSlot(headSlot), headSlot, headRoot, c.formatProcessingTimes(processingTimes))
} else {
c.logger.Debugf("received known block %v:%v [0x%x] head", c.client.GetPool().GetChainState().EpochOfSlot(headSlot), headSlot, headRoot)
c.logger.Debugf("received known block %v:%v [0x%x] head %v", c.client.GetPool().GetChainState().EpochOfSlot(headSlot), headSlot, headRoot, c.formatProcessingTimes(processingTimes))
}

// 2 - backfill old blocks up to the finalization checkpoint or known in cache
Expand Down Expand Up @@ -263,17 +283,17 @@ func (c *Client) processHeadEvent(headEvent *v1.HeadEvent) error {

// processStreamBlock processes a block received from the stream (either via block or head events).
func (c *Client) processStreamBlock(slot phase0.Slot, root phase0.Root) (*Block, error) {
block, isNew, err := c.processBlock(slot, root, nil)
block, isNew, processingTimes, err := c.processBlock(slot, root, nil)
if err != nil {
return nil, err
}

chainState := c.client.GetPool().GetChainState()

if isNew {
c.logger.Infof("received block %v:%v [0x%x] stream", chainState.EpochOfSlot(block.Slot), block.Slot, block.Root[:])
c.logger.Infof("received block %v:%v [0x%x] stream %v", chainState.EpochOfSlot(block.Slot), block.Slot, block.Root[:], c.formatProcessingTimes(processingTimes))
} else {
c.logger.Debugf("received known block %v:%v [0x%x] stream", chainState.EpochOfSlot(block.Slot), block.Slot, block.Root[:])
c.logger.Debugf("received known block %v:%v [0x%x] stream %v", chainState.EpochOfSlot(block.Slot), block.Slot, block.Root[:], c.formatProcessingTimes(processingTimes))
}

return block, nil
Expand Down Expand Up @@ -323,9 +343,10 @@ func (c *Client) processReorg(oldHead *Block, newHead *Block) error {
}

// processBlock processes a block (from stream & polling).
func (c *Client) processBlock(slot phase0.Slot, root phase0.Root, header *phase0.SignedBeaconBlockHeader) (block *Block, isNew bool, err error) {
func (c *Client) processBlock(slot phase0.Slot, root phase0.Root, header *phase0.SignedBeaconBlockHeader) (block *Block, isNew bool, processingTimes []time.Duration, err error) {
chainState := c.client.GetPool().GetChainState()
finalizedSlot := chainState.GetFinalizedSlot()
processingTimes = make([]time.Duration, 3)

if slot < finalizedSlot {
// block is in finalized epoch
Expand All @@ -349,20 +370,34 @@ func (c *Client) processBlock(slot phase0.Slot, root phase0.Root, header *phase0
return header, nil
}

t1 := time.Now()
defer func() {
processingTimes[0] += time.Since(t1)
}()

return LoadBeaconHeader(c.getContext(), c, root)
})
if err != nil {
return
}

isNew, err = block.EnsureBlock(func() (*spec.VersionedSignedBeaconBlock, error) {

t1 := time.Now()
defer func() {
processingTimes[0] += time.Since(t1)
}()

return LoadBeaconBlock(c.getContext(), c, root)
})
if err != nil {
return
}

if slot >= finalizedSlot && isNew {
c.indexer.blockCache.addBlockToParentMap(block)
t1 := time.Now()

// fork detection
err2 := c.indexer.forkCache.processBlock(block)
if err2 != nil {
Expand All @@ -376,6 +411,9 @@ func (c *Client) processBlock(slot phase0.Slot, root phase0.Root, header *phase0
return
}

processingTimes[1] = time.Since(t1)
t1 = time.Now()

// write to db
err = db.RunDBTransaction(func(tx *sqlx.Tx) error {
err := db.InsertUnfinalizedBlock(dbBlock, tx)
Expand All @@ -389,6 +427,8 @@ func (c *Client) processBlock(slot phase0.Slot, root phase0.Root, header *phase0
return
}

processingTimes[2] = time.Since(t1)

block.isInUnfinalizedDb = true
c.indexer.blockCache.latestBlock = block
}
Expand Down Expand Up @@ -444,19 +484,20 @@ func (c *Client) backfillParentBlocks(headBlock *Block) error {
break
}

var processingTimes []time.Duration
if parentBlock == nil {
var err error

parentBlock, isNewBlock, err = c.processBlock(parentSlot, parentRoot, parentHead)
parentBlock, isNewBlock, processingTimes, err = c.processBlock(parentSlot, parentRoot, parentHead)
if err != nil {
return fmt.Errorf("could not process block [0x%x]: %v", parentRoot, err)
}
}

if isNewBlock {
c.logger.Infof("received block %v:%v [0x%x] backfill", chainState.EpochOfSlot(parentSlot), parentSlot, parentRoot)
c.logger.Infof("received block %v:%v [0x%x] backfill %v", chainState.EpochOfSlot(parentSlot), parentSlot, parentRoot, c.formatProcessingTimes(processingTimes))
} else {
c.logger.Debugf("received known block %v:%v [0x%x] backfill", chainState.EpochOfSlot(parentSlot), parentSlot, parentRoot)
c.logger.Debugf("received known block %v:%v [0x%x] backfill %v", chainState.EpochOfSlot(parentSlot), parentSlot, parentRoot, c.formatProcessingTimes(processingTimes))
}

if parentSlot == 0 {
Expand Down
6 changes: 3 additions & 3 deletions indexer/beacon/fork.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ type Fork struct {
}

// newFork creates a new Fork instance.
func newFork(forkId ForkKey, baseBlock *Block, leafBlock *Block, parentFork ForkKey) *Fork {
func newFork(forkId ForkKey, baseSlot phase0.Slot, baseRoot phase0.Root, leafBlock *Block, parentFork ForkKey) *Fork {
fork := &Fork{
forkId: forkId,
baseSlot: baseBlock.Slot,
baseRoot: baseBlock.Root,
baseSlot: baseSlot,
baseRoot: baseRoot,
leafSlot: leafBlock.Slot,
leafRoot: leafBlock.Root,
parentFork: parentFork,
Expand Down
Loading
Loading