Skip to content

Commit

Permalink
Merge pull request #12920 from smartcontractkit/fix-in-mem-ds-cache-e…
Browse files Browse the repository at this point in the history
…rr-handling

Fix in mem ds cache err handling cherry pick
  • Loading branch information
ilija42 authored Apr 23, 2024
2 parents 344ff06 + bb84abf commit fcd1d3f
Show file tree
Hide file tree
Showing 11 changed files with 115 additions and 67 deletions.
5 changes: 5 additions & 0 deletions .changeset/brown-penguins-grin.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

Fix in memory data source cache changes/bug that only allowed pipeline results where none of the data sources failed. #bugfix
6 changes: 6 additions & 0 deletions .changeset/poor-masks-fold.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"chainlink": minor
---

Move JuelsPerFeeCoinCacheDuration under JuelsPerFeeCoinCache struct in config. Rename JuelsPerFeeCoinCacheDuration to updateInterval. Add stalenessAlertThreshold to JuelsPerFeeCoinCache config.
StalenessAlertThreshold cfg option has a default of 24 hours which means that it doesn't have to be set unless we want to override the duration after which a stale cache should start throwing errors.
6 changes: 4 additions & 2 deletions core/internal/features/ocr2/features_ocr2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,8 @@ juelsPerFeeCoinSource = """
answer1 [type=median index=0];
"""
juelsPerFeeCoinCacheDuration = "1m"
[pluginConfig.juelsPerFeeCoinCache]
updateInterval = "1m"
`, ocrContractAddress, kbs[i].ID(), transmitters[i], fmt.Sprintf("bridge%d", i), i, slowServers[i].URL, i, blockBeforeConfig.Number().Int64(), chainReaderSpec, fmt.Sprintf("bridge%d", i), i, slowServers[i].URL, i), nil)
require.NoError(t, err)
err = apps[i].AddJobV2(testutils.Context(t), &ocrJob)
Expand Down Expand Up @@ -840,7 +841,8 @@ juelsPerFeeCoinSource = """
answer1 [type=median index=0];
"""
juelsPerFeeCoinCacheDuration = "1m"
[pluginConfig.juelsPerFeeCoinCache]
updateInterval = "1m"
`, ocrContractAddress, kbs[i].ID(), transmitters[i], fmt.Sprintf("bridge%d", i), i, slowServers[i].URL, i, fmt.Sprintf("bridge%d", i), i, slowServers[i].URL, i), nil)
require.NoError(t, err)
err = apps[i].AddJobV2(testutils.Context(t), &ocrJob)
Expand Down
9 changes: 6 additions & 3 deletions core/services/feeds/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ ds1_multiply [type=multiply times=1.23];
ds1 -> ds1_parse -> ds1_multiply -> answer1;
answer1 [type=median index=0];
"""
juelsPerFeeCoinCacheDuration = "1m"
[pluginConfig.juelsPerFeeCoinCache]
updateInterval = "1m"
`
const BootstrapTestSpecTemplate = `
type = "bootstrap"
Expand Down Expand Up @@ -2189,7 +2190,8 @@ ds1_multiply [type=multiply times=1.23];
ds1 -> ds1_parse -> ds1_multiply -> answer1;
answer1 [type=median index=0];
"""
juelsPerFeeCoinCacheDuration = "30s"
[pluginConfig.juelsPerFeeCoinCache]
updateInterval = "30s"
`
defn2 = `
name = 'LINK / ETH | version 3 | contract 0x0000000000000000000000000000000000000000'
Expand Down Expand Up @@ -2219,7 +2221,8 @@ ds1_multiply [type=multiply times=1.23];
ds1 -> ds1_parse -> ds1_multiply -> answer1;
answer1 [type=median index=0];
"""
juelsPerFeeCoinCacheDuration = "20m"
[pluginConfig.juelsPerFeeCoinCache]
updateInterval = "20m"
`

jp = &feeds.JobProposal{
Expand Down
24 changes: 15 additions & 9 deletions core/services/ocr2/plugins/median/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,14 @@ import (

// The PluginConfig struct contains the custom arguments needed for the Median plugin.
type PluginConfig struct {
JuelsPerFeeCoinPipeline string `json:"juelsPerFeeCoinSource"`
JuelsPerFeeCoinCacheDuration models.Interval `json:"juelsPerFeeCoinCacheDuration"`
JuelsPerFeeCoinCacheDisabled bool `json:"juelsPerFeeCoinCacheDisabled"`
JuelsPerFeeCoinPipeline string `json:"juelsPerFeeCoinSource"`
// JuelsPerFeeCoinCache is disabled when nil
JuelsPerFeeCoinCache *JuelsPerFeeCoinCache `json:"juelsPerFeeCoinCache"`
}

type JuelsPerFeeCoinCache struct {
UpdateInterval models.Interval `json:"updateInterval"`
StalenessAlertThreshold models.Interval `json:"stalenessAlertThreshold"`
}

// ValidatePluginConfig validates the arguments for the Median plugin.
Expand All @@ -25,12 +30,13 @@ func ValidatePluginConfig(config PluginConfig) error {
return errors.Wrap(err, "invalid juelsPerFeeCoinSource pipeline")
}

// unset duration defaults later
if config.JuelsPerFeeCoinCacheDuration != 0 {
if config.JuelsPerFeeCoinCacheDuration.Duration() < time.Second*30 {
return errors.Errorf("juelsPerFeeCoinSource cache duration: %s is below 30 second minimum", config.JuelsPerFeeCoinCacheDuration.Duration().String())
} else if config.JuelsPerFeeCoinCacheDuration.Duration() > time.Minute*20 {
return errors.Errorf("juelsPerFeeCoinSource cache duration: %s is above 20 minute maximum", config.JuelsPerFeeCoinCacheDuration.Duration().String())
// unset durations have a default set late
if config.JuelsPerFeeCoinCache != nil {
updateInterval := config.JuelsPerFeeCoinCache.UpdateInterval.Duration()
if updateInterval != 0 && updateInterval < time.Second*30 {
return errors.Errorf("juelsPerFeeCoinSourceCache update interval: %s is below 30 second minimum", updateInterval.String())
} else if updateInterval > time.Minute*20 {
return errors.Errorf("juelsPerFeeCoinSourceCache update interval: %s is above 20 minute maximum", updateInterval.String())
}
}

Expand Down
6 changes: 3 additions & 3 deletions core/services/ocr2/plugins/median/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ func TestValidatePluginConfig(t *testing.T) {

t.Run("cache duration validation", func(t *testing.T) {
for _, tc := range []testCase{
{"cache duration below minimum", `ds1 [type=bridge name=voter_turnout];`, models.Interval(time.Second * 29), fmt.Errorf("juelsPerFeeCoinSource cache duration: 29s is below 30 second minimum")},
{"cache duration above maximum", `ds1 [type=bridge name=voter_turnout];`, models.Interval(time.Minute*20 + time.Second), fmt.Errorf("juelsPerFeeCoinSource cache duration: 20m1s is above 20 minute maximum")},
{"cache duration below minimum", `ds1 [type=bridge name=voter_turnout];`, models.Interval(time.Second * 29), fmt.Errorf("juelsPerFeeCoinSourceCache update interval: 29s is below 30 second minimum")},
{"cache duration above maximum", `ds1 [type=bridge name=voter_turnout];`, models.Interval(time.Minute*20 + time.Second), fmt.Errorf("juelsPerFeeCoinSourceCache update interval: 20m1s is above 20 minute maximum")},
} {
t.Run(tc.name, func(t *testing.T) {
assert.EqualError(t, ValidatePluginConfig(PluginConfig{JuelsPerFeeCoinPipeline: tc.pipeline, JuelsPerFeeCoinCacheDuration: tc.cacheDuration}), tc.expectedError.Error())
assert.EqualError(t, ValidatePluginConfig(PluginConfig{JuelsPerFeeCoinPipeline: tc.pipeline, JuelsPerFeeCoinCache: &JuelsPerFeeCoinCache{UpdateInterval: tc.cacheDuration}}), tc.expectedError.Error())
})
}
})
Expand Down
4 changes: 2 additions & 2 deletions core/services/ocr2/plugins/median/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,9 @@ func NewMedianServices(ctx context.Context,
CreatedAt: time.Now(),
}, lggr)

if !pluginConfig.JuelsPerFeeCoinCacheDisabled {
if pluginConfig.JuelsPerFeeCoinCache != nil {
lggr.Infof("juelsPerFeeCoin data source caching is enabled")
juelsPerFeeCoinSourceCache, err2 := ocrcommon.NewInMemoryDataSourceCache(juelsPerFeeCoinSource, kvStore, pluginConfig.JuelsPerFeeCoinCacheDuration.Duration())
juelsPerFeeCoinSourceCache, err2 := ocrcommon.NewInMemoryDataSourceCache(juelsPerFeeCoinSource, kvStore, *pluginConfig.JuelsPerFeeCoinCache)
if err2 != nil {
return nil, err2
}
Expand Down
99 changes: 56 additions & 43 deletions core/services/ocrcommon/data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package ocrcommon
import (
"context"
"encoding/json"
errjoin "errors"
"fmt"
"math/big"
"sync"
Expand All @@ -19,6 +18,7 @@ import (
serializablebig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/median/config"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"

"github.com/smartcontractkit/chainlink/v2/core/utils"
Expand Down Expand Up @@ -103,8 +103,8 @@ func NewInMemoryDataSource(pr pipeline.Runner, jb job.Job, spec pipeline.Spec, l
}
}

const defaultCacheFreshness = time.Minute * 5
const defaultCacheFreshnessAlert = time.Hour * 24
const defaultUpdateInterval = time.Minute * 5
const defaultStalenessAlertThreshold = time.Hour * 24
const dataSourceCacheKey = "dscache"

type DataSourceCacheService interface {
Expand All @@ -113,22 +113,27 @@ type DataSourceCacheService interface {
median.DataSource
}

func NewInMemoryDataSourceCache(ds median.DataSource, kvStore job.KVStore, cacheFreshness time.Duration) (DataSourceCacheService, error) {
func NewInMemoryDataSourceCache(ds median.DataSource, kvStore job.KVStore, cacheCfg config.JuelsPerFeeCoinCache) (DataSourceCacheService, error) {
inMemoryDS, ok := ds.(*inMemoryDataSource)
if !ok {
return nil, errors.Errorf("unsupported data source type: %T, only inMemoryDataSource supported", ds)
}

if cacheFreshness == 0 {
cacheFreshness = defaultCacheFreshness
updateInterval, stalenessAlertThreshold := cacheCfg.UpdateInterval.Duration(), cacheCfg.StalenessAlertThreshold.Duration()
if updateInterval == 0 {
updateInterval = defaultUpdateInterval
}
if stalenessAlertThreshold == 0 {
stalenessAlertThreshold = defaultStalenessAlertThreshold
}

dsCache := &inMemoryDataSourceCache{
kvStore: kvStore,
cacheFreshness: cacheFreshness,
inMemoryDataSource: inMemoryDS,
chStop: make(chan struct{}),
chDone: make(chan struct{}),
inMemoryDataSource: inMemoryDS,
kvStore: kvStore,
updateInterval: updateInterval,
stalenessAlertThreshold: stalenessAlertThreshold,
chStop: make(chan struct{}),
chDone: make(chan struct{}),
}
return dsCache, nil
}
Expand Down Expand Up @@ -231,16 +236,18 @@ func (ds *inMemoryDataSource) Observe(ctx context.Context, timestamp ocr2types.R
// If cache update is overdue Observe defaults to standard inMemoryDataSource behaviour.
type inMemoryDataSourceCache struct {
*inMemoryDataSource
// cacheFreshness indicates duration between cache updates.
// Even if updates fail, previous values are returned.
cacheFreshness time.Duration
mu sync.RWMutex
chStop services.StopChan
chDone chan struct{}
latestUpdateErr error
latestTrrs pipeline.TaskRunResults
latestResult pipeline.FinalResult
kvStore job.KVStore
// updateInterval indicates duration between cache updates.
// Even if update fail, previous values are returned.
updateInterval time.Duration
// stalenessAlertThreshold indicates duration before logs raise severity level because of stale cache.
stalenessAlertThreshold time.Duration
mu sync.RWMutex
chStop services.StopChan
chDone chan struct{}
latestUpdateErr error
latestTrrs pipeline.TaskRunResults
latestResult pipeline.FinalResult
kvStore job.KVStore
}

func (ds *inMemoryDataSourceCache) Start(context.Context) error {
Expand All @@ -256,7 +263,7 @@ func (ds *inMemoryDataSourceCache) Close() error {

// updater periodically updates data source cache.
func (ds *inMemoryDataSourceCache) updater() {
ticker := time.NewTicker(ds.cacheFreshness)
ticker := time.NewTicker(ds.updateInterval)
updateCache := func() {
ctx, cancel := ds.chStop.CtxCancel(context.WithTimeout(context.Background(), time.Second*10))
defer cancel()
Expand Down Expand Up @@ -286,31 +293,30 @@ func (ds *inMemoryDataSourceCache) updateCache(ctx context.Context) error {
ds.mu.Lock()
defer ds.mu.Unlock()

// check for any errors
_, latestTrrs, latestUpdateErr := ds.executeRun(ctx)
if latestTrrs.FinalResult(ds.lggr).HasErrors() {
latestUpdateErr = errjoin.Join(append(latestTrrs.FinalResult(ds.lggr).AllErrors, latestUpdateErr)...)
}

if latestUpdateErr != nil {
_, latestTrrs, err := ds.executeRun(ctx)
if err != nil {
previousUpdateErr := ds.latestUpdateErr
ds.latestUpdateErr = latestUpdateErr
// raise log severity
ds.latestUpdateErr = err
// warn log if previous cache update also errored
if previousUpdateErr != nil {
ds.lggr.Warnf("consecutive cache updates errored: previous err: %v new err: %v", previousUpdateErr, ds.latestUpdateErr)
}
return errors.Wrapf(ds.latestUpdateErr, "error executing run for spec ID %v", ds.spec.ID)

return errors.Wrapf(ds.latestUpdateErr, "error updating in memory data source cache for spec ID %v", ds.spec.ID)
}

ds.latestTrrs = latestTrrs
ds.latestResult = ds.latestTrrs.FinalResult(ds.lggr)
value, err := ds.inMemoryDataSource.parse(ds.latestResult)
value, err := ds.inMemoryDataSource.parse(latestTrrs.FinalResult(ds.lggr))
if err != nil {
return errors.Wrapf(err, "invalid result")
ds.latestUpdateErr = errors.Wrapf(err, "invalid result")
return ds.latestUpdateErr
}

// backup in case data source fails continuously and node gets rebooted
// update cache values
ds.latestTrrs = latestTrrs
ds.latestResult = ds.latestTrrs.FinalResult(ds.lggr)
ds.latestUpdateErr = nil

// backup in case data source fails continuously and node gets rebooted
timePairBytes, err := json.Marshal(&ResultTimePair{Result: *serializablebig.New(value), Time: time.Now()})
if err != nil {
return fmt.Errorf("failed to marshal result time pair, err: %w", err)
Expand All @@ -333,7 +339,7 @@ func (ds *inMemoryDataSourceCache) get(ctx context.Context) (pipeline.FinalResul
ds.mu.RUnlock()

if err := ds.updateCache(ctx); err != nil {
ds.lggr.Warnf("failed to update cache err: %v, returning stale result now, err: %v", err)
ds.lggr.Warnf("failed to update cache, returning stale result now, err: %v", err)
}

ds.mu.RLock()
Expand All @@ -349,15 +355,15 @@ func (ds *inMemoryDataSourceCache) Observe(ctx context.Context, timestamp ocr2ty

timePairBytes, err := ds.kvStore.Get(ctx, dataSourceCacheKey)
if err != nil {
return nil, fmt.Errorf("failed to get result time pair bytes, err: %w", err)
return nil, fmt.Errorf("in memory data source cache is empty and failed to get backup persisted value, err: %w", err)
}

if err := json.Unmarshal(timePairBytes, &resTime); err != nil {
return nil, fmt.Errorf("failed to unmarshal result time pair bytes, err: %w", err)
if err = json.Unmarshal(timePairBytes, &resTime); err != nil {
return nil, fmt.Errorf("in memory data source cache is empty and failed to unmarshal backup persisted value, err: %w", err)
}

if time.Since(resTime.Time) >= defaultCacheFreshnessAlert {
ds.lggr.Errorf("cache hasn't been updated for over %v, latestUpdateErr is: %v", defaultCacheFreshnessAlert, ds.latestUpdateErr)
if time.Since(resTime.Time) >= ds.stalenessAlertThreshold {
ds.lggr.Errorf("in memory data source cache is empty and the persisted value hasn't been updated for over %v, latestUpdateErr is: %v", ds.stalenessAlertThreshold, ds.latestUpdateErr)
}
return resTime.Result.ToInt(), nil
}
Expand All @@ -368,6 +374,13 @@ func (ds *inMemoryDataSourceCache) Observe(ctx context.Context, timestamp ocr2ty
ConfigDigest: timestamp.ConfigDigest.Hex(),
})

// if last update was unsuccessful, check how much time passed since a successful update
if ds.latestUpdateErr != nil {
if time.Since(ds.latestTrrs.GetTaskRunResultsFinishedAt()) >= ds.stalenessAlertThreshold {
ds.lggr.Errorf("in memory cache is old and hasn't been updated for over %v, latestUpdateErr is: %v", ds.stalenessAlertThreshold, ds.latestUpdateErr)
}

}
return ds.parse(latestResult)
}

Expand Down
8 changes: 5 additions & 3 deletions core/services/ocrcommon/data_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/job/mocks"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/median/config"
"github.com/smartcontractkit/chainlink/v2/core/services/ocrcommon"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
pipelinemocks "github.com/smartcontractkit/chainlink/v2/core/services/pipeline/mocks"
"github.com/smartcontractkit/chainlink/v2/core/store/models"
)

var (
Expand Down Expand Up @@ -78,7 +80,7 @@ func Test_CachedInMemoryDataSourceErrHandling(t *testing.T) {
mockKVStore := mocks.KVStore{}
mockKVStore.On("Store", mock.Anything, mock.Anything, mock.Anything).Return(nil)
mockKVStore.On("Get", mock.Anything, mock.Anything).Return(nil, nil)
dsCache, err := ocrcommon.NewInMemoryDataSourceCache(ds, &mockKVStore, time.Second*2)
dsCache, err := ocrcommon.NewInMemoryDataSourceCache(ds, &mockKVStore, config.JuelsPerFeeCoinCache{UpdateInterval: models.Interval(time.Second * 2)})
require.NoError(t, err)
servicetest.Run(t, dsCache)

Expand Down Expand Up @@ -112,7 +114,7 @@ func Test_CachedInMemoryDataSourceErrHandling(t *testing.T) {
mockKVStore.On("Get", mock.Anything, mock.Anything).Return(result, nil)

// set updater to a long time so that it doesn't log errors after the test is done
dsCache, err := ocrcommon.NewInMemoryDataSourceCache(ds, &mockKVStore, time.Hour*100)
dsCache, err := ocrcommon.NewInMemoryDataSourceCache(ds, &mockKVStore, config.JuelsPerFeeCoinCache{UpdateInterval: models.Interval(time.Hour * 100)})
require.NoError(t, err)
changeResultValue(runner, "-1", true, false)
servicetest.Run(t, dsCache)
Expand All @@ -131,7 +133,7 @@ func Test_CachedInMemoryDataSourceErrHandling(t *testing.T) {
mockKVStore.On("Get", mock.Anything, mock.Anything).Return(nil, assert.AnError)

// set updater to a long time so that it doesn't log errors after the test is done
dsCache, err := ocrcommon.NewInMemoryDataSourceCache(ds, &mockKVStore, time.Hour*100)
dsCache, err := ocrcommon.NewInMemoryDataSourceCache(ds, &mockKVStore, config.JuelsPerFeeCoinCache{UpdateInterval: models.Interval(time.Hour * 100)})
require.NoError(t, err)
changeResultValue(runner, "-1", true, false)
servicetest.Run(t, dsCache)
Expand Down
11 changes: 11 additions & 0 deletions core/services/pipeline/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,17 @@ func (result *TaskRunResult) IsTerminal() bool {
// TaskRunResults represents a collection of results for all task runs for one pipeline run
type TaskRunResults []TaskRunResult

// GetTaskRunResultsFinishedAt returns latest finishedAt time from TaskRunResults.
func (trrs TaskRunResults) GetTaskRunResultsFinishedAt() time.Time {
var finishedTime time.Time
for _, trr := range trrs {
if trr.FinishedAt.Valid && trr.FinishedAt.Time.After(finishedTime) {
finishedTime = trr.FinishedAt.Time
}
}
return finishedTime
}

// FinalResult pulls the FinalResult for the pipeline_run from the task runs
// It needs to respect the output index of each task
func (trrs TaskRunResults) FinalResult(l logger.Logger) FinalResult {
Expand Down
4 changes: 2 additions & 2 deletions tools/ci/install_solana
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
#!/usr/bin/env bash

set -euo pipefail
VERSION=v1.13.3
SHASUM=3a063fe58e6f8bc9e9de84a8d1b96da87e9184cb357d462522f7ec8a2c23bec2
VERSION=v1.17.28
SHASUM=97faa4d14becfccd3bc539dbc0aaf28c84cfe9d80d299ec70092fb5844403724

echo "Installing solana@${VERSION}"
curl -sSfL https://release.solana.com/$VERSION/install --output install_solana.sh \
Expand Down

0 comments on commit fcd1d3f

Please sign in to comment.