diff --git a/core/services/workflows/syncer/orm.go b/core/services/workflows/syncer/orm.go index bd0501795e6..c04b1864923 100644 --- a/core/services/workflows/syncer/orm.go +++ b/core/services/workflows/syncer/orm.go @@ -210,65 +210,73 @@ func (orm *orm) GetSecretsURLHash(owner, secretsURL []byte) ([]byte, error) { func (orm *orm) UpsertWorkflowSpec(ctx context.Context, spec *job.WorkflowSpec) (int64, error) { var id int64 + err := sqlutil.TransactDataSource(ctx, orm.ds, nil, func(tx sqlutil.DataSource) error { + txErr := tx.QueryRowxContext( + ctx, + `DELETE FROM workflow_specs WHERE workflow_owner = $1 AND workflow_name = $2 AND workflow_id != $3`, + spec.WorkflowOwner, + spec.WorkflowName, + spec.WorkflowID, + ).Scan(nil) + if txErr != nil && !errors.Is(txErr, sql.ErrNoRows) { + return fmt.Errorf("failed to clean up previous workflow specs: %w", txErr) + } - query := ` - INSERT INTO workflow_specs ( - workflow, - config, - workflow_id, - workflow_owner, - workflow_name, - status, - binary_url, - config_url, - secrets_id, - created_at, - updated_at, - spec_type - ) VALUES ( - :workflow, - :config, - :workflow_id, - :workflow_owner, - :workflow_name, - :status, - :binary_url, - :config_url, - :secrets_id, - :created_at, - :updated_at, - :spec_type - ) ON CONFLICT (workflow_owner, workflow_name) DO UPDATE - SET - workflow = EXCLUDED.workflow, - config = EXCLUDED.config, - workflow_id = EXCLUDED.workflow_id, - workflow_owner = EXCLUDED.workflow_owner, - workflow_name = EXCLUDED.workflow_name, - status = EXCLUDED.status, - binary_url = EXCLUDED.binary_url, - config_url = EXCLUDED.config_url, - secrets_id = EXCLUDED.secrets_id, - created_at = EXCLUDED.created_at, - updated_at = EXCLUDED.updated_at, - spec_type = EXCLUDED.spec_type - RETURNING id - ` - - stmt, err := orm.ds.PrepareNamedContext(ctx, query) - if err != nil { - return 0, err - } - defer stmt.Close() + query := ` + INSERT INTO workflow_specs ( + workflow, + config, + workflow_id, + workflow_owner, + workflow_name, + status, + binary_url, + config_url, + secrets_id, + created_at, + updated_at, + spec_type + ) VALUES ( + :workflow, + :config, + :workflow_id, + :workflow_owner, + :workflow_name, + :status, + :binary_url, + :config_url, + :secrets_id, + :created_at, + :updated_at, + :spec_type + ) ON CONFLICT (workflow_owner, workflow_name) DO UPDATE + SET + workflow = EXCLUDED.workflow, + config = EXCLUDED.config, + workflow_id = EXCLUDED.workflow_id, + workflow_owner = EXCLUDED.workflow_owner, + workflow_name = EXCLUDED.workflow_name, + status = EXCLUDED.status, + binary_url = EXCLUDED.binary_url, + config_url = EXCLUDED.config_url, + secrets_id = EXCLUDED.secrets_id, + created_at = EXCLUDED.created_at, + updated_at = EXCLUDED.updated_at, + spec_type = EXCLUDED.spec_type + RETURNING id + ` - spec.UpdatedAt = time.Now() - err = stmt.QueryRowxContext(ctx, spec).Scan(&id) + stmt, err := orm.ds.PrepareNamedContext(ctx, query) + if err != nil { + return err + } + defer stmt.Close() - if err != nil { - return 0, err - } + spec.UpdatedAt = time.Now() + return stmt.QueryRowxContext(ctx, spec).Scan(&id) + }) - return id, nil + return id, err } func (orm *orm) UpsertWorkflowSpecWithSecrets( @@ -293,6 +301,17 @@ func (orm *orm) UpsertWorkflowSpecWithSecrets( return fmt.Errorf("failed to create workflow secrets: %w", txErr) } + txErr = tx.QueryRowxContext( + ctx, + `DELETE FROM workflow_specs WHERE workflow_owner = $1 AND workflow_name = $2 AND workflow_id != $3`, + spec.WorkflowOwner, + spec.WorkflowName, + spec.WorkflowID, + ).Scan(nil) + if txErr != nil && !errors.Is(txErr, sql.ErrNoRows) { + return fmt.Errorf("failed to clean up previous workflow specs: %w", txErr) + } + spec.SecretsID = sql.NullInt64{Int64: sid, Valid: true} query := ` @@ -335,10 +354,7 @@ func (orm *orm) UpsertWorkflowSpecWithSecrets( created_at = EXCLUDED.created_at, updated_at = EXCLUDED.updated_at, spec_type = EXCLUDED.spec_type, - secrets_id = CASE - WHEN workflow_specs.secrets_id IS NULL THEN EXCLUDED.secrets_id - ELSE workflow_specs.secrets_id - END + secrets_id = EXCLUDED.secrets_id RETURNING id ` diff --git a/core/services/workflows/syncer/orm_test.go b/core/services/workflows/syncer/orm_test.go index a94233e78a1..b6a705752bc 100644 --- a/core/services/workflows/syncer/orm_test.go +++ b/core/services/workflows/syncer/orm_test.go @@ -6,6 +6,8 @@ import ( "testing" "time" + "github.com/google/uuid" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" "github.com/smartcontractkit/chainlink/v2/core/logger" @@ -372,4 +374,44 @@ func Test_UpsertWorkflowSpecWithSecrets(t *testing.T) { require.NoError(t, err) require.Equal(t, "new contents", contents) }) + + t.Run("updates existing spec and secrets if spec has executions", func(t *testing.T) { + giveURL := "https://example.com" + giveBytes, err := crypto.Keccak256([]byte(giveURL)) + require.NoError(t, err) + giveHash := hex.EncodeToString(giveBytes) + giveContent := "some contents" + + spec := &job.WorkflowSpec{ + Workflow: "test_workflow", + Config: "test_config", + WorkflowID: "cid-123", + WorkflowOwner: "owner-123", + WorkflowName: "Test Workflow", + Status: job.WorkflowSpecStatusActive, + BinaryURL: "http://example.com/binary", + ConfigURL: "http://example.com/config", + CreatedAt: time.Now(), + SpecType: job.WASMFile, + } + + _, err = orm.UpsertWorkflowSpecWithSecrets(ctx, spec, giveURL, giveHash, giveContent) + require.NoError(t, err) + + _, err = db.ExecContext( + ctx, + `INSERT INTO workflow_executions (id, workflow_id, status, created_at) VALUES ($1, $2, $3, $4)`, + uuid.New().String(), + "cid-123", + "started", + time.Now(), + ) + require.NoError(t, err) + + // Update the status + spec.WorkflowID = "cid-456" + + _, err = orm.UpsertWorkflowSpecWithSecrets(ctx, spec, giveURL, giveHash, "new contents") + require.NoError(t, err) + }) } diff --git a/core/services/workflows/syncer/workflow_registry.go b/core/services/workflows/syncer/workflow_registry.go index 223fbe8e758..b80eab03b4a 100644 --- a/core/services/workflows/syncer/workflow_registry.go +++ b/core/services/workflows/syncer/workflow_registry.go @@ -333,7 +333,7 @@ func (w *workflowRegistry) readRegistryEvents(ctx context.Context, reader Contra for _, event := range events { err := w.handler.Handle(ctx, event.Event) if err != nil { - w.lggr.Errorw("failed to handle event", "err", err) + w.lggr.Errorw("failed to handle event", "err", err, "type", event.Event.EventType) } } } diff --git a/core/store/migrate/migrations/0259_add_workflow_secrets.sql b/core/store/migrate/migrations/0259_add_workflow_secrets.sql index fb76d945571..420f7ed6e49 100644 --- a/core/store/migrate/migrations/0259_add_workflow_secrets.sql +++ b/core/store/migrate/migrations/0259_add_workflow_secrets.sql @@ -38,4 +38,5 @@ DROP INDEX IF EXISTS idx_secrets_url_hash; -- Drop the workflow_artifacts table DROP TABLE IF EXISTS workflow_secrets; --- +goose StatementEnd \ No newline at end of file +-- +goose StatementEnd + diff --git a/core/store/migrate/migrations/0261_remove_unique_constraint_secrets.sql b/core/store/migrate/migrations/0261_remove_unique_constraint_secrets.sql new file mode 100644 index 00000000000..15f59d8ae28 --- /dev/null +++ b/core/store/migrate/migrations/0261_remove_unique_constraint_secrets.sql @@ -0,0 +1,10 @@ +-- +goose Up +-- +goose StatementBegin +-- unique constraint on workflow_owner and workflow_name +ALTER TABLE workflow_specs DROP CONSTRAINT workflow_specs_secrets_id_key; +-- +goose StatementEnd + +-- +goose Down +-- +goose StatementBegin +ALTER TABLE workflow_specs ADD CONSTRAINT workflow_specs_secrets_id_key unique (secrets_id); +-- +goose StatementEnd