diff --git a/.github/tracing/README.md b/.github/tracing/README.md index feba31feb65..04f0216e25f 100644 --- a/.github/tracing/README.md +++ b/.github/tracing/README.md @@ -48,4 +48,55 @@ This folder contains the following config files: These config files are for an OTEL collector, grafana Tempo, and a grafana UI instance to run as containers on the same network. `otel-collector-dev.yaml` is the configuration for dev (i.e. your local machine) environments, and forwards traces from the otel collector to the grafana tempo instance on the same network. -`otel-collector-ci.yaml` is the configuration for the CI runs, and exports the trace data to the artifact from the github run. \ No newline at end of file +`otel-collector-ci.yaml` is the configuration for the CI runs, and exports the trace data to the artifact from the github run. + +## Adding Traces to Plugins and to core + +Adding traces requires identifying an observability gap in a related group of code executions or a critical path in your application. This is intuitive for the developer: + +- "What's the flow of component interaction in this distributed system?" +- "What's the behavior of the JobProcessorOne component when jobs with [x, y, z] attributes are processed?" +- "Is this critical path workflow behaving the way we expect?" + +The developer will measure a flow of execution from end to end in one trace. Each logically separate measure of this flow is called a span. Spans have either one or no parent span and multiple children span. The relationship between parent and child spans in agreggate will form a directed acyclic graph. The trace begins at the root of this graph. + +The most trivial application of a span is measuring top level performance in one critical path. There is much more you can do, including creating human readable and timestamped events within a span (useful for monitoring concurrent access to resources), recording errors, linking parent and children spans through large parts of an application, and even extending a span beyond a single process. + +Spans are created by `tracers` and passed through go applications by `Context`s. A tracer must be initialized first. Both core and plugin developers will initialize a tracer from the globally registered trace provider: + +``` +tracer := otel.GetTracerProvider().Tracer("example.com/foo") +``` + +The globally registered tracer provider is available for plugins after they are initialized, and available in core after configuration is processed (`initGlobals`). + +Add spans by: +``` + func interestingFunc() { + // Assuming there is an appropriate parentContext + ctx, span := tracer.Start(parentContext, "hello-span") + defer span.End() + + // do some work to track with hello-span + } +``` +As implied by the example, `span` is a child of its parent span captured by `parentContext`. + + +Note that in certain situations, there are 3rd party libraries that will setup spans. For instance: + +``` +import ( + "github.com/gin-gonic/gin" + "go.opentelemetry.io/contrib/instrumentation/github.com/gin-gonic/gin/otelgin" +) + +router := gin.Default() +router.Use(otelgin.Middleware("service-name")) +``` + +The developer aligns with best practices when they: +- Start with critical paths +- Measure paths from end to end (Context is wired all the way through) +- Emphasize broadness of measurement over depth +- Use automatic instrumentation if possible \ No newline at end of file diff --git a/.github/workflows/integration-tests.yml b/.github/workflows/integration-tests.yml index ab8ae8eeb2e..75184a47965 100644 --- a/.github/workflows/integration-tests.yml +++ b/.github/workflows/integration-tests.yml @@ -326,18 +326,12 @@ jobs: nodes: 1 os: ubuntu-latest file: ocr - pyroscope_env: ci-smoke-ocr-evm-simulated + pyroscope_env: ci-smoke-ocr-evm-simulated - name: ocr2 nodes: 1 os: ubuntu-latest file: ocr2 - pyroscope_env: ci-smoke-ocr2-evm-simulated - - name: ocr2 - nodes: 1 - os: ubuntu-latest - run: -run TestOCRv2Request - file: ocr2 - pyroscope_env: ci-smoke-ocr2-evm-simulated + pyroscope_env: ci-smoke-ocr2-evm-simulated - name: ocr2 nodes: 1 os: ubuntu-latest @@ -358,7 +352,7 @@ jobs: - name: vrfv2plus nodes: 1 os: ubuntu-latest - pyroscope_env: ci-smoke-vrf2plus-evm-simulated + pyroscope_env: ci-smoke-vrf2plus-evm-simulated - name: forwarder_ocr nodes: 1 os: ubuntu-latest diff --git a/core/cmd/app.go b/core/cmd/app.go index 17a4da85002..8e61380156c 100644 --- a/core/cmd/app.go +++ b/core/cmd/app.go @@ -162,6 +162,17 @@ func NewApp(s *Shell) *cli.App { Usage: "Commands for the node's configuration", Subcommands: initRemoteConfigSubCmds(s), }, + { + Name: "health", + Usage: "Prints a health report", + Action: s.Health, + Flags: []cli.Flag{ + cli.BoolFlag{ + Name: "json, j", + Usage: "json output", + }, + }, + }, { Name: "jobs", Usage: "Commands for managing Jobs", diff --git a/core/cmd/shell_local.go b/core/cmd/shell_local.go index d4fb796c3e2..69b7373ed70 100644 --- a/core/cmd/shell_local.go +++ b/core/cmd/shell_local.go @@ -51,6 +51,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/utils" "github.com/smartcontractkit/chainlink/v2/core/web" webPresenters "github.com/smartcontractkit/chainlink/v2/core/web/presenters" + "github.com/smartcontractkit/chainlink/v2/internal/testdb" ) var ErrProfileTooLong = errors.New("requested profile duration too large") @@ -258,13 +259,6 @@ func initLocalSubCmds(s *Shell, safe bool) []cli.Command { // ownerPermsMask are the file permission bits reserved for owner. const ownerPermsMask = os.FileMode(0o700) -// PristineDBName is a clean copy of test DB with migrations. -// Used by heavyweight.FullTestDB* functions. -const ( - PristineDBName = "chainlink_test_pristine" - TestDBNamePrefix = "chainlink_test_" -) - // RunNode starts the Chainlink core. func (s *Shell) RunNode(c *cli.Context) error { if err := s.runNode(c); err != nil { @@ -815,7 +809,7 @@ func dropDanglingTestDBs(lggr logger.Logger, db *sqlx.DB) (err error) { }() } for _, dbname := range dbs { - if strings.HasPrefix(dbname, TestDBNamePrefix) && !strings.HasSuffix(dbname, "_pristine") { + if strings.HasPrefix(dbname, testdb.TestDBNamePrefix) && !strings.HasSuffix(dbname, "_pristine") { ch <- dbname } } @@ -1085,11 +1079,11 @@ func dropAndCreateDB(parsed url.URL) (err error) { } func dropAndCreatePristineDB(db *sqlx.DB, template string) (err error) { - _, err = db.Exec(fmt.Sprintf(`DROP DATABASE IF EXISTS "%s"`, PristineDBName)) + _, err = db.Exec(fmt.Sprintf(`DROP DATABASE IF EXISTS "%s"`, testdb.PristineDBName)) if err != nil { return fmt.Errorf("unable to drop postgres database: %v", err) } - _, err = db.Exec(fmt.Sprintf(`CREATE DATABASE "%s" WITH TEMPLATE "%s"`, PristineDBName, template)) + _, err = db.Exec(fmt.Sprintf(`CREATE DATABASE "%s" WITH TEMPLATE "%s"`, testdb.PristineDBName, template)) if err != nil { return fmt.Errorf("unable to create postgres database: %v", err) } diff --git a/core/cmd/shell_remote.go b/core/cmd/shell_remote.go index fa72d21ee6f..bc4620d0732 100644 --- a/core/cmd/shell_remote.go +++ b/core/cmd/shell_remote.go @@ -11,6 +11,7 @@ import ( "strconv" "strings" + "github.com/gin-gonic/gin" "github.com/manyminds/api2go/jsonapi" "github.com/mitchellh/go-homedir" "github.com/pelletier/go-toml" @@ -511,6 +512,23 @@ func (s *Shell) checkRemoteBuildCompatibility(lggr logger.Logger, onlyWarn bool, return nil } +func (s *Shell) Health(c *cli.Context) error { + mime := gin.MIMEPlain + if c.Bool("json") { + mime = gin.MIMEJSON + } + resp, err := s.HTTP.Get("/health", map[string]string{"Accept": mime}) + if err != nil { + return s.errorOut(err) + } + b, err := parseResponse(resp) + if err != nil { + return s.errorOut(err) + } + fmt.Println(string(b)) + return nil +} + // ErrIncompatible is returned when the cli and remote versions are not compatible. type ErrIncompatible struct { CLIVersion, CLISha string diff --git a/core/internal/cltest/heavyweight/orm.go b/core/internal/cltest/heavyweight/orm.go index 5df28a49778..f49a94be05b 100644 --- a/core/internal/cltest/heavyweight/orm.go +++ b/core/internal/cltest/heavyweight/orm.go @@ -1,13 +1,8 @@ -package heavyweight - -// The heavyweight package contains cltest items that are costly and you should +// Package heavyweight contains test helpers that are costly and you should // think **real carefully** before using in your tests. +package heavyweight import ( - "database/sql" - "errors" - "fmt" - "net/url" "os" "path" "runtime" @@ -20,41 +15,45 @@ import ( "github.com/jmoiron/sqlx" - "github.com/smartcontractkit/chainlink/v2/core/cmd" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/configtest" "github.com/smartcontractkit/chainlink/v2/core/services/chainlink" "github.com/smartcontractkit/chainlink/v2/core/services/pg" "github.com/smartcontractkit/chainlink/v2/core/store/dialects" "github.com/smartcontractkit/chainlink/v2/core/store/models" + "github.com/smartcontractkit/chainlink/v2/internal/testdb" ) // FullTestDBV2 creates a pristine DB which runs in a separate database than the normal // unit tests, so you can do things like use other Postgres connection types with it. func FullTestDBV2(t testing.TB, overrideFn func(c *chainlink.Config, s *chainlink.Secrets)) (chainlink.GeneralConfig, *sqlx.DB) { - return prepareFullTestDBV2(t, false, true, overrideFn) + return KindFixtures.PrepareDB(t, overrideFn) } // FullTestDBNoFixturesV2 is the same as FullTestDB, but it does not load fixtures. func FullTestDBNoFixturesV2(t testing.TB, overrideFn func(c *chainlink.Config, s *chainlink.Secrets)) (chainlink.GeneralConfig, *sqlx.DB) { - return prepareFullTestDBV2(t, false, false, overrideFn) + return KindTemplate.PrepareDB(t, overrideFn) } // FullTestDBEmptyV2 creates an empty DB (without migrations). func FullTestDBEmptyV2(t testing.TB, overrideFn func(c *chainlink.Config, s *chainlink.Secrets)) (chainlink.GeneralConfig, *sqlx.DB) { - return prepareFullTestDBV2(t, true, false, overrideFn) + return KindEmpty.PrepareDB(t, overrideFn) } func generateName() string { return strings.ReplaceAll(uuid.New().String(), "-", "") } -func prepareFullTestDBV2(t testing.TB, empty bool, loadFixtures bool, overrideFn func(c *chainlink.Config, s *chainlink.Secrets)) (chainlink.GeneralConfig, *sqlx.DB) { - testutils.SkipShort(t, "FullTestDB") +type Kind int - if empty && loadFixtures { - t.Fatal("could not load fixtures into an empty DB") - } +const ( + KindEmpty Kind = iota + KindTemplate + KindFixtures +) + +func (c Kind) PrepareDB(t testing.TB, overrideFn func(c *chainlink.Config, s *chainlink.Secrets)) (chainlink.GeneralConfig, *sqlx.DB) { + testutils.SkipShort(t, "FullTestDB") gcfg := configtest.NewGeneralConfigSimulated(t, func(c *chainlink.Config, s *chainlink.Secrets) { c.Database.Dialect = dialects.Postgres @@ -64,7 +63,7 @@ func prepareFullTestDBV2(t testing.TB, empty bool, loadFixtures bool, overrideFn }) require.NoError(t, os.MkdirAll(gcfg.RootDir(), 0700)) - migrationTestDBURL, err := dropAndCreateThrowawayTestDB(gcfg.Database().URL(), generateName(), empty) + migrationTestDBURL, err := testdb.CreateOrReplace(gcfg.Database().URL(), generateName(), c != KindEmpty) require.NoError(t, err) db, err := pg.NewConnection(migrationTestDBURL, dialects.Postgres, gcfg.Database()) require.NoError(t, err) @@ -81,7 +80,7 @@ func prepareFullTestDBV2(t testing.TB, empty bool, loadFixtures bool, overrideFn } }) - if loadFixtures { + if c == KindFixtures { _, filename, _, ok := runtime.Caller(1) if !ok { t.Fatal("could not get runtime.Caller(1)") @@ -95,39 +94,3 @@ func prepareFullTestDBV2(t testing.TB, empty bool, loadFixtures bool, overrideFn return gcfg, db } - -func dropAndCreateThrowawayTestDB(parsed url.URL, postfix string, empty bool) (string, error) { - if parsed.Path == "" { - return "", errors.New("path missing from database URL") - } - - // Match the naming schema that our dangling DB cleanup methods expect - dbname := cmd.TestDBNamePrefix + postfix - if l := len(dbname); l > 63 { - return "", fmt.Errorf("dbname %v too long (%d), max is 63 bytes. Try a shorter postfix", dbname, l) - } - // Cannot drop test database if we are connected to it, so we must connect - // to a different one. 'postgres' should be present on all postgres installations - parsed.Path = "/postgres" - db, err := sql.Open(string(dialects.Postgres), parsed.String()) - if err != nil { - return "", fmt.Errorf("In order to drop the test database, we need to connect to a separate database"+ - " called 'postgres'. But we are unable to open 'postgres' database: %+v\n", err) - } - defer db.Close() - - _, err = db.Exec(fmt.Sprintf("DROP DATABASE IF EXISTS %s", dbname)) - if err != nil { - return "", fmt.Errorf("unable to drop postgres migrations test database: %v", err) - } - if empty { - _, err = db.Exec(fmt.Sprintf("CREATE DATABASE %s", dbname)) - } else { - _, err = db.Exec(fmt.Sprintf("CREATE DATABASE %s WITH TEMPLATE %s", dbname, cmd.PristineDBName)) - } - if err != nil { - return "", fmt.Errorf("unable to create postgres test database with name '%s': %v", dbname, err) - } - parsed.Path = fmt.Sprintf("/%s", dbname) - return parsed.String(), nil -} diff --git a/core/scripts/gateway/run_gateway.go b/core/scripts/gateway/run_gateway.go index e30c43bb8af..5dbcd02bf56 100644 --- a/core/scripts/gateway/run_gateway.go +++ b/core/scripts/gateway/run_gateway.go @@ -48,7 +48,7 @@ func main() { lggr, _ := logger.NewLogger() - handlerFactory := gateway.NewHandlerFactory(nil, lggr) + handlerFactory := gateway.NewHandlerFactory(nil, nil, nil, lggr) gw, err := gateway.NewGatewayFromConfig(&cfg, handlerFactory, lggr) if err != nil { fmt.Println("error creating Gateway object:", err) diff --git a/core/services/chainlink/application.go b/core/services/chainlink/application.go index ee109db2119..e94f51abdf5 100644 --- a/core/services/chainlink/application.go +++ b/core/services/chainlink/application.go @@ -344,6 +344,8 @@ func NewApplication(opts ApplicationOpts) (Application, error) { job.Gateway: gateway.NewDelegate( legacyEVMChains, keyStore.Eth(), + db, + cfg.Database(), globalLogger), } webhookJobRunner = delegates[job.Webhook].(*webhook.Delegate).WebhookJobRunner() diff --git a/core/services/gateway/delegate.go b/core/services/gateway/delegate.go index d4180184aca..8a97f68d1ea 100644 --- a/core/services/gateway/delegate.go +++ b/core/services/gateway/delegate.go @@ -4,6 +4,7 @@ import ( "encoding/json" "github.com/google/uuid" + "github.com/jmoiron/sqlx" "github.com/pelletier/go-toml" "github.com/pkg/errors" @@ -18,13 +19,21 @@ import ( type Delegate struct { legacyChains legacyevm.LegacyChainContainer ks keystore.Eth + db *sqlx.DB + cfg pg.QConfig lggr logger.Logger } var _ job.Delegate = (*Delegate)(nil) -func NewDelegate(legacyChains legacyevm.LegacyChainContainer, ks keystore.Eth, lggr logger.Logger) *Delegate { - return &Delegate{legacyChains: legacyChains, ks: ks, lggr: lggr} +func NewDelegate(legacyChains legacyevm.LegacyChainContainer, ks keystore.Eth, db *sqlx.DB, cfg pg.QConfig, lggr logger.Logger) *Delegate { + return &Delegate{ + legacyChains: legacyChains, + ks: ks, + db: db, + cfg: cfg, + lggr: lggr, + } } func (d *Delegate) JobType() job.Type { @@ -47,7 +56,7 @@ func (d *Delegate) ServicesForSpec(spec job.Job) (services []job.ServiceCtx, err if err2 != nil { return nil, errors.Wrap(err2, "unmarshal gateway config") } - handlerFactory := NewHandlerFactory(d.legacyChains, d.lggr) + handlerFactory := NewHandlerFactory(d.legacyChains, d.db, d.cfg, d.lggr) gateway, err := NewGatewayFromConfig(&gatewayConfig, handlerFactory, d.lggr) if err != nil { return nil, err diff --git a/core/services/gateway/gateway_test.go b/core/services/gateway/gateway_test.go index 74d689fffe1..1c31a643c80 100644 --- a/core/services/gateway/gateway_test.go +++ b/core/services/gateway/gateway_test.go @@ -57,7 +57,7 @@ Address = "0x0001020304050607080900010203040506070809" `) lggr := logger.TestLogger(t) - _, err := gateway.NewGatewayFromConfig(parseTOMLConfig(t, tomlConfig), gateway.NewHandlerFactory(nil, lggr), lggr) + _, err := gateway.NewGatewayFromConfig(parseTOMLConfig(t, tomlConfig), gateway.NewHandlerFactory(nil, nil, nil, lggr), lggr) require.NoError(t, err) } @@ -75,7 +75,7 @@ HandlerName = "dummy" `) lggr := logger.TestLogger(t) - _, err := gateway.NewGatewayFromConfig(parseTOMLConfig(t, tomlConfig), gateway.NewHandlerFactory(nil, lggr), lggr) + _, err := gateway.NewGatewayFromConfig(parseTOMLConfig(t, tomlConfig), gateway.NewHandlerFactory(nil, nil, nil, lggr), lggr) require.Error(t, err) } @@ -89,7 +89,7 @@ HandlerName = "no_such_handler" `) lggr := logger.TestLogger(t) - _, err := gateway.NewGatewayFromConfig(parseTOMLConfig(t, tomlConfig), gateway.NewHandlerFactory(nil, lggr), lggr) + _, err := gateway.NewGatewayFromConfig(parseTOMLConfig(t, tomlConfig), gateway.NewHandlerFactory(nil, nil, nil, lggr), lggr) require.Error(t, err) } @@ -103,7 +103,7 @@ SomeOtherField = "abcd" `) lggr := logger.TestLogger(t) - _, err := gateway.NewGatewayFromConfig(parseTOMLConfig(t, tomlConfig), gateway.NewHandlerFactory(nil, lggr), lggr) + _, err := gateway.NewGatewayFromConfig(parseTOMLConfig(t, tomlConfig), gateway.NewHandlerFactory(nil, nil, nil, lggr), lggr) require.Error(t, err) } @@ -121,7 +121,7 @@ Address = "0xnot_an_address" `) lggr := logger.TestLogger(t) - _, err := gateway.NewGatewayFromConfig(parseTOMLConfig(t, tomlConfig), gateway.NewHandlerFactory(nil, lggr), lggr) + _, err := gateway.NewGatewayFromConfig(parseTOMLConfig(t, tomlConfig), gateway.NewHandlerFactory(nil, nil, nil, lggr), lggr) require.Error(t, err) } @@ -129,7 +129,7 @@ func TestGateway_CleanStartAndClose(t *testing.T) { t.Parallel() lggr := logger.TestLogger(t) - gateway, err := gateway.NewGatewayFromConfig(parseTOMLConfig(t, buildConfig("")), gateway.NewHandlerFactory(nil, lggr), lggr) + gateway, err := gateway.NewGatewayFromConfig(parseTOMLConfig(t, buildConfig("")), gateway.NewHandlerFactory(nil, nil, nil, lggr), lggr) require.NoError(t, err) servicetest.Run(t, gateway) } diff --git a/core/services/gateway/handler_factory.go b/core/services/gateway/handler_factory.go index 368bc64c872..8ccae8c7c4b 100644 --- a/core/services/gateway/handler_factory.go +++ b/core/services/gateway/handler_factory.go @@ -4,11 +4,14 @@ import ( "encoding/json" "fmt" + "github.com/jmoiron/sqlx" + "github.com/smartcontractkit/chainlink/v2/core/chains/legacyevm" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/gateway/config" "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers" "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/functions" + "github.com/smartcontractkit/chainlink/v2/core/services/pg" ) const ( @@ -18,19 +21,21 @@ const ( type handlerFactory struct { legacyChains legacyevm.LegacyChainContainer + db *sqlx.DB + cfg pg.QConfig lggr logger.Logger } var _ HandlerFactory = (*handlerFactory)(nil) -func NewHandlerFactory(legacyChains legacyevm.LegacyChainContainer, lggr logger.Logger) HandlerFactory { - return &handlerFactory{legacyChains, lggr} +func NewHandlerFactory(legacyChains legacyevm.LegacyChainContainer, db *sqlx.DB, cfg pg.QConfig, lggr logger.Logger) HandlerFactory { + return &handlerFactory{legacyChains, db, cfg, lggr} } func (hf *handlerFactory) NewHandler(handlerType HandlerType, handlerConfig json.RawMessage, donConfig *config.DONConfig, don handlers.DON) (handlers.Handler, error) { switch handlerType { case FunctionsHandlerType: - return functions.NewFunctionsHandlerFromConfig(handlerConfig, donConfig, don, hf.legacyChains, hf.lggr) + return functions.NewFunctionsHandlerFromConfig(handlerConfig, donConfig, don, hf.legacyChains, hf.db, hf.cfg, hf.lggr) case DummyHandlerType: return handlers.NewDummyHandler(donConfig, don, hf.lggr) default: diff --git a/core/services/gateway/handlers/functions/handler.functions.go b/core/services/gateway/handlers/functions/handler.functions.go index 5277f9789d6..2872c72f761 100644 --- a/core/services/gateway/handlers/functions/handler.functions.go +++ b/core/services/gateway/handlers/functions/handler.functions.go @@ -10,6 +10,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/jmoiron/sqlx" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "go.uber.org/multierr" @@ -22,6 +23,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/gateway/config" "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers" hc "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/common" + "github.com/smartcontractkit/chainlink/v2/core/services/pg" ) var ( @@ -96,7 +98,7 @@ type PendingRequest struct { var _ handlers.Handler = (*functionsHandler)(nil) -func NewFunctionsHandlerFromConfig(handlerConfig json.RawMessage, donConfig *config.DONConfig, don handlers.DON, legacyChains legacyevm.LegacyChainContainer, lggr logger.Logger) (handlers.Handler, error) { +func NewFunctionsHandlerFromConfig(handlerConfig json.RawMessage, donConfig *config.DONConfig, don handlers.DON, legacyChains legacyevm.LegacyChainContainer, db *sqlx.DB, qcfg pg.QConfig, lggr logger.Logger) (handlers.Handler, error) { var cfg FunctionsHandlerConfig err := json.Unmarshal(handlerConfig, &cfg) if err != nil { @@ -133,7 +135,13 @@ func NewFunctionsHandlerFromConfig(handlerConfig json.RawMessage, donConfig *con if err2 != nil { return nil, err2 } - subscriptions, err2 = NewOnchainSubscriptions(chain.Client(), *cfg.OnchainSubscriptions, lggr) + + orm, err2 := NewORM(db, lggr, qcfg, cfg.OnchainSubscriptions.ContractAddress) + if err2 != nil { + return nil, err2 + } + + subscriptions, err2 = NewOnchainSubscriptions(chain.Client(), *cfg.OnchainSubscriptions, orm, lggr) if err2 != nil { return nil, err2 } diff --git a/core/services/gateway/handlers/functions/handler.functions_test.go b/core/services/gateway/handlers/functions/handler.functions_test.go index 1d6dd109625..7e055815c88 100644 --- a/core/services/gateway/handlers/functions/handler.functions_test.go +++ b/core/services/gateway/handlers/functions/handler.functions_test.go @@ -83,7 +83,7 @@ func sendNodeReponses(t *testing.T, handler handlers.Handler, userRequestMsg api func TestFunctionsHandler_Minimal(t *testing.T) { t.Parallel() - handler, err := functions.NewFunctionsHandlerFromConfig(json.RawMessage("{}"), &config.DONConfig{}, nil, nil, logger.TestLogger(t)) + handler, err := functions.NewFunctionsHandlerFromConfig(json.RawMessage("{}"), &config.DONConfig{}, nil, nil, nil, nil, logger.TestLogger(t)) require.NoError(t, err) // empty message should always error out @@ -95,7 +95,7 @@ func TestFunctionsHandler_Minimal(t *testing.T) { func TestFunctionsHandler_CleanStartAndClose(t *testing.T) { t.Parallel() - handler, err := functions.NewFunctionsHandlerFromConfig(json.RawMessage("{}"), &config.DONConfig{}, nil, nil, logger.TestLogger(t)) + handler, err := functions.NewFunctionsHandlerFromConfig(json.RawMessage("{}"), &config.DONConfig{}, nil, nil, nil, nil, logger.TestLogger(t)) require.NoError(t, err) servicetest.Run(t, handler) diff --git a/core/services/gateway/handlers/functions/mocks/orm.go b/core/services/gateway/handlers/functions/mocks/orm.go new file mode 100644 index 00000000000..110675c8b55 --- /dev/null +++ b/core/services/gateway/handlers/functions/mocks/orm.go @@ -0,0 +1,91 @@ +// Code generated by mockery v2.38.0. DO NOT EDIT. + +package mocks + +import ( + functions "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/functions" + mock "github.com/stretchr/testify/mock" + + pg "github.com/smartcontractkit/chainlink/v2/core/services/pg" +) + +// ORM is an autogenerated mock type for the ORM type +type ORM struct { + mock.Mock +} + +// GetSubscriptions provides a mock function with given fields: offset, limit, qopts +func (_m *ORM) GetSubscriptions(offset uint, limit uint, qopts ...pg.QOpt) ([]functions.CachedSubscription, error) { + _va := make([]interface{}, len(qopts)) + for _i := range qopts { + _va[_i] = qopts[_i] + } + var _ca []interface{} + _ca = append(_ca, offset, limit) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + if len(ret) == 0 { + panic("no return value specified for GetSubscriptions") + } + + var r0 []functions.CachedSubscription + var r1 error + if rf, ok := ret.Get(0).(func(uint, uint, ...pg.QOpt) ([]functions.CachedSubscription, error)); ok { + return rf(offset, limit, qopts...) + } + if rf, ok := ret.Get(0).(func(uint, uint, ...pg.QOpt) []functions.CachedSubscription); ok { + r0 = rf(offset, limit, qopts...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]functions.CachedSubscription) + } + } + + if rf, ok := ret.Get(1).(func(uint, uint, ...pg.QOpt) error); ok { + r1 = rf(offset, limit, qopts...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// UpsertSubscription provides a mock function with given fields: subscription, qopts +func (_m *ORM) UpsertSubscription(subscription functions.CachedSubscription, qopts ...pg.QOpt) error { + _va := make([]interface{}, len(qopts)) + for _i := range qopts { + _va[_i] = qopts[_i] + } + var _ca []interface{} + _ca = append(_ca, subscription) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + if len(ret) == 0 { + panic("no return value specified for UpsertSubscription") + } + + var r0 error + if rf, ok := ret.Get(0).(func(functions.CachedSubscription, ...pg.QOpt) error); ok { + r0 = rf(subscription, qopts...) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// NewORM creates a new instance of ORM. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewORM(t interface { + mock.TestingT + Cleanup(func()) +}) *ORM { + mock := &ORM{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/core/services/gateway/handlers/functions/orm.go b/core/services/gateway/handlers/functions/orm.go new file mode 100644 index 00000000000..b7ec8d865d1 --- /dev/null +++ b/core/services/gateway/handlers/functions/orm.go @@ -0,0 +1,132 @@ +package functions + +import ( + "fmt" + "math/big" + + "github.com/ethereum/go-ethereum/common" + "github.com/lib/pq" + "github.com/pkg/errors" + + "github.com/jmoiron/sqlx" + + "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/functions/generated/functions_router" + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/pg" +) + +//go:generate mockery --quiet --name ORM --output ./mocks/ --case=underscore + +type ORM interface { + GetSubscriptions(offset, limit uint, qopts ...pg.QOpt) ([]CachedSubscription, error) + UpsertSubscription(subscription CachedSubscription, qopts ...pg.QOpt) error +} + +type orm struct { + q pg.Q + routerContractAddress common.Address +} + +var _ ORM = (*orm)(nil) +var ( + ErrInvalidParameters = errors.New("invalid parameters provided to create a subscription cache ORM") +) + +const ( + tableName = "functions_subscriptions" +) + +type cachedSubscriptionRow struct { + SubscriptionID uint64 + Owner common.Address + Balance int64 + BlockedBalance int64 + ProposedOwner common.Address + Consumers pq.ByteaArray + Flags []uint8 + RouterContractAddress common.Address +} + +func NewORM(db *sqlx.DB, lggr logger.Logger, cfg pg.QConfig, routerContractAddress common.Address) (ORM, error) { + if db == nil || cfg == nil || lggr == nil || routerContractAddress == (common.Address{}) { + return nil, ErrInvalidParameters + } + + return &orm{ + q: pg.NewQ(db, lggr, cfg), + routerContractAddress: routerContractAddress, + }, nil +} + +func (o *orm) GetSubscriptions(offset, limit uint, qopts ...pg.QOpt) ([]CachedSubscription, error) { + var cacheSubscriptions []CachedSubscription + var cacheSubscriptionRows []cachedSubscriptionRow + stmt := fmt.Sprintf(` + SELECT subscription_id, owner, balance, blocked_balance, proposed_owner, consumers, flags, router_contract_address + FROM %s + WHERE router_contract_address = $1 + ORDER BY subscription_id ASC + OFFSET $2 + LIMIT $3; + `, tableName) + err := o.q.WithOpts(qopts...).Select(&cacheSubscriptionRows, stmt, o.routerContractAddress, offset, limit) + if err != nil { + return cacheSubscriptions, err + } + + for _, cs := range cacheSubscriptionRows { + cacheSubscriptions = append(cacheSubscriptions, cs.encode()) + } + + return cacheSubscriptions, nil +} + +// UpsertSubscription will update if a subscription exists or create if it does not. +// In case a subscription gets deleted we will update it with an owner address equal to 0x0. +func (o *orm) UpsertSubscription(subscription CachedSubscription, qopts ...pg.QOpt) error { + stmt := fmt.Sprintf(` + INSERT INTO %s (subscription_id, owner, balance, blocked_balance, proposed_owner, consumers, flags, router_contract_address) + VALUES ($1,$2,$3,$4,$5,$6,$7,$8) ON CONFLICT (subscription_id, router_contract_address) DO UPDATE + SET owner=$2, balance=$3, blocked_balance=$4, proposed_owner=$5, consumers=$6, flags=$7, router_contract_address=$8;`, tableName) + + if subscription.Balance == nil { + subscription.Balance = big.NewInt(0) + } + + if subscription.BlockedBalance == nil { + subscription.BlockedBalance = big.NewInt(0) + } + + _, err := o.q.WithOpts(qopts...).Exec( + stmt, + subscription.SubscriptionID, + subscription.Owner, + subscription.Balance.Int64(), + subscription.BlockedBalance.Int64(), + subscription.ProposedOwner, + subscription.Consumers, + subscription.Flags[:], + o.routerContractAddress, + ) + + return err +} + +func (cs *cachedSubscriptionRow) encode() CachedSubscription { + consumers := make([]common.Address, 0) + for _, csc := range cs.Consumers { + consumers = append(consumers, common.BytesToAddress(csc)) + } + + return CachedSubscription{ + SubscriptionID: cs.SubscriptionID, + IFunctionsSubscriptionsSubscription: functions_router.IFunctionsSubscriptionsSubscription{ + Balance: big.NewInt(cs.Balance), + Owner: cs.Owner, + BlockedBalance: big.NewInt(cs.BlockedBalance), + ProposedOwner: cs.ProposedOwner, + Consumers: consumers, + Flags: [32]byte(cs.Flags), + }, + } +} diff --git a/core/services/gateway/handlers/functions/orm_test.go b/core/services/gateway/handlers/functions/orm_test.go new file mode 100644 index 00000000000..37ec24ea0b1 --- /dev/null +++ b/core/services/gateway/handlers/functions/orm_test.go @@ -0,0 +1,246 @@ +package functions_test + +import ( + "math/big" + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/assets" + "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/functions/generated/functions_router" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/functions" +) + +var ( + defaultFlags = [32]byte{0x1, 0x2, 0x3} +) + +func setupORM(t *testing.T) (functions.ORM, error) { + t.Helper() + + var ( + db = pgtest.NewSqlxDB(t) + lggr = logger.TestLogger(t) + ) + + return functions.NewORM(db, lggr, pgtest.NewQConfig(true), testutils.NewAddress()) +} + +func createSubscriptions(t *testing.T, orm functions.ORM, amount int) []functions.CachedSubscription { + cachedSubscriptions := make([]functions.CachedSubscription, 0) + for i := amount; i > 0; i-- { + cs := functions.CachedSubscription{ + SubscriptionID: uint64(i), + IFunctionsSubscriptionsSubscription: functions_router.IFunctionsSubscriptionsSubscription{ + Balance: big.NewInt(10), + Owner: testutils.NewAddress(), + BlockedBalance: big.NewInt(20), + ProposedOwner: common.Address{}, + Consumers: []common.Address{}, + Flags: defaultFlags, + }, + } + cachedSubscriptions = append(cachedSubscriptions, cs) + err := orm.UpsertSubscription(cs) + require.NoError(t, err) + } + return cachedSubscriptions +} + +func TestORM_GetSubscriptions(t *testing.T) { + t.Parallel() + t.Run("fetch first page", func(t *testing.T) { + orm, err := setupORM(t) + require.NoError(t, err) + cachedSubscriptions := createSubscriptions(t, orm, 2) + results, err := orm.GetSubscriptions(0, 1) + require.NoError(t, err) + require.Equal(t, 1, len(results), "incorrect results length") + require.Equal(t, cachedSubscriptions[1], results[0]) + }) + + t.Run("fetch second page", func(t *testing.T) { + orm, err := setupORM(t) + require.NoError(t, err) + cachedSubscriptions := createSubscriptions(t, orm, 2) + results, err := orm.GetSubscriptions(1, 5) + require.NoError(t, err) + require.Equal(t, 1, len(results), "incorrect results length") + require.Equal(t, cachedSubscriptions[0], results[0]) + }) +} + +func TestORM_UpsertSubscription(t *testing.T) { + t.Parallel() + + t.Run("create a subscription", func(t *testing.T) { + orm, err := setupORM(t) + require.NoError(t, err) + expected := functions.CachedSubscription{ + SubscriptionID: uint64(1), + IFunctionsSubscriptionsSubscription: functions_router.IFunctionsSubscriptionsSubscription{ + Balance: big.NewInt(10), + Owner: testutils.NewAddress(), + BlockedBalance: big.NewInt(20), + ProposedOwner: common.Address{}, + Consumers: []common.Address{}, + Flags: defaultFlags, + }, + } + err = orm.UpsertSubscription(expected) + require.NoError(t, err) + + results, err := orm.GetSubscriptions(0, 1) + require.NoError(t, err) + require.Equal(t, 1, len(results), "incorrect results length") + require.Equal(t, expected, results[0]) + }) + + t.Run("update a subscription", func(t *testing.T) { + orm, err := setupORM(t) + require.NoError(t, err) + + expectedUpdated := functions.CachedSubscription{ + SubscriptionID: uint64(1), + IFunctionsSubscriptionsSubscription: functions_router.IFunctionsSubscriptionsSubscription{ + Balance: big.NewInt(10), + Owner: testutils.NewAddress(), + BlockedBalance: big.NewInt(20), + ProposedOwner: common.Address{}, + Consumers: []common.Address{}, + Flags: defaultFlags, + }, + } + err = orm.UpsertSubscription(expectedUpdated) + require.NoError(t, err) + + expectedNotUpdated := functions.CachedSubscription{ + SubscriptionID: uint64(2), + IFunctionsSubscriptionsSubscription: functions_router.IFunctionsSubscriptionsSubscription{ + Balance: big.NewInt(10), + Owner: testutils.NewAddress(), + BlockedBalance: big.NewInt(20), + ProposedOwner: common.Address{}, + Consumers: []common.Address{}, + Flags: defaultFlags, + }, + } + err = orm.UpsertSubscription(expectedNotUpdated) + require.NoError(t, err) + + // update the balance value + expectedUpdated.Balance = big.NewInt(20) + err = orm.UpsertSubscription(expectedUpdated) + require.NoError(t, err) + + results, err := orm.GetSubscriptions(0, 5) + require.NoError(t, err) + require.Equal(t, 2, len(results), "incorrect results length") + require.Equal(t, expectedNotUpdated, results[1]) + require.Equal(t, expectedUpdated, results[0]) + }) + + t.Run("update a deleted subscription", func(t *testing.T) { + orm, err := setupORM(t) + require.NoError(t, err) + + subscription := functions.CachedSubscription{ + SubscriptionID: uint64(1), + IFunctionsSubscriptionsSubscription: functions_router.IFunctionsSubscriptionsSubscription{ + Balance: big.NewInt(10), + Owner: testutils.NewAddress(), + BlockedBalance: big.NewInt(20), + ProposedOwner: common.Address{}, + Consumers: []common.Address{}, + Flags: defaultFlags, + }, + } + err = orm.UpsertSubscription(subscription) + require.NoError(t, err) + + // empty subscription + subscription.IFunctionsSubscriptionsSubscription = functions_router.IFunctionsSubscriptionsSubscription{ + Balance: big.NewInt(0), + Owner: common.Address{}, + BlockedBalance: big.NewInt(0), + ProposedOwner: common.Address{}, + Consumers: []common.Address{}, + Flags: [32]byte{}, + } + + err = orm.UpsertSubscription(subscription) + require.NoError(t, err) + + results, err := orm.GetSubscriptions(0, 5) + require.NoError(t, err) + require.Equal(t, 1, len(results), "incorrect results length") + require.Equal(t, subscription, results[0]) + }) + + t.Run("create a subscription with same id but different router address", func(t *testing.T) { + var ( + db = pgtest.NewSqlxDB(t) + lggr = logger.TestLogger(t) + ) + + orm1, err := functions.NewORM(db, lggr, pgtest.NewQConfig(true), testutils.NewAddress()) + require.NoError(t, err) + orm2, err := functions.NewORM(db, lggr, pgtest.NewQConfig(true), testutils.NewAddress()) + require.NoError(t, err) + + subscription := functions.CachedSubscription{ + SubscriptionID: uint64(1), + IFunctionsSubscriptionsSubscription: functions_router.IFunctionsSubscriptionsSubscription{ + Balance: assets.Ether(10).ToInt(), + Owner: testutils.NewAddress(), + BlockedBalance: assets.Ether(20).ToInt(), + ProposedOwner: common.Address{}, + Consumers: []common.Address{}, + Flags: defaultFlags, + }, + } + + err = orm1.UpsertSubscription(subscription) + require.NoError(t, err) + + // should update the existing subscription + subscription.Balance = assets.Ether(12).ToInt() + err = orm1.UpsertSubscription(subscription) + require.NoError(t, err) + + results, err := orm1.GetSubscriptions(0, 10) + require.NoError(t, err) + require.Equal(t, 1, len(results), "incorrect results length") + + // should create a new subscription because it comes from a different router contract + err = orm2.UpsertSubscription(subscription) + require.NoError(t, err) + + results, err = orm1.GetSubscriptions(0, 10) + require.NoError(t, err) + require.Equal(t, 1, len(results), "incorrect results length") + + results, err = orm2.GetSubscriptions(0, 10) + require.NoError(t, err) + require.Equal(t, 1, len(results), "incorrect results length") + }) +} + +func Test_NewORM(t *testing.T) { + t.Run("OK-create_ORM", func(t *testing.T) { + _, err := functions.NewORM(pgtest.NewSqlxDB(t), logger.TestLogger(t), pgtest.NewQConfig(true), testutils.NewAddress()) + require.NoError(t, err) + }) + t.Run("NOK-create_ORM_with_nil_fields", func(t *testing.T) { + _, err := functions.NewORM(nil, nil, nil, common.Address{}) + require.Error(t, err) + }) + t.Run("NOK-create_ORM_with_empty_address", func(t *testing.T) { + _, err := functions.NewORM(pgtest.NewSqlxDB(t), logger.TestLogger(t), pgtest.NewQConfig(true), common.Address{}) + require.Error(t, err) + }) +} diff --git a/core/services/gateway/handlers/functions/subscriptions.go b/core/services/gateway/handlers/functions/subscriptions.go index ebffbbdd206..8bc9cf09e7b 100644 --- a/core/services/gateway/handlers/functions/subscriptions.go +++ b/core/services/gateway/handlers/functions/subscriptions.go @@ -19,12 +19,15 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/utils" ) +const defaultCacheBatchSize = 100 + type OnchainSubscriptionsConfig struct { ContractAddress common.Address `json:"contractAddress"` BlockConfirmations uint `json:"blockConfirmations"` UpdateFrequencySec uint `json:"updateFrequencySec"` UpdateTimeoutSec uint `json:"updateTimeoutSec"` UpdateRangeSize uint `json:"updateRangeSize"` + CacheBatchSize uint `json:"cacheBatchSize"` } // OnchainSubscriptions maintains a mirror of all subscriptions fetched from the blockchain (EVM-only). @@ -43,6 +46,7 @@ type onchainSubscriptions struct { config OnchainSubscriptionsConfig subscriptions UserSubscriptions + orm ORM client evmclient.Client router *functions_router.FunctionsRouter blockConfirmations *big.Int @@ -52,7 +56,7 @@ type onchainSubscriptions struct { stopCh services.StopChan } -func NewOnchainSubscriptions(client evmclient.Client, config OnchainSubscriptionsConfig, lggr logger.Logger) (OnchainSubscriptions, error) { +func NewOnchainSubscriptions(client evmclient.Client, config OnchainSubscriptionsConfig, orm ORM, lggr logger.Logger) (OnchainSubscriptions, error) { if client == nil { return nil, errors.New("client is nil") } @@ -63,9 +67,17 @@ func NewOnchainSubscriptions(client evmclient.Client, config OnchainSubscription if err != nil { return nil, fmt.Errorf("unexpected error during functions_router.NewFunctionsRouter: %s", err) } + + // if CacheBatchSize is not specified use the default value + if config.CacheBatchSize == 0 { + lggr.Info("CacheBatchSize not specified, using default size: ", defaultCacheBatchSize) + config.CacheBatchSize = defaultCacheBatchSize + } + return &onchainSubscriptions{ config: config, subscriptions: NewUserSubscriptions(), + orm: orm, client: client, router: router, blockConfirmations: big.NewInt(int64(config.BlockConfirmations)), @@ -87,6 +99,8 @@ func (s *onchainSubscriptions) Start(ctx context.Context) error { return errors.New("OnchainSubscriptionsConfig.UpdateRangeSize must be greater than 0") } + s.loadCachedSubscriptions() + s.closeWait.Add(1) go s.queryLoop() @@ -190,7 +204,15 @@ func (s *onchainSubscriptions) querySubscriptionsRange(ctx context.Context, bloc for i, subscription := range subscriptions { subscriptionId := start + uint64(i) subscription := subscription - s.subscriptions.UpdateSubscription(subscriptionId, &subscription) + updated := s.subscriptions.UpdateSubscription(subscriptionId, &subscription) + if updated { + if err = s.orm.UpsertSubscription(CachedSubscription{ + SubscriptionID: subscriptionId, + IFunctionsSubscriptionsSubscription: subscription, + }); err != nil { + s.lggr.Errorf("unexpected error updating subscription in the cache: %w", err) + } + } } return nil @@ -203,3 +225,30 @@ func (s *onchainSubscriptions) getSubscriptionsCount(ctx context.Context, blockN Context: ctx, }) } + +func (s *onchainSubscriptions) loadCachedSubscriptions() { + offset := uint(0) + for { + csBatch, err := s.orm.GetSubscriptions(offset, s.config.CacheBatchSize) + if err != nil { + break + } + + for _, cs := range csBatch { + _ = s.subscriptions.UpdateSubscription(cs.SubscriptionID, &functions_router.IFunctionsSubscriptionsSubscription{ + Balance: cs.Balance, + Owner: cs.Owner, + BlockedBalance: cs.BlockedBalance, + ProposedOwner: cs.ProposedOwner, + Consumers: cs.Consumers, + Flags: cs.Flags, + }) + } + s.lggr.Debugw("Loading cached subscriptions", "offset", offset, "batch_length", len(csBatch)) + + if len(csBatch) != int(s.config.CacheBatchSize) { + break + } + offset += s.config.CacheBatchSize + } +} diff --git a/core/services/gateway/handlers/functions/subscriptions_test.go b/core/services/gateway/handlers/functions/subscriptions_test.go index adbf637ad73..782dcc2332d 100644 --- a/core/services/gateway/handlers/functions/subscriptions_test.go +++ b/core/services/gateway/handlers/functions/subscriptions_test.go @@ -15,14 +15,17 @@ import ( "github.com/stretchr/testify/require" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client/mocks" + "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/functions/generated/functions_router" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/functions" + fmocks "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/functions/mocks" ) const ( validUser = "0x9ED925d8206a4f88a2f643b28B3035B315753Cd6" invalidUser = "0x6E2dc0F9DB014aE19888F539E59285D2Ea04244C" + cachedUser = "0x3E2dc0F9DB014aE19888F539E59285D2Ea04233G" ) func TestSubscriptions_OnePass(t *testing.T) { @@ -47,7 +50,10 @@ func TestSubscriptions_OnePass(t *testing.T) { UpdateTimeoutSec: 1, UpdateRangeSize: 3, } - subscriptions, err := functions.NewOnchainSubscriptions(client, config, logger.TestLogger(t)) + orm := fmocks.NewORM(t) + orm.On("GetSubscriptions", uint(0), uint(100)).Return([]functions.CachedSubscription{}, nil) + orm.On("UpsertSubscription", mock.Anything).Return(nil) + subscriptions, err := functions.NewOnchainSubscriptions(client, config, orm, logger.TestLogger(t)) require.NoError(t, err) err = subscriptions.Start(ctx) @@ -95,7 +101,10 @@ func TestSubscriptions_MultiPass(t *testing.T) { UpdateTimeoutSec: 1, UpdateRangeSize: 3, } - subscriptions, err := functions.NewOnchainSubscriptions(client, config, logger.TestLogger(t)) + orm := fmocks.NewORM(t) + orm.On("GetSubscriptions", uint(0), uint(100)).Return([]functions.CachedSubscription{}, nil) + orm.On("UpsertSubscription", mock.Anything).Return(nil) + subscriptions, err := functions.NewOnchainSubscriptions(client, config, orm, logger.TestLogger(t)) require.NoError(t, err) err = subscriptions.Start(ctx) @@ -108,3 +117,57 @@ func TestSubscriptions_MultiPass(t *testing.T) { return currentCycle.Load() == ncycles }, testutils.WaitTimeout(t), time.Second).Should(gomega.BeTrue()) } + +func TestSubscriptions_Cached(t *testing.T) { + getSubscriptionCount := hexutil.MustDecode("0x0000000000000000000000000000000000000000000000000000000000000003") + getSubscriptionsInRange := hexutil.MustDecode("0x00000000000000000000000000000000000000000000000000000000000000200000000000000000000000000000000000000000000000000000000000000003000000000000000000000000000000000000000000000000000000000000006000000000000000000000000000000000000000000000000000000000000001600000000000000000000000000000000000000000000000000000000000000240000000000000000000000000000000000000000000000000de0b6b3a76400000000000000000000000000000109e6e1b12098cc8f3a1e9719a817ec53ab9b35c000000000000000000000000000000000000000000000000000034e23f515cb0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000c000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001000000000000000000000000f5340f0968ee8b7dfd97e3327a6139273cc2c4fa000000000000000000000000000000000000000000000001158e460913d000000000000000000000000000009ed925d8206a4f88a2f643b28b3035b315753cd60000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000c0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001bc14b92364c75e20000000000000000000000009ed925d8206a4f88a2f643b28b3035b315753cd60000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000c0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000010000000000000000000000005439e5881a529f3ccbffc0e82d49f9db3950aefe") + + ctx := testutils.Context(t) + client := mocks.NewClient(t) + client.On("LatestBlockHeight", mock.Anything).Return(big.NewInt(42), nil) + client.On("CallContract", mock.Anything, ethereum.CallMsg{ // getSubscriptionCount + To: &common.Address{}, + Data: hexutil.MustDecode("0x66419970"), + }, mock.Anything).Return(getSubscriptionCount, nil) + client.On("CallContract", mock.Anything, ethereum.CallMsg{ // GetSubscriptionsInRange + To: &common.Address{}, + Data: hexutil.MustDecode("0xec2454e500000000000000000000000000000000000000000000000000000000000000010000000000000000000000000000000000000000000000000000000000000003"), + }, mock.Anything).Return(getSubscriptionsInRange, nil) + config := functions.OnchainSubscriptionsConfig{ + ContractAddress: common.Address{}, + BlockConfirmations: 1, + UpdateFrequencySec: 1, + UpdateTimeoutSec: 1, + UpdateRangeSize: 3, + CacheBatchSize: 1, + } + + expectedBalance := big.NewInt(5) + orm := fmocks.NewORM(t) + orm.On("GetSubscriptions", uint(0), uint(1)).Return([]functions.CachedSubscription{ + { + SubscriptionID: 1, + IFunctionsSubscriptionsSubscription: functions_router.IFunctionsSubscriptionsSubscription{ + Balance: expectedBalance, + Owner: common.HexToAddress(cachedUser), + BlockedBalance: big.NewInt(10), + }, + }, + }, nil) + orm.On("GetSubscriptions", uint(1), uint(1)).Return([]functions.CachedSubscription{}, nil) + orm.On("UpsertSubscription", mock.Anything).Return(nil) + + subscriptions, err := functions.NewOnchainSubscriptions(client, config, orm, logger.TestLogger(t)) + require.NoError(t, err) + + err = subscriptions.Start(ctx) + require.NoError(t, err) + t.Cleanup(func() { + assert.NoError(t, subscriptions.Close()) + }) + + gomega.NewGomegaWithT(t).Eventually(func() bool { + actualBalance, err := subscriptions.GetMaxUserBalance(common.HexToAddress(cachedUser)) + return err == nil && assert.Equal(t, expectedBalance, actualBalance) + }, testutils.WaitTimeout(t), time.Second).Should(gomega.BeTrue()) +} diff --git a/core/services/gateway/handlers/functions/user_subscriptions.go b/core/services/gateway/handlers/functions/user_subscriptions.go index ff3dd753995..7e63720da71 100644 --- a/core/services/gateway/handlers/functions/user_subscriptions.go +++ b/core/services/gateway/handlers/functions/user_subscriptions.go @@ -12,8 +12,10 @@ import ( // Methods are NOT thread-safe. +var ErrUserHasNoSubscription = errors.New("user has no subscriptions") + type UserSubscriptions interface { - UpdateSubscription(subscriptionId uint64, subscription *functions_router.IFunctionsSubscriptionsSubscription) + UpdateSubscription(subscriptionId uint64, subscription *functions_router.IFunctionsSubscriptionsSubscription) bool GetMaxUserBalance(user common.Address) (*big.Int, error) } @@ -29,29 +31,45 @@ func NewUserSubscriptions() UserSubscriptions { } } -func (us *userSubscriptions) UpdateSubscription(subscriptionId uint64, subscription *functions_router.IFunctionsSubscriptionsSubscription) { +// CachedSubscription is used to populate the user subscription maps from a persistent layer like postgres. +type CachedSubscription struct { + SubscriptionID uint64 + functions_router.IFunctionsSubscriptionsSubscription +} + +// UpdateSubscription updates a subscription returning false in case there was no variation to the current state. +func (us *userSubscriptions) UpdateSubscription(subscriptionId uint64, subscription *functions_router.IFunctionsSubscriptionsSubscription) bool { if subscription == nil || subscription.Owner == utils.ZeroAddress { user, ok := us.subscriptionIdsMap[subscriptionId] - if ok { - delete(us.userSubscriptionsMap[user], subscriptionId) - if len(us.userSubscriptionsMap[user]) == 0 { - delete(us.userSubscriptionsMap, user) - } + if !ok { + return false } + + delete(us.userSubscriptionsMap[user], subscriptionId) delete(us.subscriptionIdsMap, subscriptionId) - } else { - us.subscriptionIdsMap[subscriptionId] = subscription.Owner - if _, ok := us.userSubscriptionsMap[subscription.Owner]; !ok { - us.userSubscriptionsMap[subscription.Owner] = make(map[uint64]*functions_router.IFunctionsSubscriptionsSubscription) + if len(us.userSubscriptionsMap[user]) == 0 { + delete(us.userSubscriptionsMap, user) } - us.userSubscriptionsMap[subscription.Owner][subscriptionId] = subscription + return true + } + + // there is no change to the subscription + if us.userSubscriptionsMap[subscription.Owner][subscriptionId] == subscription { + return false + } + + us.subscriptionIdsMap[subscriptionId] = subscription.Owner + if _, ok := us.userSubscriptionsMap[subscription.Owner]; !ok { + us.userSubscriptionsMap[subscription.Owner] = make(map[uint64]*functions_router.IFunctionsSubscriptionsSubscription) } + us.userSubscriptionsMap[subscription.Owner][subscriptionId] = subscription + return true } func (us *userSubscriptions) GetMaxUserBalance(user common.Address) (*big.Int, error) { subs, exists := us.userSubscriptionsMap[user] if !exists { - return nil, errors.New("user has no subscriptions") + return nil, ErrUserHasNoSubscription } maxBalance := big.NewInt(0) diff --git a/core/services/gateway/handlers/functions/user_subscriptions_test.go b/core/services/gateway/handlers/functions/user_subscriptions_test.go index e86399eb609..53827e07e1b 100644 --- a/core/services/gateway/handlers/functions/user_subscriptions_test.go +++ b/core/services/gateway/handlers/functions/user_subscriptions_test.go @@ -28,18 +28,23 @@ func TestUserSubscriptions(t *testing.T) { user2Balance1 := big.NewInt(50) user2Balance2 := big.NewInt(70) - us.UpdateSubscription(5, &functions_router.IFunctionsSubscriptionsSubscription{ + updated := us.UpdateSubscription(5, &functions_router.IFunctionsSubscriptionsSubscription{ Owner: user1, Balance: user1Balance, }) - us.UpdateSubscription(3, &functions_router.IFunctionsSubscriptionsSubscription{ + assert.True(t, updated) + + updated = us.UpdateSubscription(3, &functions_router.IFunctionsSubscriptionsSubscription{ Owner: user2, Balance: user2Balance1, }) - us.UpdateSubscription(10, &functions_router.IFunctionsSubscriptionsSubscription{ + assert.True(t, updated) + + updated = us.UpdateSubscription(10, &functions_router.IFunctionsSubscriptionsSubscription{ Owner: user2, Balance: user2Balance2, }) + assert.True(t, updated) balance, err := us.GetMaxUserBalance(user1) assert.NoError(t, err) @@ -49,29 +54,96 @@ func TestUserSubscriptions(t *testing.T) { assert.NoError(t, err) assert.Zero(t, balance.Cmp(user2Balance2)) }) +} - t.Run("UpdateSubscription to remove subscriptions", func(t *testing.T) { +func TestUserSubscriptions_UpdateSubscription(t *testing.T) { + t.Parallel() + + t.Run("update balance", func(t *testing.T) { + us := functions.NewUserSubscriptions() + owner := utils.RandomAddress() + + updated := us.UpdateSubscription(1, &functions_router.IFunctionsSubscriptionsSubscription{ + Owner: owner, + Balance: big.NewInt(10), + }) + assert.True(t, updated) + + updated = us.UpdateSubscription(1, &functions_router.IFunctionsSubscriptionsSubscription{ + Owner: owner, + Balance: big.NewInt(100), + }) + assert.True(t, updated) + }) + + t.Run("updated proposed owner", func(t *testing.T) { + us := functions.NewUserSubscriptions() + owner := utils.RandomAddress() + + updated := us.UpdateSubscription(1, &functions_router.IFunctionsSubscriptionsSubscription{ + Owner: owner, + Balance: big.NewInt(10), + }) + assert.True(t, updated) + + updated = us.UpdateSubscription(1, &functions_router.IFunctionsSubscriptionsSubscription{ + Owner: owner, + Balance: big.NewInt(10), + ProposedOwner: utils.RandomAddress(), + }) + assert.True(t, updated) + }) + t.Run("remove subscriptions", func(t *testing.T) { + us := functions.NewUserSubscriptions() user2 := utils.RandomAddress() user2Balance1 := big.NewInt(50) user2Balance2 := big.NewInt(70) - us.UpdateSubscription(3, &functions_router.IFunctionsSubscriptionsSubscription{ + updated := us.UpdateSubscription(3, &functions_router.IFunctionsSubscriptionsSubscription{ Owner: user2, Balance: user2Balance1, }) - us.UpdateSubscription(10, &functions_router.IFunctionsSubscriptionsSubscription{ + assert.True(t, updated) + + updated = us.UpdateSubscription(10, &functions_router.IFunctionsSubscriptionsSubscription{ Owner: user2, Balance: user2Balance2, }) + assert.True(t, updated) - us.UpdateSubscription(3, &functions_router.IFunctionsSubscriptionsSubscription{ + updated = us.UpdateSubscription(3, &functions_router.IFunctionsSubscriptionsSubscription{ Owner: utils.ZeroAddress, }) - us.UpdateSubscription(10, &functions_router.IFunctionsSubscriptionsSubscription{ + assert.True(t, updated) + + updated = us.UpdateSubscription(10, &functions_router.IFunctionsSubscriptionsSubscription{ Owner: utils.ZeroAddress, }) + assert.True(t, updated) _, err := us.GetMaxUserBalance(user2) assert.Error(t, err) }) + + t.Run("remove a non existing subscription", func(t *testing.T) { + us := functions.NewUserSubscriptions() + updated := us.UpdateSubscription(3, &functions_router.IFunctionsSubscriptionsSubscription{ + Owner: utils.ZeroAddress, + }) + assert.False(t, updated) + }) + + t.Run("no actual changes", func(t *testing.T) { + us := functions.NewUserSubscriptions() + subscription := &functions_router.IFunctionsSubscriptionsSubscription{ + Owner: utils.RandomAddress(), + Balance: big.NewInt(25), + BlockedBalance: big.NewInt(25), + } + updated := us.UpdateSubscription(5, subscription) + assert.True(t, updated) + + updated = us.UpdateSubscription(5, subscription) + assert.False(t, updated) + }) } diff --git a/core/services/gateway/integration_tests/gateway_integration_test.go b/core/services/gateway/integration_tests/gateway_integration_test.go index 415a8f67cf8..9e4900efeee 100644 --- a/core/services/gateway/integration_tests/gateway_integration_test.go +++ b/core/services/gateway/integration_tests/gateway_integration_test.go @@ -118,7 +118,7 @@ func TestIntegration_Gateway_NoFullNodes_BasicConnectionAndMessage(t *testing.T) // Launch Gateway lggr := logger.TestLogger(t) gatewayConfig := fmt.Sprintf(gatewayConfigTemplate, nodeKeys.Address) - gateway, err := gateway.NewGatewayFromConfig(parseGatewayConfig(t, gatewayConfig), gateway.NewHandlerFactory(nil, lggr), lggr) + gateway, err := gateway.NewGatewayFromConfig(parseGatewayConfig(t, gatewayConfig), gateway.NewHandlerFactory(nil, nil, nil, lggr), lggr) require.NoError(t, err) servicetest.Run(t, gateway) userPort, nodePort := gateway.GetUserPort(), gateway.GetNodePort() diff --git a/core/services/ocr2/plugins/functions/plugin.go b/core/services/ocr2/plugins/functions/plugin.go index 7e2b15bdccf..c6cfa946aba 100644 --- a/core/services/ocr2/plugins/functions/plugin.go +++ b/core/services/ocr2/plugins/functions/plugin.go @@ -143,7 +143,11 @@ func NewFunctionsServices(functionsOracleArgs, thresholdOracleArgs, s4OracleArgs if err2 != nil { return nil, errors.Wrap(err, "failed to create a RateLimiter") } - subscriptions, err2 := gwFunctions.NewOnchainSubscriptions(conf.Chain.Client(), *pluginConfig.OnchainSubscriptions, conf.Logger) + gwFunctionsORM, err := gwFunctions.NewORM(conf.DB, conf.Logger, conf.QConfig, pluginConfig.OnchainSubscriptions.ContractAddress) + if err != nil { + return nil, errors.Wrap(err, "failed to create functions ORM") + } + subscriptions, err2 := gwFunctions.NewOnchainSubscriptions(conf.Chain.Client(), *pluginConfig.OnchainSubscriptions, gwFunctionsORM, conf.Logger) if err2 != nil { return nil, errors.Wrap(err, "failed to create a OnchainSubscriptions") } diff --git a/core/services/relay/evm/mercury/wsrpc/client_test.go b/core/services/relay/evm/mercury/wsrpc/client_test.go index 21accbf6d28..b4a3dae733d 100644 --- a/core/services/relay/evm/mercury/wsrpc/client_test.go +++ b/core/services/relay/evm/mercury/wsrpc/client_test.go @@ -120,97 +120,59 @@ func Test_Client_Transmit(t *testing.T) { func Test_Client_LatestReport(t *testing.T) { lggr := logger.TestLogger(t) ctx := testutils.Context(t) + cacheReads := 5 + + tests := []struct { + name string + ttl time.Duration + expectedCalls int + }{ + { + name: "with cache disabled", + ttl: 0, + expectedCalls: 5, + }, + { + name: "with cache enabled", + ttl: 1000 * time.Hour, //some large value that will never expire during a test + expectedCalls: 1, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + req := &pb.LatestReportRequest{} + + cacheSet := cache.NewCacheSet(lggr, cache.Config{LatestReportTTL: tt.ttl}) + + resp := &pb.LatestReportResponse{} + + var calls int + wsrpcClient := &mocks.MockWSRPCClient{ + LatestReportF: func(ctx context.Context, in *pb.LatestReportRequest) (*pb.LatestReportResponse, error) { + calls++ + assert.Equal(t, req, in) + return resp, nil + }, + } - t.Run("with nil cache", func(t *testing.T) { - req := &pb.LatestReportRequest{} - noopCacheSet := newNoopCacheSet() - resp := &pb.LatestReportResponse{} - - wsrpcClient := &mocks.MockWSRPCClient{ - LatestReportF: func(ctx context.Context, in *pb.LatestReportRequest) (*pb.LatestReportResponse, error) { - assert.Equal(t, req, in) - return resp, nil - }, - } - - conn := &mocks.MockConn{ - Ready: true, - } - c := newClient(lggr, csakey.KeyV2{}, nil, "", noopCacheSet) - c.conn = conn - c.rawClient = wsrpcClient - require.NoError(t, c.StartOnce("Mock WSRPC Client", func() error { return nil })) - - r, err := c.LatestReport(ctx, req) - - require.NoError(t, err) - assert.Equal(t, resp, r) - }) - - t.Run("with cache disabled", func(t *testing.T) { - req := &pb.LatestReportRequest{} - cacheSet := cache.NewCacheSet(lggr, cache.Config{LatestReportTTL: 0}) - resp := &pb.LatestReportResponse{} - - var calls int - wsrpcClient := &mocks.MockWSRPCClient{ - LatestReportF: func(ctx context.Context, in *pb.LatestReportRequest) (*pb.LatestReportResponse, error) { - calls++ - assert.Equal(t, req, in) - return resp, nil - }, - } - - conn := &mocks.MockConn{ - Ready: true, - } - c := newClient(lggr, csakey.KeyV2{}, nil, "", cacheSet) - c.conn = conn - c.rawClient = wsrpcClient - - servicetest.Run(t, cacheSet) - simulateStart(ctx, t, c) - - for i := 0; i < 5; i++ { - r, err := c.LatestReport(ctx, req) - - require.NoError(t, err) - assert.Equal(t, resp, r) - } - assert.Equal(t, 5, calls, "expected 5 calls to LatestReport but it was called %d times", calls) - }) - - t.Run("with caching", func(t *testing.T) { - req := &pb.LatestReportRequest{} - const neverExpireTTL = 1000 * time.Hour // some massive value that will never expire during a test - cacheSet := cache.NewCacheSet(lggr, cache.Config{LatestReportTTL: neverExpireTTL}) - resp := &pb.LatestReportResponse{} - - var calls int - wsrpcClient := &mocks.MockWSRPCClient{ - LatestReportF: func(ctx context.Context, in *pb.LatestReportRequest) (*pb.LatestReportResponse, error) { - calls++ - assert.Equal(t, req, in) - return resp, nil - }, - } - - conn := &mocks.MockConn{ - Ready: true, - } - c := newClient(lggr, csakey.KeyV2{}, nil, "", cacheSet) - c.conn = conn - c.rawClient = wsrpcClient + conn := &mocks.MockConn{ + Ready: true, + } + c := newClient(lggr, csakey.KeyV2{}, nil, "", cacheSet) + c.conn = conn + c.rawClient = wsrpcClient - servicetest.Run(t, cacheSet) - simulateStart(ctx, t, c) + servicetest.Run(t, cacheSet) + simulateStart(ctx, t, c) - for i := 0; i < 5; i++ { - r, err := c.LatestReport(ctx, req) + for i := 0; i < cacheReads; i++ { + r, err := c.LatestReport(ctx, req) - require.NoError(t, err) - assert.Equal(t, resp, r) - } - assert.Equal(t, 1, calls, "expected only 1 call to LatestReport but it was called %d times", calls) - }) + require.NoError(t, err) + assert.Equal(t, resp, r) + } + assert.Equal(t, tt.expectedCalls, calls, "expected %d calls to LatestReport but it was called %d times", tt.expectedCalls, calls) + }) + } } diff --git a/core/store/migrate/migrations/0215_functions_subscriptions.sql b/core/store/migrate/migrations/0215_functions_subscriptions.sql new file mode 100644 index 00000000000..c3859d42f63 --- /dev/null +++ b/core/store/migrate/migrations/0215_functions_subscriptions.sql @@ -0,0 +1,19 @@ +-- +goose Up +-- +goose StatementBegin +CREATE TABLE functions_subscriptions( + router_contract_address bytea, + subscription_id bigint, + owner bytea CHECK (octet_length(owner) = 20) NOT NULL, + balance bigint, + blocked_balance bigint, + proposed_owner bytea, + consumers bytea[], + flags bytea, + PRIMARY KEY(router_contract_address, subscription_id) +); +-- +goose StatementEnd + +-- +goose Down +-- +goose StatementBegin +DROP TABLE IF EXISTS functions_subscriptions; +-- +goose StatementEnd diff --git a/core/web/health_controller.go b/core/web/health_controller.go index c8489fd6325..7ab07291b58 100644 --- a/core/web/health_controller.go +++ b/core/web/health_controller.go @@ -76,7 +76,7 @@ func (hc *HealthController) Health(c *gin.Context) { healthy, errors := checker.IsHealthy() if !healthy { - status = http.StatusServiceUnavailable + status = http.StatusMultiStatus } c.Status(status) diff --git a/core/web/health_controller_test.go b/core/web/health_controller_test.go index ae40a66bca9..547186e33fb 100644 --- a/core/web/health_controller_test.go +++ b/core/web/health_controller_test.go @@ -62,7 +62,7 @@ func TestHealthController_Health_status(t *testing.T) { { name: "not ready", ready: false, - status: http.StatusServiceUnavailable, + status: http.StatusMultiStatus, }, { name: "ready", @@ -118,7 +118,7 @@ func TestHealthController_Health_body(t *testing.T) { client := app.NewHTTPClient(nil) resp, cleanup := client.Get(tc.path, tc.headers) t.Cleanup(cleanup) - assert.Equal(t, http.StatusServiceUnavailable, resp.StatusCode) + assert.Equal(t, http.StatusMultiStatus, resp.StatusCode) body, err := io.ReadAll(resp.Body) require.NoError(t, err) if tc.expBody == bodyJSON { diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index f18fe5369af..36ee68412ca 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [dev] +### Added + +- `chainlink health` CLI command and HTML `/health` endpoint, to provide human-readable views of the underlying JSON health data. + ### Fixed - Fixed the encoding used for transactions when resending in batches diff --git a/docs/Mercury.md b/docs/Mercury.md new file mode 100644 index 00000000000..610605ddbc6 --- /dev/null +++ b/docs/Mercury.md @@ -0,0 +1,350 @@ +# Mercury Documentation + +## Contracts + +Use this tool to configure contracts: + +https://github.com/smartcontractkit/the-most-amazing-mercury-contract-configuration-tool + +TODO: updated process here @Austin Born + +[Reference contract](https://github.com/smartcontractkit/reference-data-directory/blob/master/ethereum-testnet-goerli-arbitrum-1/contracts/0x535051166466D159da8742167c9CA1eFe9e82613.json) + +[OCR2 config documentation](https://github.com/smartcontractkit/libocr/blob/master/offchainreporting2/internal/config/public_config.go) + +**🚨 Important config** + +`s` - transmission schedule. This should be set to the number of oracles on the feed - meaning that every oracle will attempt to transmit to the mercury server in the first stage of transmission. eg `[4]` if there are 4 node in the DON, excluding the bootstrap node. + +`f` - set this to `n//3` (where `//` denotes integer division), e.g. if you have 16 oracles, set `f` to 5. + +`deltaRound` - report generation frequency. This determines how frequently a new round should be started at most (if rounds take longer than this due to network latency, there will be fewer rounds per second than this parameter would suggest). `100ms` is a good starting point (10 rounds/s). + +`reportingPluginConfig.alphaAccept` - set this to `0`, because our mercury ContractTransmitter doesn't know the latest report that's been sent to the mercury server and we therefore always have a "pending" report which we compare against before accepting a report for transmission. + +`reportingPluginConfig.deltaC` - set this to `0` so every round will result in a report. + +
Example `verifier/<0xaddress>.json` + +```json +{ + "contractVersion": 1001, + "digests": { + "0x0006c67c0374ab0dcfa45c63b37df2ea8d16fb903c043caa98065033e9c15666": { + "feedId": "0x14e044f932bb959cc2aa8dc1ba110c09224e639aae00264c1ffc2a0830904a3c", + "proxyEnabled": true, + "status": "active" + } + }, + "feeds": { + "0x14e044f932bb959cc2aa8dc1ba110c09224e639aae00264c1ffc2a0830904a3c": { + "digests": [ + "0x0006c67c0374ab0dcfa45c63b37df2ea8d16fb903c043caa98065033e9c15666" + ], + "docs": { + "assetName": "Chainlink", + "feedCategory": "verified", + "feedType": "Crypto", + "hidden": true + }, + "externalAdapterRequestParams": { + "endpoint": "cryptolwba", + "from": "LINK", + "to": "USD" + }, + "feedId": "0x14e044f932bb959cc2aa8dc1ba110c09224e639aae00264c1ffc2a0830904a3c", + "latestConfig": { + "offchainConfig": { + "deltaGrace": "0", + "deltaProgress": "2s", + "deltaResend": "20s", + "deltaRound": "250ms", + "deltaStage": "60s", + "f": 1, + "maxDurationObservation": "250ms", + "maxDurationQuery": "0s", + "maxDurationReport": "250ms", + "maxDurationShouldAcceptFinalizedReport": "250ms", + "maxDurationShouldTransmitAcceptedReport": "250ms", + "rMax": 100, + "reportingPluginConfig": { + "alphaAcceptInfinite": false, + "alphaAcceptPpb": "0", + "alphaReportInfinite": false, + "alphaReportPpb": "0", + "deltaC": "0s" + }, + "s": [ + 4 + ] + }, + "offchainConfigVersion": 30, + "onchainConfig": { + "max": "99999999999999999999999999999", + "min": "1" + }, + "onchainConfigVersion": 1, + "oracles": [ + { + "api": [ + "coinmetrics", + "ncfx", + "tiingo-test" + ], + "operator": "clc-ocr-mercury-arbitrum-goerli-nodes-0" + }, + { + "api": [ + "coinmetrics", + "ncfx", + "tiingo-test" + ], + "operator": "clc-ocr-mercury-arbitrum-goerli-nodes-1" + }, + { + "api": [ + "coinmetrics", + "ncfx", + "tiingo-test" + ], + "operator": "clc-ocr-mercury-arbitrum-goerli-nodes-2" + }, + { + "api": [ + "coinmetrics", + "ncfx", + "tiingo-test" + ], + "operator": "clc-ocr-mercury-arbitrum-goerli-nodes-3" + } + ] + }, + "marketing": { + "category": "crypto", + "history": true, + "pair": [ + "LINK", + "USD" + ], + "path": "link-usd-verifier" + }, + "name": "LINK/USD-RefPricePlus-ArbitrumGoerli-002", + "reportFields": { + "ask": { + "decimals": 8, + "maxSubmissionValue": "99999999999999999999999999999", + "minSubmissionValue": "1", + "resultPath": "data,ask" + }, + "bid": { + "decimals": 8, + "maxSubmissionValue": "99999999999999999999999999999", + "minSubmissionValue": "1", + "resultPath": "data,bid" + }, + "median": { + "decimals": 8, + "maxSubmissionValue": "99999999999999999999999999999", + "minSubmissionValue": "1", + "resultPath": "data,mid" + } + }, + "status": "testing" + } + }, + "name": "Mercury v0.2 - Production Testnet Verifier (v1.0.0)", + "status": "testing" +} +``` +
+ +## Jobs + +### Bootstrap + +**🚨 Important config** + +`relayConfig.chainID` - target chain id. (the chain we pull block numbers from) + +`contractID` - the contract address of the verifier contract. + +
Example bootstrap TOML + +```toml +type = "bootstrap" +relay = "evm" +schemaVersion = 1 +name = "$feed_name" +contractID = "$verifier_contract_address" +feedID = "$feed_id" # IMPORTANT - DON'T FORGET THIS OR IT WON'T WORK +contractConfigTrackerPollInterval = "15s" + +[relayConfig] +chainID = $evm_chain_id +fromBlock = $from_block +``` +
+ +### OCR2 + +
Example OCR2 Mercury TOML + +```toml +type = "offchainreporting2" +schemaVersion = 1 +name = "$feed_name" +forwardingAllowed = false +maxTaskDuration = "1s" +contractID = "$verifier_contract_address" +feedID = "$feed_id" +contractConfigTrackerPollInterval = "15s" +ocrKeyBundleID = "$key_bundle_id" +p2pv2Bootstrappers = [ + "$bootstrapper_address>" +] +relay = "evm" +pluginType = "mercury" +transmitterID = "$csa_public_key" + +observationSource = """ + // ncfx + ds1_payload [type=bridge name="ncfx" timeout="50ms" requestData="{\\"data\\":{\\"endpoint\\":\\"crypto-lwba\\",\\"from\\":\\"ETH\\",\\"to\\":\\"USD\\"}}"]; + ds1_median [type=jsonparse path="data,mid"]; + ds1_bid [type=jsonparse path="data,bid"]; + ds1_ask [type=jsonparse path="data,ask"]; + + ds1_median_multiply [type=multiply times=100000000]; + ds1_bid_multiply [type=multiply times=100000000]; + ds1_ask_multiply [type=multiply times=100000000]; + + // tiingo + ds2_payload [type=bridge name="tiingo" timeout="50ms" requestData="{\\"data\\":{\\"endpoint\\":\\"crypto-lwba\\",\\"from\\":\\"ETH\\",\\"to\\":\\"USD\\"}}"]; + ds2_median [type=jsonparse path="data,mid"]; + ds2_bid [type=jsonparse path="data,bid"]; + ds2_ask [type=jsonparse path="data,ask"]; + + ds2_median_multiply [type=multiply times=100000000]; + ds2_bid_multiply [type=multiply times=100000000]; + ds2_ask_multiply [type=multiply times=100000000]; + + // coinmetrics + ds3_payload [type=bridge name="coinmetrics" timeout="50ms" requestData="{\\"data\\":{\\"endpoint\\":\\"crypto-lwba\\",\\"from\\":\\"ETH\\",\\"to\\":\\"USD\\"}}"]; + ds3_median [type=jsonparse path="data,mid"]; + ds3_bid [type=jsonparse path="data,bid"]; + ds3_ask [type=jsonparse path="data,ask"]; + + ds3_median_multiply [type=multiply times=100000000]; + ds3_bid_multiply [type=multiply times=100000000]; + ds3_ask_multiply [type=multiply times=100000000]; + + ds1_payload -> ds1_median -> ds1_median_multiply -> benchmark_price; + ds2_payload -> ds2_median -> ds2_median_multiply -> benchmark_price; + ds3_payload -> ds3_median -> ds3_median_multiply -> benchmark_price; + + benchmark_price [type=median allowedFaults=2 index=0]; + + ds1_payload -> ds1_bid -> ds1_bid_multiply -> bid_price; + ds2_payload -> ds2_bid -> ds2_bid_multiply -> bid_price; + ds3_payload -> ds3_bid -> ds3_bid_multiply -> bid_price; + + bid_price [type=median allowedFaults=2 index=1]; + + ds1_payload -> ds1_ask -> ds1_ask_multiply -> ask_price; + ds2_payload -> ds2_ask -> ds2_ask_multiply -> ask_price; + ds3_payload -> ds3_ask -> ds3_ask_multiply -> ask_price; + + ask_price [type=median allowedFaults=2 index=2]; +""" + +[pluginConfig] +serverURL = "$mercury_server_url" +serverPubKey = "$mercury_server_public_key" + +[relayConfig] +chainID = $evm_chain_id +fromBlock = $from_block +``` +
+ +## Nodes + +**🚨 Important config** + +`OCR2.Enabled` - must be `true` - Mercury uses OCR2. + +`P2P.V2.Enabled` - required in order for OCR2 to work. + +`Feature.LogPoller` - required in order for OCR2 to work. You will get fatal errors if not set. + +`JobPipeline.MaxSuccessfulRuns` - set to `0` to disable saving pipeline runs to reduce load on the db. Obviously this means you won’t see anything in the UI. + +`TelemetryIngress.SendInterval` - How frequently to send telemetry batches. Mercury generates a lot of telemetry data due to the throughput. `100ms` has been tested for a single feed with 5 nodes - this will need to be monitored (along with relevant config) as we add more feeds to a node. + +`Database` - **must** increase connection limits above the standard defaults + +
Example node config TOML + +```toml +RootDir = '$ROOT_DIR' + +[JobPipeline] +MaxSuccessfulRuns = 0 # you may set to some small value like '10' or similar if you like looking at job runs in the UI + +[Feature] +UICSAKeys = true # required +LogPoller = true # required + +[Log] +Level = 'info' # this should be 'debug' for chainlink internal deployments, nops may use 'info' to reduce log volume + +[Log.File] +< standard values > + +[WebServer] +< standard values > + +[WebServer.TLS] +< standard values > + +[[EVM]] +ChainID = '42161' # change as needed based on target chain + +[OCR] +Enabled = false # turn off OCR 1 + +[P2P] +TraceLogging = false # this should be 'true' for chainlink internal deployments, we may ask nops to set this to true for debugging +PeerID = '$PEERID' + +[P2P.V2] +Enabled = true # required +DefaultBootstrappers = < mercury bootstrap nodes > # Note that this should ideally be set in the job spec, this is just a fallback +# Make sure these IPs are properly configured in the firewall. May not be necessary for internal nodes +AnnounceAddresses = ['$EXTERNAL_IP:$EXTERNAL_PORT'] # Use whichever port you like, pls randomize, MAKE SURE ITS CONFIGURED IN THE FIREWALL +ListenAddresses = ['0.0.0.0:$INTERNAL_PORT'] # Use whichever port you like, pls randomize, MAKE SURE ITS CONFIGURED IN THE FIREWALL + +[OCR2] +Enabled = true # required +KeyBundleID = '$KEY_BUNDLE_ID' # Note that this should ideally be set in the job spec, this is just a fallback +CaptureEATelemetry = true + +[TelemetryIngress] +UniConn = false +SendInterval = '250ms' +BufferSize = 300 +MaxBatchSize = 100 + +[[TelemetryIngress.Endpoints]] +Network = 'EVM' +ChainID = '42161' # change as needed based on target chain +URL = '$TELEMETRY_ENDPOINT_URL' # Provided by Chainlink Labs RSTP team +ServerPubKey = '$TELEMETRY_PUB_KEY' # Provided by Chainlink Labs RSTP team + +[Database] +MaxIdleConns = 100 # should equal or greater than total number of mercury jobs +MaxOpenConns = 400 # caution! ensure postgres is configured to support this + +[[EVM.Nodes]] +< put RPC nodes here > +``` +
diff --git a/integration-tests/Makefile b/integration-tests/Makefile index 5e34b059b68..d1e9bdd89ef 100644 --- a/integration-tests/Makefile +++ b/integration-tests/Makefile @@ -74,12 +74,12 @@ build_test_image: #Build a chainlink docker image for local testing and push to k3d registry .PHONY: build_push_docker_image build_push_docker_image: - docker build -f ../core/chainlink.Dockerfile --build-arg COMMIT_SHA=$(git rev-parse HEAD) --build-arg CHAINLINK_USER=chainlink -t localhost:5000/chainlink:develop ../ ; docker push localhost:5000/chainlink:develop + docker build -f ../core/chainlink.Dockerfile --build-arg COMMIT_SHA=$(git rev-parse HEAD) --build-arg CHAINLINK_USER=chainlink -t 127.0.0.1:5000/chainlink:develop ../ ; docker push 127.0.0.1:5000/chainlink:develop #Build a chainlink docker image in plugin mode for local testing and push to k3d registry .PHONY: build_push_plugin_docker_image build_push_plugin_docker_image: - docker build -f ../plugins/chainlink.Dockerfile --build-arg COMMIT_SHA=$(git rev-parse HEAD) --build-arg CHAINLINK_USER=chainlink -t localhost:5000/chainlink:develop ../ ; docker push localhost:5000/chainlink:develop + docker build -f ../plugins/chainlink.Dockerfile --build-arg COMMIT_SHA=$(git rev-parse HEAD) --build-arg CHAINLINK_USER=chainlink -t 127.0.0.1:5000/chainlink:develop ../ ; docker push 127.0.0.1:5000/chainlink:develop # Spins up containers needed to collect traces for local testing .PHONY: run_tracing @@ -197,7 +197,7 @@ build_docker_image: # example usage: make build_docker_image image=chainlink tag=latest .PHONY: build_plugin_docker_image build_plugin_docker_image: - docker build -f ../plugins/chainlink.Dockerfile --build-arg COMMIT_SHA=$(git rev-parse HEAD) --build-arg CHAINLINK_USER=chainlink -t localhost:5000/chainlink:develop ../ + docker build -f ../plugins/chainlink.Dockerfile --build-arg COMMIT_SHA=$(git rev-parse HEAD) --build-arg CHAINLINK_USER=chainlink -t 127.0.0.1:5000/chainlink:develop ../ # image: the name for the chainlink image being built, example: image=chainlink # tag: the tag for the chainlink image being built, example: tag=latest diff --git a/integration-tests/actions/ocr_helpers.go b/integration-tests/actions/ocr_helpers.go index 17e536ec839..dd0e6606e43 100644 --- a/integration-tests/actions/ocr_helpers.go +++ b/integration-tests/actions/ocr_helpers.go @@ -343,6 +343,25 @@ func StartNewRound( return nil } +// WatchNewRound watches for a new OCR round, similarly to StartNewRound, but it does not explicitly request a new +// round from the contract, as this can cause some odd behavior in some cases +func WatchNewRound( + roundNumber int64, + ocrInstances []contracts.OffchainAggregator, + client blockchain.EVMClient, + logger zerolog.Logger, +) error { + for i := 0; i < len(ocrInstances); i++ { + ocrRound := contracts.NewOffchainAggregatorRoundConfirmer(ocrInstances[i], big.NewInt(roundNumber), client.GetNetworkConfig().Timeout.Duration, logger) + client.AddHeaderEventSubscription(ocrInstances[i].Address(), ocrRound) + err := client.WaitForEvents() + if err != nil { + return fmt.Errorf("failed to wait for event subscriptions of OCR instance %d: %w", i+1, err) + } + } + return nil +} + // SetAdapterResponse sets a single adapter response that correlates with an ocr contract and a chainlink node func SetAdapterResponse( response int, diff --git a/integration-tests/docker/test_env/cl_node.go b/integration-tests/docker/test_env/cl_node.go index d7228b1ce8f..c28959dadc9 100644 --- a/integration-tests/docker/test_env/cl_node.go +++ b/integration-tests/docker/test_env/cl_node.go @@ -3,10 +3,12 @@ package test_env import ( "context" "fmt" + "io" "maps" "math/big" "net/url" "os" + "regexp" "strings" "testing" "time" @@ -15,6 +17,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/google/uuid" "github.com/pelletier/go-toml/v2" + "github.com/pkg/errors" "github.com/rs/zerolog" "github.com/rs/zerolog/log" tc "github.com/testcontainers/testcontainers-go" @@ -361,6 +364,28 @@ func (n *ClNode) StartContainer() error { return nil } +func (n *ClNode) ExecGetVersion() (string, error) { + cmd := []string{"chainlink", "--version"} + _, output, err := n.Container.Exec(context.Background(), cmd) + if err != nil { + return "", errors.Wrapf(err, "could not execute cmd %s", cmd) + } + outputBytes, err := io.ReadAll(output) + if err != nil { + return "", err + } + outputString := strings.TrimSpace(string(outputBytes)) + + // Find version in cmd output + re := regexp.MustCompile("@(.*)") + matches := re.FindStringSubmatch(outputString) + + if len(matches) > 1 { + return matches[1], nil + } + return "", errors.Errorf("could not find chainlink version in command output '%'", output) +} + func (n *ClNode) getContainerRequest(secrets string) ( *tc.ContainerRequest, error) { configFile, err := os.CreateTemp("", "node_config") diff --git a/integration-tests/load/ocr/README.md b/integration-tests/load/ocr/README.md index 3c231b50278..61951ba700f 100644 --- a/integration-tests/load/ocr/README.md +++ b/integration-tests/load/ocr/README.md @@ -1,21 +1,12 @@ -# OCR Load tests +### OCR Load tests ## Setup - These tests can connect to any cluster create with [chainlink-cluster](../../../charts/chainlink-cluster/README.md) -<<<<<<< HEAD -Create your cluster - -```sh -kubectl create ns my-cluster -devspace use namespace my-cluster -======= Create your cluster, if you already have one just use `kubefwd` ``` kubectl create ns cl-cluster devspace use namespace cl-cluster ->>>>>>> 06656fac80999d1539e16951a54b87c6df13a9c7 devspace deploy sudo kubefwd svc -n cl-cluster ``` @@ -26,7 +17,7 @@ If you haven't changed anything in [devspace.yaml](../../../charts/chainlink-clu ## Usage -```sh +``` export LOKI_TOKEN=... export LOKI_URL=... @@ -34,4 +25,4 @@ go test -v -run TestOCRLoad go test -v -run TestOCRVolume ``` -Check test configuration [here](config.toml) +Check test configuration [here](config.toml) \ No newline at end of file diff --git a/integration-tests/scripts/buildTestMatrixList.sh b/integration-tests/scripts/buildTestMatrixList.sh index 7f058b5b659..f02362aa47a 100755 --- a/integration-tests/scripts/buildTestMatrixList.sh +++ b/integration-tests/scripts/buildTestMatrixList.sh @@ -40,24 +40,26 @@ jq -c '.tests[]' ${JSONFILE} | while read -r test; do effective_node_count=${node_count:-$NODE_COUNT} subTests=$(echo ${test} | jq -r '.run[]?.name // empty') output="" + + if [ $COUNTER -ne 1 ]; then + echo -n "," + fi # Loop through subtests, if any, and print in the desired format if [ -n "$subTests" ]; then + subTestString="" + subTestCounter=1 for subTest in $subTests; do - if [ $COUNTER -ne 1 ]; then - echo -n "," + if [ $subTestCounter -ne 1 ]; then + subTestString+="|" fi - matrix_output $COUNTER $MATRIX_JOB_NAME "${testName}/${subTest}" ${effective_node_label} ${effective_node_count} - ((COUNTER++)) + subTestString+="${testName}\/${subTest}" + ((subTestCounter++)) done - else - if [ $COUNTER -ne 1 ]; then - echo -n "," - fi - matrix_output $COUNTER $MATRIX_JOB_NAME "${testName}" ${effective_node_label} ${effective_node_count} - ((COUNTER++)) + testName="${subTestString}" fi - + matrix_output $COUNTER $MATRIX_JOB_NAME "${testName}" ${effective_node_label} ${effective_node_count} + ((COUNTER++)) done > "./tmpout.json" OUTPUT=$(cat ./tmpout.json) echo "[${OUTPUT}]" diff --git a/integration-tests/smoke/automation_test.go b/integration-tests/smoke/automation_test.go index 1dbfc78ec87..cb631eb8278 100644 --- a/integration-tests/smoke/automation_test.go +++ b/integration-tests/smoke/automation_test.go @@ -90,9 +90,9 @@ func SetupAutomationBasic(t *testing.T, nodeUpgrade bool) { "registry_2_1_with_logtrigger_and_mercury_v02": ethereum.RegistryVersion_2_1, } - for n, rv := range registryVersions { - name := n - registryVersion := rv + for name, registryVersion := range registryVersions { + name := name + registryVersion := registryVersion t.Run(name, func(t *testing.T) { t.Parallel() l := logging.GetTestLogger(t) @@ -168,7 +168,7 @@ func SetupAutomationBasic(t *testing.T, nodeUpgrade bool) { g.Expect(counter.Int64()).Should(gomega.BeNumerically(">=", int64(expect)), "Expected consumer counter to be greater than %d, but got %d", expect, counter.Int64()) } - }, "5m", "1s").Should(gomega.Succeed()) // ~1m for cluster setup, ~2m for performing each upkeep 5 times, ~2m buffer + }, "10m", "1s").Should(gomega.Succeed()) // ~1m for cluster setup, ~2m for performing each upkeep 5 times, ~2m buffer l.Info().Msgf("Total time taken to get 5 performs for each upkeep: %s", time.Since(startTime)) @@ -400,9 +400,9 @@ func TestAutomationAddFunds(t *testing.T) { "registry_2_1": ethereum.RegistryVersion_2_1, } - for n, rv := range registryVersions { - name := n - registryVersion := rv + for name, registryVersion := range registryVersions { + name := name + registryVersion := registryVersion t.Run(name, func(t *testing.T) { t.Parallel() a := setupAutomationTestDocker( @@ -557,9 +557,9 @@ func TestAutomationRegisterUpkeep(t *testing.T) { "registry_2_1": ethereum.RegistryVersion_2_1, } - for n, rv := range registryVersions { - name := n - registryVersion := rv + for name, registryVersion := range registryVersions { + name := name + registryVersion := registryVersion t.Run(name, func(t *testing.T) { t.Parallel() l := logging.GetTestLogger(t) @@ -641,9 +641,9 @@ func TestAutomationPauseRegistry(t *testing.T) { "registry_2_1": ethereum.RegistryVersion_2_1, } - for n, rv := range registryVersions { - name := n - registryVersion := rv + for name, registryVersion := range registryVersions { + name := name + registryVersion := registryVersion t.Run(name, func(t *testing.T) { t.Parallel() a := setupAutomationTestDocker( @@ -710,9 +710,9 @@ func TestAutomationKeeperNodesDown(t *testing.T) { "registry_2_1": ethereum.RegistryVersion_2_1, } - for n, rv := range registryVersions { - name := n - registryVersion := rv + for name, registryVersion := range registryVersions { + name := name + registryVersion := registryVersion t.Run(name, func(t *testing.T) { t.Parallel() l := logging.GetTestLogger(t) @@ -810,9 +810,9 @@ func TestAutomationPerformSimulation(t *testing.T) { "registry_2_1": ethereum.RegistryVersion_2_1, } - for n, rv := range registryVersions { - name := n - registryVersion := rv + for name, registryVersion := range registryVersions { + name := name + registryVersion := registryVersion t.Run(name, func(t *testing.T) { t.Parallel() a := setupAutomationTestDocker( @@ -873,9 +873,9 @@ func TestAutomationCheckPerformGasLimit(t *testing.T) { "registry_2_1": ethereum.RegistryVersion_2_1, } - for n, rv := range registryVersions { - name := n - registryVersion := rv + for name, registryVersion := range registryVersions { + name := name + registryVersion := registryVersion t.Run(name, func(t *testing.T) { t.Parallel() l := logging.GetTestLogger(t) @@ -987,9 +987,9 @@ func TestUpdateCheckData(t *testing.T) { "registry_2_1": ethereum.RegistryVersion_2_1, } - for n, rv := range registryVersions { - name := n - registryVersion := rv + for name, registryVersion := range registryVersions { + name := name + registryVersion := registryVersion t.Run(name, func(t *testing.T) { t.Parallel() l := logging.GetTestLogger(t) diff --git a/integration-tests/smoke/automation_test.go_test_list.json b/integration-tests/smoke/automation_test.go_test_list.json index da214a0d282..b88684599c7 100644 --- a/integration-tests/smoke/automation_test.go_test_list.json +++ b/integration-tests/smoke/automation_test.go_test_list.json @@ -3,7 +3,29 @@ { "name": "TestAutomationBasic", "label": "ubuntu-latest", - "nodes": 6 + "nodes": 2, + "run":[ + {"name":"registry_2_0"}, + {"name":"registry_2_1_conditional"} + ] + }, + { + "name": "TestAutomationBasic", + "label": "ubuntu-latest", + "nodes": 2, + "run":[ + {"name":"registry_2_1_logtrigger"}, + {"name":"registry_2_1_with_mercury_v02"} + ] + }, + { + "name": "TestAutomationBasic", + "label": "ubuntu-latest", + "nodes": 2, + "run":[ + {"name":"registry_2_1_with_mercury_v03"}, + {"name":"registry_2_1_with_logtrigger_and_mercury_v02"} + ] }, { "name": "TestSetUpkeepTriggerConfig" diff --git a/integration-tests/smoke/forwarder_ocr_test.go b/integration-tests/smoke/forwarder_ocr_test.go index c71c6e31516..64128ed4a8c 100644 --- a/integration-tests/smoke/forwarder_ocr_test.go +++ b/integration-tests/smoke/forwarder_ocr_test.go @@ -68,7 +68,7 @@ func TestForwarderOCRBasic(t *testing.T) { err = actions.CreateOCRJobsWithForwarderLocal(ocrInstances, bootstrapNode, workerNodes, 5, env.MockAdapter, env.EVMClient.GetChainID().String()) require.NoError(t, err, "failed to setup forwarder jobs") - err = actions.StartNewRound(1, ocrInstances, env.EVMClient, l) + err = actions.WatchNewRound(1, ocrInstances, env.EVMClient, l) require.NoError(t, err) err = env.EVMClient.WaitForEvents() require.NoError(t, err, "Error waiting for events") @@ -79,7 +79,7 @@ func TestForwarderOCRBasic(t *testing.T) { err = actions.SetAllAdapterResponsesToTheSameValueLocal(10, ocrInstances, workerNodes, env.MockAdapter) require.NoError(t, err) - err = actions.StartNewRound(2, ocrInstances, env.EVMClient, l) + err = actions.WatchNewRound(2, ocrInstances, env.EVMClient, l) require.NoError(t, err) err = env.EVMClient.WaitForEvents() require.NoError(t, err, "Error waiting for events") diff --git a/integration-tests/smoke/ocr_test.go b/integration-tests/smoke/ocr_test.go index ba158923812..ebfbf698e98 100644 --- a/integration-tests/smoke/ocr_test.go +++ b/integration-tests/smoke/ocr_test.go @@ -47,7 +47,7 @@ func TestOCRBasic(t *testing.T) { err = actions.CreateOCRJobsLocal(ocrInstances, bootstrapNode, workerNodes, 5, env.MockAdapter, env.EVMClient.GetChainID()) require.NoError(t, err) - err = actions.StartNewRound(1, ocrInstances, env.EVMClient, l) + err = actions.WatchNewRound(1, ocrInstances, env.EVMClient, l) require.NoError(t, err) answer, err := ocrInstances[0].GetLatestAnswer(testcontext.Get(t)) @@ -56,7 +56,7 @@ func TestOCRBasic(t *testing.T) { err = actions.SetAllAdapterResponsesToTheSameValueLocal(10, ocrInstances, workerNodes, env.MockAdapter) require.NoError(t, err) - err = actions.StartNewRound(2, ocrInstances, env.EVMClient, l) + err = actions.WatchNewRound(2, ocrInstances, env.EVMClient, l) require.NoError(t, err) answer, err = ocrInstances[0].GetLatestAnswer(testcontext.Get(t)) @@ -94,7 +94,7 @@ func TestOCRJobReplacement(t *testing.T) { err = actions.CreateOCRJobsLocal(ocrInstances, bootstrapNode, workerNodes, 5, env.MockAdapter, env.EVMClient.GetChainID()) require.NoError(t, err) - err = actions.StartNewRound(1, ocrInstances, env.EVMClient, l) + err = actions.WatchNewRound(1, ocrInstances, env.EVMClient, l) require.NoError(t, err) answer, err := ocrInstances[0].GetLatestAnswer(testcontext.Get(t)) @@ -103,7 +103,7 @@ func TestOCRJobReplacement(t *testing.T) { err = actions.SetAllAdapterResponsesToTheSameValueLocal(10, ocrInstances, workerNodes, env.MockAdapter) require.NoError(t, err) - err = actions.StartNewRound(2, ocrInstances, env.EVMClient, l) + err = actions.WatchNewRound(2, ocrInstances, env.EVMClient, l) require.NoError(t, err) answer, err = ocrInstances[0].GetLatestAnswer(testcontext.Get(t)) @@ -120,7 +120,7 @@ func TestOCRJobReplacement(t *testing.T) { err = actions.CreateOCRJobsLocal(ocrInstances, bootstrapNode, workerNodes, 5, env.MockAdapter, env.EVMClient.GetChainID()) require.NoError(t, err) - err = actions.StartNewRound(1, ocrInstances, env.EVMClient, l) + err = actions.WatchNewRound(1, ocrInstances, env.EVMClient, l) require.NoError(t, err) answer, err = ocrInstances[0].GetLatestAnswer(testcontext.Get(t)) diff --git a/internal/testdb/testdb.go b/internal/testdb/testdb.go new file mode 100644 index 00000000000..9b531166113 --- /dev/null +++ b/internal/testdb/testdb.go @@ -0,0 +1,56 @@ +package testdb + +import ( + "database/sql" + "errors" + "fmt" + "net/url" + + "github.com/smartcontractkit/chainlink/v2/core/store/dialects" +) + +const ( + // PristineDBName is a clean copy of test DB with migrations. + PristineDBName = "chainlink_test_pristine" + // TestDBNamePrefix is a common prefix that will be auto-removed by the dangling DB cleanup process. + TestDBNamePrefix = "chainlink_test_" +) + +// CreateOrReplace creates a database named with a common prefix and the given suffix, and returns the URL. +// If the database already exists, it will be dropped and re-created. +// If withTemplate is true, the pristine DB will be used as a template. +func CreateOrReplace(parsed url.URL, suffix string, withTemplate bool) (string, error) { + if parsed.Path == "" { + return "", errors.New("path missing from database URL") + } + + // Match the naming schema that our dangling DB cleanup methods expect + dbname := TestDBNamePrefix + suffix + if l := len(dbname); l > 63 { + return "", fmt.Errorf("dbname %v too long (%d), max is 63 bytes. Try a shorter suffix", dbname, l) + } + // Cannot drop test database if we are connected to it, so we must connect + // to a different one. 'postgres' should be present on all postgres installations + parsed.Path = "/postgres" + db, err := sql.Open(string(dialects.Postgres), parsed.String()) + if err != nil { + return "", fmt.Errorf("in order to drop the test database, we need to connect to a separate database"+ + " called 'postgres'. But we are unable to open 'postgres' database: %+v\n", err) + } + defer db.Close() + + _, err = db.Exec(fmt.Sprintf("DROP DATABASE IF EXISTS %s", dbname)) + if err != nil { + return "", fmt.Errorf("unable to drop postgres migrations test database: %v", err) + } + if withTemplate { + _, err = db.Exec(fmt.Sprintf("CREATE DATABASE %s WITH TEMPLATE %s", dbname, PristineDBName)) + } else { + _, err = db.Exec(fmt.Sprintf("CREATE DATABASE %s", dbname)) + } + if err != nil { + return "", fmt.Errorf("unable to create postgres test database with name '%s': %v", dbname, err) + } + parsed.Path = fmt.Sprintf("/%s", dbname) + return parsed.String(), nil +} diff --git a/main_test.go b/main_test.go index 032ec4718a2..15b17e32654 100644 --- a/main_test.go +++ b/main_test.go @@ -1,17 +1,41 @@ package main import ( + "fmt" + "net/url" "os" + "path/filepath" + "strconv" + "strings" "testing" + "github.com/google/uuid" + "github.com/hashicorp/consul/sdk/freeport" "github.com/rogpeppe/go-internal/testscript" "github.com/stretchr/testify/require" "github.com/smartcontractkit/chainlink/v2/core" + "github.com/smartcontractkit/chainlink/v2/core/config/env" "github.com/smartcontractkit/chainlink/v2/core/static" + "github.com/smartcontractkit/chainlink/v2/internal/testdb" "github.com/smartcontractkit/chainlink/v2/tools/txtar" ) +// special files can be included to allocate additional test resources +const ( + // testDBName triggers initializing of a test database. + // The URL will be set as the value of an env var named by the file. + // + // -- testdb.txt -- + // CL_DATABASE_URL + testDBName = "testdb.txt" + // testPortName triggers injection of a free port as the value of an env var named by the file. + // + // -- testport.txt -- + // PORT + testPortName = "testport.txt" +) + func TestMain(m *testing.M) { os.Exit(testscript.RunMain(m, map[string]func() int{ "chainlink": core.Main, @@ -22,11 +46,14 @@ func TestScripts(t *testing.T) { t.Parallel() visitor := txtar.NewDirVisitor("testdata/scripts", txtar.Recurse, func(path string) error { - t.Run(path, func(t *testing.T) { + t.Run(strings.TrimPrefix(path, "testdata/scripts/"), func(t *testing.T) { t.Parallel() + testscript.Run(t, testscript.Params{ - Dir: path, - Setup: commonEnv, + Dir: path, + Setup: commonEnv, + ContinueOnError: true, + //UpdateScripts: true, // uncomment to update golden files }) }) return nil @@ -35,9 +62,62 @@ func TestScripts(t *testing.T) { require.NoError(t, visitor.Walk()) } -func commonEnv(env *testscript.Env) error { - env.Setenv("HOME", "$WORK/home") - env.Setenv("VERSION", static.Version) - env.Setenv("COMMIT_SHA", static.Sha) +func commonEnv(te *testscript.Env) error { + te.Setenv("HOME", "$WORK/home") + te.Setenv("VERSION", static.Version) + te.Setenv("COMMIT_SHA", static.Sha) + + b, err := os.ReadFile(filepath.Join(te.WorkDir, testPortName)) + if err != nil && !os.IsNotExist(err) { + return fmt.Errorf("failed to read file %s: %w", testPortName, err) + } else if err == nil { + envVarName := strings.TrimSpace(string(b)) + te.T().Log("test port requested:", envVarName) + + port, ret, err2 := takeFreePort() + if err2 != nil { + return err2 + } + te.Defer(ret) + + te.Setenv(envVarName, strconv.Itoa(port)) + } + + b, err = os.ReadFile(filepath.Join(te.WorkDir, testDBName)) + if err != nil && !os.IsNotExist(err) { + return fmt.Errorf("failed to read file %s: %w", testDBName, err) + } else if err == nil { + envVarName := strings.TrimSpace(string(b)) + te.T().Log("test database requested:", envVarName) + + u2, err2 := initDB() + if err2 != nil { + return err2 + } + + te.Setenv(envVarName, u2) + } return nil } + +func takeFreePort() (int, func(), error) { + ports, err := freeport.Take(1) + if err != nil { + return 0, nil, fmt.Errorf("failed to get free port: %w", err) + } + return ports[0], func() { freeport.Return(ports) }, nil +} + +func initDB() (string, error) { + u, err := url.Parse(string(env.DatabaseURL.Get())) + if err != nil { + return "", fmt.Errorf("failed to parse url: %w", err) + } + + name := strings.ReplaceAll(uuid.NewString(), "-", "_") + "_test" + u2, err := testdb.CreateOrReplace(*u, name, true) + if err != nil { + return "", fmt.Errorf("failed to create DB: %w", err) + } + return u2, nil +} diff --git a/testdata/scripts/health/default.txtar b/testdata/scripts/health/default.txtar new file mode 100644 index 00000000000..c80faadfd13 --- /dev/null +++ b/testdata/scripts/health/default.txtar @@ -0,0 +1,129 @@ +# start node +exec sh -c 'eval "echo \"$(cat config.toml.tmpl)\" > config.toml"' +exec chainlink node -c config.toml start -p password -a creds & + +# initialize client +env NODEURL=http://localhost:$PORT +exec curl --retry 10 --retry-max-time 60 --retry-connrefused $NODEURL +exec chainlink --remote-node-url $NODEURL admin login -file creds --bypass-version-check + +exec chainlink --remote-node-url $NODEURL health +cmp stdout out.txt + +exec chainlink --remote-node-url $NODEURL health -json +cp stdout compact.json +exec jq . compact.json +cmp stdout out.json + +-- testdb.txt -- +CL_DATABASE_URL +-- testport.txt -- +PORT + +-- password -- +T.tLHkcmwePT/p,]sYuntjwHKAsrhm#4eRs4LuKHwvHejWYAC2JP4M8HimwgmbaZ +-- creds -- +notreal@fakeemail.ch +fj293fbBnlQ!f9vNs + +-- config.toml.tmpl -- +[Webserver] +HTTPPort = $PORT + +-- out.txt -- +-EventBroadcaster +-JobSpawner +-Mailbox.Monitor +-Mercury.WSRPCPool +-Mercury.WSRPCPool.CacheSet +-PipelineORM +-PipelineRunner +-PromReporter +-TelemetryManager + +-- out.json -- +{ + "data": [ + { + "type": "checks", + "id": "EventBroadcaster", + "attributes": { + "name": "EventBroadcaster", + "status": "passing", + "output": "" + } + }, + { + "type": "checks", + "id": "JobSpawner", + "attributes": { + "name": "JobSpawner", + "status": "passing", + "output": "" + } + }, + { + "type": "checks", + "id": "Mailbox.Monitor", + "attributes": { + "name": "Mailbox.Monitor", + "status": "passing", + "output": "" + } + }, + { + "type": "checks", + "id": "Mercury.WSRPCPool", + "attributes": { + "name": "Mercury.WSRPCPool", + "status": "passing", + "output": "" + } + }, + { + "type": "checks", + "id": "Mercury.WSRPCPool.CacheSet", + "attributes": { + "name": "Mercury.WSRPCPool.CacheSet", + "status": "passing", + "output": "" + } + }, + { + "type": "checks", + "id": "PipelineORM", + "attributes": { + "name": "PipelineORM", + "status": "passing", + "output": "" + } + }, + { + "type": "checks", + "id": "PipelineRunner", + "attributes": { + "name": "PipelineRunner", + "status": "passing", + "output": "" + } + }, + { + "type": "checks", + "id": "PromReporter", + "attributes": { + "name": "PromReporter", + "status": "passing", + "output": "" + } + }, + { + "type": "checks", + "id": "TelemetryManager", + "attributes": { + "name": "TelemetryManager", + "status": "passing", + "output": "" + } + } + ] +} diff --git a/testdata/scripts/health/help.txtar b/testdata/scripts/health/help.txtar new file mode 100644 index 00000000000..07eb0509e73 --- /dev/null +++ b/testdata/scripts/health/help.txtar @@ -0,0 +1,13 @@ +exec chainlink health --help +cmp stdout out.txt + +-- out.txt -- +NAME: + chainlink health - Prints a health report + +USAGE: + chainlink health [command options] [arguments...] + +OPTIONS: + --json, -j json output + diff --git a/testdata/scripts/health/multi-chain.txtar b/testdata/scripts/health/multi-chain.txtar new file mode 100644 index 00000000000..72c5fd8e3f6 --- /dev/null +++ b/testdata/scripts/health/multi-chain.txtar @@ -0,0 +1,309 @@ +# start node +exec sh -c 'eval "echo \"$(cat config.toml.tmpl)\" > config.toml"' +exec chainlink node -c config.toml start -p password -a creds & + +# initialize client +env NODEURL=http://localhost:$PORT +exec curl --retry 10 --retry-max-time 60 --retry-connrefused $NODEURL +exec chainlink --remote-node-url $NODEURL admin login -file creds --bypass-version-check + +exec chainlink --remote-node-url $NODEURL health +cmp stdout out.txt + +exec chainlink --remote-node-url $NODEURL health -json +cp stdout compact.json +exec jq . compact.json +cmp stdout out.json + +-- testdb.txt -- +CL_DATABASE_URL +-- testport.txt -- +PORT + +-- password -- +T.tLHkcmwePT/p,]sYuntjwHKAsrhm#4eRs4LuKHwvHejWYAC2JP4M8HimwgmbaZ +-- creds -- +notreal@fakeemail.ch +fj293fbBnlQ!f9vNs + +-- config.toml.tmpl -- +[Webserver] +HTTPPort = $PORT + +[[Cosmos]] +ChainID = 'Foo' + +[[Cosmos.Nodes]] +Name = 'primary' +TendermintURL = 'http://tender.mint' + +[[EVM]] +ChainID = '1' + +[[EVM.Nodes]] +Name = 'fake' +WSURL = 'wss://foo.bar/ws' +HTTPURL = 'https://foo.bar' + +[[Solana]] +ChainID = 'Bar' + +[[Solana.Nodes]] +Name = 'primary' +URL = 'http://solana.web' + +[[Starknet]] +ChainID = 'Baz' + +[[Starknet.Nodes]] +Name = 'primary' +URL = 'http://stark.node' + +-- out.txt -- +-Cosmos.Foo.Chain +-Cosmos.Foo.Txm +-EVM.1 +-EVM.1.BalanceMonitor +-EVM.1.HeadBroadcaster +-EVM.1.HeadTracker +!EVM.1.HeadTracker.HeadListener + Listener is not connected +-EVM.1.LogBroadcaster +-EVM.1.Txm +-EVM.1.Txm.BlockHistoryEstimator +-EVM.1.Txm.Broadcaster +-EVM.1.Txm.Confirmer +-EVM.1.Txm.WrappedEvmEstimator +-EventBroadcaster +-JobSpawner +-Mailbox.Monitor +-Mercury.WSRPCPool +-Mercury.WSRPCPool.CacheSet +-PipelineORM +-PipelineRunner +-PromReporter +-Solana.Bar +-StarkNet.Baz +-TelemetryManager + +-- out.json -- +{ + "data": [ + { + "type": "checks", + "id": "Cosmos.Foo.Chain", + "attributes": { + "name": "Cosmos.Foo.Chain", + "status": "passing", + "output": "" + } + }, + { + "type": "checks", + "id": "Cosmos.Foo.Txm", + "attributes": { + "name": "Cosmos.Foo.Txm", + "status": "passing", + "output": "" + } + }, + { + "type": "checks", + "id": "EVM.1", + "attributes": { + "name": "EVM.1", + "status": "passing", + "output": "" + } + }, + { + "type": "checks", + "id": "EVM.1.BalanceMonitor", + "attributes": { + "name": "EVM.1.BalanceMonitor", + "status": "passing", + "output": "" + } + }, + { + "type": "checks", + "id": "EVM.1.HeadBroadcaster", + "attributes": { + "name": "EVM.1.HeadBroadcaster", + "status": "passing", + "output": "" + } + }, + { + "type": "checks", + "id": "EVM.1.HeadTracker", + "attributes": { + "name": "EVM.1.HeadTracker", + "status": "passing", + "output": "" + } + }, + { + "type": "checks", + "id": "EVM.1.HeadTracker.HeadListener", + "attributes": { + "name": "EVM.1.HeadTracker.HeadListener", + "status": "failing", + "output": "Listener is not connected" + } + }, + { + "type": "checks", + "id": "EVM.1.LogBroadcaster", + "attributes": { + "name": "EVM.1.LogBroadcaster", + "status": "passing", + "output": "" + } + }, + { + "type": "checks", + "id": "EVM.1.Txm", + "attributes": { + "name": "EVM.1.Txm", + "status": "passing", + "output": "" + } + }, + { + "type": "checks", + "id": "EVM.1.Txm.BlockHistoryEstimator", + "attributes": { + "name": "EVM.1.Txm.BlockHistoryEstimator", + "status": "passing", + "output": "" + } + }, + { + "type": "checks", + "id": "EVM.1.Txm.Broadcaster", + "attributes": { + "name": "EVM.1.Txm.Broadcaster", + "status": "passing", + "output": "" + } + }, + { + "type": "checks", + "id": "EVM.1.Txm.Confirmer", + "attributes": { + "name": "EVM.1.Txm.Confirmer", + "status": "passing", + "output": "" + } + }, + { + "type": "checks", + "id": "EVM.1.Txm.WrappedEvmEstimator", + "attributes": { + "name": "EVM.1.Txm.WrappedEvmEstimator", + "status": "passing", + "output": "" + } + }, + { + "type": "checks", + "id": "EventBroadcaster", + "attributes": { + "name": "EventBroadcaster", + "status": "passing", + "output": "" + } + }, + { + "type": "checks", + "id": "JobSpawner", + "attributes": { + "name": "JobSpawner", + "status": "passing", + "output": "" + } + }, + { + "type": "checks", + "id": "Mailbox.Monitor", + "attributes": { + "name": "Mailbox.Monitor", + "status": "passing", + "output": "" + } + }, + { + "type": "checks", + "id": "Mercury.WSRPCPool", + "attributes": { + "name": "Mercury.WSRPCPool", + "status": "passing", + "output": "" + } + }, + { + "type": "checks", + "id": "Mercury.WSRPCPool.CacheSet", + "attributes": { + "name": "Mercury.WSRPCPool.CacheSet", + "status": "passing", + "output": "" + } + }, + { + "type": "checks", + "id": "PipelineORM", + "attributes": { + "name": "PipelineORM", + "status": "passing", + "output": "" + } + }, + { + "type": "checks", + "id": "PipelineRunner", + "attributes": { + "name": "PipelineRunner", + "status": "passing", + "output": "" + } + }, + { + "type": "checks", + "id": "PromReporter", + "attributes": { + "name": "PromReporter", + "status": "passing", + "output": "" + } + }, + { + "type": "checks", + "id": "Solana.Bar", + "attributes": { + "name": "Solana.Bar", + "status": "passing", + "output": "" + } + }, + { + "type": "checks", + "id": "StarkNet.Baz", + "attributes": { + "name": "StarkNet.Baz", + "status": "passing", + "output": "" + } + }, + { + "type": "checks", + "id": "TelemetryManager", + "attributes": { + "name": "TelemetryManager", + "status": "passing", + "output": "" + } + } + ] +} diff --git a/testdata/scripts/help.txtar b/testdata/scripts/help.txtar index 1484aceb5df..e4c19f3987d 100644 --- a/testdata/scripts/help.txtar +++ b/testdata/scripts/help.txtar @@ -17,6 +17,7 @@ COMMANDS: blocks Commands for managing blocks bridges Commands for Bridges communicating with External Adapters config Commands for the node's configuration + health Prints a health report jobs Commands for managing Jobs keys Commands for managing various types of keys used by the Chainlink node node, local Commands for admin actions that must be run locally diff --git a/testdata/scripts/node/db/migrate/db.txtar b/testdata/scripts/node/db/migrate/db.txtar new file mode 100644 index 00000000000..f040a937fd0 --- /dev/null +++ b/testdata/scripts/node/db/migrate/db.txtar @@ -0,0 +1,6 @@ +exec chainlink node db migrate +! stdout . +stderr 'goose: no migrations to run. current version:' + +-- testdb.txt -- +CL_DATABASE_URL