diff --git a/db/forks.go b/db/forks.go index c6472b2..0d21bc7 100644 --- a/db/forks.go +++ b/db/forks.go @@ -124,3 +124,16 @@ func UpdateFinalizedForkParents(finalizedRoots [][]byte, tx *sqlx.Tx) error { return nil } + +func UpdateForkParent(parentRoot []byte, parentForkId uint64, tx *sqlx.Tx) error { + _, err := tx.Exec(` + UPDATE forks + SET parent_fork = $1 + WHERE base_root = $2 + `, parentForkId, parentRoot) + if err != nil { + return err + } + + return nil +} diff --git a/indexer/beacon/blockcache.go b/indexer/beacon/blockcache.go index 5d4cf31..9aeaa2e 100644 --- a/indexer/beacon/blockcache.go +++ b/indexer/beacon/blockcache.go @@ -18,15 +18,17 @@ type blockCache struct { lowestSlot int64 slotMap map[phase0.Slot][]*Block rootMap map[phase0.Root]*Block + parentMap map[phase0.Root][]*Block latestBlock *Block // latest added block (might not be the head block, just a marker for cache changes) } // newBlockCache creates a new instance of blockCache. func newBlockCache(indexer *Indexer) *blockCache { return &blockCache{ - indexer: indexer, - slotMap: map[phase0.Slot][]*Block{}, - rootMap: map[phase0.Root]*Block{}, + indexer: indexer, + slotMap: map[phase0.Slot][]*Block{}, + rootMap: map[phase0.Root]*Block{}, + parentMap: map[phase0.Root][]*Block{}, } } @@ -60,6 +62,25 @@ func (cache *blockCache) createOrGetBlock(root phase0.Root, slot phase0.Slot) (* return cacheBlock, true } +// addBlockToParentMap adds the given block to the parent map. +func (cache *blockCache) addBlockToParentMap(block *Block) { + cache.cacheMutex.Lock() + defer cache.cacheMutex.Unlock() + + parentRoot := block.GetParentRoot() + if parentRoot == nil { + return + } + + for _, parentBlock := range cache.parentMap[*parentRoot] { + if parentBlock == block { + return + } + } + + cache.parentMap[*parentRoot] = append(cache.parentMap[*parentRoot], block) +} + // getBlockByRoot returns the cached block with the given root. func (cache *blockCache) getBlockByRoot(root phase0.Root) *Block { cache.cacheMutex.RLock() @@ -86,27 +107,13 @@ func (cache *blockCache) getBlocksByParentRoot(parentRoot phase0.Root) []*Block cache.cacheMutex.RLock() defer cache.cacheMutex.RUnlock() - parentBlock := cache.rootMap[parentRoot] - - resBlocks := []*Block{} - for slot, blocks := range cache.slotMap { - if parentBlock != nil && slot <= parentBlock.Slot { - continue - } - - for _, block := range blocks { - blockParentRoot := block.GetParentRoot() - if blockParentRoot == nil { - continue - } - - if bytes.Equal((*blockParentRoot)[:], parentRoot[:]) { - resBlocks = append(resBlocks, block) - } - } + cachedBlocks := cache.parentMap[parentRoot] + blocks := make([]*Block, len(cachedBlocks)) + if len(blocks) > 0 { + copy(blocks, cachedBlocks) } - return resBlocks + return blocks } // getBlockByStateRoot returns the block with the given state root. @@ -265,8 +272,10 @@ func (cache *blockCache) removeBlock(block *Block) { cache.cacheMutex.Lock() defer cache.cacheMutex.Unlock() + // remove the block from the root map. delete(cache.rootMap, block.Root) + // remove the block from the slot map. slotBlocks := cache.slotMap[block.Slot] if len(slotBlocks) == 1 && slotBlocks[0] == block { delete(cache.slotMap, block.Slot) @@ -278,6 +287,22 @@ func (cache *blockCache) removeBlock(block *Block) { } } } + + // remove the block from the parent map. + if parentRoot := block.GetParentRoot(); parentRoot != nil { + parentBlocks := cache.parentMap[*parentRoot] + if len(parentBlocks) == 1 && parentBlocks[0] == block { + delete(cache.parentMap, *parentRoot) + } else if len(parentBlocks) > 1 { + for i, parentBlock := range parentBlocks { + if parentBlock == block { + cache.parentMap[*parentRoot] = append(parentBlocks[:i], parentBlocks[i+1:]...) + break + } + } + } + } + } // getEpochBlocks returns the blocks that belong to the specified epoch. diff --git a/indexer/beacon/client.go b/indexer/beacon/client.go index fcc2dac..fd4b3c7 100644 --- a/indexer/beacon/client.go +++ b/indexer/beacon/client.go @@ -5,6 +5,7 @@ import ( "context" "fmt" "runtime/debug" + "strings" "time" v1 "github.com/attestantio/go-eth2-client/api/v1" @@ -105,6 +106,25 @@ func (c *Client) startClientLoop() { } } +func (c *Client) formatProcessingTimes(processingTimes []time.Duration) string { + if len(processingTimes) == 0 { + return "" + } + + str := strings.Builder{} + str.WriteString(" (") + for i, pt := range processingTimes { + if i > 0 { + str.WriteString(", ") + } + + str.WriteString(fmt.Sprintf("%v ms", pt.Milliseconds())) + } + str.WriteString(")") + + return str.String() +} + // runClientLoop runs the client event processing subroutine. func (c *Client) runClientLoop() error { // 1 - load & process head block @@ -116,7 +136,7 @@ func (c *Client) runClientLoop() error { c.headRoot = headRoot - headBlock, isNew, err := c.processBlock(headSlot, headRoot, nil) + headBlock, isNew, processingTimes, err := c.processBlock(headSlot, headRoot, nil) if err != nil { return fmt.Errorf("failed processing head block: %v", err) } @@ -126,9 +146,9 @@ func (c *Client) runClientLoop() error { } if isNew { - c.logger.Infof("received block %v:%v [0x%x] head", c.client.GetPool().GetChainState().EpochOfSlot(headSlot), headSlot, headRoot) + c.logger.Infof("received block %v:%v [0x%x] head %v", c.client.GetPool().GetChainState().EpochOfSlot(headSlot), headSlot, headRoot, c.formatProcessingTimes(processingTimes)) } else { - c.logger.Debugf("received known block %v:%v [0x%x] head", c.client.GetPool().GetChainState().EpochOfSlot(headSlot), headSlot, headRoot) + c.logger.Debugf("received known block %v:%v [0x%x] head %v", c.client.GetPool().GetChainState().EpochOfSlot(headSlot), headSlot, headRoot, c.formatProcessingTimes(processingTimes)) } // 2 - backfill old blocks up to the finalization checkpoint or known in cache @@ -263,7 +283,7 @@ func (c *Client) processHeadEvent(headEvent *v1.HeadEvent) error { // processStreamBlock processes a block received from the stream (either via block or head events). func (c *Client) processStreamBlock(slot phase0.Slot, root phase0.Root) (*Block, error) { - block, isNew, err := c.processBlock(slot, root, nil) + block, isNew, processingTimes, err := c.processBlock(slot, root, nil) if err != nil { return nil, err } @@ -271,9 +291,9 @@ func (c *Client) processStreamBlock(slot phase0.Slot, root phase0.Root) (*Block, chainState := c.client.GetPool().GetChainState() if isNew { - c.logger.Infof("received block %v:%v [0x%x] stream", chainState.EpochOfSlot(block.Slot), block.Slot, block.Root[:]) + c.logger.Infof("received block %v:%v [0x%x] stream %v", chainState.EpochOfSlot(block.Slot), block.Slot, block.Root[:], c.formatProcessingTimes(processingTimes)) } else { - c.logger.Debugf("received known block %v:%v [0x%x] stream", chainState.EpochOfSlot(block.Slot), block.Slot, block.Root[:]) + c.logger.Debugf("received known block %v:%v [0x%x] stream %v", chainState.EpochOfSlot(block.Slot), block.Slot, block.Root[:], c.formatProcessingTimes(processingTimes)) } return block, nil @@ -323,9 +343,10 @@ func (c *Client) processReorg(oldHead *Block, newHead *Block) error { } // processBlock processes a block (from stream & polling). -func (c *Client) processBlock(slot phase0.Slot, root phase0.Root, header *phase0.SignedBeaconBlockHeader) (block *Block, isNew bool, err error) { +func (c *Client) processBlock(slot phase0.Slot, root phase0.Root, header *phase0.SignedBeaconBlockHeader) (block *Block, isNew bool, processingTimes []time.Duration, err error) { chainState := c.client.GetPool().GetChainState() finalizedSlot := chainState.GetFinalizedSlot() + processingTimes = make([]time.Duration, 3) if slot < finalizedSlot { // block is in finalized epoch @@ -349,6 +370,11 @@ func (c *Client) processBlock(slot phase0.Slot, root phase0.Root, header *phase0 return header, nil } + t1 := time.Now() + defer func() { + processingTimes[0] += time.Since(t1) + }() + return LoadBeaconHeader(c.getContext(), c, root) }) if err != nil { @@ -356,6 +382,12 @@ func (c *Client) processBlock(slot phase0.Slot, root phase0.Root, header *phase0 } isNew, err = block.EnsureBlock(func() (*spec.VersionedSignedBeaconBlock, error) { + + t1 := time.Now() + defer func() { + processingTimes[0] += time.Since(t1) + }() + return LoadBeaconBlock(c.getContext(), c, root) }) if err != nil { @@ -363,6 +395,9 @@ func (c *Client) processBlock(slot phase0.Slot, root phase0.Root, header *phase0 } if slot >= finalizedSlot && isNew { + c.indexer.blockCache.addBlockToParentMap(block) + t1 := time.Now() + // fork detection err2 := c.indexer.forkCache.processBlock(block) if err2 != nil { @@ -376,6 +411,9 @@ func (c *Client) processBlock(slot phase0.Slot, root phase0.Root, header *phase0 return } + processingTimes[1] = time.Since(t1) + t1 = time.Now() + // write to db err = db.RunDBTransaction(func(tx *sqlx.Tx) error { err := db.InsertUnfinalizedBlock(dbBlock, tx) @@ -389,6 +427,8 @@ func (c *Client) processBlock(slot phase0.Slot, root phase0.Root, header *phase0 return } + processingTimes[2] = time.Since(t1) + block.isInUnfinalizedDb = true c.indexer.blockCache.latestBlock = block } @@ -444,19 +484,20 @@ func (c *Client) backfillParentBlocks(headBlock *Block) error { break } + var processingTimes []time.Duration if parentBlock == nil { var err error - parentBlock, isNewBlock, err = c.processBlock(parentSlot, parentRoot, parentHead) + parentBlock, isNewBlock, processingTimes, err = c.processBlock(parentSlot, parentRoot, parentHead) if err != nil { return fmt.Errorf("could not process block [0x%x]: %v", parentRoot, err) } } if isNewBlock { - c.logger.Infof("received block %v:%v [0x%x] backfill", chainState.EpochOfSlot(parentSlot), parentSlot, parentRoot) + c.logger.Infof("received block %v:%v [0x%x] backfill %v", chainState.EpochOfSlot(parentSlot), parentSlot, parentRoot, c.formatProcessingTimes(processingTimes)) } else { - c.logger.Debugf("received known block %v:%v [0x%x] backfill", chainState.EpochOfSlot(parentSlot), parentSlot, parentRoot) + c.logger.Debugf("received known block %v:%v [0x%x] backfill %v", chainState.EpochOfSlot(parentSlot), parentSlot, parentRoot, c.formatProcessingTimes(processingTimes)) } if parentSlot == 0 { diff --git a/indexer/beacon/fork.go b/indexer/beacon/fork.go index 0d1245d..5c3651f 100644 --- a/indexer/beacon/fork.go +++ b/indexer/beacon/fork.go @@ -21,11 +21,11 @@ type Fork struct { } // newFork creates a new Fork instance. -func newFork(forkId ForkKey, baseBlock *Block, leafBlock *Block, parentFork ForkKey) *Fork { +func newFork(forkId ForkKey, baseSlot phase0.Slot, baseRoot phase0.Root, leafBlock *Block, parentFork ForkKey) *Fork { fork := &Fork{ forkId: forkId, - baseSlot: baseBlock.Slot, - baseRoot: baseBlock.Root, + baseSlot: baseSlot, + baseRoot: baseRoot, leafSlot: leafBlock.Slot, leafRoot: leafBlock.Root, parentFork: parentFork, diff --git a/indexer/beacon/forkcache.go b/indexer/beacon/forkcache.go index bf0cbab..f0cebfe 100644 --- a/indexer/beacon/forkcache.go +++ b/indexer/beacon/forkcache.go @@ -4,7 +4,6 @@ import ( "bytes" "fmt" "sort" - "strings" "sync" "github.com/attestantio/go-eth2-client/spec/phase0" @@ -78,6 +77,7 @@ func (cache *forkCache) addFork(fork *Fork) { cache.forkMap[fork.forkId] = fork } +// getForkByLeaf retrieves a fork from the cache by its leaf root. func (cache *forkCache) getForkByLeaf(leafRoot phase0.Root) *Fork { cache.cacheMutex.Lock() defer cache.cacheMutex.Unlock() @@ -91,6 +91,21 @@ func (cache *forkCache) getForkByLeaf(leafRoot phase0.Root) *Fork { return nil } +// getForkByBase retrieves forks from the cache by their base root. +func (cache *forkCache) getForkByBase(baseRoot phase0.Root) []*Fork { + cache.cacheMutex.Lock() + defer cache.cacheMutex.Unlock() + + forks := []*Fork{} + for _, fork := range cache.forkMap { + if bytes.Equal(fork.baseRoot[:], baseRoot[:]) { + forks = append(forks, fork) + } + } + + return forks +} + // removeFork removes a fork from the cache. func (cache *forkCache) removeFork(forkId ForkKey) { cache.cacheMutex.Lock() @@ -99,6 +114,7 @@ func (cache *forkCache) removeFork(forkId ForkKey) { delete(cache.forkMap, forkId) } +// getParentForkIds returns the parent fork ids of the given fork. func (cache *forkCache) getParentForkIds(forkId ForkKey) []ForkKey { parentForks := []ForkKey{forkId} @@ -205,307 +221,3 @@ func (cache *forkCache) setFinalizedEpoch(finalizedSlot phase0.Slot, justifiedRo cache.indexer.logger.Errorf("error while updating fork state: %v", err) } } - -// checkForkDistance checks the distance between two blocks in a fork and returns the base block and distances. -// If the fork happened before the latest finalized slot, only the side of the fork that does not include the finalized block gets returned. -func (cache *forkCache) checkForkDistance(block1 *Block, block2 *Block, parentsMap map[phase0.Root]bool) (baseBlock *Block, block1Distance uint64, leafBlock1 *Block, block2Distance uint64, leafBlock2 *Block) { - finalizedSlot := cache.indexer.consensusPool.GetChainState().GetFinalizedSlot() - _, finalizedRoot := cache.indexer.consensusPool.GetChainState().GetFinalizedCheckpoint() - leafBlock1 = block1 - leafBlock2 = block2 - - var block1IsFinalized, block2IsFinalized bool - - for { - parentsMap[block1.Root] = true - parentsMap[block2.Root] = true - - if bytes.Equal(block1.Root[:], block2.Root[:]) { - baseBlock = block1 - return - } - - if !block1IsFinalized && bytes.Equal(block1.Root[:], finalizedRoot[:]) { - block1IsFinalized = true - } - - if !block2IsFinalized && bytes.Equal(block2.Root[:], finalizedRoot[:]) { - block2IsFinalized = true - } - - if block1.Slot <= finalizedSlot && block2.Slot <= finalizedSlot { - if block1IsFinalized { - baseBlock = block2 - leafBlock1 = nil - return - } - if block2IsFinalized { - baseBlock = block1 - leafBlock2 = nil - return - } - - break - } - - block1Slot := block1.Slot - block2Slot := block2.Slot - - if block1Slot <= block2Slot && !block2IsFinalized { - leafBlock2 = block2 - parentRoot := block2.GetParentRoot() - if parentRoot == nil { - break - } - - block2 = cache.indexer.blockCache.getBlockByRoot(*parentRoot) - if block2 == nil { - dbBlockHead := db.GetBlockHeadByRoot(parentRoot[:]) - if dbBlockHead != nil { - block2 = newBlock(cache.indexer.dynSsz, phase0.Root(dbBlockHead.Root), phase0.Slot(dbBlockHead.Slot)) - block2.isInFinalizedDb = true - block2.parentRoot = (*phase0.Root)(dbBlockHead.ParentRoot) - } else { - break - } - } - - block2Distance++ - } - - if block2Slot <= block1Slot && !block1IsFinalized { - leafBlock1 = block1 - parentRoot := block1.GetParentRoot() - if parentRoot == nil { - break - } - - block1 = cache.indexer.blockCache.getBlockByRoot(*parentRoot) - if block1 == nil { - dbBlockHead := db.GetBlockHeadByRoot(parentRoot[:]) - if dbBlockHead != nil { - block1 = newBlock(cache.indexer.dynSsz, phase0.Root(dbBlockHead.Root), phase0.Slot(dbBlockHead.Slot)) - block1.isInFinalizedDb = true - block1.parentRoot = (*phase0.Root)(dbBlockHead.ParentRoot) - } else { - break - } - } - - block1Distance++ - } - } - - return nil, 0, nil, 0, nil -} - -// processBlock processes a block and detects new forks if any. -// It persists the new forks to the database, updates any subsequent blocks building on top of the given block and returns the fork ID. -func (cache *forkCache) processBlock(block *Block) error { - cache.forkProcessLock.Lock() - defer cache.forkProcessLock.Unlock() - - parentForkId := ForkKey(1) - // get fork id from parent block - parentRoot := block.GetParentRoot() - if parentRoot != nil { - parentBlock := cache.indexer.blockCache.getBlockByRoot(*parentRoot) - if parentBlock == nil { - blockHead := db.GetBlockHeadByRoot((*parentRoot)[:]) - if blockHead != nil { - parentForkId = ForkKey(blockHead.ForkId) - } - } else if parentBlock.fokChecked { - parentForkId = parentBlock.forkId - } - } - - forkBlocks := cache.indexer.blockCache.getForkBlocks(parentForkId) - sort.Slice(forkBlocks, func(i, j int) bool { - return forkBlocks[i].Slot > forkBlocks[j].Slot - }) - - var fork1, fork2 *Fork - var fork1Roots, fork2Roots [][]byte - currentForkId := parentForkId - - parentsMap := map[phase0.Root]bool{} - for _, forkBlock := range forkBlocks { - if forkBlock == block || parentsMap[forkBlock.Root] { - continue - } - - baseBlock, distance1, leaf1, distance2, leaf2 := cache.checkForkDistance(block, forkBlock, parentsMap) - if baseBlock != nil && distance1 > 0 && distance2 > 0 { - // new fork detected - - if leaf1 != nil { - if cache.getForkByLeaf(leaf1.Root) != nil { - cache.indexer.logger.Warnf("fork already exists for leaf %v [%v] (processing %v)", leaf1.Slot, leaf1.Root.String(), block.Slot) - } else { - cache.lastForkId++ - fork1 = newFork(cache.lastForkId, baseBlock, leaf1, parentForkId) - cache.addFork(fork1) - fork1Roots = cache.updateNewForkBlocks(fork1, forkBlocks, block) - } - } - - if leaf2 != nil { - if cache.getForkByLeaf(leaf2.Root) != nil { - cache.indexer.logger.Warnf("fork already exists for leaf %v [%v] (processing %v)", leaf2.Slot, leaf2.Root.String(), block.Slot) - } else { - cache.lastForkId++ - fork2 = newFork(cache.lastForkId, baseBlock, leaf2, parentForkId) - cache.addFork(fork2) - fork2Roots = cache.updateNewForkBlocks(fork2, forkBlocks, nil) - } - } - - if parentForkId > 0 { - parentFork := cache.getForkById(parentForkId) - if parentFork != nil { - parentFork.headBlock = baseBlock - } - } - - logbuf := strings.Builder{} - fmt.Fprintf(&logbuf, "new fork detected (base %v [%v]", baseBlock.Slot, baseBlock.Root.String()) - if leaf1 != nil { - fmt.Fprintf(&logbuf, ", head1: %v [%v]", leaf1.Slot, leaf1.Root.String()) - } - if leaf2 != nil { - fmt.Fprintf(&logbuf, ", head2: %v [%v]", leaf2.Slot, leaf2.Root.String()) - } - fmt.Fprintf(&logbuf, ")") - cache.indexer.logger.Infof(logbuf.String()) - - if fork1 != nil { - currentForkId = fork1.forkId - } - - break - } - } - - updatedBlocks := [][]byte{} - if currentForkId != 0 { - // apply fork id to all blocks building on top of this block - nextBlock := block - - for { - nextBlocks := cache.indexer.blockCache.getBlocksByParentRoot(nextBlock.Root) - if len(nextBlocks) > 1 { - // sub-fork detected, but that's probably already handled - // TODO (low prio): check if the sub-fork really exists? - break - } - - if len(nextBlocks) == 0 { - break - } - - nextBlock = nextBlocks[0] - - if !nextBlock.fokChecked { - break - } - - if nextBlock.forkId == currentForkId { - break - } - - nextBlock.forkId = currentForkId - updatedBlocks = append(updatedBlocks, nextBlock.Root[:]) - } - - fork := cache.getForkById(currentForkId) - if fork != nil && (fork.headBlock == nil || fork.headBlock.Slot < nextBlock.Slot) { - fork.headBlock = nextBlock - } - } - - block.forkId = currentForkId - block.fokChecked = true - - if fork1 != nil || fork2 != nil || len(updatedBlocks) > 0 { - err := db.RunDBTransaction(func(tx *sqlx.Tx) error { - if fork1 != nil { - err := db.InsertFork(fork1.toDbFork(), tx) - if err != nil { - return err - } - - if len(fork1Roots) > 0 { - err = db.UpdateUnfinalizedBlockForkId(fork1Roots, uint64(fork1.forkId), tx) - if err != nil { - return err - } - } - - cache.indexer.logger.Infof("fork %v created (base %v [%v], head %v [%v], updated blocks: %v)", fork1.forkId, fork1.baseSlot, fork1.baseRoot.String(), fork1.leafSlot, fork1.leafRoot.String(), len(fork1Roots)) - } - - if fork2 != nil { - err := db.InsertFork(fork2.toDbFork(), tx) - if err != nil { - return err - } - - if len(fork2Roots) > 0 { - err = db.UpdateUnfinalizedBlockForkId(fork2Roots, uint64(fork2.forkId), tx) - if err != nil { - return err - } - } - - cache.indexer.logger.Infof("fork %v created (base %v [%v], head %v [%v], updated blocks: %v)", fork2.forkId, fork2.baseSlot, fork2.baseRoot.String(), fork2.leafSlot, fork2.leafRoot.String(), len(fork2Roots)) - } - - if len(updatedBlocks) > 0 { - err := db.UpdateUnfinalizedBlockForkId(updatedBlocks, uint64(currentForkId), tx) - if err != nil { - return err - } - - cache.indexer.logger.Infof("updated %v blocks to fork %v", len(updatedBlocks), currentForkId) - } - - err := cache.updateForkState(tx) - if err != nil { - return fmt.Errorf("error while updating fork state: %v", err) - } - - return nil - }) - if err != nil { - return err - } - } - - return nil -} - -// updateNewForkBlocks updates the fork blocks with the given fork. returns the roots of the updated blocks. -func (cache *forkCache) updateNewForkBlocks(fork *Fork, blocks []*Block, ignoreBlock *Block) [][]byte { - updatedRoots := [][]byte{} - - for _, block := range blocks { - if block.Slot <= fork.baseSlot { - return updatedRoots - } - - if block == ignoreBlock { - continue - } - - isInFork, _ := cache.indexer.blockCache.getCanonicalDistance(fork.leafRoot, block.Root, 0) - if !isInFork { - continue - } - - block.forkId = fork.forkId - updatedRoots = append(updatedRoots, block.Root[:]) - } - - return updatedRoots -} diff --git a/indexer/beacon/forkdetection.go b/indexer/beacon/forkdetection.go new file mode 100644 index 0000000..12a6e34 --- /dev/null +++ b/indexer/beacon/forkdetection.go @@ -0,0 +1,329 @@ +package beacon + +import ( + "fmt" + "strings" + + "github.com/attestantio/go-eth2-client/spec/phase0" + "github.com/jmoiron/sqlx" + + "github.com/ethpandaops/dora/db" +) + +type newForkInfo struct { + fork *Fork + updateRoots [][]byte +} + +type updateForkInfo struct { + baseRoot []byte + parent ForkKey +} + +// processBlock processes a block and detects new forks if any. +// It persists the new forks to the database, sets the forkId of the supplied block +// and updates the forkId of all blocks affected by newly detected forks. +func (cache *forkCache) processBlock(block *Block) error { + cache.forkProcessLock.Lock() + defer cache.forkProcessLock.Unlock() + + parentRoot := block.GetParentRoot() + if parentRoot == nil { + return fmt.Errorf("parent root not found for block %v", block.Slot) + } + + chainState := cache.indexer.consensusPool.GetChainState() + + // get fork id from parent block + parentForkId := ForkKey(1) + parentSlot := phase0.Slot(0) + parentIsProcessed := false + parentIsFinalized := false + + if parentBlock := cache.indexer.blockCache.getBlockByRoot(*parentRoot); parentBlock == nil { + // parent block might already be finalized, check if it's in the database + blockHead := db.GetBlockHeadByRoot((*parentRoot)[:]) + if blockHead != nil { + parentForkId = ForkKey(blockHead.ForkId) + parentSlot = phase0.Slot(blockHead.Slot) + parentIsProcessed = true + parentIsFinalized = parentSlot < chainState.GetFinalizedSlot() + } + } else if parentBlock.fokChecked { + parentForkId = parentBlock.forkId + parentSlot = parentBlock.Slot + parentIsProcessed = true + parentIsFinalized = parentBlock.Slot < chainState.GetFinalizedSlot() + } + + // check if this block (c) introduces a new fork, it does so if: + // 1. the parent (p) is known & processed and has 1 or more child blocks besides this one (c1, c2, ...) + // c c1 c2 + // \ | / + // p + // 2. the current block (c) has 2 or more child blocks, multiple forks possible (c1, c2, ...) + // c1 c2 c3 + // \ | / + // c + + newForks := []*newForkInfo{} + updateForks := []*updateForkInfo{} + currentForkId := parentForkId // default to parent fork id + + // check scenario 1 + if parentIsProcessed { + otherChildren := []*Block{} + for _, child := range cache.indexer.blockCache.getBlocksByParentRoot(*parentRoot) { + if child == block { + continue + } + + otherChildren = append(otherChildren, child) + } + + if len(otherChildren) > 0 { + logbuf := strings.Builder{} + + // parent already has a children, so this block introduces a new fork + if cache.getForkByLeaf(block.Root) != nil { + cache.indexer.logger.Warnf("fork already exists for leaf %v [%v] (processing %v, scenario 1)", block.Slot, block.Root.String(), block.Slot) + } else { + cache.lastForkId++ + fork := newFork(cache.lastForkId, parentSlot, *parentRoot, block, parentForkId) + cache.addFork(fork) + + currentForkId = fork.forkId + newFork := &newForkInfo{ + fork: fork, + } + newForks = append(newForks, newFork) + + fmt.Fprintf(&logbuf, ", head1: %v [%v, ? upd]", block.Slot, block.Root.String()) + } + + if !parentIsFinalized && len(otherChildren) == 1 { + // parent (a) is not finalized and our new detected fork is the first fork based on this parent (c) + // we need to create another fork for the other chain that starts from our fork base (b1, b2, ) + // and update the blocks building on top of it + // we don't need to care about this if there are other forks already based on the parent + // b2 + // | + // b1 c + // | / + // a + + if cache.getForkByLeaf(otherChildren[0].Root) != nil { + cache.indexer.logger.Warnf("fork already exists for leaf %v [%v] (processing %v, scenario 1)", otherChildren[0].Slot, otherChildren[0].Root.String(), block.Slot) + } else { + cache.lastForkId++ + otherFork := newFork(cache.lastForkId, parentSlot, *parentRoot, otherChildren[0], parentForkId) + cache.addFork(otherFork) + + updatedRoots, updatedFork := cache.updateForkBlocks(otherChildren[0], otherFork.forkId, false) + newFork := &newForkInfo{ + fork: otherFork, + updateRoots: updatedRoots, + } + newForks = append(newForks, newFork) + + if updatedFork != nil { + updateForks = append(updateForks, updatedFork) + } + + fmt.Fprintf(&logbuf, ", head2: %v [%v, %v upd]", newFork.fork.leafSlot, newFork.fork.leafRoot.String(), len(newFork.updateRoots)) + } + } + + if logbuf.Len() > 0 { + cache.indexer.logger.Infof("new fork leaf detected (base %v [%v]%v)", parentSlot, parentRoot.String(), logbuf.String()) + } + } + } + + // check scenario 2 + childBlocks := make([]*Block, 0) + for _, child := range cache.indexer.blockCache.getBlocksByParentRoot(block.Root) { + if !child.fokChecked { + continue + } + + childBlocks = append(childBlocks, child) + } + + if len(childBlocks) > 1 { + // multiple blocks building on top of the current one, create a fork for each + logbuf := strings.Builder{} + for idx, child := range childBlocks { + if cache.getForkByLeaf(child.Root) != nil { + cache.indexer.logger.Warnf("fork already exists for leaf %v [%v] (processing %v, scenario 2)", child.Slot, child.Root.String(), block.Slot) + } else { + cache.lastForkId++ + fork := newFork(cache.lastForkId, block.Slot, block.Root, child, currentForkId) + cache.addFork(fork) + + updatedRoots, updatedFork := cache.updateForkBlocks(child, fork.forkId, false) + newFork := &newForkInfo{ + fork: fork, + updateRoots: updatedRoots, + } + newForks = append(newForks, newFork) + + if updatedFork != nil { + updateForks = append(updateForks, updatedFork) + } + + fmt.Fprintf(&logbuf, ", head%v: %v [%v, %v upd]", idx+1, newFork.fork.leafSlot, newFork.fork.leafRoot.String(), len(newFork.updateRoots)) + } + } + + if logbuf.Len() > 0 { + cache.indexer.logger.Infof("new child forks detected (base %v [%v]%v)", block.Slot, block.Root.String(), logbuf.String()) + } + } + + // update fork ids of all blocks building on top of the current block + updatedBlocks, updatedFork := cache.updateForkBlocks(block, currentForkId, true) + if updatedFork != nil { + updateForks = append(updateForks, updatedFork) + } + + // set detected fork id to the block + block.forkId = currentForkId + block.fokChecked = true + + // update fork head block if needed + fork := cache.getForkById(currentForkId) + if fork != nil { + lastBlock := block + if len(updatedBlocks) > 0 { + lastBlock = cache.indexer.blockCache.getBlockByRoot(phase0.Root(updatedBlocks[len(updatedBlocks)-1])) + } + if lastBlock != nil && (fork.headBlock == nil || lastBlock.Slot > fork.headBlock.Slot) { + fork.headBlock = lastBlock + } + } + + // persist new forks and updated blocks to the database + if len(newForks) > 0 || len(updatedBlocks) > 0 { + err := db.RunDBTransaction(func(tx *sqlx.Tx) error { + // helper function to update unfinalized block fork ids in batches + updateUnfinalizedBlockForkIds := func(updateRoots [][]byte, forkId ForkKey) error { + batchSize := 1000 + numBatches := (len(updateRoots) + batchSize - 1) / batchSize + + for i := 0; i < numBatches; i++ { + start := i * batchSize + end := (i + 1) * batchSize + if end > len(updateRoots) { + end = len(updateRoots) + } + + batchRoots := updateRoots[start:end] + + err := db.UpdateUnfinalizedBlockForkId(batchRoots, uint64(forkId), tx) + if err != nil { + return err + } + } + + return nil + } + + // add new forks + for _, newFork := range newForks { + err := db.InsertFork(newFork.fork.toDbFork(), tx) + if err != nil { + return err + } + + if len(newFork.updateRoots) > 0 { + err := updateUnfinalizedBlockForkIds(newFork.updateRoots, newFork.fork.forkId) + if err != nil { + return err + } + } + } + + // update blocks building on top of current block + if len(updatedBlocks) > 0 { + err := updateUnfinalizedBlockForkIds(updatedBlocks, currentForkId) + if err != nil { + return err + } + + cache.indexer.logger.Infof("updated %v blocks to fork %v", len(updatedBlocks), currentForkId) + } + + // update parents of forks building on top of current blocks chain segment + if len(updateForks) > 0 { + for _, updatedFork := range updateForks { + err := db.UpdateForkParent(updatedFork.baseRoot, uint64(updatedFork.parent), tx) + if err != nil { + return err + } + } + + cache.indexer.logger.Infof("updated %v fork parents", len(updateForks)) + } + + err := cache.updateForkState(tx) + if err != nil { + return fmt.Errorf("error while updating fork state: %v", err) + } + + return nil + }) + if err != nil { + return err + } + } + + return nil +} + +// updateForkBlocks updates the blocks building on top of the given block in the fork and returns the updated block roots. +func (cache *forkCache) updateForkBlocks(startBlock *Block, forkId ForkKey, skipStartBlock bool) (blockRoots [][]byte, updatedFork *updateForkInfo) { + blockRoots = [][]byte{} + + if !skipStartBlock { + blockRoots = append(blockRoots, startBlock.Root[:]) + } + + for { + nextBlocks := cache.indexer.blockCache.getBlocksByParentRoot(startBlock.Root) + if len(nextBlocks) == 0 { + break + } + + if len(nextBlocks) > 1 { + // potential fork ahead, check if the fork is already processed and has correct parent fork id + if forks := cache.getForkByBase(startBlock.Root); len(forks) > 0 && forks[0].parentFork != forkId { + for _, fork := range forks { + fork.parentFork = forkId + } + + updatedFork = &updateForkInfo{ + baseRoot: startBlock.Root[:], + parent: forkId, + } + } + break + } + + nextBlock := nextBlocks[0] + if !nextBlock.fokChecked { + break + } + + if nextBlock.forkId == forkId { + break + } + + nextBlock.forkId = forkId + blockRoots = append(blockRoots, nextBlock.Root[:]) + + startBlock = nextBlock + } + + return +} diff --git a/indexer/beacon/indexer.go b/indexer/beacon/indexer.go index bb8032a..5345fc1 100644 --- a/indexer/beacon/indexer.go +++ b/indexer/beacon/indexer.go @@ -318,6 +318,7 @@ func (indexer *Indexer) StartIndexer() { } block.SetHeader(header) + indexer.blockCache.addBlockToParentMap(block) blockBody, err := unmarshalVersionedSignedBeaconBlockSSZ(indexer.dynSsz, dbBlock.BlockVer, dbBlock.BlockSSZ) if err != nil {