Skip to content

Commit

Permalink
core/vote: vote before committing state and writing block (#2589)
Browse files Browse the repository at this point in the history
  • Loading branch information
buddh0 authored Jul 19, 2024
1 parent 74078e1 commit 7b8d28b
Show file tree
Hide file tree
Showing 10 changed files with 88 additions and 50 deletions.
3 changes: 3 additions & 0 deletions consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ type ChainHeaderReader interface {
// GetHighestVerifiedHeader retrieves the highest header verified.
GetHighestVerifiedHeader() *types.Header

// GetVerifiedBlockByHash retrieves the highest verified block.
GetVerifiedBlockByHash(hash common.Hash) *types.Header

// ChasingHead return the best chain head of peers.
ChasingHead() *types.Header
}
Expand Down
2 changes: 1 addition & 1 deletion consensus/parlia/parlia.go
Original file line number Diff line number Diff line change
Expand Up @@ -1362,7 +1362,7 @@ func (p *Parlia) IsActiveValidatorAt(chain consensus.ChainHeaderReader, header *
func (p *Parlia) VerifyVote(chain consensus.ChainHeaderReader, vote *types.VoteEnvelope) error {
targetNumber := vote.Data.TargetNumber
targetHash := vote.Data.TargetHash
header := chain.GetHeaderByHash(targetHash)
header := chain.GetVerifiedBlockByHash(targetHash)
if header == nil {
log.Warn("BlockHeader at current voteBlockNumber is nil", "targetNumber", targetNumber, "targetHash", targetHash)
return errors.New("BlockHeader at current voteBlockNumber is nil")
Expand Down
33 changes: 20 additions & 13 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,23 +259,25 @@ type BlockChain struct {
triesInMemory uint64
txIndexer *txIndexer // Transaction indexer, might be nil if not enabled

hc *HeaderChain
rmLogsFeed event.Feed
chainFeed event.Feed
chainSideFeed event.Feed
chainHeadFeed event.Feed
chainBlockFeed event.Feed
logsFeed event.Feed
blockProcFeed event.Feed
finalizedHeaderFeed event.Feed
scope event.SubscriptionScope
genesisBlock *types.Block
hc *HeaderChain
rmLogsFeed event.Feed
chainFeed event.Feed
chainSideFeed event.Feed
chainHeadFeed event.Feed
chainBlockFeed event.Feed
logsFeed event.Feed
blockProcFeed event.Feed
finalizedHeaderFeed event.Feed
highestVerifiedBlockFeed event.Feed
scope event.SubscriptionScope
genesisBlock *types.Block

// This mutex synchronizes chain write operations.
// Readers don't need to take it, they can just read the database.
chainmu *syncx.ClosableMutex

highestVerifiedHeader atomic.Pointer[types.Header]
highestVerifiedBlock atomic.Pointer[types.Header]
currentBlock atomic.Pointer[types.Header] // Current head of the chain
currentSnapBlock atomic.Pointer[types.Header] // Current head of snap-sync
currentFinalBlock atomic.Pointer[types.Header] // Latest (consensus) finalized block
Expand Down Expand Up @@ -400,6 +402,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
}

bc.highestVerifiedHeader.Store(nil)
bc.highestVerifiedBlock.Store(nil)
bc.currentBlock.Store(nil)
bc.currentSnapBlock.Store(nil)
bc.chasingHead.Store(nil)
Expand Down Expand Up @@ -1925,8 +1928,12 @@ func (bc *BlockChain) writeBlockAndSetHead(block *types.Block, receipts []*types
if err != nil {
return NonStatTy, err
}
if reorg && mux != nil {
mux.Post(NewSealedBlockEvent{Block: block})
if reorg {
bc.highestVerifiedBlock.Store(types.CopyHeader(block.Header()))
bc.highestVerifiedBlockFeed.Send(HighestVerifiedBlockEvent{Header: block.Header()})
if mux != nil {
mux.Post(NewSealedBlockEvent{Block: block})
}
}

if err := bc.writeBlockWithState(block, receipts, state); err != nil {
Expand Down
14 changes: 14 additions & 0 deletions core/blockchain_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,15 @@ func (bc *BlockChain) GetHeaderByHash(hash common.Hash) *types.Header {
return bc.hc.GetHeaderByHash(hash)
}

// GetVerifiedBlockByHash retrieves the header of a verified block, it may be only in memory.
func (bc *BlockChain) GetVerifiedBlockByHash(hash common.Hash) *types.Header {
highestVerifiedBlock := bc.highestVerifiedBlock.Load()
if highestVerifiedBlock != nil && highestVerifiedBlock.Hash() == hash {
return highestVerifiedBlock
}
return bc.hc.GetHeaderByHash(hash)
}

// GetHeaderByNumber retrieves a block header from the database by number,
// caching it (associated with its hash) if found.
func (bc *BlockChain) GetHeaderByNumber(number uint64) *types.Header {
Expand Down Expand Up @@ -486,6 +495,11 @@ func (bc *BlockChain) SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Su
return bc.scope.Track(bc.chainHeadFeed.Subscribe(ch))
}

// SubscribeHighestVerifiedBlockEvent registers a subscription of HighestVerifiedBlockEvent.
func (bc *BlockChain) SubscribeHighestVerifiedHeaderEvent(ch chan<- HighestVerifiedBlockEvent) event.Subscription {
return bc.scope.Track(bc.highestVerifiedBlockFeed.Subscribe(ch))
}

// SubscribeChainBlockEvent registers a subscription of ChainBlockEvent.
func (bc *BlockChain) SubscribeChainBlockEvent(ch chan<- ChainHeadEvent) event.Subscription {
return bc.scope.Track(bc.chainBlockFeed.Subscribe(ch))
Expand Down
4 changes: 4 additions & 0 deletions core/chain_makers.go
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,10 @@ func (cm *chainMaker) GetHighestVerifiedHeader() *types.Header {
panic("not supported")
}

func (cm *chainMaker) GetVerifiedBlockByHash(hash common.Hash) *types.Header {
return cm.GetHeaderByHash(hash)
}

func (cm *chainMaker) ChasingHead() *types.Header {
panic("not supported")
}
4 changes: 4 additions & 0 deletions core/data_availability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,10 @@ func (r *mockDAHeaderReader) GetHighestVerifiedHeader() *types.Header {
panic("not supported")
}

func (r *mockDAHeaderReader) GetVerifiedBlockByHash(hash common.Hash) *types.Header {
panic("not supported")
}

func createMockDATx(config *params.ChainConfig, sidecar *types.BlobTxSidecar) *types.Transaction {
if sidecar == nil {
tx := &types.DynamicFeeTx{
Expand Down
2 changes: 2 additions & 0 deletions core/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,5 @@ type ChainSideEvent struct {
}

type ChainHeadEvent struct{ Block *types.Block }

type HighestVerifiedBlockEvent struct{ Header *types.Header }
4 changes: 4 additions & 0 deletions core/headerchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,10 @@ func (hc *HeaderChain) GetHighestVerifiedHeader() *types.Header {
return nil
}

func (hc *HeaderChain) GetVerifiedBlockByHash(hash common.Hash) *types.Header {
return hc.GetHeaderByHash(hash)
}

func (hc *HeaderChain) ChasingHead() *types.Header {
return nil
}
Expand Down
30 changes: 15 additions & 15 deletions core/vote/vote_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ type VoteManager struct {

chain *core.BlockChain

chainHeadCh chan core.ChainHeadEvent
chainHeadSub event.Subscription
highestVerifiedBlockCh chan core.HighestVerifiedBlockEvent
highestVerifiedBlockSub event.Subscription

// used for backup validators to sync votes from corresponding mining validator
syncVoteCh chan core.NewVoteEvent
Expand All @@ -49,12 +49,12 @@ type VoteManager struct {

func NewVoteManager(eth Backend, chain *core.BlockChain, pool *VotePool, journalPath, blsPasswordPath, blsWalletPath string, engine consensus.PoSA) (*VoteManager, error) {
voteManager := &VoteManager{
eth: eth,
chain: chain,
chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize),
syncVoteCh: make(chan core.NewVoteEvent, voteBufferForPut),
pool: pool,
engine: engine,
eth: eth,
chain: chain,
highestVerifiedBlockCh: make(chan core.HighestVerifiedBlockEvent, highestVerifiedBlockChanSize),
syncVoteCh: make(chan core.NewVoteEvent, voteBufferForPut),
pool: pool,
engine: engine,
}

// Create voteSigner.
Expand All @@ -74,7 +74,7 @@ func NewVoteManager(eth Backend, chain *core.BlockChain, pool *VotePool, journal
voteManager.journal = voteJournal

// Subscribe to chain head event.
voteManager.chainHeadSub = voteManager.chain.SubscribeChainHeadEvent(voteManager.chainHeadCh)
voteManager.highestVerifiedBlockSub = voteManager.chain.SubscribeHighestVerifiedHeaderEvent(voteManager.highestVerifiedBlockCh)
voteManager.syncVoteSub = voteManager.pool.SubscribeNewVoteEvent(voteManager.syncVoteCh)

go voteManager.loop()
Expand All @@ -84,7 +84,7 @@ func NewVoteManager(eth Backend, chain *core.BlockChain, pool *VotePool, journal

func (voteManager *VoteManager) loop() {
log.Debug("vote manager routine loop started")
defer voteManager.chainHeadSub.Unsubscribe()
defer voteManager.highestVerifiedBlockSub.Unsubscribe()
defer voteManager.syncVoteSub.Unsubscribe()

events := voteManager.eth.EventMux().Subscribe(downloader.StartEvent{}, downloader.DoneEvent{}, downloader.FailedEvent{})
Expand Down Expand Up @@ -119,7 +119,7 @@ func (voteManager *VoteManager) loop() {
log.Debug("downloader is in DoneEvent mode, set the startVote flag to true")
startVote = true
}
case cHead := <-voteManager.chainHeadCh:
case cHead := <-voteManager.highestVerifiedBlockCh:
if !startVote {
log.Debug("startVote flag is false, continue")
continue
Expand All @@ -135,12 +135,12 @@ func (voteManager *VoteManager) loop() {
continue
}

if cHead.Block == nil {
log.Debug("cHead.Block is nil, continue")
if cHead.Header == nil {
log.Debug("cHead.Header is nil, continue")
continue
}

curHead := cHead.Block.Header()
curHead := cHead.Header
if p, ok := voteManager.engine.(*parlia.Parlia); ok {
nextBlockMinedTime := time.Unix(int64((curHead.Time + p.Period())), 0)
timeForBroadcast := 50 * time.Millisecond // enough to broadcast a vote
Expand Down Expand Up @@ -217,7 +217,7 @@ func (voteManager *VoteManager) loop() {
case <-voteManager.syncVoteSub.Err():
log.Debug("voteManager subscribed votes failed")
return
case <-voteManager.chainHeadSub.Err():
case <-voteManager.highestVerifiedBlockSub.Err():
log.Debug("voteManager subscribed chainHead failed")
return
}
Expand Down
42 changes: 21 additions & 21 deletions core/vote/vote_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ const (
lowerLimitOfVoteBlockNumber = 256
upperLimitOfVoteBlockNumber = 11 // refer to fetcher.maxUncleDist

chainHeadChanSize = 10 // chainHeadChanSize is the size of channel listening to ChainHeadEvent.
highestVerifiedBlockChanSize = 10 // highestVerifiedBlockChanSize is the size of channel listening to HighestVerifiedBlockEvent.
)

var (
Expand Down Expand Up @@ -57,8 +57,8 @@ type VotePool struct {
curVotesPq *votesPriorityQueue
futureVotesPq *votesPriorityQueue

chainHeadCh chan core.ChainHeadEvent
chainHeadSub event.Subscription
highestVerifiedBlockCh chan core.HighestVerifiedBlockEvent
highestVerifiedBlockSub event.Subscription

votesCh chan *types.VoteEnvelope

Expand All @@ -69,38 +69,38 @@ type votesPriorityQueue []*types.VoteData

func NewVotePool(chain *core.BlockChain, engine consensus.PoSA) *VotePool {
votePool := &VotePool{
chain: chain,
receivedVotes: mapset.NewSet[common.Hash](),
curVotes: make(map[common.Hash]*VoteBox),
futureVotes: make(map[common.Hash]*VoteBox),
curVotesPq: &votesPriorityQueue{},
futureVotesPq: &votesPriorityQueue{},
chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize),
votesCh: make(chan *types.VoteEnvelope, voteBufferForPut),
engine: engine,
chain: chain,
receivedVotes: mapset.NewSet[common.Hash](),
curVotes: make(map[common.Hash]*VoteBox),
futureVotes: make(map[common.Hash]*VoteBox),
curVotesPq: &votesPriorityQueue{},
futureVotesPq: &votesPriorityQueue{},
highestVerifiedBlockCh: make(chan core.HighestVerifiedBlockEvent, highestVerifiedBlockChanSize),
votesCh: make(chan *types.VoteEnvelope, voteBufferForPut),
engine: engine,
}

// Subscribe events from blockchain and start the main event loop.
votePool.chainHeadSub = votePool.chain.SubscribeChainHeadEvent(votePool.chainHeadCh)
votePool.highestVerifiedBlockSub = votePool.chain.SubscribeHighestVerifiedHeaderEvent(votePool.highestVerifiedBlockCh)

go votePool.loop()
return votePool
}

// loop is the vote pool's main even loop, waiting for and reacting to outside blockchain events and votes channel event.
func (pool *VotePool) loop() {
defer pool.chainHeadSub.Unsubscribe()
defer pool.highestVerifiedBlockSub.Unsubscribe()

for {
select {
// Handle ChainHeadEvent.
case ev := <-pool.chainHeadCh:
if ev.Block != nil {
latestBlockNumber := ev.Block.NumberU64()
case ev := <-pool.highestVerifiedBlockCh:
if ev.Header != nil {
latestBlockNumber := ev.Header.Number.Uint64()
pool.prune(latestBlockNumber)
pool.transferVotesFromFutureToCur(ev.Block.Header())
pool.transferVotesFromFutureToCur(ev.Header)
}
case <-pool.chainHeadSub.Err():
case <-pool.highestVerifiedBlockSub.Err():
return

// Handle votes channel and put the vote into vote pool.
Expand Down Expand Up @@ -135,7 +135,7 @@ func (pool *VotePool) putIntoVotePool(vote *types.VoteEnvelope) bool {
var votesPq *votesPriorityQueue
isFutureVote := false

voteBlock := pool.chain.GetHeaderByHash(targetHash)
voteBlock := pool.chain.GetVerifiedBlockByHash(targetHash)
if voteBlock == nil {
votes = pool.futureVotes
votesPq = pool.futureVotesPq
Expand Down Expand Up @@ -226,7 +226,7 @@ func (pool *VotePool) transferVotesFromFutureToCur(latestBlockHeader *types.Head
futurePqBuffer := make([]*types.VoteData, 0)
for futurePq.Len() > 0 && futurePq.Peek().TargetNumber <= latestBlockNumber {
blockHash := futurePq.Peek().TargetHash
header := pool.chain.GetHeaderByHash(blockHash)
header := pool.chain.GetVerifiedBlockByHash(blockHash)
if header == nil {
// Put into pq buffer used for later put again into futurePq
futurePqBuffer = append(futurePqBuffer, heap.Pop(futurePq).(*types.VoteData))
Expand Down

0 comments on commit 7b8d28b

Please sign in to comment.