Skip to content

Commit

Permalink
feat(workflows): adds registry syncer
Browse files Browse the repository at this point in the history
  • Loading branch information
MStreet3 committed Nov 19, 2024
1 parent 5a68674 commit 32b5582
Show file tree
Hide file tree
Showing 17 changed files with 1,837 additions and 17 deletions.
15 changes: 15 additions & 0 deletions .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,21 @@ packages:
github.com/smartcontractkit/chainlink/v2/core/services/registrysyncer:
interfaces:
ORM:
github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer:
interfaces:
ORM:
ContractReader:
config:
mockname: "Mock{{ .InterfaceName }}"
filename: contract_reader_mock.go
inpackage: true
dir: "{{ .InterfaceDir }}"
Handler:
config:
mockname: "Mock{{ .InterfaceName }}"
filename: handler_mock.go
inpackage: true
dir: "{{ .InterfaceDir }}"
github.com/smartcontractkit/chainlink/v2/core/capabilities/targets:
interfaces:
ContractValueGetter:
2 changes: 1 addition & 1 deletion core/services/chainlink/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func NewApplication(opts ApplicationOpts) (Application, error) {

// TODO: wire this up to config so we only instantiate it
// if a workflow registry address is provided.
workflowRegistrySyncer := syncer.NewWorkflowRegistry()
workflowRegistrySyncer := syncer.NewNullWorkflowRegistrySyncer()
srvcs = append(srvcs, workflowRegistrySyncer)

var externalPeerWrapper p2ptypes.PeerWrapper
Expand Down
4 changes: 2 additions & 2 deletions core/services/workflows/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (sucm *stepUpdateManager) len() int64 {
}

type secretsFetcher interface {
SecretsFor(workflowOwner, workflowName string) (map[string]string, error)
SecretsFor(ctx context.Context, workflowOwner, workflowName string) (map[string]string, error)
}

// Engine handles the lifecycle of a single workflow and its executions.
Expand Down Expand Up @@ -849,7 +849,7 @@ func (e *Engine) interpolateEnvVars(config map[string]any, env exec.Env) (*value
// registry (for capability-level configuration). It doesn't perform any caching of the config values, since
// the two registries perform their own caching.
func (e *Engine) configForStep(ctx context.Context, lggr logger.Logger, step *step) (*values.Map, error) {
secrets, err := e.secretsFetcher.SecretsFor(e.workflow.owner, e.workflow.name)
secrets, err := e.secretsFetcher.SecretsFor(ctx, e.workflow.owner, e.workflow.name)
if err != nil {
return nil, fmt.Errorf("failed to fetch secrets: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions core/services/workflows/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func newTestEngineWithYAMLSpec(t *testing.T, reg *coreCap.Registry, spec string,

type mockSecretsFetcher struct{}

func (s mockSecretsFetcher) SecretsFor(workflowOwner, workflowName string) (map[string]string, error) {
func (s mockSecretsFetcher) SecretsFor(ctx context.Context, workflowOwner, workflowName string) (map[string]string, error) {
return map[string]string{}, nil
}

Expand Down Expand Up @@ -1605,7 +1605,7 @@ type mockFetcher struct {
retval map[string]string
}

func (m *mockFetcher) SecretsFor(workflowOwner, workflowName string) (map[string]string, error) {
func (m *mockFetcher) SecretsFor(ctx context.Context, workflowOwner, workflowName string) (map[string]string, error) {
return m.retval, nil
}

Expand Down
148 changes: 148 additions & 0 deletions core/services/workflows/syncer/contract_reader_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

72 changes: 72 additions & 0 deletions core/services/workflows/syncer/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package syncer

import (
"context"
"encoding/hex"
"fmt"

"github.com/smartcontractkit/chainlink/v2/core/logger"
)

// eventHandler is a handler for WorkflowRegistryEvent events. Each event type has a corresponding
// method that handles the event.
type eventHandler struct {
lggr logger.Logger
orm ORM
fetcher FetcherFunc
}

// newEventHandler returns a new eventHandler instance.
func newEventHandler(
lggr logger.Logger,
orm ORM,
gateway FetcherFunc,
) *eventHandler {
return &eventHandler{
lggr: lggr,
orm: orm,
fetcher: gateway,
}
}

func (h *eventHandler) Handle(ctx context.Context, event WorkflowRegistryEvent) error {
switch event.EventType {
case ForceUpdateSecretsEvent:
return h.forceUpdateSecretsEvent(ctx, event)
default:
return fmt.Errorf("event type unsupported: %v", event.EventType)
}
}

// forceUpdateSecretsEvent handles the ForceUpdateSecretsEvent event type.
func (h *eventHandler) forceUpdateSecretsEvent(
ctx context.Context,
event WorkflowRegistryEvent,
) error {
// Get the URL of the secrets file from the event data
data, ok := event.Data.(WorkflowRegistryForceUpdateSecretsRequestedV1)
if !ok {
return fmt.Errorf("invalid data type %T for event", event.Data)
}

hash := hex.EncodeToString(data.SecretsURLHash)

url, err := h.orm.GetSecretsURLByHash(ctx, hash)
if err != nil {
h.lggr.Errorf("failed to get URL by hash %s : %s", hash, err)
return err
}

// Fetch the contents of the secrets file from the url via the fetcher
secrets, err := h.fetcher(ctx, url)
if err != nil {
return err
}

// Update the secrets in the ORM
if _, err := h.orm.Update(ctx, hash, string(secrets)); err != nil {
return err
}

return nil
}
Loading

0 comments on commit 32b5582

Please sign in to comment.