From bc8e921b5fed721c10c9b184d03885dc6da0938e Mon Sep 17 00:00:00 2001 From: Ivaylo Novakov Date: Mon, 25 Nov 2024 23:56:12 +0100 Subject: [PATCH] Add support for stream jobs to the feeds service. --- core/services/feeds/service.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/core/services/feeds/service.go b/core/services/feeds/service.go index 61b2d53f2d5..da7fb5be234 100644 --- a/core/services/feeds/service.go +++ b/core/services/feeds/service.go @@ -19,9 +19,11 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" ccip "github.com/smartcontractkit/chainlink/v2/core/capabilities/ccip/validate" + "github.com/smartcontractkit/chainlink/v2/core/services/streams" "github.com/smartcontractkit/chainlink/v2/plugins" pb "github.com/smartcontractkit/chainlink-protos/orchestrator/feedsmanager" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big" "github.com/smartcontractkit/chainlink/v2/core/chains/legacyevm" @@ -859,6 +861,15 @@ func (s *service) ApproveSpec(ctx context.Context, id int64, force bool) error { if txerr != nil && !errors.Is(txerr, sql.ErrNoRows) { return fmt.Errorf("failed while checking for existing ccip job: %w", txerr) } + case job.Stream: + var existingJob job.Job + existingJob, txerr = tx.jobORM.FindJobByExternalJobID(ctx, j.ExternalJobID) + // Return an error if the repository errors. If there is a not found + // error we want to continue with approving the job. + if txerr != nil && !errors.Is(txerr, sql.ErrNoRows) { + return fmt.Errorf("failed while checking for existing ccip job: %w", txerr) + } + existingJobID = existingJob.ID default: return errors.Errorf("unsupported job type when approving job proposal specs: %s", j.Type) } @@ -1249,6 +1260,8 @@ func (s *service) generateJob(ctx context.Context, spec string) (*job.Job, error js, err = workflows.ValidatedWorkflowJobSpec(ctx, spec) case job.CCIP: js, err = ccip.ValidatedCCIPSpec(spec) + case job.Stream: + js, err = streams.ValidatedStreamSpec(spec) default: return nil, errors.Errorf("unknown job type: %s", jobType) }