Skip to content

Commit

Permalink
[CAPPL-394] Some database changes
Browse files Browse the repository at this point in the history
- Allow multiple workflows to reference the same secrets file
- Change workflow spec upsert methods so it deletes old workflows
  • Loading branch information
cedric-cordenier committed Dec 13, 2024
1 parent 8f1b956 commit 479f15f
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 61 deletions.
134 changes: 75 additions & 59 deletions core/services/workflows/syncer/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,65 +210,73 @@ func (orm *orm) GetSecretsURLHash(owner, secretsURL []byte) ([]byte, error) {

func (orm *orm) UpsertWorkflowSpec(ctx context.Context, spec *job.WorkflowSpec) (int64, error) {
var id int64
err := sqlutil.TransactDataSource(ctx, orm.ds, nil, func(tx sqlutil.DataSource) error {
txErr := tx.QueryRowxContext(
ctx,
`DELETE FROM workflow_specs WHERE workflow_owner = $1 AND workflow_name = $2 AND workflow_id != $3`,
spec.WorkflowOwner,
spec.WorkflowName,
spec.WorkflowID,
).Scan(nil)
if txErr != nil && !errors.Is(txErr, sql.ErrNoRows) {
return fmt.Errorf("failed to clean up previous workflow specs: %w", txErr)
}

query := `
INSERT INTO workflow_specs (
workflow,
config,
workflow_id,
workflow_owner,
workflow_name,
status,
binary_url,
config_url,
secrets_id,
created_at,
updated_at,
spec_type
) VALUES (
:workflow,
:config,
:workflow_id,
:workflow_owner,
:workflow_name,
:status,
:binary_url,
:config_url,
:secrets_id,
:created_at,
:updated_at,
:spec_type
) ON CONFLICT (workflow_owner, workflow_name) DO UPDATE
SET
workflow = EXCLUDED.workflow,
config = EXCLUDED.config,
workflow_id = EXCLUDED.workflow_id,
workflow_owner = EXCLUDED.workflow_owner,
workflow_name = EXCLUDED.workflow_name,
status = EXCLUDED.status,
binary_url = EXCLUDED.binary_url,
config_url = EXCLUDED.config_url,
secrets_id = EXCLUDED.secrets_id,
created_at = EXCLUDED.created_at,
updated_at = EXCLUDED.updated_at,
spec_type = EXCLUDED.spec_type
RETURNING id
`

stmt, err := orm.ds.PrepareNamedContext(ctx, query)
if err != nil {
return 0, err
}
defer stmt.Close()
query := `
INSERT INTO workflow_specs (
workflow,
config,
workflow_id,
workflow_owner,
workflow_name,
status,
binary_url,
config_url,
secrets_id,
created_at,
updated_at,
spec_type
) VALUES (
:workflow,
:config,
:workflow_id,
:workflow_owner,
:workflow_name,
:status,
:binary_url,
:config_url,
:secrets_id,
:created_at,
:updated_at,
:spec_type
) ON CONFLICT (workflow_owner, workflow_name) DO UPDATE
SET
workflow = EXCLUDED.workflow,
config = EXCLUDED.config,
workflow_id = EXCLUDED.workflow_id,
workflow_owner = EXCLUDED.workflow_owner,
workflow_name = EXCLUDED.workflow_name,
status = EXCLUDED.status,
binary_url = EXCLUDED.binary_url,
config_url = EXCLUDED.config_url,
secrets_id = EXCLUDED.secrets_id,
created_at = EXCLUDED.created_at,
updated_at = EXCLUDED.updated_at,
spec_type = EXCLUDED.spec_type
RETURNING id
`

spec.UpdatedAt = time.Now()
err = stmt.QueryRowxContext(ctx, spec).Scan(&id)
stmt, err := orm.ds.PrepareNamedContext(ctx, query)
if err != nil {
return err
}
defer stmt.Close()

if err != nil {
return 0, err
}
spec.UpdatedAt = time.Now()
return stmt.QueryRowxContext(ctx, spec).Scan(&id)
})

return id, nil
return id, err
}

func (orm *orm) UpsertWorkflowSpecWithSecrets(
Expand All @@ -293,6 +301,17 @@ func (orm *orm) UpsertWorkflowSpecWithSecrets(
return fmt.Errorf("failed to create workflow secrets: %w", txErr)
}

txErr = tx.QueryRowxContext(
ctx,
`DELETE FROM workflow_specs WHERE workflow_owner = $1 AND workflow_name = $2 AND workflow_id != $3`,
spec.WorkflowOwner,
spec.WorkflowName,
spec.WorkflowID,
).Scan(nil)
if txErr != nil && !errors.Is(txErr, sql.ErrNoRows) {
return fmt.Errorf("failed to clean up previous workflow specs: %w", txErr)
}

spec.SecretsID = sql.NullInt64{Int64: sid, Valid: true}

query := `
Expand Down Expand Up @@ -335,10 +354,7 @@ func (orm *orm) UpsertWorkflowSpecWithSecrets(
created_at = EXCLUDED.created_at,
updated_at = EXCLUDED.updated_at,
spec_type = EXCLUDED.spec_type,
secrets_id = CASE
WHEN workflow_specs.secrets_id IS NULL THEN EXCLUDED.secrets_id
ELSE workflow_specs.secrets_id
END
secrets_id = EXCLUDED.secrets_id
RETURNING id
`

Expand Down
42 changes: 42 additions & 0 deletions core/services/workflows/syncer/orm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"testing"
"time"

"github.com/google/uuid"

"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
"github.com/smartcontractkit/chainlink/v2/core/logger"
Expand Down Expand Up @@ -372,4 +374,44 @@ func Test_UpsertWorkflowSpecWithSecrets(t *testing.T) {
require.NoError(t, err)
require.Equal(t, "new contents", contents)
})

t.Run("updates existing spec and secrets if spec has executions", func(t *testing.T) {
giveURL := "https://example.com"
giveBytes, err := crypto.Keccak256([]byte(giveURL))
require.NoError(t, err)
giveHash := hex.EncodeToString(giveBytes)
giveContent := "some contents"

spec := &job.WorkflowSpec{
Workflow: "test_workflow",
Config: "test_config",
WorkflowID: "cid-123",
WorkflowOwner: "owner-123",
WorkflowName: "Test Workflow",
Status: job.WorkflowSpecStatusActive,
BinaryURL: "http://example.com/binary",
ConfigURL: "http://example.com/config",
CreatedAt: time.Now(),
SpecType: job.WASMFile,
}

_, err = orm.UpsertWorkflowSpecWithSecrets(ctx, spec, giveURL, giveHash, giveContent)
require.NoError(t, err)

_, err = db.ExecContext(
ctx,
`INSERT INTO workflow_executions (id, workflow_id, status, created_at) VALUES ($1, $2, $3, $4)`,
uuid.New().String(),
"cid-123",
"started",
time.Now(),
)
require.NoError(t, err)

// Update the status
spec.WorkflowID = "cid-456"

_, err = orm.UpsertWorkflowSpecWithSecrets(ctx, spec, giveURL, giveHash, "new contents")
require.NoError(t, err)
})
}
2 changes: 1 addition & 1 deletion core/services/workflows/syncer/workflow_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ func (w *workflowRegistry) readRegistryEvents(ctx context.Context, reader Contra
for _, event := range events {
err := w.handler.Handle(ctx, event.Event)
if err != nil {
w.lggr.Errorw("failed to handle event", "err", err)
w.lggr.Errorw("failed to handle event", "err", err, "type", event.Event.EventType)
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion core/store/migrate/migrations/0259_add_workflow_secrets.sql
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,5 @@ DROP INDEX IF EXISTS idx_secrets_url_hash;

-- Drop the workflow_artifacts table
DROP TABLE IF EXISTS workflow_secrets;
-- +goose StatementEnd
-- +goose StatementEnd

Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
-- +goose Up
-- +goose StatementBegin
-- unique constraint on workflow_owner and workflow_name
ALTER TABLE workflow_specs DROP CONSTRAINT workflow_specs_secrets_id_key;
-- +goose StatementEnd

-- +goose Down
-- +goose StatementBegin
ALTER TABLE workflow_specs ADD CONSTRAINT workflow_specs_secrets_id_key unique (secrets_id);
-- +goose StatementEnd

0 comments on commit 479f15f

Please sign in to comment.