Skip to content

Commit

Permalink
changes required to work with the updated chainlink common key value …
Browse files Browse the repository at this point in the history
…store (#12634)
  • Loading branch information
ettec authored Apr 2, 2024
1 parent 76e507e commit e9e903b
Show file tree
Hide file tree
Showing 17 changed files with 139 additions and 97 deletions.
5 changes: 5 additions & 0 deletions .changeset/large-flowers-agree.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

Update keyvalue store to be compatible with the interface required in chainlink common
2 changes: 1 addition & 1 deletion core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ require (
github.com/prometheus/client_golang v1.17.0
github.com/shopspring/decimal v1.3.1
github.com/smartcontractkit/chainlink-automation v1.0.2
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240401172519-4bfc659b80bf
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240402105740-0be47ab9cf63
github.com/smartcontractkit/chainlink-vrf v0.0.0-20231120191722-fef03814f868
github.com/smartcontractkit/chainlink/v2 v2.0.0-00010101000000-000000000000
github.com/smartcontractkit/libocr v0.0.0-20240326191951-2bbe9382d052
Expand Down
4 changes: 2 additions & 2 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1187,8 +1187,8 @@ github.com/smartcontractkit/chain-selectors v1.0.10 h1:t9kJeE6B6G+hKD0GYR4kGJSCq
github.com/smartcontractkit/chain-selectors v1.0.10/go.mod h1:d4Hi+E1zqjy9HqMkjBE5q1vcG9VGgxf5VxiRHfzi2kE=
github.com/smartcontractkit/chainlink-automation v1.0.2 h1:xsfyuswL15q2YBGQT3qn2SBz6fnSKiSW7XZ8IZQLpnI=
github.com/smartcontractkit/chainlink-automation v1.0.2/go.mod h1:RjboV0Qd7YP+To+OrzHGXaxUxoSONveCoAK2TQ1INLU=
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240401172519-4bfc659b80bf h1:yW8rTFycozLVnXRyOgZWGktnmzoFLxSWh1xPJXsp7vg=
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240401172519-4bfc659b80bf/go.mod h1:kstYjAGqBswdZpl7YkSPeXBDVwaY1VaR6tUMPWl8ykA=
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240402105740-0be47ab9cf63 h1:wX78l6lMQ6hfwqpOkavD/IyXqBDZ8MZOhhBE9z15Sd0=
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240402105740-0be47ab9cf63/go.mod h1:kstYjAGqBswdZpl7YkSPeXBDVwaY1VaR6tUMPWl8ykA=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240213120401-01a23955f9f8 h1:I326nw5GwHQHsLKHwtu5Sb9EBLylC8CfUd7BFAS0jtg=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240213120401-01a23955f9f8/go.mod h1:a65NtrK4xZb01mf0dDNghPkN2wXgcqFQ55ADthVBgMc=
github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240220203239-09be0ea34540 h1:xFSv8561jsLtF6gYZr/zW2z5qUUAkcFkApin2mnbYTo=
Expand Down
37 changes: 16 additions & 21 deletions core/services/job/kv_orm.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
package job

import (
"encoding/json"
"context"
"fmt"
"time"

"github.com/jmoiron/sqlx"
"github.com/jmoiron/sqlx/types"

"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
Expand All @@ -16,8 +15,8 @@ import (
//
//go:generate mockery --quiet --name KVStore --output ./mocks/ --case=underscore
type KVStore interface {
Store(key string, val interface{}) error
Get(key string, dest interface{}) error
Store(ctx context.Context, key string, val []byte) error
Get(ctx context.Context, key string) ([]byte, error)
}

type kVStore struct {
Expand All @@ -37,32 +36,28 @@ func NewKVStore(jobID int32, db *sqlx.DB, cfg pg.QConfig, lggr logger.Logger) kV
}
}

// Store saves serializable value by key.
func (kv kVStore) Store(key string, val interface{}) error {
jsonVal, err := json.Marshal(val)
if err != nil {
return err
}
// Store saves []byte value by key.
func (kv kVStore) Store(ctx context.Context, key string, val []byte) error {

sql := `INSERT INTO job_kv_store (job_id, key, val)
sql := `INSERT INTO job_kv_store (job_id, key, val_bytea)
VALUES ($1, $2, $3)
ON CONFLICT (job_id, key) DO UPDATE SET
val = EXCLUDED.val,
val_bytea = EXCLUDED.val_bytea,
updated_at = $4;`

if err = kv.q.ExecQ(sql, kv.jobID, key, types.JSONText(jsonVal), time.Now()); err != nil {
return fmt.Errorf("failed to store value: %s for key: %s for jobID: %d : %w", string(jsonVal), key, kv.jobID, err)
if _, err := kv.q.ExecContext(ctx, sql, kv.jobID, key, val, time.Now()); err != nil {
return fmt.Errorf("failed to store value: %s for key: %s for jobID: %d : %w", string(val), key, kv.jobID, err)
}
return nil
}

// Get retrieves serializable value by key.
func (kv kVStore) Get(key string, dest interface{}) error {
var ret json.RawMessage
sql := "SELECT val FROM job_kv_store WHERE job_id = $1 AND key = $2"
if err := kv.q.Get(&ret, sql, kv.jobID, key); err != nil {
return fmt.Errorf("failed to get value by key: %s for jobID: %d : %w", key, kv.jobID, err)
// Get retrieves []byte value by key.
func (kv kVStore) Get(ctx context.Context, key string) ([]byte, error) {
var val []byte
sql := "SELECT val_bytea FROM job_kv_store WHERE job_id = $1 AND key = $2"
if err := kv.q.GetContext(ctx, &val, sql, kv.jobID, key); err != nil {
return nil, fmt.Errorf("failed to get value by key: %s for jobID: %d : %w", key, kv.jobID, err)
}

return json.Unmarshal(ret, dest)
return val, nil
}
63 changes: 26 additions & 37 deletions core/services/job/kv_orm_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package job_test

import (
"context"
"fmt"
"reflect"
"testing"

"github.com/stretchr/testify/assert"

"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink/v2/core/bridges"
Expand All @@ -19,6 +21,9 @@ import (
)

func TestJobKVStore(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

config := configtest.NewTestGeneralConfig(t)
db := pgtest.NewSqlxDB(t)

Expand All @@ -36,52 +41,36 @@ func TestJobKVStore(t *testing.T) {
jb.ID = jobID
require.NoError(t, jobORM.CreateJob(&jb))

type testData struct {
Test string
var values = [][]byte{
[]byte("Hello"),
[]byte("World"),
[]byte("Go"),
}

type nested struct {
Contact testData // Nested struct
}

values := []interface{}{
42, // int
"hello", // string
3.14, // float64
true, // bool
[]int{1, 2, 3}, // slice of ints
map[string]int{"a": 1, "b": 2}, // map of string to int
testData{Test: "value1"}, // regular struct
nested{testData{"value2"}}, // nested struct
}

for i, value := range values {
for i, insertBytes := range values {
testKey := "test_key_" + fmt.Sprint(i)
require.NoError(t, kvStore.Store(testKey, value))

// Get the type of the current value
valueType := reflect.TypeOf(value)
// Create a new instance of the value's type
temp := reflect.New(valueType).Interface()
require.NoError(t, kvStore.Store(ctx, testKey, insertBytes))

require.NoError(t, kvStore.Get(testKey, &temp))
var readBytes []byte
readBytes, err = kvStore.Get(ctx, testKey)
assert.NoError(t, err)

tempValue := reflect.ValueOf(temp).Elem().Interface()
require.Equal(t, value, tempValue)
require.Equal(t, insertBytes, readBytes)
}

key := "test_key_updating"
td1 := testData{Test: "value1"}
td2 := testData{Test: "value2"}
td1 := []byte("value1")
td2 := []byte("value2")

var retData testData
require.NoError(t, kvStore.Store(key, td1))
require.NoError(t, kvStore.Get(key, &retData))
require.Equal(t, td1, retData)
require.NoError(t, kvStore.Store(ctx, key, td1))
fetchedBytes, err := kvStore.Get(ctx, key)
require.NoError(t, err)
require.Equal(t, td1, fetchedBytes)

require.NoError(t, kvStore.Store(key, td2))
require.NoError(t, kvStore.Get(key, &retData))
require.Equal(t, td2, retData)
require.NoError(t, kvStore.Store(ctx, key, td2))
fetchedBytes, err = kvStore.Get(ctx, key)
require.NoError(t, err)
require.Equal(t, td2, fetchedBytes)

require.NoError(t, jobORM.DeleteJob(jobID))
}
44 changes: 30 additions & 14 deletions core/services/job/mocks/kv_store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 7 additions & 3 deletions core/services/ocr2/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,8 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, jb job.Job) ([]job.Servi
return d.newServicesOCR2Functions(ctx, lggr, jb, bootstrapPeers, kb, ocrDB, thresholdPluginDB, s4PluginDB, lc, ocrLogger)

case types.GenericPlugin:
return d.newServicesGenericPlugin(ctx, lggr, jb, bootstrapPeers, kb, ocrDB, lc, ocrLogger, d.capabilitiesRegistry)
return d.newServicesGenericPlugin(ctx, lggr, jb, bootstrapPeers, kb, ocrDB, lc, ocrLogger, d.capabilitiesRegistry,
kvStore)

default:
return nil, errors.Errorf("plugin type %s not supported", spec.PluginType)
Expand Down Expand Up @@ -531,6 +532,7 @@ func (d *Delegate) newServicesGenericPlugin(
lc ocrtypes.LocalConfig,
ocrLogger commontypes.Logger,
capabilitiesRegistry types.CapabilitiesRegistry,
keyValueStore types.KeyValueStore,
) (srvs []job.ServiceCtx, err error) {
spec := jb.OCR2OracleSpec

Expand Down Expand Up @@ -649,7 +651,8 @@ func (d *Delegate) newServicesGenericPlugin(

switch pCfg.OCRVersion {
case 2:
plugin := reportingplugins.NewLOOPPService(pluginLggr, grpcOpts, cmdFn, pluginConfig, providerClientConn, pr, ta, errorLog)
plugin := reportingplugins.NewLOOPPService(pluginLggr, grpcOpts, cmdFn, pluginConfig, providerClientConn, pr, ta,
errorLog, keyValueStore)
oracleArgs := libocr2.OCR2OracleArgs{
BinaryNetworkEndpointFactory: d.peerWrapper.Peer2,
V2Bootstrappers: bootstrapPeers,
Expand All @@ -674,7 +677,8 @@ func (d *Delegate) newServicesGenericPlugin(

case 3:
//OCR3 with OCR2 OnchainKeyring and ContractTransmitter
plugin := ocr3.NewLOOPPService(pluginLggr, grpcOpts, cmdFn, pluginConfig, providerClientConn, pr, ta, errorLog, capabilitiesRegistry)
plugin := ocr3.NewLOOPPService(pluginLggr, grpcOpts, cmdFn, pluginConfig, providerClientConn, pr, ta, errorLog,
capabilitiesRegistry, keyValueStore)
contractTransmitter := ocrcommon.NewOCR3ContractTransmitterAdapter(provider.ContractTransmitter())
oracleArgs := libocr2.OCR3OracleArgs[[]byte]{
BinaryNetworkEndpointFactory: d.peerWrapper.Peer2,
Expand Down
21 changes: 18 additions & 3 deletions core/services/ocrcommon/data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package ocrcommon

import (
"context"
"encoding/json"
errjoin "errors"
"fmt"
"math/big"
"sync"
"time"
Expand Down Expand Up @@ -308,7 +310,13 @@ func (ds *inMemoryDataSourceCache) updateCache(ctx context.Context) error {
}

// backup in case data source fails continuously and node gets rebooted
if err = ds.kvStore.Store(dataSourceCacheKey, &ResultTimePair{Result: *serializablebig.New(value), Time: time.Now()}); err != nil {

timePairBytes, err := json.Marshal(&ResultTimePair{Result: *serializablebig.New(value), Time: time.Now()})
if err != nil {
return fmt.Errorf("failed to marshal result time pair, err: %w", err)
}

if err = ds.kvStore.Store(ctx, dataSourceCacheKey, timePairBytes); err != nil {
ds.lggr.Errorf("failed to persist latest task run value, err: %v", err)
}

Expand Down Expand Up @@ -338,9 +346,16 @@ func (ds *inMemoryDataSourceCache) Observe(ctx context.Context, timestamp ocr2ty
latestResult, latestTrrs := ds.get(ctx)
if latestTrrs == nil {
ds.lggr.Warnf("cache is empty, returning persisted value now")
if err := ds.kvStore.Get(dataSourceCacheKey, &resTime); err != nil {
return nil, err

timePairBytes, err := ds.kvStore.Get(ctx, dataSourceCacheKey)
if err != nil {
return nil, fmt.Errorf("failed to get result time pair bytes, err: %w", err)
}

if err := json.Unmarshal(timePairBytes, &resTime); err != nil {
return nil, fmt.Errorf("failed to unmarshal result time pair bytes, err: %w", err)
}

if time.Since(resTime.Time) >= defaultCacheFreshnessAlert {
ds.lggr.Errorf("cache hasn't been updated for over %v, latestUpdateErr is: %v", defaultCacheFreshnessAlert, ds.latestUpdateErr)
}
Expand Down
15 changes: 8 additions & 7 deletions core/services/ocrcommon/data_source_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ocrcommon_test

import (
"encoding/json"
"fmt"
"math/big"
"testing"
Expand Down Expand Up @@ -75,8 +76,8 @@ func Test_CachedInMemoryDataSourceErrHandling(t *testing.T) {
runner := pipelinemocks.NewRunner(t)
ds := ocrcommon.NewInMemoryDataSource(runner, job.Job{}, pipeline.Spec{}, logger.TestLogger(t))
mockKVStore := mocks.KVStore{}
mockKVStore.On("Store", mock.Anything, mock.Anything).Return(nil)
mockKVStore.On("Get", mock.Anything, mock.IsType(&ocrcommon.ResultTimePair{})).Return(nil)
mockKVStore.On("Store", mock.Anything, mock.Anything, mock.Anything).Return(nil)
mockKVStore.On("Get", mock.Anything, mock.Anything).Return(nil, nil)
dsCache, err := ocrcommon.NewInMemoryDataSourceCache(ds, &mockKVStore, time.Second*2)
require.NoError(t, err)
servicetest.Run(t, dsCache)
Expand Down Expand Up @@ -105,10 +106,10 @@ func Test_CachedInMemoryDataSourceErrHandling(t *testing.T) {

mockKVStore := mocks.KVStore{}
persistedVal := serializablebig.NewI(1337)
mockKVStore.On("Get", mock.Anything, mock.IsType(&ocrcommon.ResultTimePair{})).Return(nil).Run(func(args mock.Arguments) {
arg := args.Get(1).(*ocrcommon.ResultTimePair)
arg.Result = *persistedVal
})

result, err := json.Marshal(&ocrcommon.ResultTimePair{Result: *persistedVal, Time: time.Now()})
assert.NoError(t, err)
mockKVStore.On("Get", mock.Anything, mock.Anything).Return(result, nil)

// set updater to a long time so that it doesn't log errors after the test is done
dsCache, err := ocrcommon.NewInMemoryDataSourceCache(ds, &mockKVStore, time.Hour*100)
Expand All @@ -127,7 +128,7 @@ func Test_CachedInMemoryDataSourceErrHandling(t *testing.T) {
ds := ocrcommon.NewInMemoryDataSource(runner, job.Job{}, pipeline.Spec{}, logger.TestLogger(t))

mockKVStore := mocks.KVStore{}
mockKVStore.On("Get", mock.Anything, mock.IsType(&ocrcommon.ResultTimePair{})).Return(nil).Return(assert.AnError)
mockKVStore.On("Get", mock.Anything, mock.Anything).Return(nil, assert.AnError)

// set updater to a long time so that it doesn't log errors after the test is done
dsCache, err := ocrcommon.NewInMemoryDataSourceCache(ds, &mockKVStore, time.Hour*100)
Expand Down
Loading

0 comments on commit e9e903b

Please sign in to comment.