Skip to content

Commit

Permalink
[CAPPL-300] Implement SecretsFor (#15439)
Browse files Browse the repository at this point in the history
* [CAPPL-300] Implement SecretsFor

* Fix lint errors
  • Loading branch information
cedric-cordenier authored Nov 28, 2024
1 parent 19393cf commit b70c932
Show file tree
Hide file tree
Showing 12 changed files with 564 additions and 97 deletions.
7 changes: 0 additions & 7 deletions core/services/chainlink/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/webhook"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows"
workflowstore "github.com/smartcontractkit/chainlink/v2/core/services/workflows/store"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer"
"github.com/smartcontractkit/chainlink/v2/core/sessions"
"github.com/smartcontractkit/chainlink/v2/core/sessions/ldapauth"
"github.com/smartcontractkit/chainlink/v2/core/sessions/localauth"
Expand Down Expand Up @@ -213,11 +212,6 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
opts.CapabilitiesRegistry = capabilities.NewRegistry(globalLogger)
}

// TODO: wire this up to config so we only instantiate it
// if a workflow registry address is provided.
workflowRegistrySyncer := syncer.NewNullWorkflowRegistrySyncer()
srvcs = append(srvcs, workflowRegistrySyncer)

var externalPeerWrapper p2ptypes.PeerWrapper
if cfg.Capabilities().Peering().Enabled() {
var dispatcher remotetypes.Dispatcher
Expand Down Expand Up @@ -478,7 +472,6 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
delegates[job.Workflow] = workflows.NewDelegate(
globalLogger,
opts.CapabilitiesRegistry,
workflowRegistrySyncer,
workflowORM,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/jonboulle/clockwork"
"github.com/stretchr/testify/assert"

"github.com/smartcontractkit/chainlink-common/pkg/custmsg"
Expand All @@ -20,6 +21,7 @@ import (
coretestutils "github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/workflowkey"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/capabilities/testutils"
evmtypes "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/types"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer"
Expand Down Expand Up @@ -223,7 +225,7 @@ func Test_SecretsWorker(t *testing.T) {
require.Equal(t, contents, giveContents)

handler := syncer.NewEventHandler(lggr, orm, fetcherFn, nil, nil,
emitter, nil)
emitter, clockwork.NewFakeClock(), workflowkey.Key{})

worker := syncer.NewWorkflowRegistry(lggr, contractReader, wfRegistryAddr.Hex(),
syncer.WorkflowEventPollerConfig{
Expand Down
13 changes: 11 additions & 2 deletions core/services/workflows/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,22 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) ([]job.Ser
return []job.ServiceCtx{engine}, nil
}

type noopSecretsFetcher struct{}

func (n *noopSecretsFetcher) SecretsFor(ctx context.Context, workflowOwner, workflowName, workflowID string) (map[string]string, error) {
return map[string]string{}, nil
}

func newNoopSecretsFetcher() *noopSecretsFetcher {
return &noopSecretsFetcher{}
}

func NewDelegate(
logger logger.Logger,
registry core.CapabilitiesRegistry,
secretsFetcher secretsFetcher,
store store.Store,
) *Delegate {
return &Delegate{logger: logger, registry: registry, secretsFetcher: secretsFetcher, store: store}
return &Delegate{logger: logger, registry: registry, secretsFetcher: newNoopSecretsFetcher(), store: store}
}

func ValidatedWorkflowJobSpec(ctx context.Context, tomlString string) (job.Job, error) {
Expand Down
4 changes: 2 additions & 2 deletions core/services/workflows/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (sucm *stepUpdateManager) len() int64 {
}

type secretsFetcher interface {
SecretsFor(ctx context.Context, workflowOwner, workflowName string) (map[string]string, error)
SecretsFor(ctx context.Context, workflowOwner, workflowName, workflowID string) (map[string]string, error)
}

// Engine handles the lifecycle of a single workflow and its executions.
Expand Down Expand Up @@ -868,7 +868,7 @@ func (e *Engine) interpolateEnvVars(config map[string]any, env exec.Env) (*value
// registry (for capability-level configuration). It doesn't perform any caching of the config values, since
// the two registries perform their own caching.
func (e *Engine) configForStep(ctx context.Context, lggr logger.Logger, step *step) (*values.Map, error) {
secrets, err := e.secretsFetcher.SecretsFor(ctx, e.workflow.owner, e.workflow.hexName)
secrets, err := e.secretsFetcher.SecretsFor(ctx, e.workflow.owner, e.workflow.hexName, e.workflow.id)
if err != nil {
return nil, fmt.Errorf("failed to fetch secrets: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions core/services/workflows/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func newTestEngineWithYAMLSpec(t *testing.T, reg *coreCap.Registry, spec string,

type mockSecretsFetcher struct{}

func (s mockSecretsFetcher) SecretsFor(ctx context.Context, workflowOwner, workflowName string) (map[string]string, error) {
func (s mockSecretsFetcher) SecretsFor(ctx context.Context, workflowOwner, workflowName, workflowID string) (map[string]string, error) {
return map[string]string{}, nil
}

Expand Down Expand Up @@ -1606,7 +1606,7 @@ type mockFetcher struct {
retval map[string]string
}

func (m *mockFetcher) SecretsFor(ctx context.Context, workflowOwner, workflowName string) (map[string]string, error) {
func (m *mockFetcher) SecretsFor(ctx context.Context, workflowOwner, workflowName, workflowID string) (map[string]string, error) {
return m.retval, nil
}

Expand Down
164 changes: 137 additions & 27 deletions core/services/workflows/syncer/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,22 @@ import (
"context"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"sync"
"time"

"github.com/jonboulle/clockwork"

"github.com/smartcontractkit/chainlink-common/pkg/custmsg"
"github.com/smartcontractkit/chainlink-common/pkg/types/core"
"github.com/smartcontractkit/chainlink-common/pkg/workflows/secrets"
"github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/host"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/platform"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/workflowkey"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/store"
)
Expand Down Expand Up @@ -93,28 +100,53 @@ type WorkflowRegistryWorkflowDeletedV1 struct {
WorkflowName string
}

type secretsFetcher interface {
SecretsFor(ctx context.Context, workflowOwner, workflowName string) (map[string]string, error)
type lastFetchedAtMap struct {
m map[string]time.Time
sync.RWMutex
}

func (l *lastFetchedAtMap) Set(url string, at time.Time) {
l.Lock()
defer l.Unlock()
l.m[url] = at
}

func (l *lastFetchedAtMap) Get(url string) (time.Time, bool) {
l.RLock()
defer l.RUnlock()
got, ok := l.m[url]
return got, ok
}

func newLastFetchedAtMap() *lastFetchedAtMap {
return &lastFetchedAtMap{
m: map[string]time.Time{},
}
}

// eventHandler is a handler for WorkflowRegistryEvent events. Each event type has a corresponding
// method that handles the event.
type eventHandler struct {
lggr logger.Logger
orm WorkflowRegistryDS
fetcher FetcherFunc
workflowStore store.Store
capRegistry core.CapabilitiesRegistry
engineRegistry *engineRegistry
emitter custmsg.MessageEmitter
secretsFetcher secretsFetcher
lggr logger.Logger
orm WorkflowRegistryDS
fetcher FetcherFunc
workflowStore store.Store
capRegistry core.CapabilitiesRegistry
engineRegistry *engineRegistry
emitter custmsg.MessageEmitter
lastFetchedAtMap *lastFetchedAtMap
clock clockwork.Clock
secretsFreshnessDuration time.Duration
encryptionKey workflowkey.Key
}

type Event interface {
GetEventType() WorkflowRegistryEventType
GetData() any
}

var defaultSecretsFreshnessDuration = 24 * time.Hour

// NewEventHandler returns a new eventHandler instance.
func NewEventHandler(
lggr logger.Logger,
Expand All @@ -123,18 +155,94 @@ func NewEventHandler(
workflowStore store.Store,
capRegistry core.CapabilitiesRegistry,
emitter custmsg.MessageEmitter,
secretsFetcher secretsFetcher,
clock clockwork.Clock,
encryptionKey workflowkey.Key,
) *eventHandler {
return &eventHandler{
lggr: lggr,
orm: orm,
fetcher: gateway,
workflowStore: workflowStore,
capRegistry: capRegistry,
engineRegistry: newEngineRegistry(),
emitter: emitter,
secretsFetcher: secretsFetcher,
lggr: lggr,
orm: orm,
fetcher: gateway,
workflowStore: workflowStore,
capRegistry: capRegistry,
engineRegistry: newEngineRegistry(),
emitter: emitter,
lastFetchedAtMap: newLastFetchedAtMap(),
clock: clock,
secretsFreshnessDuration: defaultSecretsFreshnessDuration,
encryptionKey: encryptionKey,
}
}

func (h *eventHandler) refreshSecrets(ctx context.Context, workflowOwner, workflowName, workflowID, secretsURLHash string) (string, error) {
owner, err := hex.DecodeString(workflowOwner)
if err != nil {
return "", err
}

decodedHash, err := hex.DecodeString(secretsURLHash)
if err != nil {
return "", err
}

updatedSecrets, err := h.forceUpdateSecretsEvent(
ctx,
WorkflowRegistryForceUpdateSecretsRequestedV1{
SecretsURLHash: decodedHash,
Owner: owner,
WorkflowName: name,
},
)
if err != nil {
return "", err
}

return updatedSecrets, nil
}

func (h *eventHandler) SecretsFor(ctx context.Context, workflowOwner, workflowName, workflowID string) (map[string]string, error) {
secretsURLHash, secretsPayload, err := h.orm.GetContentsByWorkflowID(ctx, workflowID)
if err != nil {
// The workflow record was found, but secrets_id was empty.
// Let's just stub out the response.
if errors.Is(err, ErrEmptySecrets) {
return map[string]string{}, nil
}

return nil, fmt.Errorf("failed to fetch secrets by workflow ID: %w", err)
}

lastFetchedAt, ok := h.lastFetchedAtMap.Get(secretsURLHash)
if !ok || h.clock.Now().Sub(lastFetchedAt) > h.secretsFreshnessDuration {
updatedSecrets, innerErr := h.refreshSecrets(ctx, workflowOwner, workflowName, workflowID, secretsURLHash)
if innerErr != nil {
msg := fmt.Sprintf("could not refresh secrets: proceeding with stale secrets for workflowID %s: %s", workflowID, innerErr)
h.lggr.Error(msg)
logCustMsg(
ctx,
h.emitter.With(
platform.KeyWorkflowID, workflowID,
platform.KeyWorkflowName, workflowName,
platform.KeyWorkflowOwner, workflowOwner,
),
msg,
h.lggr,
)
} else {
secretsPayload = updatedSecrets
}
}

res := secrets.EncryptedSecretsResult{}
err = json.Unmarshal([]byte(secretsPayload), &res)
if err != nil {
return nil, fmt.Errorf("could not unmarshal secrets: %w", err)
}

return secrets.DecryptSecretsForNode(
res,
h.encryptionKey,
workflowOwner,
)
}

func (h *eventHandler) Handle(ctx context.Context, event Event) error {
Expand All @@ -150,7 +258,7 @@ func (h *eventHandler) Handle(ctx context.Context, event Event) error {
platform.KeyWorkflowOwner, hex.EncodeToString(payload.Owner),
)

if err := h.forceUpdateSecretsEvent(ctx, payload); err != nil {
if _, err := h.forceUpdateSecretsEvent(ctx, payload); err != nil {
logCustMsg(ctx, cma, fmt.Sprintf("failed to handle force update secrets event: %v", err), h.lggr)
return err
}
Expand Down Expand Up @@ -331,13 +439,13 @@ func (h *eventHandler) workflowRegisteredEvent(
Lggr: h.lggr,
Workflow: *sdkSpec,
WorkflowID: wfID,
WorkflowOwner: hex.EncodeToString(payload.Owner),
WorkflowOwner: string(payload.Owner), // this gets hex encoded in the engine.
WorkflowName: payload.WorkflowName,
Registry: h.capRegistry,
Store: h.workflowStore,
Config: config,
Binary: binary,
SecretsFetcher: h.secretsFetcher,
SecretsFetcher: h,
}
e, err := workflows.NewEngine(ctx, cfg)
if err != nil {
Expand Down Expand Up @@ -460,27 +568,29 @@ func (h *eventHandler) workflowDeletedEvent(
func (h *eventHandler) forceUpdateSecretsEvent(
ctx context.Context,
payload WorkflowRegistryForceUpdateSecretsRequestedV1,
) error {
) (string, error) {
// Get the URL of the secrets file from the event data
hash := hex.EncodeToString(payload.SecretsURLHash)

url, err := h.orm.GetSecretsURLByHash(ctx, hash)
if err != nil {
return fmt.Errorf("failed to get URL by hash %s : %w", hash, err)
return "", fmt.Errorf("failed to get URL by hash %s : %w", hash, err)
}

// Fetch the contents of the secrets file from the url via the fetcher
secrets, err := h.fetcher(ctx, url)
if err != nil {
return fmt.Errorf("failed to fetch secrets from url %s : %w", url, err)
return "", err
}

h.lastFetchedAtMap.Set(hash, h.clock.Now())

// Update the secrets in the ORM
if _, err := h.orm.Update(ctx, hash, string(secrets)); err != nil {
return fmt.Errorf("failed to update secrets: %w", err)
return "", fmt.Errorf("failed to update secrets: %w", err)
}

return nil
return string(secrets), nil
}

// tryEngineCleanup attempts to stop the workflow engine for the given workflow ID. Does nothing if the
Expand Down
Loading

0 comments on commit b70c932

Please sign in to comment.