Skip to content

Commit

Permalink
CCIP-2791 Long term solution for snoozing and caching CommitRoots (#1182
Browse files Browse the repository at this point in the history
)

## Motivation

Detailed context is here
#726, but long story short
scanning the entire `permissionLessExecThreshold` /
`messageIntervalVisibility` from every Commit plugin instance (think of
it as a lane) puts a lot of pressure to the database. For most of the
cases, we don't need the entire window but only Roots that are not
executed yet.

Unfortunately, the solution suggested in the mentioned PR was faulty
because it relied on `block_timestamp` of unfinalized blocks. Re-orgs on
Polygon exposed that problem, details and revert here
#1155


## Solution

Divide the cache into two layers
* finalized commit roots are kept in memory because we assume these
never change
* unfinalized commit roots are fetched from LogPoller, and finalized
logs are promoted to the finalized part

There should be no additional memory footprint because we kept all
executed and finalized roots in cache memory anyway. The performance
impact of this approach should be similar to the #726 , because we keep
scanning only unfinalized chunks of the LogPoller so the number of logs
fetched from the database would be very limited.

However, there is one caveat here, LogPoller doesn't mark logs as
finalized/not finalized by default so we need to ask for the latest
finalized block in the Commit Reader directly and enrich logs with that
information (similar approach to what we do for Offramp reader).
Thankfully, asking the database for the latest log is extremely fast and
(if needed) could be optimized by caching latest finalized block in the
LogPoller/HeadTracker.

There will be load tests conducted here to confirm performance
assumptions.
Please see the code and tests for more details

chainlink-common PR
smartcontractkit/chainlink-common#650

### High level picture 


