Skip to content

Commit

Permalink
Merge branch 'develop' into ccipreader-tests2
Browse files Browse the repository at this point in the history
  • Loading branch information
asoliman92 committed Nov 29, 2024
2 parents e67d90e + fcc8d3c commit c73fd83
Show file tree
Hide file tree
Showing 24 changed files with 1,694 additions and 154 deletions.
5 changes: 5 additions & 0 deletions .changeset/thin-cats-try.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

Add support for Mercury LLO streams to feeds service. #added
2 changes: 1 addition & 1 deletion .github/e2e-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -942,7 +942,7 @@ runner-test-matrix:
triggers:
- PR E2E Core Tests
- Nightly E2E Tests
test_cmd: cd integration-tests/smoke/ccip && go test ccip_test.go -timeout 12m -test.parallel=2 -count=1 -json
test_cmd: cd integration-tests/smoke/ccip && go test ccip_test.go -timeout 25m -test.parallel=2 -count=1 -json
pyroscope_env: ci-smoke-ccipv1_6-evm-simulated
test_env_vars:
E2E_TEST_SELECTED_NETWORK: SIMULATED_1,SIMULATED_2
Expand Down
8 changes: 8 additions & 0 deletions .github/integration-in-memory-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,14 @@
runner-test-matrix:
# START: CCIPv1.6 tests

- id: smoke/ccip/ccip_fees_test.go:*
path: integration-tests/smoke/ccip/ccip_fees_test.go
test_env_type: in-memory
runs_on: ubuntu-latest
triggers:
- PR Integration CCIP Tests
test_cmd: cd integration-tests/smoke/ccip && go test ccip_fees_test.go -timeout 12m -test.parallel=2 -count=1 -json

- id: smoke/ccip/ccip_messaging_test.go:*
path: integration-tests/smoke/ccip/ccip_messaging_test.go
test_env_type: in-memory
Expand Down
54 changes: 0 additions & 54 deletions core/capabilities/remote/executable/endtoend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,60 +63,6 @@ func Test_RemoteExecutableCapability_TransmissionSchedules(t *testing.T) {
testRemoteExecutableCapability(ctx, t, capability, 10, 9, timeOut, 10, 9, timeOut, method)
}

func Test_RemoteExecutionCapability_DonTopologies(t *testing.T) {
ctx := testutils.Context(t)

responseTest := func(t *testing.T, response commoncap.CapabilityResponse, responseError error) {
require.NoError(t, responseError)
mp, err := response.Value.Unwrap()
require.NoError(t, err)
assert.Equal(t, "aValue1", mp.(map[string]any)["response"].(string))
}

transmissionSchedule, err := values.NewMap(map[string]any{
"schedule": transmission.Schedule_OneAtATime,
"deltaStage": "10ms",
})
require.NoError(t, err)

timeOut := 10 * time.Minute

capability := &TestCapability{}

var methods []func(ctx context.Context, caller commoncap.ExecutableCapability)

methods = append(methods, func(ctx context.Context, caller commoncap.ExecutableCapability) {
executeCapability(ctx, t, caller, transmissionSchedule, responseTest)
})

methods = append(methods, func(ctx context.Context, caller commoncap.ExecutableCapability) {
registerWorkflow(ctx, t, caller, transmissionSchedule, func(t *testing.T, responseError error) {
require.NoError(t, responseError)
})
})

methods = append(methods, func(ctx context.Context, caller commoncap.ExecutableCapability) {
unregisterWorkflow(ctx, t, caller, transmissionSchedule, func(t *testing.T, responseError error) {
require.NoError(t, responseError)
})
})

for _, method := range methods {
// Test scenarios where the number of submissions is greater than or equal to F + 1
testRemoteExecutableCapability(ctx, t, capability, 1, 0, timeOut, 1, 0, timeOut, method)
testRemoteExecutableCapability(ctx, t, capability, 4, 3, timeOut, 1, 0, timeOut, method)
testRemoteExecutableCapability(ctx, t, capability, 10, 3, timeOut, 1, 0, timeOut, method)

testRemoteExecutableCapability(ctx, t, capability, 1, 0, timeOut, 1, 0, timeOut, method)
testRemoteExecutableCapability(ctx, t, capability, 1, 0, timeOut, 4, 3, timeOut, method)
testRemoteExecutableCapability(ctx, t, capability, 1, 0, timeOut, 10, 3, timeOut, method)

testRemoteExecutableCapability(ctx, t, capability, 4, 3, timeOut, 4, 3, timeOut, method)
testRemoteExecutableCapability(ctx, t, capability, 10, 3, timeOut, 10, 3, timeOut, method)
testRemoteExecutableCapability(ctx, t, capability, 10, 9, timeOut, 10, 9, timeOut, method)
}
}

