diff --git a/core/scripts/gateway/run_gateway.go b/core/scripts/gateway/run_gateway.go index e30c43bb8af..5dbcd02bf56 100644 --- a/core/scripts/gateway/run_gateway.go +++ b/core/scripts/gateway/run_gateway.go @@ -48,7 +48,7 @@ func main() { lggr, _ := logger.NewLogger() - handlerFactory := gateway.NewHandlerFactory(nil, lggr) + handlerFactory := gateway.NewHandlerFactory(nil, nil, nil, lggr) gw, err := gateway.NewGatewayFromConfig(&cfg, handlerFactory, lggr) if err != nil { fmt.Println("error creating Gateway object:", err) diff --git a/core/services/chainlink/application.go b/core/services/chainlink/application.go index ee109db2119..e94f51abdf5 100644 --- a/core/services/chainlink/application.go +++ b/core/services/chainlink/application.go @@ -344,6 +344,8 @@ func NewApplication(opts ApplicationOpts) (Application, error) { job.Gateway: gateway.NewDelegate( legacyEVMChains, keyStore.Eth(), + db, + cfg.Database(), globalLogger), } webhookJobRunner = delegates[job.Webhook].(*webhook.Delegate).WebhookJobRunner() diff --git a/core/services/gateway/delegate.go b/core/services/gateway/delegate.go index d4180184aca..8a97f68d1ea 100644 --- a/core/services/gateway/delegate.go +++ b/core/services/gateway/delegate.go @@ -4,6 +4,7 @@ import ( "encoding/json" "github.com/google/uuid" + "github.com/jmoiron/sqlx" "github.com/pelletier/go-toml" "github.com/pkg/errors" @@ -18,13 +19,21 @@ import ( type Delegate struct { legacyChains legacyevm.LegacyChainContainer ks keystore.Eth + db *sqlx.DB + cfg pg.QConfig lggr logger.Logger } var _ job.Delegate = (*Delegate)(nil) -func NewDelegate(legacyChains legacyevm.LegacyChainContainer, ks keystore.Eth, lggr logger.Logger) *Delegate { - return &Delegate{legacyChains: legacyChains, ks: ks, lggr: lggr} +func NewDelegate(legacyChains legacyevm.LegacyChainContainer, ks keystore.Eth, db *sqlx.DB, cfg pg.QConfig, lggr logger.Logger) *Delegate { + return &Delegate{ + legacyChains: legacyChains, + ks: ks, + db: db, + cfg: cfg, + lggr: lggr, + } } func (d *Delegate) JobType() job.Type { @@ -47,7 +56,7 @@ func (d *Delegate) ServicesForSpec(spec job.Job) (services []job.ServiceCtx, err if err2 != nil { return nil, errors.Wrap(err2, "unmarshal gateway config") } - handlerFactory := NewHandlerFactory(d.legacyChains, d.lggr) + handlerFactory := NewHandlerFactory(d.legacyChains, d.db, d.cfg, d.lggr) gateway, err := NewGatewayFromConfig(&gatewayConfig, handlerFactory, d.lggr) if err != nil { return nil, err diff --git a/core/services/gateway/gateway_test.go b/core/services/gateway/gateway_test.go index 74d689fffe1..1c31a643c80 100644 --- a/core/services/gateway/gateway_test.go +++ b/core/services/gateway/gateway_test.go @@ -57,7 +57,7 @@ Address = "0x0001020304050607080900010203040506070809" `) lggr := logger.TestLogger(t) - _, err := gateway.NewGatewayFromConfig(parseTOMLConfig(t, tomlConfig), gateway.NewHandlerFactory(nil, lggr), lggr) + _, err := gateway.NewGatewayFromConfig(parseTOMLConfig(t, tomlConfig), gateway.NewHandlerFactory(nil, nil, nil, lggr), lggr) require.NoError(t, err) } @@ -75,7 +75,7 @@ HandlerName = "dummy" `) lggr := logger.TestLogger(t) - _, err := gateway.NewGatewayFromConfig(parseTOMLConfig(t, tomlConfig), gateway.NewHandlerFactory(nil, lggr), lggr) + _, err := gateway.NewGatewayFromConfig(parseTOMLConfig(t, tomlConfig), gateway.NewHandlerFactory(nil, nil, nil, lggr), lggr) require.Error(t, err) } @@ -89,7 +89,7 @@ HandlerName = "no_such_handler" `) lggr := logger.TestLogger(t) - _, err := gateway.NewGatewayFromConfig(parseTOMLConfig(t, tomlConfig), gateway.NewHandlerFactory(nil, lggr), lggr) + _, err := gateway.NewGatewayFromConfig(parseTOMLConfig(t, tomlConfig), gateway.NewHandlerFactory(nil, nil, nil, lggr), lggr) require.Error(t, err) } @@ -103,7 +103,7 @@ SomeOtherField = "abcd" `) lggr := logger.TestLogger(t) - _, err := gateway.NewGatewayFromConfig(parseTOMLConfig(t, tomlConfig), gateway.NewHandlerFactory(nil, lggr), lggr) + _, err := gateway.NewGatewayFromConfig(parseTOMLConfig(t, tomlConfig), gateway.NewHandlerFactory(nil, nil, nil, lggr), lggr) require.Error(t, err) } @@ -121,7 +121,7 @@ Address = "0xnot_an_address" `) lggr := logger.TestLogger(t) - _, err := gateway.NewGatewayFromConfig(parseTOMLConfig(t, tomlConfig), gateway.NewHandlerFactory(nil, lggr), lggr) + _, err := gateway.NewGatewayFromConfig(parseTOMLConfig(t, tomlConfig), gateway.NewHandlerFactory(nil, nil, nil, lggr), lggr) require.Error(t, err) } @@ -129,7 +129,7 @@ func TestGateway_CleanStartAndClose(t *testing.T) { t.Parallel() lggr := logger.TestLogger(t) - gateway, err := gateway.NewGatewayFromConfig(parseTOMLConfig(t, buildConfig("")), gateway.NewHandlerFactory(nil, lggr), lggr) + gateway, err := gateway.NewGatewayFromConfig(parseTOMLConfig(t, buildConfig("")), gateway.NewHandlerFactory(nil, nil, nil, lggr), lggr) require.NoError(t, err) servicetest.Run(t, gateway) } diff --git a/core/services/gateway/handler_factory.go b/core/services/gateway/handler_factory.go index 368bc64c872..8ccae8c7c4b 100644 --- a/core/services/gateway/handler_factory.go +++ b/core/services/gateway/handler_factory.go @@ -4,11 +4,14 @@ import ( "encoding/json" "fmt" + "github.com/jmoiron/sqlx" + "github.com/smartcontractkit/chainlink/v2/core/chains/legacyevm" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/gateway/config" "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers" "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/functions" + "github.com/smartcontractkit/chainlink/v2/core/services/pg" ) const ( @@ -18,19 +21,21 @@ const ( type handlerFactory struct { legacyChains legacyevm.LegacyChainContainer + db *sqlx.DB + cfg pg.QConfig lggr logger.Logger } var _ HandlerFactory = (*handlerFactory)(nil) -func NewHandlerFactory(legacyChains legacyevm.LegacyChainContainer, lggr logger.Logger) HandlerFactory { - return &handlerFactory{legacyChains, lggr} +func NewHandlerFactory(legacyChains legacyevm.LegacyChainContainer, db *sqlx.DB, cfg pg.QConfig, lggr logger.Logger) HandlerFactory { + return &handlerFactory{legacyChains, db, cfg, lggr} } func (hf *handlerFactory) NewHandler(handlerType HandlerType, handlerConfig json.RawMessage, donConfig *config.DONConfig, don handlers.DON) (handlers.Handler, error) { switch handlerType { case FunctionsHandlerType: - return functions.NewFunctionsHandlerFromConfig(handlerConfig, donConfig, don, hf.legacyChains, hf.lggr) + return functions.NewFunctionsHandlerFromConfig(handlerConfig, donConfig, don, hf.legacyChains, hf.db, hf.cfg, hf.lggr) case DummyHandlerType: return handlers.NewDummyHandler(donConfig, don, hf.lggr) default: diff --git a/core/services/gateway/handlers/functions/handler.functions.go b/core/services/gateway/handlers/functions/handler.functions.go index 5277f9789d6..2872c72f761 100644 --- a/core/services/gateway/handlers/functions/handler.functions.go +++ b/core/services/gateway/handlers/functions/handler.functions.go @@ -10,6 +10,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/jmoiron/sqlx" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "go.uber.org/multierr" @@ -22,6 +23,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/gateway/config" "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers" hc "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/common" + "github.com/smartcontractkit/chainlink/v2/core/services/pg" ) var ( @@ -96,7 +98,7 @@ type PendingRequest struct { var _ handlers.Handler = (*functionsHandler)(nil) -func NewFunctionsHandlerFromConfig(handlerConfig json.RawMessage, donConfig *config.DONConfig, don handlers.DON, legacyChains legacyevm.LegacyChainContainer, lggr logger.Logger) (handlers.Handler, error) { +func NewFunctionsHandlerFromConfig(handlerConfig json.RawMessage, donConfig *config.DONConfig, don handlers.DON, legacyChains legacyevm.LegacyChainContainer, db *sqlx.DB, qcfg pg.QConfig, lggr logger.Logger) (handlers.Handler, error) { var cfg FunctionsHandlerConfig err := json.Unmarshal(handlerConfig, &cfg) if err != nil { @@ -133,7 +135,13 @@ func NewFunctionsHandlerFromConfig(handlerConfig json.RawMessage, donConfig *con if err2 != nil { return nil, err2 } - subscriptions, err2 = NewOnchainSubscriptions(chain.Client(), *cfg.OnchainSubscriptions, lggr) + + orm, err2 := NewORM(db, lggr, qcfg, cfg.OnchainSubscriptions.ContractAddress) + if err2 != nil { + return nil, err2 + } + + subscriptions, err2 = NewOnchainSubscriptions(chain.Client(), *cfg.OnchainSubscriptions, orm, lggr) if err2 != nil { return nil, err2 } diff --git a/core/services/gateway/handlers/functions/handler.functions_test.go b/core/services/gateway/handlers/functions/handler.functions_test.go index 1d6dd109625..7e055815c88 100644 --- a/core/services/gateway/handlers/functions/handler.functions_test.go +++ b/core/services/gateway/handlers/functions/handler.functions_test.go @@ -83,7 +83,7 @@ func sendNodeReponses(t *testing.T, handler handlers.Handler, userRequestMsg api func TestFunctionsHandler_Minimal(t *testing.T) { t.Parallel() - handler, err := functions.NewFunctionsHandlerFromConfig(json.RawMessage("{}"), &config.DONConfig{}, nil, nil, logger.TestLogger(t)) + handler, err := functions.NewFunctionsHandlerFromConfig(json.RawMessage("{}"), &config.DONConfig{}, nil, nil, nil, nil, logger.TestLogger(t)) require.NoError(t, err) // empty message should always error out @@ -95,7 +95,7 @@ func TestFunctionsHandler_Minimal(t *testing.T) { func TestFunctionsHandler_CleanStartAndClose(t *testing.T) { t.Parallel() - handler, err := functions.NewFunctionsHandlerFromConfig(json.RawMessage("{}"), &config.DONConfig{}, nil, nil, logger.TestLogger(t)) + handler, err := functions.NewFunctionsHandlerFromConfig(json.RawMessage("{}"), &config.DONConfig{}, nil, nil, nil, nil, logger.TestLogger(t)) require.NoError(t, err) servicetest.Run(t, handler) diff --git a/core/services/gateway/handlers/functions/mocks/orm.go b/core/services/gateway/handlers/functions/mocks/orm.go new file mode 100644 index 00000000000..110675c8b55 --- /dev/null +++ b/core/services/gateway/handlers/functions/mocks/orm.go @@ -0,0 +1,91 @@ +// Code generated by mockery v2.38.0. DO NOT EDIT. + +package mocks + +import ( + functions "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/functions" + mock "github.com/stretchr/testify/mock" + + pg "github.com/smartcontractkit/chainlink/v2/core/services/pg" +) + +// ORM is an autogenerated mock type for the ORM type +type ORM struct { + mock.Mock +} + +// GetSubscriptions provides a mock function with given fields: offset, limit, qopts +func (_m *ORM) GetSubscriptions(offset uint, limit uint, qopts ...pg.QOpt) ([]functions.CachedSubscription, error) { + _va := make([]interface{}, len(qopts)) + for _i := range qopts { + _va[_i] = qopts[_i] + } + var _ca []interface{} + _ca = append(_ca, offset, limit) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + if len(ret) == 0 { + panic("no return value specified for GetSubscriptions") + } + + var r0 []functions.CachedSubscription + var r1 error + if rf, ok := ret.Get(0).(func(uint, uint, ...pg.QOpt) ([]functions.CachedSubscription, error)); ok { + return rf(offset, limit, qopts...) + } + if rf, ok := ret.Get(0).(func(uint, uint, ...pg.QOpt) []functions.CachedSubscription); ok { + r0 = rf(offset, limit, qopts...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]functions.CachedSubscription) + } + } + + if rf, ok := ret.Get(1).(func(uint, uint, ...pg.QOpt) error); ok { + r1 = rf(offset, limit, qopts...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// UpsertSubscription provides a mock function with given fields: subscription, qopts +func (_m *ORM) UpsertSubscription(subscription functions.CachedSubscription, qopts ...pg.QOpt) error { + _va := make([]interface{}, len(qopts)) + for _i := range qopts { + _va[_i] = qopts[_i] + } + var _ca []interface{} + _ca = append(_ca, subscription) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + if len(ret) == 0 { + panic("no return value specified for UpsertSubscription") + } + + var r0 error + if rf, ok := ret.Get(0).(func(functions.CachedSubscription, ...pg.QOpt) error); ok { + r0 = rf(subscription, qopts...) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// NewORM creates a new instance of ORM. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewORM(t interface { + mock.TestingT + Cleanup(func()) +}) *ORM { + mock := &ORM{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/core/services/gateway/handlers/functions/orm.go b/core/services/gateway/handlers/functions/orm.go new file mode 100644 index 00000000000..b7ec8d865d1 --- /dev/null +++ b/core/services/gateway/handlers/functions/orm.go @@ -0,0 +1,132 @@ +package functions + +import ( + "fmt" + "math/big" + + "github.com/ethereum/go-ethereum/common" + "github.com/lib/pq" + "github.com/pkg/errors" + + "github.com/jmoiron/sqlx" + + "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/functions/generated/functions_router" + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/pg" +) + +//go:generate mockery --quiet --name ORM --output ./mocks/ --case=underscore + +type ORM interface { + GetSubscriptions(offset, limit uint, qopts ...pg.QOpt) ([]CachedSubscription, error) + UpsertSubscription(subscription CachedSubscription, qopts ...pg.QOpt) error +} + +type orm struct { + q pg.Q + routerContractAddress common.Address +} + +var _ ORM = (*orm)(nil) +var ( + ErrInvalidParameters = errors.New("invalid parameters provided to create a subscription cache ORM") +) + +const ( + tableName = "functions_subscriptions" +) + +type cachedSubscriptionRow struct { + SubscriptionID uint64 + Owner common.Address + Balance int64 + BlockedBalance int64 + ProposedOwner common.Address + Consumers pq.ByteaArray + Flags []uint8 + RouterContractAddress common.Address +} + +func NewORM(db *sqlx.DB, lggr logger.Logger, cfg pg.QConfig, routerContractAddress common.Address) (ORM, error) { + if db == nil || cfg == nil || lggr == nil || routerContractAddress == (common.Address{}) { + return nil, ErrInvalidParameters + } + + return &orm{ + q: pg.NewQ(db, lggr, cfg), + routerContractAddress: routerContractAddress, + }, nil +} + +func (o *orm) GetSubscriptions(offset, limit uint, qopts ...pg.QOpt) ([]CachedSubscription, error) { + var cacheSubscriptions []CachedSubscription + var cacheSubscriptionRows []cachedSubscriptionRow + stmt := fmt.Sprintf(` + SELECT subscription_id, owner, balance, blocked_balance, proposed_owner, consumers, flags, router_contract_address + FROM %s + WHERE router_contract_address = $1 + ORDER BY subscription_id ASC + OFFSET $2 + LIMIT $3; + `, tableName) + err := o.q.WithOpts(qopts...).Select(&cacheSubscriptionRows, stmt, o.routerContractAddress, offset, limit) + if err != nil { + return cacheSubscriptions, err + } + + for _, cs := range cacheSubscriptionRows { + cacheSubscriptions = append(cacheSubscriptions, cs.encode()) + } + + return cacheSubscriptions, nil +} + +// UpsertSubscription will update if a subscription exists or create if it does not. +// In case a subscription gets deleted we will update it with an owner address equal to 0x0. +func (o *orm) UpsertSubscription(subscription CachedSubscription, qopts ...pg.QOpt) error { + stmt := fmt.Sprintf(` + INSERT INTO %s (subscription_id, owner, balance, blocked_balance, proposed_owner, consumers, flags, router_contract_address) + VALUES ($1,$2,$3,$4,$5,$6,$7,$8) ON CONFLICT (subscription_id, router_contract_address) DO UPDATE + SET owner=$2, balance=$3, blocked_balance=$4, proposed_owner=$5, consumers=$6, flags=$7, router_contract_address=$8;`, tableName) + + if subscription.Balance == nil { + subscription.Balance = big.NewInt(0) + } + + if subscription.BlockedBalance == nil { + subscription.BlockedBalance = big.NewInt(0) + } + + _, err := o.q.WithOpts(qopts...).Exec( + stmt, + subscription.SubscriptionID, + subscription.Owner, + subscription.Balance.Int64(), + subscription.BlockedBalance.Int64(), + subscription.ProposedOwner, + subscription.Consumers, + subscription.Flags[:], + o.routerContractAddress, + ) + + return err +} + +func (cs *cachedSubscriptionRow) encode() CachedSubscription { + consumers := make([]common.Address, 0) + for _, csc := range cs.Consumers { + consumers = append(consumers, common.BytesToAddress(csc)) + } + + return CachedSubscription{ + SubscriptionID: cs.SubscriptionID, + IFunctionsSubscriptionsSubscription: functions_router.IFunctionsSubscriptionsSubscription{ + Balance: big.NewInt(cs.Balance), + Owner: cs.Owner, + BlockedBalance: big.NewInt(cs.BlockedBalance), + ProposedOwner: cs.ProposedOwner, + Consumers: consumers, + Flags: [32]byte(cs.Flags), + }, + } +} diff --git a/core/services/gateway/handlers/functions/orm_test.go b/core/services/gateway/handlers/functions/orm_test.go new file mode 100644 index 00000000000..37ec24ea0b1 --- /dev/null +++ b/core/services/gateway/handlers/functions/orm_test.go @@ -0,0 +1,246 @@ +package functions_test + +import ( + "math/big" + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/assets" + "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/functions/generated/functions_router" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/functions" +) + +var ( + defaultFlags = [32]byte{0x1, 0x2, 0x3} +) + +func setupORM(t *testing.T) (functions.ORM, error) { + t.Helper() + + var ( + db = pgtest.NewSqlxDB(t) + lggr = logger.TestLogger(t) + ) + + return functions.NewORM(db, lggr, pgtest.NewQConfig(true), testutils.NewAddress()) +} + +func createSubscriptions(t *testing.T, orm functions.ORM, amount int) []functions.CachedSubscription { + cachedSubscriptions := make([]functions.CachedSubscription, 0) + for i := amount; i > 0; i-- { + cs := functions.CachedSubscription{ + SubscriptionID: uint64(i), + IFunctionsSubscriptionsSubscription: functions_router.IFunctionsSubscriptionsSubscription{ + Balance: big.NewInt(10), + Owner: testutils.NewAddress(), + BlockedBalance: big.NewInt(20), + ProposedOwner: common.Address{}, + Consumers: []common.Address{}, + Flags: defaultFlags, + }, + } + cachedSubscriptions = append(cachedSubscriptions, cs) + err := orm.UpsertSubscription(cs) + require.NoError(t, err) + } + return cachedSubscriptions +} + +func TestORM_GetSubscriptions(t *testing.T) { + t.Parallel() + t.Run("fetch first page", func(t *testing.T) { + orm, err := setupORM(t) + require.NoError(t, err) + cachedSubscriptions := createSubscriptions(t, orm, 2) + results, err := orm.GetSubscriptions(0, 1) + require.NoError(t, err) + require.Equal(t, 1, len(results), "incorrect results length") + require.Equal(t, cachedSubscriptions[1], results[0]) + }) + + t.Run("fetch second page", func(t *testing.T) { + orm, err := setupORM(t) + require.NoError(t, err) + cachedSubscriptions := createSubscriptions(t, orm, 2) + results, err := orm.GetSubscriptions(1, 5) + require.NoError(t, err) + require.Equal(t, 1, len(results), "incorrect results length") + require.Equal(t, cachedSubscriptions[0], results[0]) + }) +} + +func TestORM_UpsertSubscription(t *testing.T) { + t.Parallel() + + t.Run("create a subscription", func(t *testing.T) { + orm, err := setupORM(t) + require.NoError(t, err) + expected := functions.CachedSubscription{ + SubscriptionID: uint64(1), + IFunctionsSubscriptionsSubscription: functions_router.IFunctionsSubscriptionsSubscription{ + Balance: big.NewInt(10), + Owner: testutils.NewAddress(), + BlockedBalance: big.NewInt(20), + ProposedOwner: common.Address{}, + Consumers: []common.Address{}, + Flags: defaultFlags, + }, + } + err = orm.UpsertSubscription(expected) + require.NoError(t, err) + + results, err := orm.GetSubscriptions(0, 1) + require.NoError(t, err) + require.Equal(t, 1, len(results), "incorrect results length") + require.Equal(t, expected, results[0]) + }) + + t.Run("update a subscription", func(t *testing.T) { + orm, err := setupORM(t) + require.NoError(t, err) + + expectedUpdated := functions.CachedSubscription{ + SubscriptionID: uint64(1), + IFunctionsSubscriptionsSubscription: functions_router.IFunctionsSubscriptionsSubscription{ + Balance: big.NewInt(10), + Owner: testutils.NewAddress(), + BlockedBalance: big.NewInt(20), + ProposedOwner: common.Address{}, + Consumers: []common.Address{}, + Flags: defaultFlags, + }, + } + err = orm.UpsertSubscription(expectedUpdated) + require.NoError(t, err) + + expectedNotUpdated := functions.CachedSubscription{ + SubscriptionID: uint64(2), + IFunctionsSubscriptionsSubscription: functions_router.IFunctionsSubscriptionsSubscription{ + Balance: big.NewInt(10), + Owner: testutils.NewAddress(), + BlockedBalance: big.NewInt(20), + ProposedOwner: common.Address{}, + Consumers: []common.Address{}, + Flags: defaultFlags, + }, + } + err = orm.UpsertSubscription(expectedNotUpdated) + require.NoError(t, err) + + // update the balance value + expectedUpdated.Balance = big.NewInt(20) + err = orm.UpsertSubscription(expectedUpdated) + require.NoError(t, err) + + results, err := orm.GetSubscriptions(0, 5) + require.NoError(t, err) + require.Equal(t, 2, len(results), "incorrect results length") + require.Equal(t, expectedNotUpdated, results[1]) + require.Equal(t, expectedUpdated, results[0]) + }) + + t.Run("update a deleted subscription", func(t *testing.T) { + orm, err := setupORM(t) + require.NoError(t, err) + + subscription := functions.CachedSubscription{ + SubscriptionID: uint64(1), + IFunctionsSubscriptionsSubscription: functions_router.IFunctionsSubscriptionsSubscription{ + Balance: big.NewInt(10), + Owner: testutils.NewAddress(), + BlockedBalance: big.NewInt(20), + ProposedOwner: common.Address{}, + Consumers: []common.Address{}, + Flags: defaultFlags, + }, + } + err = orm.UpsertSubscription(subscription) + require.NoError(t, err) + + // empty subscription + subscription.IFunctionsSubscriptionsSubscription = functions_router.IFunctionsSubscriptionsSubscription{ + Balance: big.NewInt(0), + Owner: common.Address{}, + BlockedBalance: big.NewInt(0), + ProposedOwner: common.Address{}, + Consumers: []common.Address{}, + Flags: [32]byte{}, + } + + err = orm.UpsertSubscription(subscription) + require.NoError(t, err) + + results, err := orm.GetSubscriptions(0, 5) + require.NoError(t, err) + require.Equal(t, 1, len(results), "incorrect results length") + require.Equal(t, subscription, results[0]) + }) + + t.Run("create a subscription with same id but different router address", func(t *testing.T) { + var ( + db = pgtest.NewSqlxDB(t) + lggr = logger.TestLogger(t) + ) + + orm1, err := functions.NewORM(db, lggr, pgtest.NewQConfig(true), testutils.NewAddress()) + require.NoError(t, err) + orm2, err := functions.NewORM(db, lggr, pgtest.NewQConfig(true), testutils.NewAddress()) + require.NoError(t, err) + + subscription := functions.CachedSubscription{ + SubscriptionID: uint64(1), + IFunctionsSubscriptionsSubscription: functions_router.IFunctionsSubscriptionsSubscription{ + Balance: assets.Ether(10).ToInt(), + Owner: testutils.NewAddress(), + BlockedBalance: assets.Ether(20).ToInt(), + ProposedOwner: common.Address{}, + Consumers: []common.Address{}, + Flags: defaultFlags, + }, + } + + err = orm1.UpsertSubscription(subscription) + require.NoError(t, err) + + // should update the existing subscription + subscription.Balance = assets.Ether(12).ToInt() + err = orm1.UpsertSubscription(subscription) + require.NoError(t, err) + + results, err := orm1.GetSubscriptions(0, 10) + require.NoError(t, err) + require.Equal(t, 1, len(results), "incorrect results length") + + // should create a new subscription because it comes from a different router contract + err = orm2.UpsertSubscription(subscription) + require.NoError(t, err) + + results, err = orm1.GetSubscriptions(0, 10) + require.NoError(t, err) + require.Equal(t, 1, len(results), "incorrect results length") + + results, err = orm2.GetSubscriptions(0, 10) + require.NoError(t, err) + require.Equal(t, 1, len(results), "incorrect results length") + }) +} + +func Test_NewORM(t *testing.T) { + t.Run("OK-create_ORM", func(t *testing.T) { + _, err := functions.NewORM(pgtest.NewSqlxDB(t), logger.TestLogger(t), pgtest.NewQConfig(true), testutils.NewAddress()) + require.NoError(t, err) + }) + t.Run("NOK-create_ORM_with_nil_fields", func(t *testing.T) { + _, err := functions.NewORM(nil, nil, nil, common.Address{}) + require.Error(t, err) + }) + t.Run("NOK-create_ORM_with_empty_address", func(t *testing.T) { + _, err := functions.NewORM(pgtest.NewSqlxDB(t), logger.TestLogger(t), pgtest.NewQConfig(true), common.Address{}) + require.Error(t, err) + }) +} diff --git a/core/services/gateway/handlers/functions/subscriptions.go b/core/services/gateway/handlers/functions/subscriptions.go index ebffbbdd206..8bc9cf09e7b 100644 --- a/core/services/gateway/handlers/functions/subscriptions.go +++ b/core/services/gateway/handlers/functions/subscriptions.go @@ -19,12 +19,15 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/utils" ) +const defaultCacheBatchSize = 100 + type OnchainSubscriptionsConfig struct { ContractAddress common.Address `json:"contractAddress"` BlockConfirmations uint `json:"blockConfirmations"` UpdateFrequencySec uint `json:"updateFrequencySec"` UpdateTimeoutSec uint `json:"updateTimeoutSec"` UpdateRangeSize uint `json:"updateRangeSize"` + CacheBatchSize uint `json:"cacheBatchSize"` } // OnchainSubscriptions maintains a mirror of all subscriptions fetched from the blockchain (EVM-only). @@ -43,6 +46,7 @@ type onchainSubscriptions struct { config OnchainSubscriptionsConfig subscriptions UserSubscriptions + orm ORM client evmclient.Client router *functions_router.FunctionsRouter blockConfirmations *big.Int @@ -52,7 +56,7 @@ type onchainSubscriptions struct { stopCh services.StopChan } -func NewOnchainSubscriptions(client evmclient.Client, config OnchainSubscriptionsConfig, lggr logger.Logger) (OnchainSubscriptions, error) { +func NewOnchainSubscriptions(client evmclient.Client, config OnchainSubscriptionsConfig, orm ORM, lggr logger.Logger) (OnchainSubscriptions, error) { if client == nil { return nil, errors.New("client is nil") } @@ -63,9 +67,17 @@ func NewOnchainSubscriptions(client evmclient.Client, config OnchainSubscription if err != nil { return nil, fmt.Errorf("unexpected error during functions_router.NewFunctionsRouter: %s", err) } + + // if CacheBatchSize is not specified use the default value + if config.CacheBatchSize == 0 { + lggr.Info("CacheBatchSize not specified, using default size: ", defaultCacheBatchSize) + config.CacheBatchSize = defaultCacheBatchSize + } + return &onchainSubscriptions{ config: config, subscriptions: NewUserSubscriptions(), + orm: orm, client: client, router: router, blockConfirmations: big.NewInt(int64(config.BlockConfirmations)), @@ -87,6 +99,8 @@ func (s *onchainSubscriptions) Start(ctx context.Context) error { return errors.New("OnchainSubscriptionsConfig.UpdateRangeSize must be greater than 0") } + s.loadCachedSubscriptions() + s.closeWait.Add(1) go s.queryLoop() @@ -190,7 +204,15 @@ func (s *onchainSubscriptions) querySubscriptionsRange(ctx context.Context, bloc for i, subscription := range subscriptions { subscriptionId := start + uint64(i) subscription := subscription - s.subscriptions.UpdateSubscription(subscriptionId, &subscription) + updated := s.subscriptions.UpdateSubscription(subscriptionId, &subscription) + if updated { + if err = s.orm.UpsertSubscription(CachedSubscription{ + SubscriptionID: subscriptionId, + IFunctionsSubscriptionsSubscription: subscription, + }); err != nil { + s.lggr.Errorf("unexpected error updating subscription in the cache: %w", err) + } + } } return nil @@ -203,3 +225,30 @@ func (s *onchainSubscriptions) getSubscriptionsCount(ctx context.Context, blockN Context: ctx, }) } + +func (s *onchainSubscriptions) loadCachedSubscriptions() { + offset := uint(0) + for { + csBatch, err := s.orm.GetSubscriptions(offset, s.config.CacheBatchSize) + if err != nil { + break + } + + for _, cs := range csBatch { + _ = s.subscriptions.UpdateSubscription(cs.SubscriptionID, &functions_router.IFunctionsSubscriptionsSubscription{ + Balance: cs.Balance, + Owner: cs.Owner, + BlockedBalance: cs.BlockedBalance, + ProposedOwner: cs.ProposedOwner, + Consumers: cs.Consumers, + Flags: cs.Flags, + }) + } + s.lggr.Debugw("Loading cached subscriptions", "offset", offset, "batch_length", len(csBatch)) + + if len(csBatch) != int(s.config.CacheBatchSize) { + break + } + offset += s.config.CacheBatchSize + } +} diff --git a/core/services/gateway/handlers/functions/subscriptions_test.go b/core/services/gateway/handlers/functions/subscriptions_test.go index adbf637ad73..782dcc2332d 100644 --- a/core/services/gateway/handlers/functions/subscriptions_test.go +++ b/core/services/gateway/handlers/functions/subscriptions_test.go @@ -15,14 +15,17 @@ import ( "github.com/stretchr/testify/require" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client/mocks" + "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/functions/generated/functions_router" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/functions" + fmocks "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/functions/mocks" ) const ( validUser = "0x9ED925d8206a4f88a2f643b28B3035B315753Cd6" invalidUser = "0x6E2dc0F9DB014aE19888F539E59285D2Ea04244C" + cachedUser = "0x3E2dc0F9DB014aE19888F539E59285D2Ea04233G" ) func TestSubscriptions_OnePass(t *testing.T) { @@ -47,7 +50,10 @@ func TestSubscriptions_OnePass(t *testing.T) { UpdateTimeoutSec: 1, UpdateRangeSize: 3, } - subscriptions, err := functions.NewOnchainSubscriptions(client, config, logger.TestLogger(t)) + orm := fmocks.NewORM(t) + orm.On("GetSubscriptions", uint(0), uint(100)).Return([]functions.CachedSubscription{}, nil) + orm.On("UpsertSubscription", mock.Anything).Return(nil) + subscriptions, err := functions.NewOnchainSubscriptions(client, config, orm, logger.TestLogger(t)) require.NoError(t, err) err = subscriptions.Start(ctx) @@ -95,7 +101,10 @@ func TestSubscriptions_MultiPass(t *testing.T) { UpdateTimeoutSec: 1, UpdateRangeSize: 3, } - subscriptions, err := functions.NewOnchainSubscriptions(client, config, logger.TestLogger(t)) + orm := fmocks.NewORM(t) + orm.On("GetSubscriptions", uint(0), uint(100)).Return([]functions.CachedSubscription{}, nil) + orm.On("UpsertSubscription", mock.Anything).Return(nil) + subscriptions, err := functions.NewOnchainSubscriptions(client, config, orm, logger.TestLogger(t)) require.NoError(t, err) err = subscriptions.Start(ctx) @@ -108,3 +117,57 @@ func TestSubscriptions_MultiPass(t *testing.T) { return currentCycle.Load() == ncycles }, testutils.WaitTimeout(t), time.Second).Should(gomega.BeTrue()) } + +func TestSubscriptions_Cached(t *testing.T) { + getSubscriptionCount := hexutil.MustDecode("0x0000000000000000000000000000000000000000000000000000000000000003") + getSubscriptionsInRange := hexutil.MustDecode("0x00000000000000000000000000000000000000000000000000000000000000200000000000000000000000000000000000000000000000000000000000000003000000000000000000000000000000000000000000000000000000000000006000000000000000000000000000000000000000000000000000000000000001600000000000000000000000000000000000000000000000000000000000000240000000000000000000000000000000000000000000000000de0b6b3a76400000000000000000000000000000109e6e1b12098cc8f3a1e9719a817ec53ab9b35c000000000000000000000000000000000000000000000000000034e23f515cb0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000c000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001000000000000000000000000f5340f0968ee8b7dfd97e3327a6139273cc2c4fa000000000000000000000000000000000000000000000001158e460913d000000000000000000000000000009ed925d8206a4f88a2f643b28b3035b315753cd60000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000c0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001bc14b92364c75e20000000000000000000000009ed925d8206a4f88a2f643b28b3035b315753cd60000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000c0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000010000000000000000000000005439e5881a529f3ccbffc0e82d49f9db3950aefe") + + ctx := testutils.Context(t) + client := mocks.NewClient(t) + client.On("LatestBlockHeight", mock.Anything).Return(big.NewInt(42), nil) + client.On("CallContract", mock.Anything, ethereum.CallMsg{ // getSubscriptionCount + To: &common.Address{}, + Data: hexutil.MustDecode("0x66419970"), + }, mock.Anything).Return(getSubscriptionCount, nil) + client.On("CallContract", mock.Anything, ethereum.CallMsg{ // GetSubscriptionsInRange + To: &common.Address{}, + Data: hexutil.MustDecode("0xec2454e500000000000000000000000000000000000000000000000000000000000000010000000000000000000000000000000000000000000000000000000000000003"), + }, mock.Anything).Return(getSubscriptionsInRange, nil) + config := functions.OnchainSubscriptionsConfig{ + ContractAddress: common.Address{}, + BlockConfirmations: 1, + UpdateFrequencySec: 1, + UpdateTimeoutSec: 1, + UpdateRangeSize: 3, + CacheBatchSize: 1, + } + + expectedBalance := big.NewInt(5) + orm := fmocks.NewORM(t) + orm.On("GetSubscriptions", uint(0), uint(1)).Return([]functions.CachedSubscription{ + { + SubscriptionID: 1, + IFunctionsSubscriptionsSubscription: functions_router.IFunctionsSubscriptionsSubscription{ + Balance: expectedBalance, + Owner: common.HexToAddress(cachedUser), + BlockedBalance: big.NewInt(10), + }, + }, + }, nil) + orm.On("GetSubscriptions", uint(1), uint(1)).Return([]functions.CachedSubscription{}, nil) + orm.On("UpsertSubscription", mock.Anything).Return(nil) + + subscriptions, err := functions.NewOnchainSubscriptions(client, config, orm, logger.TestLogger(t)) + require.NoError(t, err) + + err = subscriptions.Start(ctx) + require.NoError(t, err) + t.Cleanup(func() { + assert.NoError(t, subscriptions.Close()) + }) + + gomega.NewGomegaWithT(t).Eventually(func() bool { + actualBalance, err := subscriptions.GetMaxUserBalance(common.HexToAddress(cachedUser)) + return err == nil && assert.Equal(t, expectedBalance, actualBalance) + }, testutils.WaitTimeout(t), time.Second).Should(gomega.BeTrue()) +} diff --git a/core/services/gateway/handlers/functions/user_subscriptions.go b/core/services/gateway/handlers/functions/user_subscriptions.go index ff3dd753995..7e63720da71 100644 --- a/core/services/gateway/handlers/functions/user_subscriptions.go +++ b/core/services/gateway/handlers/functions/user_subscriptions.go @@ -12,8 +12,10 @@ import ( // Methods are NOT thread-safe. +var ErrUserHasNoSubscription = errors.New("user has no subscriptions") + type UserSubscriptions interface { - UpdateSubscription(subscriptionId uint64, subscription *functions_router.IFunctionsSubscriptionsSubscription) + UpdateSubscription(subscriptionId uint64, subscription *functions_router.IFunctionsSubscriptionsSubscription) bool GetMaxUserBalance(user common.Address) (*big.Int, error) } @@ -29,29 +31,45 @@ func NewUserSubscriptions() UserSubscriptions { } } -func (us *userSubscriptions) UpdateSubscription(subscriptionId uint64, subscription *functions_router.IFunctionsSubscriptionsSubscription) { +// CachedSubscription is used to populate the user subscription maps from a persistent layer like postgres. +type CachedSubscription struct { + SubscriptionID uint64 + functions_router.IFunctionsSubscriptionsSubscription +} + +// UpdateSubscription updates a subscription returning false in case there was no variation to the current state. +func (us *userSubscriptions) UpdateSubscription(subscriptionId uint64, subscription *functions_router.IFunctionsSubscriptionsSubscription) bool { if subscription == nil || subscription.Owner == utils.ZeroAddress { user, ok := us.subscriptionIdsMap[subscriptionId] - if ok { - delete(us.userSubscriptionsMap[user], subscriptionId) - if len(us.userSubscriptionsMap[user]) == 0 { - delete(us.userSubscriptionsMap, user) - } + if !ok { + return false } + + delete(us.userSubscriptionsMap[user], subscriptionId) delete(us.subscriptionIdsMap, subscriptionId) - } else { - us.subscriptionIdsMap[subscriptionId] = subscription.Owner - if _, ok := us.userSubscriptionsMap[subscription.Owner]; !ok { - us.userSubscriptionsMap[subscription.Owner] = make(map[uint64]*functions_router.IFunctionsSubscriptionsSubscription) + if len(us.userSubscriptionsMap[user]) == 0 { + delete(us.userSubscriptionsMap, user) } - us.userSubscriptionsMap[subscription.Owner][subscriptionId] = subscription + return true + } + + // there is no change to the subscription + if us.userSubscriptionsMap[subscription.Owner][subscriptionId] == subscription { + return false + } + + us.subscriptionIdsMap[subscriptionId] = subscription.Owner + if _, ok := us.userSubscriptionsMap[subscription.Owner]; !ok { + us.userSubscriptionsMap[subscription.Owner] = make(map[uint64]*functions_router.IFunctionsSubscriptionsSubscription) } + us.userSubscriptionsMap[subscription.Owner][subscriptionId] = subscription + return true } func (us *userSubscriptions) GetMaxUserBalance(user common.Address) (*big.Int, error) { subs, exists := us.userSubscriptionsMap[user] if !exists { - return nil, errors.New("user has no subscriptions") + return nil, ErrUserHasNoSubscription } maxBalance := big.NewInt(0) diff --git a/core/services/gateway/handlers/functions/user_subscriptions_test.go b/core/services/gateway/handlers/functions/user_subscriptions_test.go index e86399eb609..53827e07e1b 100644 --- a/core/services/gateway/handlers/functions/user_subscriptions_test.go +++ b/core/services/gateway/handlers/functions/user_subscriptions_test.go @@ -28,18 +28,23 @@ func TestUserSubscriptions(t *testing.T) { user2Balance1 := big.NewInt(50) user2Balance2 := big.NewInt(70) - us.UpdateSubscription(5, &functions_router.IFunctionsSubscriptionsSubscription{ + updated := us.UpdateSubscription(5, &functions_router.IFunctionsSubscriptionsSubscription{ Owner: user1, Balance: user1Balance, }) - us.UpdateSubscription(3, &functions_router.IFunctionsSubscriptionsSubscription{ + assert.True(t, updated) + + updated = us.UpdateSubscription(3, &functions_router.IFunctionsSubscriptionsSubscription{ Owner: user2, Balance: user2Balance1, }) - us.UpdateSubscription(10, &functions_router.IFunctionsSubscriptionsSubscription{ + assert.True(t, updated) + + updated = us.UpdateSubscription(10, &functions_router.IFunctionsSubscriptionsSubscription{ Owner: user2, Balance: user2Balance2, }) + assert.True(t, updated) balance, err := us.GetMaxUserBalance(user1) assert.NoError(t, err) @@ -49,29 +54,96 @@ func TestUserSubscriptions(t *testing.T) { assert.NoError(t, err) assert.Zero(t, balance.Cmp(user2Balance2)) }) +} - t.Run("UpdateSubscription to remove subscriptions", func(t *testing.T) { +func TestUserSubscriptions_UpdateSubscription(t *testing.T) { + t.Parallel() + + t.Run("update balance", func(t *testing.T) { + us := functions.NewUserSubscriptions() + owner := utils.RandomAddress() + + updated := us.UpdateSubscription(1, &functions_router.IFunctionsSubscriptionsSubscription{ + Owner: owner, + Balance: big.NewInt(10), + }) + assert.True(t, updated) + + updated = us.UpdateSubscription(1, &functions_router.IFunctionsSubscriptionsSubscription{ + Owner: owner, + Balance: big.NewInt(100), + }) + assert.True(t, updated) + }) + + t.Run("updated proposed owner", func(t *testing.T) { + us := functions.NewUserSubscriptions() + owner := utils.RandomAddress() + + updated := us.UpdateSubscription(1, &functions_router.IFunctionsSubscriptionsSubscription{ + Owner: owner, + Balance: big.NewInt(10), + }) + assert.True(t, updated) + + updated = us.UpdateSubscription(1, &functions_router.IFunctionsSubscriptionsSubscription{ + Owner: owner, + Balance: big.NewInt(10), + ProposedOwner: utils.RandomAddress(), + }) + assert.True(t, updated) + }) + t.Run("remove subscriptions", func(t *testing.T) { + us := functions.NewUserSubscriptions() user2 := utils.RandomAddress() user2Balance1 := big.NewInt(50) user2Balance2 := big.NewInt(70) - us.UpdateSubscription(3, &functions_router.IFunctionsSubscriptionsSubscription{ + updated := us.UpdateSubscription(3, &functions_router.IFunctionsSubscriptionsSubscription{ Owner: user2, Balance: user2Balance1, }) - us.UpdateSubscription(10, &functions_router.IFunctionsSubscriptionsSubscription{ + assert.True(t, updated) + + updated = us.UpdateSubscription(10, &functions_router.IFunctionsSubscriptionsSubscription{ Owner: user2, Balance: user2Balance2, }) + assert.True(t, updated) - us.UpdateSubscription(3, &functions_router.IFunctionsSubscriptionsSubscription{ + updated = us.UpdateSubscription(3, &functions_router.IFunctionsSubscriptionsSubscription{ Owner: utils.ZeroAddress, }) - us.UpdateSubscription(10, &functions_router.IFunctionsSubscriptionsSubscription{ + assert.True(t, updated) + + updated = us.UpdateSubscription(10, &functions_router.IFunctionsSubscriptionsSubscription{ Owner: utils.ZeroAddress, }) + assert.True(t, updated) _, err := us.GetMaxUserBalance(user2) assert.Error(t, err) }) + + t.Run("remove a non existing subscription", func(t *testing.T) { + us := functions.NewUserSubscriptions() + updated := us.UpdateSubscription(3, &functions_router.IFunctionsSubscriptionsSubscription{ + Owner: utils.ZeroAddress, + }) + assert.False(t, updated) + }) + + t.Run("no actual changes", func(t *testing.T) { + us := functions.NewUserSubscriptions() + subscription := &functions_router.IFunctionsSubscriptionsSubscription{ + Owner: utils.RandomAddress(), + Balance: big.NewInt(25), + BlockedBalance: big.NewInt(25), + } + updated := us.UpdateSubscription(5, subscription) + assert.True(t, updated) + + updated = us.UpdateSubscription(5, subscription) + assert.False(t, updated) + }) } diff --git a/core/services/gateway/integration_tests/gateway_integration_test.go b/core/services/gateway/integration_tests/gateway_integration_test.go index 415a8f67cf8..9e4900efeee 100644 --- a/core/services/gateway/integration_tests/gateway_integration_test.go +++ b/core/services/gateway/integration_tests/gateway_integration_test.go @@ -118,7 +118,7 @@ func TestIntegration_Gateway_NoFullNodes_BasicConnectionAndMessage(t *testing.T) // Launch Gateway lggr := logger.TestLogger(t) gatewayConfig := fmt.Sprintf(gatewayConfigTemplate, nodeKeys.Address) - gateway, err := gateway.NewGatewayFromConfig(parseGatewayConfig(t, gatewayConfig), gateway.NewHandlerFactory(nil, lggr), lggr) + gateway, err := gateway.NewGatewayFromConfig(parseGatewayConfig(t, gatewayConfig), gateway.NewHandlerFactory(nil, nil, nil, lggr), lggr) require.NoError(t, err) servicetest.Run(t, gateway) userPort, nodePort := gateway.GetUserPort(), gateway.GetNodePort() diff --git a/core/services/ocr2/plugins/functions/plugin.go b/core/services/ocr2/plugins/functions/plugin.go index 7e2b15bdccf..c6cfa946aba 100644 --- a/core/services/ocr2/plugins/functions/plugin.go +++ b/core/services/ocr2/plugins/functions/plugin.go @@ -143,7 +143,11 @@ func NewFunctionsServices(functionsOracleArgs, thresholdOracleArgs, s4OracleArgs if err2 != nil { return nil, errors.Wrap(err, "failed to create a RateLimiter") } - subscriptions, err2 := gwFunctions.NewOnchainSubscriptions(conf.Chain.Client(), *pluginConfig.OnchainSubscriptions, conf.Logger) + gwFunctionsORM, err := gwFunctions.NewORM(conf.DB, conf.Logger, conf.QConfig, pluginConfig.OnchainSubscriptions.ContractAddress) + if err != nil { + return nil, errors.Wrap(err, "failed to create functions ORM") + } + subscriptions, err2 := gwFunctions.NewOnchainSubscriptions(conf.Chain.Client(), *pluginConfig.OnchainSubscriptions, gwFunctionsORM, conf.Logger) if err2 != nil { return nil, errors.Wrap(err, "failed to create a OnchainSubscriptions") } diff --git a/core/store/migrate/migrations/0215_functions_subscriptions.sql b/core/store/migrate/migrations/0215_functions_subscriptions.sql new file mode 100644 index 00000000000..c3859d42f63 --- /dev/null +++ b/core/store/migrate/migrations/0215_functions_subscriptions.sql @@ -0,0 +1,19 @@ +-- +goose Up +-- +goose StatementBegin +CREATE TABLE functions_subscriptions( + router_contract_address bytea, + subscription_id bigint, + owner bytea CHECK (octet_length(owner) = 20) NOT NULL, + balance bigint, + blocked_balance bigint, + proposed_owner bytea, + consumers bytea[], + flags bytea, + PRIMARY KEY(router_contract_address, subscription_id) +); +-- +goose StatementEnd + +-- +goose Down +-- +goose StatementBegin +DROP TABLE IF EXISTS functions_subscriptions; +-- +goose StatementEnd