Skip to content

Commit

Permalink
MERC-6389: Add support for stream jobs to the feeds service. (#15412)
Browse files Browse the repository at this point in the history
* Add support for stream jobs to the feeds service.

* Add ORM method FindJobIDByStreamID and cover approving stream specs with a test.

* Fix changeset.
  • Loading branch information
ro-tex authored Nov 29, 2024
1 parent 86f83f4 commit bd43e27
Show file tree
Hide file tree
Showing 5 changed files with 633 additions and 0 deletions.
5 changes: 5 additions & 0 deletions .changeset/thin-cats-try.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

Add support for Mercury LLO streams to feeds service. #added
11 changes: 11 additions & 0 deletions core/services/feeds/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"

ccip "github.com/smartcontractkit/chainlink/v2/core/capabilities/ccip/validate"
"github.com/smartcontractkit/chainlink/v2/core/services/streams"
"github.com/smartcontractkit/chainlink/v2/plugins"

pb "github.com/smartcontractkit/chainlink-protos/orchestrator/feedsmanager"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big"
"github.com/smartcontractkit/chainlink/v2/core/chains/legacyevm"
Expand Down Expand Up @@ -859,6 +861,13 @@ func (s *service) ApproveSpec(ctx context.Context, id int64, force bool) error {
if txerr != nil && !errors.Is(txerr, sql.ErrNoRows) {
return fmt.Errorf("failed while checking for existing ccip job: %w", txerr)
}
case job.Stream:
existingJobID, txerr = tx.jobORM.FindJobIDByStreamID(ctx, *j.StreamID)
// Return an error if the repository errors. If there is a not found
// error we want to continue with approving the job.
if txerr != nil && !errors.Is(txerr, sql.ErrNoRows) {
return fmt.Errorf("failed while checking for existing stream job: %w", txerr)
}
default:
return errors.Errorf("unsupported job type when approving job proposal specs: %s", j.Type)
}
Expand Down Expand Up @@ -1249,6 +1258,8 @@ func (s *service) generateJob(ctx context.Context, spec string) (*job.Job, error
js, err = workflows.ValidatedWorkflowJobSpec(ctx, spec)
case job.CCIP:
js, err = ccip.ValidatedCCIPSpec(spec)
case job.Stream:
js, err = streams.ValidatedStreamSpec(spec)
default:
return nil, errors.Errorf("unknown job type: %s", jobType)
}
Expand Down
Loading

0 comments on commit bd43e27

Please sign in to comment.