Skip to content

Commit

Permalink
Update pipeline adapter to use JSONSerializable (#12617)
Browse files Browse the repository at this point in the history
* -Change JSONSerializable to point to chainlink-common
-Set TaskValue as JSONSerializable

* Fix lint

* Pin to latest version of chainlink-common PR

* Fix lint

* Pin to latest common
  • Loading branch information
george-dorin authored Mar 28, 2024
1 parent 3ec8cc9 commit 7992ed8
Show file tree
Hide file tree
Showing 33 changed files with 141 additions and 412 deletions.
5 changes: 3 additions & 2 deletions core/internal/cltest/factories.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/jmoiron/sqlx"

"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"
"github.com/smartcontractkit/chainlink-common/pkg/utils/jsonserializable"
txmgrcommon "github.com/smartcontractkit/chainlink/v2/common/txmgr"
txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types"
"github.com/smartcontractkit/chainlink/v2/core/auth"
Expand Down Expand Up @@ -457,14 +458,14 @@ func MustInsertPipelineRun(t *testing.T, db *sqlx.DB) (run pipeline.Run) {

func MustInsertPipelineRunWithStatus(t *testing.T, db *sqlx.DB, pipelineSpecID int32, status pipeline.RunStatus) (run pipeline.Run) {
var finishedAt *time.Time
var outputs pipeline.JSONSerializable
var outputs jsonserializable.JSONSerializable
var allErrors pipeline.RunErrors
var fatalErrors pipeline.RunErrors
now := time.Now()
switch status {
case pipeline.RunStatusCompleted:
finishedAt = &now
outputs = pipeline.JSONSerializable{
outputs = jsonserializable.JSONSerializable{
Val: "foo",
Valid: true,
}
Expand Down
10 changes: 6 additions & 4 deletions core/internal/mocks/application.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ require (
github.com/prometheus/client_golang v1.17.0
github.com/shopspring/decimal v1.3.1
github.com/smartcontractkit/chainlink-automation v1.0.2-0.20240311111125-22812a072c35
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240327133125-eed636b9a6df
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240328193519-0336c0d1e64e
github.com/smartcontractkit/chainlink-vrf v0.0.0-20231120191722-fef03814f868
github.com/smartcontractkit/chainlink/v2 v2.0.0-00010101000000-000000000000
github.com/smartcontractkit/libocr v0.0.0-20240326191951-2bbe9382d052
Expand Down
4 changes: 2 additions & 2 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1187,8 +1187,8 @@ github.com/smartcontractkit/chain-selectors v1.0.10 h1:t9kJeE6B6G+hKD0GYR4kGJSCq
github.com/smartcontractkit/chain-selectors v1.0.10/go.mod h1:d4Hi+E1zqjy9HqMkjBE5q1vcG9VGgxf5VxiRHfzi2kE=
github.com/smartcontractkit/chainlink-automation v1.0.2-0.20240311111125-22812a072c35 h1:GNhRKD3izyzAoGMXDvVUAwEuzz4Atdj3U3RH7eak5Is=
github.com/smartcontractkit/chainlink-automation v1.0.2-0.20240311111125-22812a072c35/go.mod h1:2I0dWdYdK6jHPnSYYy7Y7Xp7L0YTnJ3KZtkhLQflsTU=
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240327133125-eed636b9a6df h1:AKjckaIV8R53dLJwoQ3VlUI56L34Ca+nkkzjR1784zY=
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240327133125-eed636b9a6df/go.mod h1:u2XnvJHl7sQ9HMlRnVLsOKgV9ihk0RGIYywB12p9gQQ=
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240328193519-0336c0d1e64e h1:XbhclSD026YvXPlfogfE4IEOCk4QBzchIHulP4m00yQ=
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240328193519-0336c0d1e64e/go.mod h1:kstYjAGqBswdZpl7YkSPeXBDVwaY1VaR6tUMPWl8ykA=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240213120401-01a23955f9f8 h1:I326nw5GwHQHsLKHwtu5Sb9EBLylC8CfUd7BFAS0jtg=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240213120401-01a23955f9f8/go.mod h1:a65NtrK4xZb01mf0dDNghPkN2wXgcqFQ55ADthVBgMc=
github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240220203239-09be0ea34540 h1:xFSv8561jsLtF6gYZr/zW2z5qUUAkcFkApin2mnbYTo=
Expand Down
5 changes: 3 additions & 2 deletions core/services/chainlink/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
commonservices "github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"
"github.com/smartcontractkit/chainlink-common/pkg/utils"
"github.com/smartcontractkit/chainlink-common/pkg/utils/jsonserializable"
"github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox"
"github.com/smartcontractkit/chainlink/v2/core/capabilities"
"github.com/smartcontractkit/chainlink/v2/core/static"
Expand Down Expand Up @@ -100,7 +101,7 @@ type Application interface {
TxmStorageService() txmgr.EvmTxStore
AddJobV2(ctx context.Context, job *job.Job) error
DeleteJob(ctx context.Context, jobID int32) error
RunWebhookJobV2(ctx context.Context, jobUUID uuid.UUID, requestBody string, meta pipeline.JSONSerializable) (int64, error)
RunWebhookJobV2(ctx context.Context, jobUUID uuid.UUID, requestBody string, meta jsonserializable.JSONSerializable) (int64, error)
ResumeJobV2(ctx context.Context, taskID uuid.UUID, result pipeline.Result) error
// Testing only
RunJobV2(ctx context.Context, jobID int32, meta map[string]interface{}) (int64, error)
Expand Down Expand Up @@ -742,7 +743,7 @@ func (app *ChainlinkApplication) DeleteJob(ctx context.Context, jobID int32) err
return app.jobSpawner.DeleteJob(jobID, pg.WithParentCtx(ctx))
}

func (app *ChainlinkApplication) RunWebhookJobV2(ctx context.Context, jobUUID uuid.UUID, requestBody string, meta pipeline.JSONSerializable) (int64, error) {
func (app *ChainlinkApplication) RunWebhookJobV2(ctx context.Context, jobUUID uuid.UUID, requestBody string, meta jsonserializable.JSONSerializable) (int64, error) {
return app.webhookJobRunner.RunJob(ctx, jobUUID, requestBody, meta)
}

Expand Down
5 changes: 3 additions & 2 deletions core/services/fluxmonitorv2/orm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-common/pkg/utils/jsonserializable"
commontxmmocks "github.com/smartcontractkit/chainlink/v2/common/txmgr/types/mocks"
"github.com/smartcontractkit/chainlink/v2/core/bridges"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr"
Expand Down Expand Up @@ -116,12 +117,12 @@ func TestORM_UpdateFluxMonitorRoundStats(t *testing.T) {
FinishedAt: null.TimeFrom(f),
AllErrors: pipeline.RunErrors{null.String{}},
FatalErrors: pipeline.RunErrors{null.String{}},
Outputs: pipeline.JSONSerializable{Val: []interface{}{10}, Valid: true},
Outputs: jsonserializable.JSONSerializable{Val: []interface{}{10}, Valid: true},
PipelineTaskRuns: []pipeline.TaskRun{
{
ID: uuid.New(),
Type: pipeline.TaskTypeHTTP,
Output: pipeline.JSONSerializable{Val: 10, Valid: true},
Output: jsonserializable.JSONSerializable{Val: 10, Valid: true},
CreatedAt: f,
FinishedAt: null.TimeFrom(f),
},
Expand Down
3 changes: 2 additions & 1 deletion core/services/job/job_orm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"gopkg.in/guregu/null.v4"

"github.com/smartcontractkit/chainlink-common/pkg/types"
"github.com/smartcontractkit/chainlink-common/pkg/utils/jsonserializable"

"github.com/smartcontractkit/chainlink/v2/core/bridges"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/assets"
Expand Down Expand Up @@ -1725,7 +1726,7 @@ func mustInsertPipelineRun(t *testing.T, orm pipeline.ORM, j job.Job) pipeline.R
run := pipeline.Run{
PipelineSpecID: j.PipelineSpecID,
State: pipeline.RunStatusRunning,
Outputs: pipeline.JSONSerializable{Valid: false},
Outputs: jsonserializable.JSONSerializable{Valid: false},
AllErrors: pipeline.RunErrors{},
CreatedAt: time.Now(),
FinishedAt: null.Time{},
Expand Down
5 changes: 3 additions & 2 deletions core/services/job/runner_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

commonconfig "github.com/smartcontractkit/chainlink-common/pkg/config"
"github.com/smartcontractkit/chainlink-common/pkg/services/servicetest"
"github.com/smartcontractkit/chainlink-common/pkg/utils/jsonserializable"
"github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox/mailboxtest"

"github.com/smartcontractkit/chainlink/v2/core/auth"
Expand Down Expand Up @@ -908,7 +909,7 @@ func TestRunner_Success_Callback_AsyncJob(t *testing.T) {
require.Empty(t, run.PipelineTaskRuns[1].Error)
require.Empty(t, run.PipelineTaskRuns[2].Error)
require.Empty(t, run.PipelineTaskRuns[3].Error)
require.Equal(t, pipeline.JSONSerializable{Val: []interface{}{"123450000000000000000"}, Valid: true}, run.Outputs)
require.Equal(t, jsonserializable.JSONSerializable{Val: []interface{}{"123450000000000000000"}, Valid: true}, run.Outputs)
require.Equal(t, pipeline.RunErrors{null.String{NullString: sql.NullString{String: "", Valid: false}}}, run.FatalErrors)
})
// Delete the job
Expand Down Expand Up @@ -1086,7 +1087,7 @@ func TestRunner_Error_Callback_AsyncJob(t *testing.T) {
assert.Equal(t, "something exploded in EA", run.PipelineTaskRuns[1].Error.String)
assert.True(t, run.PipelineTaskRuns[2].Error.Valid)
assert.True(t, run.PipelineTaskRuns[3].Error.Valid)
require.Equal(t, pipeline.JSONSerializable{Val: []interface{}{interface{}(nil)}, Valid: true}, run.Outputs)
require.Equal(t, jsonserializable.JSONSerializable{Val: []interface{}{interface{}(nil)}, Valid: true}, run.Outputs)
require.Equal(t, pipeline.RunErrors{null.String{NullString: sql.NullString{String: "task inputs: too many errors", Valid: true}}}, run.FatalErrors)
})
// Delete the job
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,8 @@ func (p *PipelineRunnerAdapter) ExecuteRun(ctx context.Context, spec string, var
ID: trr.ID.String(),
Type: string(trr.Task.Type()),
Index: int(trr.Task.OutputIndex()),

TaskValue: types.TaskValue{
Value: trr.Result.Value,
Value: trr.Result.OutputDB(),
Error: trr.Result.Error,
IsTerminal: len(trr.Task.Outputs()) == 0,
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestAdapter_Integration(t *testing.T) {
results, err := pra.ExecuteRun(testutils.Context(t), spec, types.Vars{Vars: map[string]interface{}{"val": 1}}, types.Options{})
require.NoError(t, err)

finalResult := results[0].Value.(decimal.Decimal)
finalResult := results[0].Value.Val.(decimal.Decimal)

assert.True(t, decimal.NewFromInt(3).Equal(finalResult))
}
Expand Down
Loading

0 comments on commit 7992ed8

Please sign in to comment.