diff --git a/.changeset/thin-cats-try.md b/.changeset/thin-cats-try.md new file mode 100644 index 00000000000..e7934fe279a --- /dev/null +++ b/.changeset/thin-cats-try.md @@ -0,0 +1,5 @@ +--- +"chainlink": minor +--- + +Add support for Mercury LLO streams to feeds service. #added diff --git a/core/services/feeds/service.go b/core/services/feeds/service.go index 61b2d53f2d5..c411cb2e096 100644 --- a/core/services/feeds/service.go +++ b/core/services/feeds/service.go @@ -19,9 +19,11 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" ccip "github.com/smartcontractkit/chainlink/v2/core/capabilities/ccip/validate" + "github.com/smartcontractkit/chainlink/v2/core/services/streams" "github.com/smartcontractkit/chainlink/v2/plugins" pb "github.com/smartcontractkit/chainlink-protos/orchestrator/feedsmanager" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big" "github.com/smartcontractkit/chainlink/v2/core/chains/legacyevm" @@ -859,6 +861,13 @@ func (s *service) ApproveSpec(ctx context.Context, id int64, force bool) error { if txerr != nil && !errors.Is(txerr, sql.ErrNoRows) { return fmt.Errorf("failed while checking for existing ccip job: %w", txerr) } + case job.Stream: + existingJobID, txerr = tx.jobORM.FindJobIDByStreamID(ctx, *j.StreamID) + // Return an error if the repository errors. If there is a not found + // error we want to continue with approving the job. + if txerr != nil && !errors.Is(txerr, sql.ErrNoRows) { + return fmt.Errorf("failed while checking for existing stream job: %w", txerr) + } default: return errors.Errorf("unsupported job type when approving job proposal specs: %s", j.Type) } @@ -1249,6 +1258,8 @@ func (s *service) generateJob(ctx context.Context, spec string) (*job.Job, error js, err = workflows.ValidatedWorkflowJobSpec(ctx, spec) case job.CCIP: js, err = ccip.ValidatedCCIPSpec(spec) + case job.Stream: + js, err = streams.ValidatedStreamSpec(spec) default: return nil, errors.Errorf("unknown job type: %s", jobType) } diff --git a/core/services/feeds/service_test.go b/core/services/feeds/service_test.go index 5369d645c4e..3eed9c80d64 100644 --- a/core/services/feeds/service_test.go +++ b/core/services/feeds/service_test.go @@ -21,6 +21,7 @@ import ( commonconfig "github.com/smartcontractkit/chainlink-common/pkg/config" "github.com/smartcontractkit/chainlink-common/pkg/services/servicetest" proto "github.com/smartcontractkit/chainlink-protos/orchestrator/feedsmanager" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/headtracker" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" evmbig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big" @@ -135,6 +136,41 @@ answer1 [type=median index=0]; [pluginConfig.juelsPerFeeCoinCache] updateInterval = "1m" ` + +const StreamTestSpecTemplate = ` +name = '%s' +type = 'stream' +schemaVersion = 1 +externalJobID = '%s' +streamID = %d +observationSource = """ +ds1_payload [type=bridge name=\"bridge-ncfx\" timeout=\"50s\" requestData=\"{\\\"data\\\":{\\\"endpoint\\\":\\\"cryptolwba\\\",\\\"from\\\":\\\"SEI\\\",\\\"to\\\":\\\"USD\\\"}}\"]; +ds1_benchmark [type=jsonparse path=\"data,mid\"]; +ds1_bid [type=jsonparse path=\"data,bid\"]; +ds1_ask [type=jsonparse path=\"data,ask\"]; +ds2_payload [type=bridge name=\"bridge-tiingo\" timeout=\"50s\" requestData=\"{\\\"data\\\":{\\\"endpoint\\\":\\\"cryptolwba\\\",\\\"from\\\":\\\"SEI\\\",\\\"to\\\":\\\"USD\\\"}}\"]; +ds2_benchmark [type=jsonparse path=\"data,mid\"]; +ds2_bid [type=jsonparse path=\"data,bid\"]; +ds2_ask [type=jsonparse path=\"data,ask\"]; +ds3_payload [type=bridge name=\"bridge-gsr\" timeout=\"50s\" requestData=\"{\\\"data\\\":{\\\"endpoint\\\":\\\"cryptolwba\\\",\\\"from\\\":\\\"SEI\\\",\\\"to\\\":\\\"USD\\\"}}\"]; +ds3_benchmark [type=jsonparse path=\"data,mid\"]; +ds3_bid [type=jsonparse path=\"data,bid\"]; +ds3_ask [type=jsonparse path=\"data,ask\"]; +ds1_payload -> ds1_benchmark -> benchmark_price; +ds2_payload -> ds2_benchmark -> benchmark_price; +ds3_payload -> ds3_benchmark -> benchmark_price; +benchmark_price [type=median allowedFaults=2 index=0]; +ds1_payload -> ds1_bid -> bid_price; +ds2_payload -> ds2_bid -> bid_price; +ds3_payload -> ds3_bid -> bid_price; +bid_price [type=median allowedFaults=2 index=1]; +ds1_payload -> ds1_ask -> ask_price; +ds2_payload -> ds2_ask -> ask_price; +ds3_payload -> ds3_ask -> ask_price; +ask_price [type=median allowedFaults=2 index=2]; +""" +` + const BootstrapTestSpecTemplate = ` type = "bootstrap" schemaVersion = 1 @@ -3269,6 +3305,514 @@ updateInterval = "20m" } } +func Test_Service_ApproveSpec_Stream(t *testing.T) { + externalJobID := uuid.New() + streamName := "LINK / ETH | version 3 | contract 0x0000000000000000000000000000000000000000" + streamID := uint32(1009001032) + + var ( + ctx = testutils.Context(t) + + jp = &feeds.JobProposal{ + ID: 1, + FeedsManagerID: 100, + } + spec = &feeds.JobProposalSpec{ + ID: 20, + Status: feeds.SpecStatusPending, + JobProposalID: jp.ID, + Version: 1, + Definition: fmt.Sprintf(StreamTestSpecTemplate, streamName, externalJobID.String(), streamID), + } + rejectedSpec = &feeds.JobProposalSpec{ + ID: 20, + Status: feeds.SpecStatusRejected, + JobProposalID: jp.ID, + Version: 1, + Definition: fmt.Sprintf(StreamTestSpecTemplate, streamName, externalJobID.String(), streamID), + } + cancelledSpec = &feeds.JobProposalSpec{ + ID: 20, + Status: feeds.SpecStatusCancelled, + JobProposalID: jp.ID, + Version: 1, + Definition: fmt.Sprintf(StreamTestSpecTemplate, streamName, externalJobID.String(), streamID), + } + j = job.Job{ + ID: 1, + ExternalJobID: externalJobID, + } + ) + + testCases := []struct { + name string + httpTimeout *commonconfig.Duration + before func(svc *TestService) + id int64 + force bool + wantErr string + }{ + { + name: "pending job success", + httpTimeout: commonconfig.MustNewDuration(1 * time.Minute), + before: func(svc *TestService) { + svc.orm.On("GetSpec", mock.Anything, spec.ID).Return(spec, nil) + svc.orm.On("GetJobProposal", mock.Anything, jp.ID).Return(jp, nil) + svc.connMgr.On("GetClient", jp.FeedsManagerID).Return(svc.fmsClient, nil) + svc.jobORM.On("AssertBridgesExist", mock.Anything, mock.IsType(pipeline.Pipeline{})).Return(nil) + svc.jobORM.On("FindJobByExternalJobID", mock.Anything, externalJobID).Return(job.Job{}, sql.ErrNoRows) + svc.jobORM.On("FindJobIDByStreamID", mock.Anything, mock.Anything).Return(int32(0), sql.ErrNoRows) + + svc.spawner. + On("CreateJob", + mock.Anything, + mock.Anything, + mock.MatchedBy(func(j *job.Job) bool { + return j.Name.String == streamName + }), + ). + Run(func(args mock.Arguments) { (args.Get(2).(*job.Job)).ID = 1 }). + Return(nil) + svc.orm.On("ApproveSpec", + mock.Anything, + spec.ID, + externalJobID, + ).Return(nil) + svc.fmsClient.On("ApprovedJob", + mock.MatchedBy(func(ctx context.Context) bool { return true }), + &proto.ApprovedJobRequest{ + Uuid: jp.RemoteUUID.String(), + Version: int64(spec.Version), + }, + ).Return(&proto.ApprovedJobResponse{}, nil) + svc.orm.On("CountJobProposalsByStatus", mock.Anything).Return(&feeds.JobProposalCounts{}, nil) + svc.orm.On("WithDataSource", mock.Anything).Return(feeds.ORM(svc.orm)) + svc.jobORM.On("WithDataSource", mock.Anything).Return(job.ORM(svc.jobORM)) + }, + id: spec.ID, + force: false, + }, + { + name: "cancelled spec success when it is the latest spec", + httpTimeout: commonconfig.MustNewDuration(1 * time.Minute), + before: func(svc *TestService) { + svc.connMgr.On("GetClient", jp.FeedsManagerID).Return(svc.fmsClient, nil) + svc.orm.On("GetSpec", mock.Anything, cancelledSpec.ID, mock.Anything).Return(cancelledSpec, nil) + svc.orm.On("GetJobProposal", mock.Anything, jp.ID).Return(jp, nil) + svc.orm.On("GetLatestSpec", mock.Anything, cancelledSpec.JobProposalID).Return(cancelledSpec, nil) + svc.jobORM.On("AssertBridgesExist", mock.Anything, mock.IsType(pipeline.Pipeline{})).Return(nil) + + svc.jobORM.On("FindJobByExternalJobID", mock.Anything, externalJobID).Return(job.Job{}, sql.ErrNoRows) + svc.jobORM.On("FindJobIDByStreamID", mock.Anything, mock.Anything).Return(int32(0), sql.ErrNoRows) + + svc.spawner. + On("CreateJob", + mock.Anything, + mock.Anything, + mock.MatchedBy(func(j *job.Job) bool { + return j.Name.String == streamName + }), + ). + Run(func(args mock.Arguments) { (args.Get(2).(*job.Job)).ID = 1 }). + Return(nil) + svc.orm.On("ApproveSpec", + mock.Anything, + cancelledSpec.ID, + externalJobID, + ).Return(nil) + svc.fmsClient.On("ApprovedJob", + mock.MatchedBy(func(ctx context.Context) bool { return true }), + &proto.ApprovedJobRequest{ + Uuid: jp.RemoteUUID.String(), + Version: int64(spec.Version), + }, + ).Return(&proto.ApprovedJobResponse{}, nil) + svc.orm.On("CountJobProposalsByStatus", mock.Anything).Return(&feeds.JobProposalCounts{}, nil) + svc.orm.On("WithDataSource", mock.Anything).Return(feeds.ORM(svc.orm)) + svc.jobORM.On("WithDataSource", mock.Anything).Return(job.ORM(svc.jobORM)) + }, + id: cancelledSpec.ID, + force: false, + }, + { + name: "cancelled spec failed not latest spec", + before: func(svc *TestService) { + svc.orm.On("GetSpec", mock.Anything, cancelledSpec.ID, mock.Anything).Return(cancelledSpec, nil) + svc.orm.On("GetJobProposal", mock.Anything, jp.ID).Return(jp, nil) + svc.orm.On("GetLatestSpec", mock.Anything, cancelledSpec.JobProposalID).Return(&feeds.JobProposalSpec{ + ID: 21, + Status: feeds.SpecStatusPending, + JobProposalID: jp.ID, + Version: 2, + Definition: StreamTestSpecTemplate, + }, nil) + }, + id: cancelledSpec.ID, + force: false, + wantErr: "cannot approve a cancelled spec", + }, + { + name: "rejected spec failed cannot be approved", + before: func(svc *TestService) { + svc.orm.On("GetSpec", mock.Anything, cancelledSpec.ID, mock.Anything).Return(rejectedSpec, nil) + svc.orm.On("GetJobProposal", mock.Anything, jp.ID).Return(jp, nil) + }, + id: rejectedSpec.ID, + force: false, + wantErr: "cannot approve a rejected spec", + }, + { + name: "already existing job replacement error", + httpTimeout: commonconfig.MustNewDuration(1 * time.Minute), + before: func(svc *TestService) { + svc.connMgr.On("GetClient", jp.FeedsManagerID).Return(svc.fmsClient, nil) + svc.orm.On("GetSpec", mock.Anything, spec.ID).Return(spec, nil) + svc.orm.On("GetJobProposal", mock.Anything, jp.ID).Return(jp, nil) + svc.jobORM.On("AssertBridgesExist", mock.Anything, mock.IsType(pipeline.Pipeline{})).Return(nil) + svc.jobORM.On("FindJobByExternalJobID", mock.Anything, externalJobID).Return(job.Job{}, sql.ErrNoRows) + svc.jobORM.On("FindJobIDByStreamID", mock.Anything, mock.Anything).Return(j.ID, nil) + svc.orm.On("WithDataSource", mock.Anything).Return(feeds.ORM(svc.orm)) + svc.jobORM.On("WithDataSource", mock.Anything).Return(job.ORM(svc.jobORM)) + }, + id: spec.ID, + force: false, + wantErr: "could not approve job proposal: a job for this contract address already exists - please use the 'force' option to replace it", + }, + { + name: "already existing self managed job replacement success if forced without feedID", + httpTimeout: commonconfig.MustNewDuration(1 * time.Minute), + before: func(svc *TestService) { + svc.connMgr.On("GetClient", jp.FeedsManagerID).Return(svc.fmsClient, nil) + svc.orm.On("GetSpec", mock.Anything, spec.ID).Return(spec, nil) + svc.orm.On("GetJobProposal", mock.Anything, jp.ID).Return(jp, nil) + svc.jobORM.On("AssertBridgesExist", mock.Anything, mock.IsType(pipeline.Pipeline{})).Return(nil) + svc.orm.EXPECT().GetApprovedSpec(mock.Anything, jp.ID).Return(nil, sql.ErrNoRows) + svc.jobORM.On("FindJobByExternalJobID", mock.Anything, externalJobID).Return(job.Job{}, sql.ErrNoRows) + svc.jobORM.On("FindJobIDByStreamID", mock.Anything, mock.Anything).Return(j.ID, nil) + svc.spawner.On("DeleteJob", mock.Anything, mock.Anything, j.ID).Return(nil) + + svc.spawner. + On("CreateJob", + mock.Anything, + mock.Anything, + mock.MatchedBy(func(j *job.Job) bool { + return j.Name.String == streamName + }), + ). + Run(func(args mock.Arguments) { (args.Get(2).(*job.Job)).ID = 1 }). + Return(nil) + svc.orm.On("ApproveSpec", + mock.Anything, + spec.ID, + externalJobID, + ).Return(nil) + svc.fmsClient.On("ApprovedJob", + mock.MatchedBy(func(ctx context.Context) bool { return true }), + &proto.ApprovedJobRequest{ + Uuid: jp.RemoteUUID.String(), + Version: int64(spec.Version), + }, + ).Return(&proto.ApprovedJobResponse{}, nil) + svc.orm.On("CountJobProposalsByStatus", mock.Anything).Return(&feeds.JobProposalCounts{}, nil) + svc.orm.On("WithDataSource", mock.Anything).Return(feeds.ORM(svc.orm)) + svc.jobORM.On("WithDataSource", mock.Anything).Return(job.ORM(svc.jobORM)) + }, + id: spec.ID, + force: true, + }, + { + name: "already existing self managed job replacement success if forced with feedID", + httpTimeout: commonconfig.MustNewDuration(1 * time.Minute), + before: func(svc *TestService) { + svc.connMgr.On("GetClient", jp.FeedsManagerID).Return(svc.fmsClient, nil) + svc.orm.On("GetSpec", mock.Anything, spec.ID).Return(&feeds.JobProposalSpec{ + ID: 20, + Status: feeds.SpecStatusPending, + JobProposalID: jp.ID, + Version: 1, + Definition: fmt.Sprintf(StreamTestSpecTemplate, streamName, externalJobID.String(), streamID), + }, nil) + svc.orm.On("GetJobProposal", mock.Anything, jp.ID).Return(jp, nil) + svc.jobORM.On("AssertBridgesExist", mock.Anything, mock.IsType(pipeline.Pipeline{})).Return(nil) + svc.orm.EXPECT().GetApprovedSpec(mock.Anything, jp.ID).Return(nil, sql.ErrNoRows) + svc.jobORM.On("FindJobByExternalJobID", mock.Anything, externalJobID).Return(job.Job{}, sql.ErrNoRows) + svc.jobORM.On("FindJobIDByStreamID", mock.Anything, mock.Anything).Return(j.ID, nil) + svc.spawner.On("DeleteJob", mock.Anything, mock.Anything, j.ID).Return(nil) + + svc.spawner. + On("CreateJob", + mock.Anything, + mock.Anything, + mock.MatchedBy(func(j *job.Job) bool { + return j.Name.String == streamName + }), + ). + Run(func(args mock.Arguments) { (args.Get(2).(*job.Job)).ID = 1 }). + Return(nil) + svc.orm.On("ApproveSpec", + mock.Anything, + spec.ID, + externalJobID, + ).Return(nil) + svc.fmsClient.On("ApprovedJob", + mock.MatchedBy(func(ctx context.Context) bool { return true }), + &proto.ApprovedJobRequest{ + Uuid: jp.RemoteUUID.String(), + Version: int64(spec.Version), + }, + ).Return(&proto.ApprovedJobResponse{}, nil) + svc.orm.On("CountJobProposalsByStatus", mock.Anything).Return(&feeds.JobProposalCounts{}, nil) + svc.orm.On("WithDataSource", mock.Anything).Return(feeds.ORM(svc.orm)) + svc.jobORM.On("WithDataSource", mock.Anything).Return(job.ORM(svc.jobORM)) + }, + id: spec.ID, + force: true, + }, + { + name: "already existing FMS managed job replacement success if forced", + httpTimeout: commonconfig.MustNewDuration(1 * time.Minute), + before: func(svc *TestService) { + svc.connMgr.On("GetClient", jp.FeedsManagerID).Return(svc.fmsClient, nil) + svc.orm.On("GetSpec", mock.Anything, spec.ID).Return(spec, nil) + svc.orm.On("GetJobProposal", mock.Anything, jp.ID).Return(jp, nil) + svc.jobORM.On("AssertBridgesExist", mock.Anything, mock.IsType(pipeline.Pipeline{})).Return(nil) + svc.orm.EXPECT().GetApprovedSpec(mock.Anything, jp.ID).Return(&feeds.JobProposalSpec{ID: 100}, nil) + svc.orm.EXPECT().CancelSpec(mock.Anything, int64(100)).Return(nil) + svc.jobORM.On("FindJobByExternalJobID", mock.Anything, externalJobID).Return(job.Job{}, sql.ErrNoRows) + svc.jobORM.On("FindJobIDByStreamID", mock.Anything, mock.Anything).Return(j.ID, nil) + svc.spawner.On("DeleteJob", mock.Anything, mock.Anything, j.ID).Return(nil) + + svc.spawner. + On("CreateJob", + mock.Anything, + mock.Anything, + mock.MatchedBy(func(j *job.Job) bool { + return j.Name.String == streamName + }), + ). + Run(func(args mock.Arguments) { (args.Get(2).(*job.Job)).ID = 1 }). + Return(nil) + svc.orm.On("ApproveSpec", + mock.Anything, + spec.ID, + externalJobID, + ).Return(nil) + svc.fmsClient.On("ApprovedJob", + mock.MatchedBy(func(ctx context.Context) bool { return true }), + &proto.ApprovedJobRequest{ + Uuid: jp.RemoteUUID.String(), + Version: int64(spec.Version), + }, + ).Return(&proto.ApprovedJobResponse{}, nil) + svc.orm.On("CountJobProposalsByStatus", mock.Anything).Return(&feeds.JobProposalCounts{}, nil) + svc.orm.On("WithDataSource", mock.Anything).Return(feeds.ORM(svc.orm)) + svc.jobORM.On("WithDataSource", mock.Anything).Return(job.ORM(svc.jobORM)) + }, + id: spec.ID, + force: true, + }, + { + name: "spec does not exist", + before: func(svc *TestService) { + svc.orm.On("GetSpec", mock.Anything, spec.ID).Return(nil, errors.New("Not Found")) + }, + id: spec.ID, + force: false, + wantErr: "orm: job proposal spec: Not Found", + }, + { + name: "cannot approve an approved spec", + before: func(svc *TestService) { + aspec := &feeds.JobProposalSpec{ + ID: spec.ID, + JobProposalID: jp.ID, + Status: feeds.SpecStatusApproved, + } + svc.orm.On("GetSpec", mock.Anything, spec.ID).Return(aspec, nil) + svc.orm.On("GetJobProposal", mock.Anything, jp.ID).Return(jp, nil) + }, + id: spec.ID, + force: false, + wantErr: "cannot approve an approved spec", + }, + { + name: "cannot approved a rejected spec", + before: func(svc *TestService) { + rspec := &feeds.JobProposalSpec{ + ID: spec.ID, + JobProposalID: jp.ID, + Status: feeds.SpecStatusRejected, + } + svc.orm.On("GetSpec", mock.Anything, rspec.ID, mock.Anything).Return(rspec, nil) + svc.orm.On("GetJobProposal", mock.Anything, jp.ID).Return(jp, nil) + }, + id: spec.ID, + force: false, + wantErr: "cannot approve a rejected spec", + }, + { + name: "job proposal does not exist", + before: func(svc *TestService) { + svc.orm.On("GetSpec", mock.Anything, spec.ID).Return(spec, nil) + svc.orm.On("GetJobProposal", mock.Anything, jp.ID).Return(nil, errors.New("Not Found")) + }, + id: spec.ID, + wantErr: "orm: job proposal: Not Found", + }, + { + name: "bridges do not exist", + httpTimeout: commonconfig.MustNewDuration(1 * time.Minute), + before: func(svc *TestService) { + svc.connMgr.On("GetClient", jp.FeedsManagerID).Return(svc.fmsClient, nil) + svc.orm.On("GetSpec", mock.Anything, spec.ID).Return(spec, nil) + svc.orm.On("GetJobProposal", mock.Anything, jp.ID).Return(jp, nil) + svc.jobORM.On("AssertBridgesExist", mock.Anything, mock.IsType(pipeline.Pipeline{})).Return(errors.New("bridges do not exist")) + }, + id: spec.ID, + wantErr: "failed to approve job spec due to bridge check: bridges do not exist", + }, + { + name: "rpc client not connected", + before: func(svc *TestService) { + svc.orm.On("GetSpec", mock.Anything, spec.ID).Return(spec, nil) + svc.orm.On("GetJobProposal", mock.Anything, jp.ID).Return(jp, nil) + svc.connMgr.On("GetClient", jp.FeedsManagerID).Return(nil, errors.New("Not Connected")) + }, + id: spec.ID, + force: false, + wantErr: "fms rpc client: Not Connected", + }, + { + name: "create job error", + httpTimeout: commonconfig.MustNewDuration(1 * time.Minute), + before: func(svc *TestService) { + svc.orm.On("GetSpec", mock.Anything, spec.ID).Return(spec, nil) + svc.orm.On("GetJobProposal", mock.Anything, jp.ID).Return(jp, nil) + svc.connMgr.On("GetClient", jp.FeedsManagerID).Return(svc.fmsClient, nil) + svc.jobORM.On("AssertBridgesExist", mock.Anything, mock.IsType(pipeline.Pipeline{})).Return(nil) + + svc.jobORM.On("FindJobByExternalJobID", mock.Anything, externalJobID).Return(job.Job{}, sql.ErrNoRows) + svc.jobORM.On("FindJobIDByStreamID", mock.Anything, mock.Anything).Return(int32(0), sql.ErrNoRows) + + svc.spawner. + On("CreateJob", + mock.Anything, + mock.Anything, + mock.MatchedBy(func(j *job.Job) bool { + return j.Name.String == streamName + }), + ). + Return(errors.New("could not save")) + svc.orm.On("WithDataSource", mock.Anything).Return(feeds.ORM(svc.orm)) + svc.jobORM.On("WithDataSource", mock.Anything).Return(job.ORM(svc.jobORM)) + }, + id: spec.ID, + force: false, + wantErr: "could not approve job proposal: could not save", + }, + { + name: "approve spec orm error", + httpTimeout: commonconfig.MustNewDuration(1 * time.Minute), + before: func(svc *TestService) { + svc.orm.On("GetSpec", mock.Anything, spec.ID).Return(spec, nil) + svc.orm.On("GetJobProposal", mock.Anything, jp.ID).Return(jp, nil) + svc.connMgr.On("GetClient", jp.FeedsManagerID).Return(svc.fmsClient, nil) + svc.jobORM.On("AssertBridgesExist", mock.Anything, mock.IsType(pipeline.Pipeline{})).Return(nil) + + svc.jobORM.On("FindJobByExternalJobID", mock.Anything, externalJobID).Return(job.Job{}, sql.ErrNoRows) + svc.jobORM.On("FindJobIDByStreamID", mock.Anything, mock.Anything).Return(int32(0), sql.ErrNoRows) + + svc.spawner. + On("CreateJob", + mock.Anything, + mock.Anything, + mock.MatchedBy(func(j *job.Job) bool { + return j.Name.String == streamName + }), + ). + Run(func(args mock.Arguments) { (args.Get(2).(*job.Job)).ID = 1 }). + Return(nil) + svc.orm.On("ApproveSpec", + mock.Anything, + spec.ID, + externalJobID, + ).Return(errors.New("failure")) + svc.orm.On("WithDataSource", mock.Anything).Return(feeds.ORM(svc.orm)) + svc.jobORM.On("WithDataSource", mock.Anything).Return(job.ORM(svc.jobORM)) + }, + id: spec.ID, + force: false, + wantErr: "could not approve job proposal: failure", + }, + { + name: "fms call error", + httpTimeout: commonconfig.MustNewDuration(1 * time.Minute), + before: func(svc *TestService) { + svc.orm.On("GetSpec", mock.Anything, spec.ID).Return(spec, nil) + svc.orm.On("GetJobProposal", mock.Anything, jp.ID).Return(jp, nil) + svc.connMgr.On("GetClient", jp.FeedsManagerID).Return(svc.fmsClient, nil) + svc.jobORM.On("AssertBridgesExist", mock.Anything, mock.IsType(pipeline.Pipeline{})).Return(nil) + + svc.jobORM.On("FindJobByExternalJobID", mock.Anything, externalJobID).Return(job.Job{}, sql.ErrNoRows) + svc.jobORM.On("FindJobIDByStreamID", mock.Anything, mock.Anything).Return(int32(0), sql.ErrNoRows) + + svc.spawner. + On("CreateJob", + mock.Anything, + mock.Anything, + mock.MatchedBy(func(j *job.Job) bool { + return j.Name.String == streamName + }), + ). + Run(func(args mock.Arguments) { (args.Get(2).(*job.Job)).ID = 1 }). + Return(nil) + svc.orm.On("ApproveSpec", + mock.Anything, + spec.ID, + externalJobID, + ).Return(nil) + svc.fmsClient.On("ApprovedJob", + mock.MatchedBy(func(ctx context.Context) bool { return true }), + &proto.ApprovedJobRequest{ + Uuid: jp.RemoteUUID.String(), + Version: int64(spec.Version), + }, + ).Return(nil, errors.New("failure")) + svc.orm.On("WithDataSource", mock.Anything).Return(feeds.ORM(svc.orm)) + svc.jobORM.On("WithDataSource", mock.Anything).Return(job.ORM(svc.jobORM)) + }, + id: spec.ID, + force: false, + wantErr: "could not approve job proposal: failure", + }, + } + + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + svc := setupTestServiceCfg(t, func(c *chainlink.Config, s *chainlink.Secrets) { + c.OCR2.Enabled = testutils.Ptr(true) + if tc.httpTimeout != nil { + c.JobPipeline.HTTPRequest.DefaultTimeout = tc.httpTimeout + } + }) + + if tc.before != nil { + tc.before(svc) + } + + err := svc.ApproveSpec(ctx, tc.id, tc.force) + + if tc.wantErr != "" { + require.Error(t, err) + assert.EqualError(t, err, tc.wantErr) + } else { + require.NoError(t, err) + } + }) + } +} + func Test_Service_ApproveSpec_Bootstrap(t *testing.T) { address := "0x613a38AC1659769640aaE063C651F48E0250454C" feedIDHex := "0x0000000000000000000000000000000000000000000000000000000000000001" diff --git a/core/services/job/mocks/orm.go b/core/services/job/mocks/orm.go index 89426b55a21..96513866f37 100644 --- a/core/services/job/mocks/orm.go +++ b/core/services/job/mocks/orm.go @@ -601,6 +601,63 @@ func (_c *ORM_FindJobIDByCapabilityNameAndVersion_Call) RunAndReturn(run func(co return _c } +// FindJobIDByStreamID provides a mock function with given fields: ctx, streamID +func (_m *ORM) FindJobIDByStreamID(ctx context.Context, streamID uint32) (int32, error) { + ret := _m.Called(ctx, streamID) + + if len(ret) == 0 { + panic("no return value specified for FindJobIDByStreamID") + } + + var r0 int32 + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, uint32) (int32, error)); ok { + return rf(ctx, streamID) + } + if rf, ok := ret.Get(0).(func(context.Context, uint32) int32); ok { + r0 = rf(ctx, streamID) + } else { + r0 = ret.Get(0).(int32) + } + + if rf, ok := ret.Get(1).(func(context.Context, uint32) error); ok { + r1 = rf(ctx, streamID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// ORM_FindJobIDByStreamID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'FindJobIDByStreamID' +type ORM_FindJobIDByStreamID_Call struct { + *mock.Call +} + +// FindJobIDByStreamID is a helper method to define mock.On call +// - ctx context.Context +// - streamID uint32 +func (_e *ORM_Expecter) FindJobIDByStreamID(ctx interface{}, streamID interface{}) *ORM_FindJobIDByStreamID_Call { + return &ORM_FindJobIDByStreamID_Call{Call: _e.mock.On("FindJobIDByStreamID", ctx, streamID)} +} + +func (_c *ORM_FindJobIDByStreamID_Call) Run(run func(ctx context.Context, streamID uint32)) *ORM_FindJobIDByStreamID_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(uint32)) + }) + return _c +} + +func (_c *ORM_FindJobIDByStreamID_Call) Return(_a0 int32, _a1 error) *ORM_FindJobIDByStreamID_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *ORM_FindJobIDByStreamID_Call) RunAndReturn(run func(context.Context, uint32) (int32, error)) *ORM_FindJobIDByStreamID_Call { + _c.Call.Return(run) + return _c +} + // FindJobIDByWorkflow provides a mock function with given fields: ctx, spec func (_m *ORM) FindJobIDByWorkflow(ctx context.Context, spec job.WorkflowSpec) (int32, error) { ret := _m.Called(ctx, spec) diff --git a/core/services/job/orm.go b/core/services/job/orm.go index 92ec9b2e83c..38e3fa492ce 100644 --- a/core/services/job/orm.go +++ b/core/services/job/orm.go @@ -78,6 +78,8 @@ type ORM interface { FindJobIDByWorkflow(ctx context.Context, spec WorkflowSpec) (int32, error) FindJobIDByCapabilityNameAndVersion(ctx context.Context, spec CCIPSpec) (int32, error) + + FindJobIDByStreamID(ctx context.Context, streamID uint32) (int32, error) } type ORMConfig interface { @@ -1334,6 +1336,20 @@ func (o *orm) FindJobsByPipelineSpecIDs(ctx context.Context, ids []int32) ([]Job return jbs, errors.Wrap(err, "FindJobsByPipelineSpecIDs failed") } +func (o *orm) FindJobIDByStreamID(ctx context.Context, streamID uint32) (jobID int32, err error) { + stmt := `SELECT id FROM jobs WHERE type = 'stream' AND stream_id = $1` + err = o.ds.GetContext(ctx, &jobID, stmt, streamID) + if err != nil { + if !errors.Is(err, sql.ErrNoRows) { + err = errors.Wrap(err, "error searching for job by stream id") + } + err = errors.Wrap(err, "FindJobIDByStreamID failed") + return + } + + return +} + // PipelineRuns returns pipeline runs for a job, with spec and taskruns loaded, latest first // If jobID is nil, returns all pipeline runs func (o *orm) PipelineRuns(ctx context.Context, jobID *int32, offset, size int) (runs []pipeline.Run, count int, err error) {