Skip to content

Commit

Permalink
remove check for finality depth in bhs and bhf. Do not update interna… (
Browse files Browse the repository at this point in the history
#10391)

* remove check for finality depth in bhs and bhf. Do not update internal stored mapping based on IsStored()

* fix delegate_test.go

* address comments for trusted bhs. revert blockheaderfeeder changes

* add a debug log

* fix failing test
  • Loading branch information
jinhoonbang authored Sep 2, 2023
1 parent 50d5570 commit ceea5fb
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 132 deletions.
6 changes: 0 additions & 6 deletions core/services/blockhashstore/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,6 @@ func (d *Delegate) ServicesForSpec(jb job.Job, qopts ...pg.QOpt) ([]job.ServiceC
return nil, errors.New("log poller must be enabled to run blockhashstore")
}

if jb.BlockhashStoreSpec.WaitBlocks < int32(chain.Config().EVM().FinalityDepth()) {
return nil, fmt.Errorf(
"waitBlocks must be greater than or equal to chain's finality depth (%d), currently %d",
chain.Config().EVM().FinalityDepth(), jb.BlockhashStoreSpec.WaitBlocks)
}

keys, err := d.ks.EnabledKeysForChain(chain.ID())
if err != nil {
return nil, errors.Wrap(err, "getting sending keys")
Expand Down
8 changes: 0 additions & 8 deletions core/services/blockhashstore/delegate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,14 +128,6 @@ func TestDelegate_ServicesForSpec(t *testing.T) {
assert.Error(t, err)
})

t.Run("WaitBlocks less than EvmFinalityDepth", func(t *testing.T) {
spec := job.Job{BlockhashStoreSpec: &job.BlockhashStoreSpec{
WaitBlocks: defaultWaitBlocks - 1,
}}
_, err := delegate.ServicesForSpec(spec)
assert.Error(t, err)
})

t.Run("missing EnabledKeysForChain", func(t *testing.T) {
_, err := testData.ethKeyStore.Delete(testData.sendingKey.ID())
require.NoError(t, err)
Expand Down
48 changes: 25 additions & 23 deletions core/services/blockhashstore/feeder.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func NewFeeder(
lookbackBlocks: lookbackBlocks,
latestBlock: latestBlock,
stored: make(map[uint64]struct{}),
storedTrusted: make(map[uint64]common.Hash),
lastRunBlock: 0,
wgStored: sync.WaitGroup{},
}
Expand All @@ -54,12 +55,12 @@ type Feeder struct {
lookbackBlocks int
latestBlock func(ctx context.Context) (uint64, error)

stored map[uint64]struct{}
lastRunBlock uint64
wgStored sync.WaitGroup
batchLock sync.Mutex
storedLock sync.RWMutex
errsLock sync.Mutex
stored map[uint64]struct{} // used for trustless feeder
storedTrusted map[uint64]common.Hash // used for trusted feeder
lastRunBlock uint64
wgStored sync.WaitGroup
batchLock sync.Mutex
errsLock sync.Mutex
}

// Run the feeder.
Expand Down Expand Up @@ -103,10 +104,10 @@ func (f *Feeder) Run(ctx context.Context) error {
"block", block)
errs = multierr.Append(errs, errors.Wrap(err, "checking if stored"))
} else if stored {
// IsStored() can be based on unfinalized blocks. Therefore, f.stored mapping is not updated
f.lggr.Infow("Blockhash already stored",
"block", block, "latestBlock", latestBlock,
"unfulfilledReqIDs", LimitReqIDs(unfulfilledReqs, 50))
f.stored[block] = struct{}{}
continue
}

Expand Down Expand Up @@ -161,13 +162,6 @@ func (f *Feeder) runTrusted(
if len(unfulfilled) == 0 {
return
}
f.storedLock.RLock()
if _, ok := f.stored[block]; ok {
// Already stored
f.storedLock.RUnlock()
return
}
f.storedLock.RUnlock()

// Do not store a block if it has been marked as stored; otherwise, store it even
// if the RPC call errors, as to be conservative.
Expand All @@ -185,9 +179,6 @@ func (f *Feeder) runTrusted(
f.lggr.Infow("Blockhash already stored",
"block", block, "latestBlock", latestBlock,
"unfulfilledReqIDs", LimitReqIDs(unfulfilled, 50))
f.storedLock.Lock()
f.stored[block] = struct{}{}
f.storedLock.Unlock()
return
}

Expand Down Expand Up @@ -224,15 +215,23 @@ func (f *Feeder) runTrusted(
// append its blockhash to our blockhashes we want to store.
// If it is the log poller block pertaining to our recent block number, assig it.
for _, b := range lpBlocks {
if b.BlockNumber == int64(latestBlock) {
latestBlockhash = b.BlockHash
}
if f.storedTrusted[uint64(b.BlockNumber)] == b.BlockHash {
// blockhash is already stored. skip to save gas
continue
}
if _, ok := batch[uint64(b.BlockNumber)]; ok {
blocksToStore = append(blocksToStore, uint64(b.BlockNumber))
blockhashesToStore = append(blockhashesToStore, b.BlockHash)
}
if b.BlockNumber == int64(latestBlock) {
latestBlockhash = b.BlockHash
}
}

if len(blocksToStore) == 0 {
f.lggr.Debugw("no blocks to store", "latestBlock", latestBlock)
return errs
}
// Store the batch of blocks and their blockhashes.
err = f.bhs.StoreTrusted(ctx, blocksToStore, blockhashesToStore, latestBlock, latestBlockhash)
if err != nil {
Expand All @@ -246,12 +245,15 @@ func (f *Feeder) runTrusted(
errs = multierr.Append(errs, errors.Wrap(err, "checking if stored"))
return errs
}
for i, block := range blocksToStore {
f.storedTrusted[block] = blockhashesToStore[i]
}
}

// Prune stored, anything older than fromBlock can be discarded.
for b := range f.stored {
// Prune storedTrusted, anything older than fromBlock can be discarded.
for b := range f.storedTrusted {
if b < fromBlock {
delete(f.stored, b)
delete(f.storedTrusted, b)
}
}

Expand Down
Loading

0 comments on commit ceea5fb

Please sign in to comment.