From 9d2a67d4e07c53cd503d6eaccb5bc9e7f821cd7c Mon Sep 17 00:00:00 2001 From: Michael Street <5597260+MStreet3@users.noreply.github.com> Date: Thu, 21 Nov 2024 11:05:49 +0200 Subject: [PATCH] feat(workflows): adds orm methods for managing specs --- core/services/job/models.go | 26 +-- core/services/workflows/syncer/handler.go | 19 ++- core/services/workflows/syncer/mocks/orm.go | 108 +++++++++++++ core/services/workflows/syncer/orm.go | 93 ++++++++++- core/services/workflows/syncer/orm_test.go | 151 ++++++++++++++++++ .../0260_add_status_workflow_spec.sql | 9 ++ 6 files changed, 395 insertions(+), 11 deletions(-) create mode 100644 core/store/migrate/migrations/0260_add_status_workflow_spec.sql diff --git a/core/services/job/models.go b/core/services/job/models.go index 423a297c8da..91108f7304e 100644 --- a/core/services/job/models.go +++ b/core/services/job/models.go @@ -869,21 +869,29 @@ const ( DefaultSpecType = "" ) +type WorkflowSpecStatus uint8 + +const ( + WorkflowSpecStatusActive WorkflowSpecStatus = iota + WorkflowSpecStatusPaused +) + type WorkflowSpec struct { ID int32 `toml:"-"` Workflow string `toml:"workflow"` // the raw representation of the workflow Config string `toml:"config" db:"config"` // the raw representation of the config // fields derived from the yaml spec, used for indexing the database // note: i tried to make these private, but translating them to the database seems to require them to be public - WorkflowID string `toml:"-" db:"workflow_id"` // Derived. Do not modify. the CID of the workflow. - WorkflowOwner string `toml:"-" db:"workflow_owner"` // Derived. Do not modify. the owner of the workflow. - WorkflowName string `toml:"-" db:"workflow_name"` // Derived. Do not modify. the name of the workflow. - BinaryURL string `db:"binary_url"` - ConfigURL string `db:"config_url"` - SecretsID sql.NullInt64 `db:"secrets_id"` - CreatedAt time.Time `toml:"-"` - UpdatedAt time.Time `toml:"-"` - SpecType WorkflowSpecType `toml:"spec_type" db:"spec_type"` + WorkflowID string `toml:"-" db:"workflow_id"` // Derived. Do not modify. the CID of the workflow. + WorkflowOwner string `toml:"-" db:"workflow_owner"` // Derived. Do not modify. the owner of the workflow. + WorkflowName string `toml:"-" db:"workflow_name"` // Derived. Do not modify. the name of the workflow. + Status WorkflowSpecStatus `db:"status"` + BinaryURL string `db:"binary_url"` + ConfigURL string `db:"config_url"` + SecretsID sql.NullInt64 `db:"secrets_id"` + CreatedAt time.Time `toml:"-"` + UpdatedAt time.Time `toml:"-"` + SpecType WorkflowSpecType `toml:"spec_type" db:"spec_type"` sdkWorkflow *sdk.WorkflowSpec rawSpec []byte config []byte diff --git a/core/services/workflows/syncer/handler.go b/core/services/workflows/syncer/handler.go index f5e2f3aea9b..6ac2a5ba2fa 100644 --- a/core/services/workflows/syncer/handler.go +++ b/core/services/workflows/syncer/handler.go @@ -138,8 +138,25 @@ func (h *eventHandler) Handle(ctx context.Context, event WorkflowRegistryEvent) // TODO: Implement this method func (h *eventHandler) workflowRegisteredEvent( _ context.Context, - _ WorkflowRegistryEvent, + event WorkflowRegistryEvent, ) error { + payload, ok := event.Data.(WorkflowRegistryWorkflowRegisteredV1) + if !ok { + return fmt.Errorf("invalid data type %T for event", event.Data) + } + + _ = payload + + // Pre-check: verify that the workflowID matches; if it doesn’t abort and log an error via Beholder. + + // Download the contents of binaryURL, configURL and secretsURL and cache them locally. + + // Create a new entry in the workflow_spec table corresponding for the new workflow, with the contents of the binaryURL + configURL in the table + + // If status == active, start a new WorkflowEngine instance, and add it to local map of instances. + + // Spin off task to refresh the secretsURL periodically. + return ErrNotImplemented } diff --git a/core/services/workflows/syncer/mocks/orm.go b/core/services/workflows/syncer/mocks/orm.go index 19c459fa0ee..cf26f1c7260 100644 --- a/core/services/workflows/syncer/mocks/orm.go +++ b/core/services/workflows/syncer/mocks/orm.go @@ -138,6 +138,54 @@ func (_c *ORM_CreateWorkflowSpec_Call) RunAndReturn(run func(context.Context, *j return _c } +// DeleteWorkflowSpec provides a mock function with given fields: ctx, owner, name +func (_m *ORM) DeleteWorkflowSpec(ctx context.Context, owner string, name string) error { + ret := _m.Called(ctx, owner, name) + + if len(ret) == 0 { + panic("no return value specified for DeleteWorkflowSpec") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string, string) error); ok { + r0 = rf(ctx, owner, name) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// ORM_DeleteWorkflowSpec_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DeleteWorkflowSpec' +type ORM_DeleteWorkflowSpec_Call struct { + *mock.Call +} + +// DeleteWorkflowSpec is a helper method to define mock.On call +// - ctx context.Context +// - owner string +// - name string +func (_e *ORM_Expecter) DeleteWorkflowSpec(ctx interface{}, owner interface{}, name interface{}) *ORM_DeleteWorkflowSpec_Call { + return &ORM_DeleteWorkflowSpec_Call{Call: _e.mock.On("DeleteWorkflowSpec", ctx, owner, name)} +} + +func (_c *ORM_DeleteWorkflowSpec_Call) Run(run func(ctx context.Context, owner string, name string)) *ORM_DeleteWorkflowSpec_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string), args[2].(string)) + }) + return _c +} + +func (_c *ORM_DeleteWorkflowSpec_Call) Return(_a0 error) *ORM_DeleteWorkflowSpec_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *ORM_DeleteWorkflowSpec_Call) RunAndReturn(run func(context.Context, string, string) error) *ORM_DeleteWorkflowSpec_Call { + _c.Call.Return(run) + return _c +} + // GetContents provides a mock function with given fields: ctx, url func (_m *ORM) GetContents(ctx context.Context, url string) (string, error) { ret := _m.Called(ctx, url) @@ -425,6 +473,66 @@ func (_c *ORM_GetSecretsURLHash_Call) RunAndReturn(run func([]byte, []byte) ([]b return _c } +// GetWorkflowSpec provides a mock function with given fields: ctx, owner, name +func (_m *ORM) GetWorkflowSpec(ctx context.Context, owner string, name string) (*job.WorkflowSpec, error) { + ret := _m.Called(ctx, owner, name) + + if len(ret) == 0 { + panic("no return value specified for GetWorkflowSpec") + } + + var r0 *job.WorkflowSpec + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string, string) (*job.WorkflowSpec, error)); ok { + return rf(ctx, owner, name) + } + if rf, ok := ret.Get(0).(func(context.Context, string, string) *job.WorkflowSpec); ok { + r0 = rf(ctx, owner, name) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*job.WorkflowSpec) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, string, string) error); ok { + r1 = rf(ctx, owner, name) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// ORM_GetWorkflowSpec_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetWorkflowSpec' +type ORM_GetWorkflowSpec_Call struct { + *mock.Call +} + +// GetWorkflowSpec is a helper method to define mock.On call +// - ctx context.Context +// - owner string +// - name string +func (_e *ORM_Expecter) GetWorkflowSpec(ctx interface{}, owner interface{}, name interface{}) *ORM_GetWorkflowSpec_Call { + return &ORM_GetWorkflowSpec_Call{Call: _e.mock.On("GetWorkflowSpec", ctx, owner, name)} +} + +func (_c *ORM_GetWorkflowSpec_Call) Run(run func(ctx context.Context, owner string, name string)) *ORM_GetWorkflowSpec_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string), args[2].(string)) + }) + return _c +} + +func (_c *ORM_GetWorkflowSpec_Call) Return(_a0 *job.WorkflowSpec, _a1 error) *ORM_GetWorkflowSpec_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *ORM_GetWorkflowSpec_Call) RunAndReturn(run func(context.Context, string, string) (*job.WorkflowSpec, error)) *ORM_GetWorkflowSpec_Call { + _c.Call.Return(run) + return _c +} + // Update provides a mock function with given fields: ctx, secretsURL, contents func (_m *ORM) Update(ctx context.Context, secretsURL string, contents string) (int64, error) { ret := _m.Called(ctx, secretsURL, contents) diff --git a/core/services/workflows/syncer/orm.go b/core/services/workflows/syncer/orm.go index 9bf68c88782..27c255a648f 100644 --- a/core/services/workflows/syncer/orm.go +++ b/core/services/workflows/syncer/orm.go @@ -2,7 +2,9 @@ package syncer import ( "context" + "database/sql" "errors" + "time" "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" "github.com/smartcontractkit/chainlink/v2/core/logger" @@ -34,6 +36,8 @@ type WorkflowSecretsDS interface { type WorkflowSpecsDS interface { CreateWorkflowSpec(ctx context.Context, spec *job.WorkflowSpec) (int64, error) + GetWorkflowSpec(ctx context.Context, owner, name string) (*job.WorkflowSpec, error) + DeleteWorkflowSpec(ctx context.Context, owner, name string) error } type ORM interface { @@ -150,5 +154,92 @@ func (orm *orm) GetSecretsURLHash(owner, secretsURL []byte) ([]byte, error) { } func (orm *orm) CreateWorkflowSpec(ctx context.Context, spec *job.WorkflowSpec) (int64, error) { - return 0, errors.New("not implemented") + var id int64 + + query := ` + INSERT INTO workflow_specs ( + workflow, + config, + workflow_id, + workflow_owner, + workflow_name, + status, + binary_url, + config_url, + secrets_id, + created_at, + updated_at, + spec_type + ) VALUES ( + :workflow, + :config, + :workflow_id, + :workflow_owner, + :workflow_name, + :status, + :binary_url, + :config_url, + :secrets_id, + :created_at, + :updated_at, + :spec_type + ) RETURNING id + ` + + stmt, err := orm.ds.PrepareNamedContext(ctx, query) + if err != nil { + return 0, err + } + defer stmt.Close() + + spec.UpdatedAt = time.Now() + err = stmt.QueryRowxContext(ctx, spec).Scan(&id) + + if err != nil { + return 0, err + } + + return id, nil +} + +func (orm *orm) GetWorkflowSpec(ctx context.Context, owner, name string) (*job.WorkflowSpec, error) { + query := ` + SELECT * + FROM workflow_specs + WHERE workflow_owner = $1 AND workflow_name = $2 + ` + + var spec job.WorkflowSpec + err := orm.ds.GetContext(ctx, &spec, query, owner, name) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil, nil // No spec found + } + return nil, err + } + + return &spec, nil +} + +func (orm *orm) DeleteWorkflowSpec(ctx context.Context, owner, name string) error { + query := ` + DELETE FROM workflow_specs + WHERE workflow_owner = $1 AND workflow_name = $2 + ` + + result, err := orm.ds.ExecContext(ctx, query, owner, name) + if err != nil { + return err + } + + rowsAffected, err := result.RowsAffected() + if err != nil { + return err + } + + if rowsAffected == 0 { + return sql.ErrNoRows // No spec deleted + } + + return nil } diff --git a/core/services/workflows/syncer/orm_test.go b/core/services/workflows/syncer/orm_test.go index 8b9f685bb52..e7c990cee87 100644 --- a/core/services/workflows/syncer/orm_test.go +++ b/core/services/workflows/syncer/orm_test.go @@ -1,12 +1,15 @@ package syncer import ( + "database/sql" "encoding/hex" "testing" + "time" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/job" "github.com/smartcontractkit/chainlink/v2/core/utils/crypto" "github.com/stretchr/testify/assert" @@ -51,3 +54,151 @@ func TestWorkflowArtifactsORM_GetAndUpdate(t *testing.T) { require.NoError(t, err) assert.Equal(t, "new contents", contents) } + +func Test_CreateWorkflowSpec(t *testing.T) { + db := pgtest.NewSqlxDB(t) + ctx := testutils.Context(t) + lggr := logger.TestLogger(t) + orm := &orm{ds: db, lggr: lggr} + + t.Run("creates a workflow spec without workflow secrets", func(t *testing.T) { + spec := &job.WorkflowSpec{ + Workflow: "test_workflow", + Config: "test_config", + WorkflowID: "cid-123", + WorkflowOwner: "owner-123", + WorkflowName: "Test Workflow", + Status: job.WorkflowSpecStatusActive, + BinaryURL: "http://example.com/binary", + ConfigURL: "http://example.com/config", + CreatedAt: time.Now(), + SpecType: job.WASMFile, + } + + id, err := orm.CreateWorkflowSpec(ctx, spec) + require.NoError(t, err) + require.NotZero(t, id) + + // Verify the record exists in the database + var dbSpec job.WorkflowSpec + err = db.Get(&dbSpec, `SELECT * FROM workflow_specs WHERE id = $1`, id) + require.NoError(t, err) + require.Equal(t, spec.Workflow, dbSpec.Workflow) + }) + + t.Run("creates a workflow spec with workflow secrets", func(t *testing.T) { + giveURL := "https://example.com" + giveBytes, err := crypto.Keccak256([]byte(giveURL)) + require.NoError(t, err) + giveHash := hex.EncodeToString(giveBytes) + giveContent := "some contents" + + gotID, err := orm.Create(ctx, giveURL, giveHash, giveContent) + require.NoError(t, err) + + spec := &job.WorkflowSpec{ + Workflow: "test_workflow", + Config: "test_config", + WorkflowID: "cid-456", + WorkflowOwner: "owner-123", + WorkflowName: "Test Workflow with Secrets", + SecretsID: sql.NullInt64{Int64: gotID, Valid: true}, + Status: job.WorkflowSpecStatusActive, + BinaryURL: "http://example.com/binary", + ConfigURL: "http://example.com/config", + CreatedAt: time.Now(), + SpecType: job.WASMFile, + } + + id, err := orm.CreateWorkflowSpec(ctx, spec) + require.NoError(t, err) + require.NotZero(t, id) + + // Verify the record exists in the database + var dbSpec job.WorkflowSpec + err = db.Get(&dbSpec, `SELECT * FROM workflow_specs WHERE id = $1`, id) + require.NoError(t, err) + require.Equal(t, spec.Workflow, dbSpec.Workflow) + require.Equal(t, gotID, dbSpec.SecretsID.Int64) + }) +} + +func Test_DeleteWorkflowSpec(t *testing.T) { + db := pgtest.NewSqlxDB(t) + ctx := testutils.Context(t) + lggr := logger.TestLogger(t) + orm := &orm{ds: db, lggr: lggr} + + t.Run("deletes a workflow spec", func(t *testing.T) { + spec := &job.WorkflowSpec{ + Workflow: "test_workflow", + Config: "test_config", + WorkflowID: "cid-123", + WorkflowOwner: "owner-123", + WorkflowName: "Test Workflow", + Status: job.WorkflowSpecStatusActive, + BinaryURL: "http://example.com/binary", + ConfigURL: "http://example.com/config", + CreatedAt: time.Now(), + SpecType: job.WASMFile, + } + + id, err := orm.CreateWorkflowSpec(ctx, spec) + require.NoError(t, err) + require.NotZero(t, id) + + err = orm.DeleteWorkflowSpec(ctx, spec.WorkflowOwner, spec.WorkflowName) + require.NoError(t, err) + + // Verify the record is deleted from the database + var dbSpec job.WorkflowSpec + err = db.Get(&dbSpec, `SELECT * FROM workflow_specs WHERE id = $1`, id) + require.Error(t, err) + require.Equal(t, sql.ErrNoRows, err) + }) + + t.Run("fails if no workflow spec exists", func(t *testing.T) { + err := orm.DeleteWorkflowSpec(ctx, "owner-123", "Test Workflow") + require.Error(t, err) + require.Equal(t, sql.ErrNoRows, err) + }) +} + +func Test_GetWorkflowSpec(t *testing.T) { + db := pgtest.NewSqlxDB(t) + ctx := testutils.Context(t) + lggr := logger.TestLogger(t) + orm := &orm{ds: db, lggr: lggr} + + t.Run("gets a workflow spec", func(t *testing.T) { + spec := &job.WorkflowSpec{ + Workflow: "test_workflow", + Config: "test_config", + WorkflowID: "cid-123", + WorkflowOwner: "owner-123", + WorkflowName: "Test Workflow", + Status: job.WorkflowSpecStatusActive, + BinaryURL: "http://example.com/binary", + ConfigURL: "http://example.com/config", + CreatedAt: time.Now(), + SpecType: job.WASMFile, + } + + id, err := orm.CreateWorkflowSpec(ctx, spec) + require.NoError(t, err) + require.NotZero(t, id) + + dbSpec, err := orm.GetWorkflowSpec(ctx, spec.WorkflowOwner, spec.WorkflowName) + require.NoError(t, err) + require.Equal(t, spec.Workflow, dbSpec.Workflow) + + err = orm.DeleteWorkflowSpec(ctx, spec.WorkflowOwner, spec.WorkflowName) + require.NoError(t, err) + }) + + t.Run("fails if no workflow spec exists", func(t *testing.T) { + dbSpec, err := orm.GetWorkflowSpec(ctx, "owner-123", "Test Workflow") + require.NoError(t, err) + require.Nil(t, dbSpec) + }) +} diff --git a/core/store/migrate/migrations/0260_add_status_workflow_spec.sql b/core/store/migrate/migrations/0260_add_status_workflow_spec.sql new file mode 100644 index 00000000000..1b70eac8537 --- /dev/null +++ b/core/store/migrate/migrations/0260_add_status_workflow_spec.sql @@ -0,0 +1,9 @@ +-- +goose Up +-- Add a `status` column to the `workflow_specs` table. +ALTER TABLE workflow_specs +ADD COLUMN status smallint DEFAULT 0 NOT NULL; + +-- +goose Down +-- Remove the `status` column from the `workflow_specs` table. +ALTER TABLE workflow_specs +DROP COLUMN status; \ No newline at end of file