-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #12401 from smartcontractkit/release/2.10.0-cherry…
…pick BCF-3052 - Job Based KV Store and juelsFeePerCoin reboot persistence …
- Loading branch information
Showing
9 changed files
with
353 additions
and
41 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
--- | ||
"chainlink": patch | ||
--- | ||
|
||
Add kv store tied to jobs and use it for juels fee per coin cache to store persisted values for backup |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
package job | ||
|
||
import ( | ||
"encoding/json" | ||
"fmt" | ||
"time" | ||
|
||
"github.com/jmoiron/sqlx" | ||
"github.com/jmoiron/sqlx/types" | ||
|
||
"github.com/smartcontractkit/chainlink/v2/core/logger" | ||
"github.com/smartcontractkit/chainlink/v2/core/services/pg" | ||
) | ||
|
||
// KVStore is a simple KV store that can store and retrieve serializable data. | ||
// | ||
//go:generate mockery --quiet --name KVStore --output ./mocks/ --case=underscore | ||
type KVStore interface { | ||
Store(key string, val interface{}) error | ||
Get(key string, dest interface{}) error | ||
} | ||
|
||
type kVStore struct { | ||
jobID int32 | ||
q pg.Q | ||
lggr logger.SugaredLogger | ||
} | ||
|
||
var _ KVStore = (*kVStore)(nil) | ||
|
||
func NewKVStore(jobID int32, db *sqlx.DB, cfg pg.QConfig, lggr logger.Logger) kVStore { | ||
namedLogger := logger.Sugared(lggr.Named("JobORM")) | ||
return kVStore{ | ||
jobID: jobID, | ||
q: pg.NewQ(db, namedLogger, cfg), | ||
lggr: namedLogger, | ||
} | ||
} | ||
|
||
// Store saves serializable value by key. | ||
func (kv kVStore) Store(key string, val interface{}) error { | ||
jsonVal, err := json.Marshal(val) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
sql := `INSERT INTO job_kv_store (job_id, key, val) | ||
VALUES ($1, $2, $3) | ||
ON CONFLICT (job_id, key) DO UPDATE SET | ||
val = EXCLUDED.val, | ||
updated_at = $4;` | ||
|
||
if err = kv.q.ExecQ(sql, kv.jobID, key, types.JSONText(jsonVal), time.Now()); err != nil { | ||
return fmt.Errorf("failed to store value: %s for key: %s for jobID: %d : %w", string(jsonVal), key, kv.jobID, err) | ||
} | ||
return nil | ||
} | ||
|
||
// Get retrieves serializable value by key. | ||
func (kv kVStore) Get(key string, dest interface{}) error { | ||
var ret json.RawMessage | ||
sql := "SELECT val FROM job_kv_store WHERE job_id = $1 AND key = $2" | ||
if err := kv.q.Get(&ret, sql, kv.jobID, key); err != nil { | ||
return fmt.Errorf("failed to get value by key: %s for jobID: %d : %w", key, kv.jobID, err) | ||
} | ||
|
||
return json.Unmarshal(ret, dest) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,85 @@ | ||
package job_test | ||
|
||
import ( | ||
"fmt" | ||
"reflect" | ||
"testing" | ||
|
||
"github.com/stretchr/testify/require" | ||
|
||
"github.com/smartcontractkit/chainlink/v2/core/bridges" | ||
"github.com/smartcontractkit/chainlink/v2/core/internal/cltest" | ||
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/configtest" | ||
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" | ||
"github.com/smartcontractkit/chainlink/v2/core/logger" | ||
"github.com/smartcontractkit/chainlink/v2/core/services/directrequest" | ||
"github.com/smartcontractkit/chainlink/v2/core/services/job" | ||
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline" | ||
"github.com/smartcontractkit/chainlink/v2/core/testdata/testspecs" | ||
) | ||
|
||
func TestJobKVStore(t *testing.T) { | ||
config := configtest.NewTestGeneralConfig(t) | ||
db := pgtest.NewSqlxDB(t) | ||
|
||
lggr := logger.TestLogger(t) | ||
|
||
pipelineORM := pipeline.NewORM(db, logger.TestLogger(t), config.Database(), config.JobPipeline().MaxSuccessfulRuns()) | ||
bridgesORM := bridges.NewORM(db, logger.TestLogger(t), config.Database()) | ||
|
||
jobID := int32(1337) | ||
kvStore := job.NewKVStore(jobID, db, config.Database(), lggr) | ||
jobORM := NewTestORM(t, db, pipelineORM, bridgesORM, cltest.NewKeyStore(t, db, config.Database()), config.Database()) | ||
|
||
jb, err := directrequest.ValidatedDirectRequestSpec(testspecs.GetDirectRequestSpec()) | ||
require.NoError(t, err) | ||
jb.ID = jobID | ||
require.NoError(t, jobORM.CreateJob(&jb)) | ||
|
||
type testData struct { | ||
Test string | ||
} | ||
|
||
type nested struct { | ||
Contact testData // Nested struct | ||
} | ||
|
||
values := []interface{}{ | ||
42, // int | ||
"hello", // string | ||
3.14, // float64 | ||
true, // bool | ||
[]int{1, 2, 3}, // slice of ints | ||
map[string]int{"a": 1, "b": 2}, // map of string to int | ||
testData{Test: "value1"}, // regular struct | ||
nested{testData{"value2"}}, // nested struct | ||
} | ||
|
||
for i, value := range values { | ||
testKey := "test_key_" + fmt.Sprint(i) | ||
require.NoError(t, kvStore.Store(testKey, value)) | ||
|
||
// Get the type of the current value | ||
valueType := reflect.TypeOf(value) | ||
// Create a new instance of the value's type | ||
temp := reflect.New(valueType).Interface() | ||
|
||
require.NoError(t, kvStore.Get(testKey, &temp)) | ||
|
||
tempValue := reflect.ValueOf(temp).Elem().Interface() | ||
require.Equal(t, value, tempValue) | ||
} | ||
|
||
key := "test_key_updating" | ||
td1 := testData{Test: "value1"} | ||
td2 := testData{Test: "value2"} | ||
|
||
var retData testData | ||
require.NoError(t, kvStore.Store(key, td1)) | ||
require.NoError(t, kvStore.Get(key, &retData)) | ||
require.Equal(t, td1, retData) | ||
|
||
require.NoError(t, kvStore.Store(key, td2)) | ||
require.NoError(t, kvStore.Get(key, &retData)) | ||
require.Equal(t, td2, retData) | ||
} |
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.