Skip to content

Commit

Permalink
[CAPPL-300] Implement SecretsFor
Browse files Browse the repository at this point in the history
  • Loading branch information
cedric-cordenier committed Nov 28, 2024
1 parent 3816e15 commit 595ed9f
Show file tree
Hide file tree
Showing 10 changed files with 548 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/jonboulle/clockwork"
"github.com/stretchr/testify/assert"

"github.com/smartcontractkit/chainlink-common/pkg/custmsg"
Expand All @@ -20,6 +21,7 @@ import (
coretestutils "github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/workflowkey"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/capabilities/testutils"
evmtypes "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/types"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer"
Expand Down Expand Up @@ -223,7 +225,7 @@ func Test_SecretsWorker(t *testing.T) {
require.Equal(t, contents, giveContents)

handler := syncer.NewEventHandler(lggr, orm, fetcherFn, nil, nil,
emitter, nil)
emitter, clockwork.NewFakeClock(), workflowkey.Key{})

worker := syncer.NewWorkflowRegistry(lggr, contractReader, wfRegistryAddr.Hex(),
syncer.WorkflowEventPollerConfig{
Expand Down
4 changes: 2 additions & 2 deletions core/services/workflows/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (sucm *stepUpdateManager) len() int64 {
}

type secretsFetcher interface {
SecretsFor(ctx context.Context, workflowOwner, workflowName string) (map[string]string, error)
SecretsFor(ctx context.Context, workflowOwner, workflowName, workflowID string) (map[string]string, error)
}

// Engine handles the lifecycle of a single workflow and its executions.
Expand Down Expand Up @@ -868,7 +868,7 @@ func (e *Engine) interpolateEnvVars(config map[string]any, env exec.Env) (*value
// registry (for capability-level configuration). It doesn't perform any caching of the config values, since
// the two registries perform their own caching.
func (e *Engine) configForStep(ctx context.Context, lggr logger.Logger, step *step) (*values.Map, error) {
secrets, err := e.secretsFetcher.SecretsFor(ctx, e.workflow.owner, e.workflow.hexName)
secrets, err := e.secretsFetcher.SecretsFor(ctx, e.workflow.owner, e.workflow.hexName, e.workflow.id)
if err != nil {
return nil, fmt.Errorf("failed to fetch secrets: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions core/services/workflows/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func newTestEngineWithYAMLSpec(t *testing.T, reg *coreCap.Registry, spec string,

type mockSecretsFetcher struct{}

func (s mockSecretsFetcher) SecretsFor(ctx context.Context, workflowOwner, workflowName string) (map[string]string, error) {
func (s mockSecretsFetcher) SecretsFor(ctx context.Context, workflowOwner, workflowName, workflowID string) (map[string]string, error) {
return map[string]string{}, nil
}

Expand Down Expand Up @@ -1606,7 +1606,7 @@ type mockFetcher struct {
retval map[string]string
}

func (m *mockFetcher) SecretsFor(ctx context.Context, workflowOwner, workflowName string) (map[string]string, error) {
func (m *mockFetcher) SecretsFor(ctx context.Context, workflowOwner, workflowName, workflowID string) (map[string]string, error) {
return m.retval, nil
}

Expand Down
157 changes: 131 additions & 26 deletions core/services/workflows/syncer/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,22 @@ import (
"context"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"sync"
"time"

"github.com/jonboulle/clockwork"

"github.com/smartcontractkit/chainlink-common/pkg/custmsg"
"github.com/smartcontractkit/chainlink-common/pkg/types/core"
"github.com/smartcontractkit/chainlink-common/pkg/workflows/secrets"
"github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/host"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/platform"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/workflowkey"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/store"
)
Expand Down Expand Up @@ -93,28 +100,58 @@ type WorkflowRegistryWorkflowDeletedV1 struct {
WorkflowName string
}

type lastFetchedAtMap struct {
m map[string]time.Time
sync.RWMutex
}

func (l *lastFetchedAtMap) Set(url string, at time.Time) {
l.Lock()
defer l.Unlock()
l.m[url] = at
}

func (l *lastFetchedAtMap) Get(url string) (time.Time, bool) {
l.RLock()
defer l.RUnlock()
got, ok := l.m[url]
return got, ok
}

func newLastFetchedAtMap() *lastFetchedAtMap {
return &lastFetchedAtMap{
m: map[string]time.Time{},
}
}

type secretsFetcher interface {
SecretsFor(ctx context.Context, workflowOwner, workflowName string) (map[string]string, error)
SecretsFor(ctx context.Context, workflowOwner, workflowName, workflowID string) (map[string]string, error)
}

// eventHandler is a handler for WorkflowRegistryEvent events. Each event type has a corresponding
// method that handles the event.
type eventHandler struct {
lggr logger.Logger
orm WorkflowRegistryDS
fetcher FetcherFunc
workflowStore store.Store
capRegistry core.CapabilitiesRegistry
engineRegistry *engineRegistry
emitter custmsg.MessageEmitter
secretsFetcher secretsFetcher
lggr logger.Logger
orm WorkflowRegistryDS
fetcher FetcherFunc
workflowStore store.Store
capRegistry core.CapabilitiesRegistry
engineRegistry *engineRegistry
emitter custmsg.MessageEmitter
secretsFetcher secretsFetcher
lastFetchedAtMap *lastFetchedAtMap
clock clockwork.Clock
secretsFreshnessDuration time.Duration
encryptionKey workflowkey.Key
}

type Event interface {
GetEventType() WorkflowRegistryEventType
GetData() any
}

var defaultSecretsFreshnessDuration = 24 * time.Hour

// NewEventHandler returns a new eventHandler instance.
func NewEventHandler(
lggr logger.Logger,
Expand All @@ -123,20 +160,86 @@ func NewEventHandler(
workflowStore store.Store,
capRegistry core.CapabilitiesRegistry,
emitter custmsg.MessageEmitter,
secretsFetcher secretsFetcher,
clock clockwork.Clock,
encryptionKey workflowkey.Key,
) *eventHandler {
return &eventHandler{
lggr: lggr,
orm: orm,
fetcher: gateway,
workflowStore: workflowStore,
capRegistry: capRegistry,
engineRegistry: newEngineRegistry(),
emitter: emitter,
secretsFetcher: secretsFetcher,
lggr: lggr,
orm: orm,
fetcher: gateway,
workflowStore: workflowStore,
capRegistry: capRegistry,
engineRegistry: newEngineRegistry(),
emitter: emitter,
lastFetchedAtMap: newLastFetchedAtMap(),
clock: clock,
secretsFreshnessDuration: defaultSecretsFreshnessDuration,
encryptionKey: encryptionKey,
}
}

func (h *eventHandler) refreshSecrets(ctx context.Context, workflowOwner, workflowName, workflowID, secretsURLHash string) (string, error) {
owner, err := hex.DecodeString(workflowOwner)
if err != nil {
return "", err
}

decodedHash, err := hex.DecodeString(secretsURLHash)
if err != nil {
return "", err
}

updatedSecrets, err := h.forceUpdateSecretsEvent(
ctx,
WorkflowRegistryForceUpdateSecretsRequestedV1{
SecretsURLHash: decodedHash,
Owner: owner,
WorkflowName: name,
},
)
if err != nil {
return "", err
}

return updatedSecrets, nil
}

func (h *eventHandler) SecretsFor(ctx context.Context, workflowOwner, workflowName, workflowID string) (map[string]string, error) {
secretsURLHash, secretsPayload, err := h.orm.GetContentsByWorkflowID(ctx, workflowID)
if err != nil {
// The workflow record was found, but secrets_id was empty.
// Let's just stub out the response.
if errors.Is(err, EmptySecrets) {
return map[string]string{}, nil
}

return nil, fmt.Errorf("failed to fetch secrets by workflow ID: %w", err)
}

lastFetchedAt, ok := h.lastFetchedAtMap.Get(secretsURLHash)
if !ok || h.clock.Now().Sub(lastFetchedAt) > h.secretsFreshnessDuration {
updatedSecrets, err := h.refreshSecrets(ctx, workflowOwner, workflowName, workflowID, secretsURLHash)
if err != nil {
h.lggr.Errorf("could not refresh secrets: proceeding with stale secrets: %w", err)
// TODO: log custom message
} else {
secretsPayload = updatedSecrets
}
}

res := secrets.EncryptedSecretsResult{}
err = json.Unmarshal([]byte(secretsPayload), &res)
if err != nil {
return nil, fmt.Errorf("could not unmarshal secrets: %w", err)
}

return secrets.DecryptSecretsForNode(
res,
h.encryptionKey,
workflowOwner,
)
}

func (h *eventHandler) Handle(ctx context.Context, event Event) error {
switch event.GetEventType() {
case ForceUpdateSecretsEvent:
Expand All @@ -150,7 +253,7 @@ func (h *eventHandler) Handle(ctx context.Context, event Event) error {
platform.KeyWorkflowOwner, hex.EncodeToString(payload.Owner),
)

if err := h.forceUpdateSecretsEvent(ctx, payload); err != nil {
if _, err := h.forceUpdateSecretsEvent(ctx, payload); err != nil {
logCustMsg(ctx, cma, fmt.Sprintf("failed to handle force update secrets event: %v", err), h.lggr)
return err
}
Expand Down Expand Up @@ -331,13 +434,13 @@ func (h *eventHandler) workflowRegisteredEvent(
Lggr: h.lggr,
Workflow: *sdkSpec,
WorkflowID: wfID,
WorkflowOwner: hex.EncodeToString(payload.Owner),
WorkflowOwner: string(payload.Owner), // this gets hex encoded in the engine.
WorkflowName: payload.WorkflowName,
Registry: h.capRegistry,
Store: h.workflowStore,
Config: config,
Binary: binary,
SecretsFetcher: h.secretsFetcher,
SecretsFetcher: h,
}
e, err := workflows.NewEngine(ctx, cfg)
if err != nil {
Expand Down Expand Up @@ -460,27 +563,29 @@ func (h *eventHandler) workflowDeletedEvent(
func (h *eventHandler) forceUpdateSecretsEvent(
ctx context.Context,
payload WorkflowRegistryForceUpdateSecretsRequestedV1,
) error {
) (string, error) {
// Get the URL of the secrets file from the event data
hash := hex.EncodeToString(payload.SecretsURLHash)

url, err := h.orm.GetSecretsURLByHash(ctx, hash)
if err != nil {
return fmt.Errorf("failed to get URL by hash %s : %w", hash, err)
return "", fmt.Errorf("failed to get URL by hash %s : %w", hash, err)
}

// Fetch the contents of the secrets file from the url via the fetcher
secrets, err := h.fetcher(ctx, url)
if err != nil {
return fmt.Errorf("failed to fetch secrets from url %s : %w", url, err)
return "", err
}

h.lastFetchedAtMap.Set(hash, h.clock.Now())

// Update the secrets in the ORM
if _, err := h.orm.Update(ctx, hash, string(secrets)); err != nil {
return fmt.Errorf("failed to update secrets: %w", err)
return "", fmt.Errorf("failed to update secrets: %w", err)
}

return nil
return string(secrets), nil
}

// tryEngineCleanup attempts to stop the workflow engine for the given workflow ID. Does nothing if the
Expand Down
Loading

0 comments on commit 595ed9f

Please sign in to comment.