func Test_RemoteExecutionCapability_CapabilityError(t *testing.T) {
ctx := testutils.Context(t)

Expand Down
7 changes: 0 additions & 7 deletions core/services/chainlink/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/webhook"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows"
workflowstore "github.com/smartcontractkit/chainlink/v2/core/services/workflows/store"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer"
"github.com/smartcontractkit/chainlink/v2/core/sessions"
"github.com/smartcontractkit/chainlink/v2/core/sessions/ldapauth"
"github.com/smartcontractkit/chainlink/v2/core/sessions/localauth"
Expand Down Expand Up @@ -213,11 +212,6 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
opts.CapabilitiesRegistry = capabilities.NewRegistry(globalLogger)
}

// TODO: wire this up to config so we only instantiate it
// if a workflow registry address is provided.
workflowRegistrySyncer := syncer.NewNullWorkflowRegistrySyncer()
srvcs = append(srvcs, workflowRegistrySyncer)

var externalPeerWrapper p2ptypes.PeerWrapper
if cfg.Capabilities().Peering().Enabled() {
var dispatcher remotetypes.Dispatcher
Expand Down Expand Up @@ -478,7 +472,6 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
delegates[job.Workflow] = workflows.NewDelegate(
globalLogger,
opts.CapabilitiesRegistry,
workflowRegistrySyncer,
workflowORM,
)

Expand Down
11 changes: 11 additions & 0 deletions core/services/feeds/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"

ccip "github.com/smartcontractkit/chainlink/v2/core/capabilities/ccip/validate"
"github.com/smartcontractkit/chainlink/v2/core/services/streams"
"github.com/smartcontractkit/chainlink/v2/plugins"

pb "github.com/smartcontractkit/chainlink-protos/orchestrator/feedsmanager"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big"
"github.com/smartcontractkit/chainlink/v2/core/chains/legacyevm"
Expand Down Expand Up @@ -859,6 +861,13 @@ func (s *service) ApproveSpec(ctx context.Context, id int64, force bool) error {
if txerr != nil && !errors.Is(txerr, sql.ErrNoRows) {
return fmt.Errorf("failed while checking for existing ccip job: %w", txerr)
}
case job.Stream:
existingJobID, txerr = tx.jobORM.FindJobIDByStreamID(ctx, *j.StreamID)
// Return an error if the repository errors. If there is a not found
// error we want to continue with approving the job.
if txerr != nil && !errors.Is(txerr, sql.ErrNoRows) {
return fmt.Errorf("failed while checking for existing stream job: %w", txerr)
}
default:
return errors.Errorf("unsupported job type when approving job proposal specs: %s", j.Type)
}
Expand Down Expand Up @@ -1249,6 +1258,8 @@ func (s *service) generateJob(ctx context.Context, spec string) (*job.Job, error
js, err = workflows.ValidatedWorkflowJobSpec(ctx, spec)
case job.CCIP:
js, err = ccip.ValidatedCCIPSpec(spec)
case job.Stream:
js, err = streams.ValidatedStreamSpec(spec)
default:
return nil, errors.Errorf("unknown job type: %s", jobType)
}
Expand Down
Loading

0 comments on commit c73fd83

Please sign in to comment.