Skip to content

Commit

Permalink
[CAPPL-300] Implement SecretsFor
Browse files Browse the repository at this point in the history
  • Loading branch information
cedric-cordenier committed Nov 27, 2024
1 parent d77db32 commit a463c6b
Show file tree
Hide file tree
Showing 8 changed files with 492 additions and 32 deletions.
4 changes: 2 additions & 2 deletions core/services/workflows/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (sucm *stepUpdateManager) len() int64 {
}

type secretsFetcher interface {
SecretsFor(ctx context.Context, workflowOwner, workflowName string) (map[string]string, error)
SecretsFor(ctx context.Context, workflowOwner, workflowName, workflowID string) (map[string]string, error)
}

// Engine handles the lifecycle of a single workflow and its executions.
Expand Down Expand Up @@ -850,7 +850,7 @@ func (e *Engine) interpolateEnvVars(config map[string]any, env exec.Env) (*value
// registry (for capability-level configuration). It doesn't perform any caching of the config values, since
// the two registries perform their own caching.
func (e *Engine) configForStep(ctx context.Context, lggr logger.Logger, step *step) (*values.Map, error) {
secrets, err := e.secretsFetcher.SecretsFor(ctx, e.workflow.owner, e.workflow.name)
secrets, err := e.secretsFetcher.SecretsFor(ctx, e.workflow.owner, e.workflow.name, e.workflow.id)
if err != nil {
return nil, fmt.Errorf("failed to fetch secrets: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions core/services/workflows/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func newTestEngineWithYAMLSpec(t *testing.T, reg *coreCap.Registry, spec string,

type mockSecretsFetcher struct{}

func (s mockSecretsFetcher) SecretsFor(ctx context.Context, workflowOwner, workflowName string) (map[string]string, error) {
func (s mockSecretsFetcher) SecretsFor(ctx context.Context, workflowOwner, workflowName, workflowID string) (map[string]string, error) {
return map[string]string{}, nil
}

Expand Down Expand Up @@ -1606,7 +1606,7 @@ type mockFetcher struct {
retval map[string]string
}

func (m *mockFetcher) SecretsFor(ctx context.Context, workflowOwner, workflowName string) (map[string]string, error) {
func (m *mockFetcher) SecretsFor(ctx context.Context, workflowOwner, workflowName, workflowID string) (map[string]string, error) {
return m.retval, nil
}

Expand Down
23 changes: 8 additions & 15 deletions core/services/workflows/syncer/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,7 @@ type WorkflowRegistryWorkflowDeletedV1 struct {
}

type secretsFetcher interface {
SecretsFor(ctx context.Context, workflowOwner, workflowName string) (map[string]string, error)
}

// secretsFetcherFunc implements the secretsFetcher interface for a function.
type secretsFetcherFunc func(ctx context.Context, workflowOwner, workflowName string) (map[string]string, error)

func (f secretsFetcherFunc) SecretsFor(ctx context.Context, workflowOwner, workflowName string) (map[string]string, error) {
return f(ctx, workflowOwner, workflowName)
SecretsFor(ctx context.Context, workflowOwner, workflowName, workflowID string) (map[string]string, error)
}

// eventHandler is a handler for WorkflowRegistryEvent events. Each event type has a corresponding
Expand Down Expand Up @@ -153,7 +146,7 @@ func (h *eventHandler) Handle(ctx context.Context, event WorkflowRegistryEvent)
platform.KeyWorkflowOwner, hex.EncodeToString(payload.Owner),
)

if err := h.forceUpdateSecretsEvent(ctx, payload); err != nil {
if _, err := h.forceUpdateSecretsEvent(ctx, payload); err != nil {
logCustMsg(ctx, cma, fmt.Sprintf("failed to handle force update secrets event: %v", err), h.lggr)
return err
}
Expand Down Expand Up @@ -263,7 +256,7 @@ func (h *eventHandler) workflowRegisteredEvent(
Lggr: h.lggr,
Workflow: *sdkSpec,
WorkflowID: wfID,
WorkflowOwner: hex.EncodeToString(payload.WorkflowOwner),
WorkflowOwner: string(payload.WorkflowOwner), // this gets hex encoded in the engine.
WorkflowName: payload.WorkflowName,
Registry: h.capRegistry,
Store: h.workflowStore,
Expand Down Expand Up @@ -312,27 +305,27 @@ func (h *eventHandler) workflowActivatedEvent(
func (h *eventHandler) forceUpdateSecretsEvent(
ctx context.Context,
payload WorkflowRegistryForceUpdateSecretsRequestedV1,
) error {
) (string, error) {
// Get the URL of the secrets file from the event data
hash := hex.EncodeToString(payload.SecretsURLHash)

url, err := h.orm.GetSecretsURLByHash(ctx, hash)
if err != nil {
return fmt.Errorf("failed to get URL by hash %s : %w", hash, err)
return "", fmt.Errorf("failed to get URL by hash %s : %w", hash, err)
}

// Fetch the contents of the secrets file from the url via the fetcher
secrets, err := h.fetcher(ctx, url)
if err != nil {
return fmt.Errorf("failed to fetch secrets from url %s : %w", url, err)
return "", err
}

// Update the secrets in the ORM
if _, err := h.orm.Update(ctx, hash, string(secrets)); err != nil {
return fmt.Errorf("failed to update secrets: %w", err)
return "", fmt.Errorf("failed to update secrets: %w", err)
}

return nil
return string(secrets), nil
}

// workflowID returns a hex encoded sha256 hash of the wasm, config and secretsURL.
Expand Down
64 changes: 64 additions & 0 deletions core/services/workflows/syncer/mocks/orm.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

41 changes: 41 additions & 0 deletions core/services/workflows/syncer/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package syncer
import (
"context"
"database/sql"
"errors"
"fmt"
"time"

Expand All @@ -25,6 +26,9 @@ type WorkflowSecretsDS interface {
// GetContentsByHash returns the contents of the secret at the given hashed URL.
GetContentsByHash(ctx context.Context, hash string) (string, error)

// GetContentsByWorkflowID returns the contents and secrets_url of the secret for the given workflow.
GetContentsByWorkflowID(ctx context.Context, workflowID string) (string, string, error)

// GetSecretsURLHash returns the keccak256 hash of the owner and secrets URL.
GetSecretsURLHash(owner, secretsURL []byte) ([]byte, error)

Expand Down Expand Up @@ -123,6 +127,43 @@ func (orm *orm) GetContents(ctx context.Context, url string) (string, error) {
return contents, nil // Return the populated Artifact struct
}

type Int struct {
sql.NullInt64
}

type joinRecord struct {
SecretsID sql.NullString `db:"wspec_secrets_id"`
SecretsURLHash sql.NullString `db:"wsec_secrets_url_hash"`
Contents sql.NullString `db:"wsec_contents"`
}

var EmptySecrets = errors.New("secrets field is empty")

// GetContentsByWorkflowID joins the workflow_secrets on the workflow_specs table and gets
// the associated secrets contents.
func (orm *orm) GetContentsByWorkflowID(ctx context.Context, workflowID string) (string, string, error) {
var jr joinRecord
err := orm.ds.GetContext(
ctx,
&jr,
`SELECT wsec.secrets_url_hash AS wsec_secrets_url_hash, wsec.contents AS wsec_contents, wspec.secrets_id AS wspec_secrets_id
FROM workflow_specs AS wspec
LEFT JOIN
workflow_secrets AS wsec ON wspec.secrets_id = wsec.id
WHERE wspec.workflow_id = $1`,
workflowID,
)
if err != nil {
return "", "", err
}

if !jr.SecretsID.Valid {
return "", "", EmptySecrets
}

return jr.SecretsURLHash.String, jr.Contents.String, nil
}

// Update updates the secrets content at the given hash or inserts a new record if not found.
func (orm *orm) Update(ctx context.Context, hash, contents string) (int64, error) {
var id int64
Expand Down
60 changes: 60 additions & 0 deletions core/services/workflows/syncer/orm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,3 +196,63 @@ func Test_GetWorkflowSpec(t *testing.T) {
require.Nil(t, dbSpec)
})
}

func Test_GetContentsByWorkflowID(t *testing.T) {
db := pgtest.NewSqlxDB(t)
ctx := testutils.Context(t)
lggr := logger.TestLogger(t)
orm := &orm{ds: db, lggr: lggr}

// workflow_id is missing
_, _, err := orm.GetContentsByWorkflowID(ctx, "doesnt-exist")
assert.ErrorContains(t, err, "no rows in result set")

// secrets_id is nil; should return EmptySecrets
workflowID := "aWorkflowID"
_, err = orm.UpsertWorkflowSpec(ctx, &job.WorkflowSpec{
Workflow: "",
Config: "",
WorkflowID: workflowID,
WorkflowOwner: "aWorkflowOwner",
WorkflowName: "aWorkflowName",
BinaryURL: "",
ConfigURL: "",
CreatedAt: time.Now(),
SpecType: job.DefaultSpecType,
})
require.NoError(t, err)

_, _, err = orm.GetContentsByWorkflowID(ctx, workflowID)
assert.ErrorIs(t, err, EmptySecrets)

// retrieves the artifact if provided
giveURL := "https://example.com"
giveBytes, err := crypto.Keccak256([]byte(giveURL))
require.NoError(t, err)
giveHash := hex.EncodeToString(giveBytes)
giveContent := "some contents"

secretsID, err := orm.Create(ctx, giveURL, giveHash, giveContent)
require.NoError(t, err)

_, err = orm.UpsertWorkflowSpec(ctx, &job.WorkflowSpec{
Workflow: "",
Config: "",
SecretsID: sql.NullInt64{Int64: secretsID, Valid: true},
WorkflowID: workflowID,
WorkflowOwner: "aWorkflowOwner",
WorkflowName: "aWorkflowName",
BinaryURL: "",
ConfigURL: "",
CreatedAt: time.Now(),
SpecType: job.DefaultSpecType,
})
require.NoError(t, err)
_, err = orm.GetWorkflowSpec(ctx, "aWorkflowOwner", "aWorkflowName")
require.NoError(t, err)

gotHash, gotContent, err := orm.GetContentsByWorkflowID(ctx, workflowID)
require.NoError(t, err)
assert.Equal(t, giveHash, gotHash)
assert.Equal(t, giveContent, gotContent)
}
Loading

0 comments on commit a463c6b

Please sign in to comment.