Skip to content

Commit

Permalink
[CAPPL-300] Implement SecretsFor
Browse files Browse the repository at this point in the history
  • Loading branch information
cedric-cordenier committed Nov 27, 2024
1 parent d77db32 commit e1530b1
Show file tree
Hide file tree
Showing 10 changed files with 570 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
coretestutils "github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/workflowkey"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/capabilities/testutils"
evmtypes "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/types"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer"
Expand Down Expand Up @@ -122,6 +123,7 @@ func Test_SecretsWorker(t *testing.T) {
nil,
nil,
emitter,
workflowkey.Key{},
syncer.WithTicker(giveTicker.C),
)

Expand Down
4 changes: 2 additions & 2 deletions core/services/workflows/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (sucm *stepUpdateManager) len() int64 {
}

type secretsFetcher interface {
SecretsFor(ctx context.Context, workflowOwner, workflowName string) (map[string]string, error)
SecretsFor(ctx context.Context, workflowOwner, workflowName, workflowID string) (map[string]string, error)
}

// Engine handles the lifecycle of a single workflow and its executions.
Expand Down Expand Up @@ -850,7 +850,7 @@ func (e *Engine) interpolateEnvVars(config map[string]any, env exec.Env) (*value
// registry (for capability-level configuration). It doesn't perform any caching of the config values, since
// the two registries perform their own caching.
func (e *Engine) configForStep(ctx context.Context, lggr logger.Logger, step *step) (*values.Map, error) {
secrets, err := e.secretsFetcher.SecretsFor(ctx, e.workflow.owner, e.workflow.name)
secrets, err := e.secretsFetcher.SecretsFor(ctx, e.workflow.owner, e.workflow.name, e.workflow.id)
if err != nil {
return nil, fmt.Errorf("failed to fetch secrets: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions core/services/workflows/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func newTestEngineWithYAMLSpec(t *testing.T, reg *coreCap.Registry, spec string,

type mockSecretsFetcher struct{}

func (s mockSecretsFetcher) SecretsFor(ctx context.Context, workflowOwner, workflowName string) (map[string]string, error) {
func (s mockSecretsFetcher) SecretsFor(ctx context.Context, workflowOwner, workflowName, workflowID string) (map[string]string, error) {
return map[string]string{}, nil
}

Expand Down Expand Up @@ -1606,7 +1606,7 @@ type mockFetcher struct {
retval map[string]string
}

func (m *mockFetcher) SecretsFor(ctx context.Context, workflowOwner, workflowName string) (map[string]string, error) {
func (m *mockFetcher) SecretsFor(ctx context.Context, workflowOwner, workflowName, workflowID string) (map[string]string, error) {
return m.retval, nil
}

Expand Down
65 changes: 34 additions & 31 deletions core/services/workflows/syncer/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"errors"
"fmt"

"github.com/jonboulle/clockwork"

"github.com/smartcontractkit/chainlink-common/pkg/custmsg"
"github.com/smartcontractkit/chainlink-common/pkg/types/core"
"github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/host"
Expand Down Expand Up @@ -94,27 +96,22 @@ type WorkflowRegistryWorkflowDeletedV1 struct {
}

type secretsFetcher interface {
SecretsFor(ctx context.Context, workflowOwner, workflowName string) (map[string]string, error)
}

// secretsFetcherFunc implements the secretsFetcher interface for a function.
type secretsFetcherFunc func(ctx context.Context, workflowOwner, workflowName string) (map[string]string, error)

func (f secretsFetcherFunc) SecretsFor(ctx context.Context, workflowOwner, workflowName string) (map[string]string, error) {
return f(ctx, workflowOwner, workflowName)
SecretsFor(ctx context.Context, workflowOwner, workflowName, workflowID string) (map[string]string, error)
}

// eventHandler is a handler for WorkflowRegistryEvent events. Each event type has a corresponding
// method that handles the event.
type eventHandler struct {
lggr logger.Logger
orm WorkflowRegistryDS
fetcher FetcherFunc
workflowStore store.Store
capRegistry core.CapabilitiesRegistry
engineRegistry *engineRegistry
emitter custmsg.MessageEmitter
secretsFetcher secretsFetcher
lggr logger.Logger
orm WorkflowRegistryDS
fetcher FetcherFunc
workflowStore store.Store
capRegistry core.CapabilitiesRegistry
engineRegistry *engineRegistry
emitter custmsg.MessageEmitter
secretsFetcher secretsFetcher
lastFetchedAtMap *lastFetchedAtMap
clock clockwork.Clock
}

// newEventHandler returns a new eventHandler instance.
Expand All @@ -127,16 +124,20 @@ func newEventHandler(
engineRegistry *engineRegistry,
emitter custmsg.MessageEmitter,
secretsFetcher secretsFetcher,
lastFetchedAtMap *lastFetchedAtMap,
clock clockwork.Clock,
) *eventHandler {
return &eventHandler{
lggr: lggr,
orm: orm,
fetcher: gateway,
workflowStore: workflowStore,
capRegistry: capRegistry,
engineRegistry: engineRegistry,
emitter: emitter,
secretsFetcher: secretsFetcher,
lggr: lggr,
orm: orm,
fetcher: gateway,
workflowStore: workflowStore,
capRegistry: capRegistry,
engineRegistry: engineRegistry,
emitter: emitter,
secretsFetcher: secretsFetcher,
lastFetchedAtMap: lastFetchedAtMap,
clock: clock,
}
}

Expand All @@ -153,7 +154,7 @@ func (h *eventHandler) Handle(ctx context.Context, event WorkflowRegistryEvent)
platform.KeyWorkflowOwner, hex.EncodeToString(payload.Owner),
)

if err := h.forceUpdateSecretsEvent(ctx, payload); err != nil {
if _, err := h.forceUpdateSecretsEvent(ctx, payload); err != nil {
logCustMsg(ctx, cma, fmt.Sprintf("failed to handle force update secrets event: %v", err), h.lggr)
return err
}
Expand Down Expand Up @@ -263,7 +264,7 @@ func (h *eventHandler) workflowRegisteredEvent(
Lggr: h.lggr,
Workflow: *sdkSpec,
WorkflowID: wfID,
WorkflowOwner: hex.EncodeToString(payload.WorkflowOwner),
WorkflowOwner: string(payload.WorkflowOwner), // this gets hex encoded in the engine.
WorkflowName: payload.WorkflowName,
Registry: h.capRegistry,
Store: h.workflowStore,
Expand Down Expand Up @@ -312,27 +313,29 @@ func (h *eventHandler) workflowActivatedEvent(
func (h *eventHandler) forceUpdateSecretsEvent(
ctx context.Context,
payload WorkflowRegistryForceUpdateSecretsRequestedV1,
) error {
) (string, error) {
// Get the URL of the secrets file from the event data
hash := hex.EncodeToString(payload.SecretsURLHash)

url, err := h.orm.GetSecretsURLByHash(ctx, hash)
if err != nil {
return fmt.Errorf("failed to get URL by hash %s : %w", hash, err)
return "", fmt.Errorf("failed to get URL by hash %s : %w", hash, err)
}

// Fetch the contents of the secrets file from the url via the fetcher
secrets, err := h.fetcher(ctx, url)
if err != nil {
return fmt.Errorf("failed to fetch secrets from url %s : %w", url, err)
return "", err
}

h.lastFetchedAtMap.Set(hash, h.clock.Now())

// Update the secrets in the ORM
if _, err := h.orm.Update(ctx, hash, string(secrets)); err != nil {
return fmt.Errorf("failed to update secrets: %w", err)
return "", fmt.Errorf("failed to update secrets: %w", err)
}

return nil
return string(secrets), nil
}

// workflowID returns a hex encoded sha256 hash of the wasm, config and secretsURL.
Expand Down
10 changes: 5 additions & 5 deletions core/services/workflows/syncer/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func Test_Handler(t *testing.T) {
}
mockORM.EXPECT().GetSecretsURLByHash(matches.AnyContext, giveHash).Return(giveURL, nil)
mockORM.EXPECT().Update(matches.AnyContext, giveHash, "contents").Return(int64(1), nil)
h := newEventHandler(lggr, mockORM, fetcher, nil, nil, nil, emitter, nil)
h := newEventHandler(lggr, mockORM, fetcher, nil, nil, nil, emitter, nil, newLastFetchedAtMap(), clockwork.NewFakeClock())
err = h.Handle(ctx, giveEvent)
require.NoError(t, err)
})
Expand All @@ -77,7 +77,7 @@ func Test_Handler(t *testing.T) {
return []byte("contents"), nil
}

h := newEventHandler(lggr, mockORM, fetcher, nil, nil, nil, emitter, nil)
h := newEventHandler(lggr, mockORM, fetcher, nil, nil, nil, emitter, nil, newLastFetchedAtMap(), clockwork.NewFakeClock())
err := h.Handle(ctx, giveEvent)
require.Error(t, err)
require.Contains(t, err.Error(), "event type unsupported")
Expand All @@ -86,7 +86,7 @@ func Test_Handler(t *testing.T) {
t.Run("fails to get secrets url", func(t *testing.T) {
mockORM := mocks.NewORM(t)
ctx := testutils.Context(t)
h := newEventHandler(lggr, mockORM, nil, nil, nil, nil, emitter, nil)
h := newEventHandler(lggr, mockORM, nil, nil, nil, nil, emitter, nil, newLastFetchedAtMap(), clockwork.NewFakeClock())
giveURL := "https://original-url.com"
giveBytes, err := crypto.Keccak256([]byte(giveURL))
require.NoError(t, err)
Expand Down Expand Up @@ -126,7 +126,7 @@ func Test_Handler(t *testing.T) {
return nil, assert.AnError
}
mockORM.EXPECT().GetSecretsURLByHash(matches.AnyContext, giveHash).Return(giveURL, nil)
h := newEventHandler(lggr, mockORM, fetcher, nil, nil, nil, emitter, nil)
h := newEventHandler(lggr, mockORM, fetcher, nil, nil, nil, emitter, nil, newLastFetchedAtMap(), clockwork.NewFakeClock())
err = h.Handle(ctx, giveEvent)
require.Error(t, err)
require.ErrorIs(t, err, assert.AnError)
Expand All @@ -153,7 +153,7 @@ func Test_Handler(t *testing.T) {
}
mockORM.EXPECT().GetSecretsURLByHash(matches.AnyContext, giveHash).Return(giveURL, nil)
mockORM.EXPECT().Update(matches.AnyContext, giveHash, "contents").Return(0, assert.AnError)
h := newEventHandler(lggr, mockORM, fetcher, nil, nil, nil, emitter, nil)
h := newEventHandler(lggr, mockORM, fetcher, nil, nil, nil, emitter, nil, newLastFetchedAtMap(), clockwork.NewFakeClock())
err = h.Handle(ctx, giveEvent)
require.Error(t, err)
require.ErrorIs(t, err, assert.AnError)
Expand Down
64 changes: 64 additions & 0 deletions core/services/workflows/syncer/mocks/orm.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

41 changes: 41 additions & 0 deletions core/services/workflows/syncer/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package syncer
import (
"context"
"database/sql"
"errors"
"fmt"
"time"

Expand All @@ -25,6 +26,9 @@ type WorkflowSecretsDS interface {
// GetContentsByHash returns the contents of the secret at the given hashed URL.
GetContentsByHash(ctx context.Context, hash string) (string, error)

// GetContentsByWorkflowID returns the contents and secrets_url of the secret for the given workflow.
GetContentsByWorkflowID(ctx context.Context, workflowID string) (string, string, error)

// GetSecretsURLHash returns the keccak256 hash of the owner and secrets URL.
GetSecretsURLHash(owner, secretsURL []byte) ([]byte, error)

Expand Down Expand Up @@ -123,6 +127,43 @@ func (orm *orm) GetContents(ctx context.Context, url string) (string, error) {
return contents, nil // Return the populated Artifact struct
}

type Int struct {
sql.NullInt64
}

type joinRecord struct {
SecretsID sql.NullString `db:"wspec_secrets_id"`
SecretsURLHash sql.NullString `db:"wsec_secrets_url_hash"`
Contents sql.NullString `db:"wsec_contents"`
}

var EmptySecrets = errors.New("secrets field is empty")

Check failure on line 140 in core/services/workflows/syncer/orm.go

View workflow job for this annotation

GitHub Actions / lint

error-naming: error var EmptySecrets should have name of the form ErrFoo (revive)

// GetContentsByWorkflowID joins the workflow_secrets on the workflow_specs table and gets
// the associated secrets contents.
func (orm *orm) GetContentsByWorkflowID(ctx context.Context, workflowID string) (string, string, error) {
var jr joinRecord
err := orm.ds.GetContext(
ctx,
&jr,
`SELECT wsec.secrets_url_hash AS wsec_secrets_url_hash, wsec.contents AS wsec_contents, wspec.secrets_id AS wspec_secrets_id
FROM workflow_specs AS wspec
LEFT JOIN
workflow_secrets AS wsec ON wspec.secrets_id = wsec.id
WHERE wspec.workflow_id = $1`,
workflowID,
)
if err != nil {
return "", "", err
}

if !jr.SecretsID.Valid {
return "", "", EmptySecrets
}

return jr.SecretsURLHash.String, jr.Contents.String, nil
}

// Update updates the secrets content at the given hash or inserts a new record if not found.
func (orm *orm) Update(ctx context.Context, hash, contents string) (int64, error) {
var id int64
Expand Down
Loading

0 comments on commit e1530b1

Please sign in to comment.