![image](https://github.com/smartcontractkit/ccip/assets/18304098/5c7e04e7-9703-4228-ac10-d264ab6e184a)


### Alternative solutions:

[I've tried implementing the solution in which the cache is simplified
and keeps only executed/snoozed
roots](#1170). Snoozed
Roots are passed to the database query and used for filtering out
executed and finalized roots. However, it works only for some number of
snoozed roots. After adding more than 250 snoozed roots to the LogPoller
query, the driver/database started to cut off some of the filters making
it unreliable (erroring or not filtering properly). Also, there is a
risk of hitting the Postgres query size limit (because every root has to
be passed in SQL) which might lead to some limbo state which the plugin
can't recover from.


## Tests

### Before - fetching all CommitReports always

<img width="1293" alt="image"
src="https://github.com/user-attachments/assets/ea32eccb-8d76-468c-9018-01d4989968f7">


### After - fetching only unfinalized CommitReports and caching
finalized ones

<img width="1283" alt="image"
src="https://github.com/user-attachments/assets/3732f438-063e-4e8f-86a8-849684e88970">


### Summary

Caching roots improved the performance, query is smaller in terms of the
objects scanned and returned. Number of roots returned is constant
almost during the entire test (around ~5 reports at once)
Sightly higher memory footprint (around 800MB higher) but lower CPU
utilization during tests (~15% drop)

<img width="2500" alt="image"
src="https://github.com/user-attachments/assets/027cb379-03ec-419c-a289-7b1df3e25cd8">
  • Loading branch information
mateusz-sekara authored Jul 31, 2024
1 parent b972fe9 commit 44b23a5
Show file tree
Hide file tree
Showing 8 changed files with 711 additions and 409 deletions.
3 changes: 2 additions & 1 deletion core/services/ocr2/plugins/ccip/ccipexec/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/smartcontractkit/libocr/offchainreporting2plus/types"

cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccip"

"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipcommon"

"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip"
Expand Down Expand Up @@ -141,7 +142,7 @@ func (rf *ExecutionReportingPluginFactory) NewReportingPluginFn(config types.Rep
offRampReader: rf.config.offRampReader,
tokenPoolBatchedReader: rf.config.tokenPoolBatchedReader,
inflightReports: newInflightExecReportsContainer(offchainConfig.InflightCacheExpiry.Duration()),
commitRootsCache: cache.NewCommitRootsCache(lggr, msgVisibilityInterval, offchainConfig.RootSnoozeTime.Duration()),
commitRootsCache: cache.NewCommitRootsCache(lggr, rf.config.commitStoreReader, msgVisibilityInterval, offchainConfig.RootSnoozeTime.Duration()),
metricsCollector: rf.config.metricsCollector,
chainHealthcheck: rf.config.chainHealthcheck,
batchingStrategy: batchingStrategy,
Expand Down
43 changes: 2 additions & 41 deletions core/services/ocr2/plugins/ccip/ccipexec/ocr2.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,11 @@ func (r *ExecutionReportingPlugin) Observation(ctx context.Context, timestamp ty
}

func (r *ExecutionReportingPlugin) getExecutableObservations(ctx context.Context, lggr logger.Logger, inflight []InflightInternalExecutionReport) ([]ccip.ObservedMessage, error) {
unexpiredReports, err := r.getUnexpiredCommitReports(ctx, r.commitStoreReader, lggr)
unexpiredReports, err := r.commitRootsCache.RootsEligibleForExecution(ctx)
if err != nil {
return nil, err
}
r.metricsCollector.UnexpiredCommitRoots(len(unexpiredReports))

if len(unexpiredReports) == 0 {
return []ccip.ObservedMessage{}, nil
Expand Down Expand Up @@ -703,46 +704,6 @@ func getTokensPrices(ctx context.Context, priceRegistry ccipdata.PriceRegistryRe
return tokenPrices, nil
}

func (r *ExecutionReportingPlugin) getUnexpiredCommitReports(
ctx context.Context,
commitStoreReader ccipdata.CommitStoreReader,
lggr logger.Logger,
) ([]cciptypes.CommitStoreReport, error) {
createdAfterTimestamp := r.commitRootsCache.OldestRootTimestamp()
lggr.Infow("Fetching unexpired commit roots from database", "createdAfterTimestamp", createdAfterTimestamp)
acceptedReports, err := commitStoreReader.GetAcceptedCommitReportsGteTimestamp(
ctx,
createdAfterTimestamp,
0,
)
if err != nil {
return nil, err
}

var reports []cciptypes.CommitStoreReport
for _, acceptedReport := range acceptedReports {
reports = append(reports, acceptedReport.CommitStoreReport)
r.commitRootsCache.AppendUnexecutedRoot(acceptedReport.MerkleRoot, time.UnixMilli(acceptedReport.TxMeta.BlockTimestampUnixMilli))
}

notSnoozedReports := make([]cciptypes.CommitStoreReport, 0)
for _, report := range reports {
if r.commitRootsCache.IsSkipped(report.MerkleRoot) {
lggr.Debugw("Skipping snoozed root",
"minSeqNr", report.Interval.Min,
"maxSeqNr", report.Interval.Max,
"root", hex.EncodeToString(report.MerkleRoot[:]),
)
continue
}
notSnoozedReports = append(notSnoozedReports, report)
}

r.metricsCollector.UnexpiredCommitRoots(len(notSnoozedReports))
lggr.Infow("Unexpired roots", "all", len(reports), "notSnoozed", len(notSnoozedReports))
return notSnoozedReports, nil
}

type execTokenData struct {
rateLimiterTokenBucket cciptypes.TokenBucketRateLimit
sourceTokenPrices map[cciptypes.Address]*big.Int
Expand Down
2 changes: 1 addition & 1 deletion core/services/ocr2/plugins/ccip/ccipexec/ocr2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func TestExecutionReportingPlugin_Observation(t *testing.T) {
mockOnRampPriceRegistryProvider.On("NewPriceRegistryReader", ctx, sourcePriceRegistryAddress).Return(sourcePriceRegReader, nil).Maybe()
p.sourcePriceRegistryProvider = mockOnRampPriceRegistryProvider

p.commitRootsCache = cache.NewCommitRootsCache(logger.TestLogger(t), time.Minute, time.Minute)
p.commitRootsCache = cache.NewCommitRootsCache(logger.TestLogger(t), commitStoreReader, time.Minute, time.Minute)
p.chainHealthcheck = cache.NewChainHealthcheck(p.lggr, mockOnRampReader, commitStoreReader)

bs := &BestEffortBatchingStrategy{}
Expand Down
Loading

0 comments on commit 44b23a5

Please sign in to comment.