Skip to content

Commit

Permalink
feat(handler): implements handle workflow registered event
Browse files Browse the repository at this point in the history
  • Loading branch information
MStreet3 committed Nov 22, 2024
1 parent ff19883 commit 3a06e53
Show file tree
Hide file tree
Showing 5 changed files with 460 additions and 11 deletions.
160 changes: 155 additions & 5 deletions core/services/workflows/syncer/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,19 @@ package syncer

import (
"context"
"crypto/sha256"
"encoding/hex"
"errors"
"fmt"
"sync"

"github.com/smartcontractkit/chainlink-common/pkg/custmsg"
"github.com/smartcontractkit/chainlink-common/pkg/types/core"
"github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/host"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/platform"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/store"
)

Expand Down Expand Up @@ -87,15 +94,30 @@ type WorkflowRegistryWorkflowDeletedV1 struct {
WorkflowName string
}

type secretsFetcher interface {
SecretsFor(ctx context.Context, workflowOwner, workflowName string) (map[string]string, error)
}

// secretsFetcherFunc implements the secretsFetcher interface for a function.
type secretsFetcherFunc func(ctx context.Context, workflowOwner, workflowName string) (map[string]string, error)

Check failure on line 102 in core/services/workflows/syncer/handler.go

View workflow job for this annotation

GitHub Actions / lint

type `secretsFetcherFunc` is unused (unused)

func (f secretsFetcherFunc) SecretsFor(ctx context.Context, workflowOwner, workflowName string) (map[string]string, error) {

Check failure on line 104 in core/services/workflows/syncer/handler.go

View workflow job for this annotation

GitHub Actions / lint

func `secretsFetcherFunc.SecretsFor` is unused (unused)
return f(ctx, workflowOwner, workflowName)
}

// eventHandler is a handler for WorkflowRegistryEvent events. Each event type has a corresponding
// method that handles the event.
type eventHandler struct {
lggr logger.Logger
orm WorkflowSecretsDS
orm WorkflowRegistryDS
fetcher FetcherFunc
workflowStore store.Store
capRegistry core.CapabilitiesRegistry
engineRegistry *engineRegistry
emitter custmsg.MessageEmitter
secretsFetcher secretsFetcher

wg sync.WaitGroup

Check failure on line 120 in core/services/workflows/syncer/handler.go

View workflow job for this annotation

GitHub Actions / lint

field `wg` is unused (unused)
}

// newEventHandler returns a new eventHandler instance.
Expand All @@ -106,6 +128,8 @@ func newEventHandler(
workflowStore store.Store,
capRegistry core.CapabilitiesRegistry,
engineRegistry *engineRegistry,
emitter custmsg.MessageEmitter,
secretsFetcher secretsFetcher,
) *eventHandler {
return &eventHandler{
lggr: lggr,
Expand All @@ -114,6 +138,8 @@ func newEventHandler(
workflowStore: workflowStore,
capRegistry: capRegistry,
engineRegistry: engineRegistry,
emitter: emitter,
secretsFetcher: secretsFetcher,
}
}

Expand All @@ -135,12 +161,119 @@ func (h *eventHandler) Handle(ctx context.Context, event WorkflowRegistryEvent)
}

// workflowRegisteredEvent handles the WorkflowRegisteredEvent event type.
// TODO: Implement this method
func (h *eventHandler) workflowRegisteredEvent(
_ context.Context,
_ WorkflowRegistryEvent,
ctx context.Context,
event WorkflowRegistryEvent,
) error {
return ErrNotImplemented
payload, ok := event.Data.(WorkflowRegistryWorkflowRegisteredV1)
if !ok {
return fmt.Errorf("invalid data type %T for event", event.Data)
}

wfID := hex.EncodeToString(payload.WorkflowID[:])

cma := h.emitter.With(
platform.KeyWorkflowID, wfID,
platform.KeyWorkflowName, payload.WorkflowName,
platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner),
)

// Download the contents of binaryURL, configURL and secretsURL and cache them locally.
binary, err := h.fetcher(ctx, payload.BinaryURL)
if err != nil {
logCustMsg(ctx, cma, fmt.Sprintf("failed to fetch binary: %v", err), h.lggr)
return err
}

config, err := h.fetcher(ctx, payload.ConfigURL)
if err != nil {
logCustMsg(ctx, cma, fmt.Sprintf("failed to fetch config: %v", err), h.lggr)
return err
}

secrets, err := h.fetcher(ctx, payload.SecretsURL)
if err != nil {
logCustMsg(ctx, cma, fmt.Sprintf("failed to fetch secrets: %v", err), h.lggr)
return err
}

// Calculate the hash of the binary and config files
hash := sha(binary, config, []byte(payload.SecretsURL))

// Pre-check: verify that the workflowID matches; if it doesn’t abort and log an error via Beholder.
if hash != wfID {
logCustMsg(ctx, cma, fmt.Sprintf("workflowID mismatch: %s != %s", hash, wfID), h.lggr)
return fmt.Errorf("workflowID mismatch: %s != %s", hash, wfID)
}

// Save the workflow secrets
urlHash, err := h.orm.GetSecretsURLHash(payload.WorkflowOwner, []byte(payload.SecretsURL))
if err != nil {
logCustMsg(ctx, cma, fmt.Sprintf("failed to get secrets URL hash: %v", err), h.lggr)
return fmt.Errorf("failed to get secrets URL hash: %w", err)
}

// Create a new entry in the workflow_spec table corresponding for the new workflow, with the contents of the binaryURL + configURL in the table
status := job.WorkflowSpecStatusActive
if payload.Status == 1 {
status = job.WorkflowSpecStatusPaused
}

entry := &job.WorkflowSpec{
Workflow: hex.EncodeToString(binary),
Config: string(config),
WorkflowID: wfID,
Status: status,
WorkflowOwner: hex.EncodeToString(payload.WorkflowOwner),
WorkflowName: payload.WorkflowName,
SpecType: job.WASMFile,
BinaryURL: payload.BinaryURL,
ConfigURL: payload.ConfigURL,
}
if _, err := h.orm.UpsertWorkflowSpecWithSecrets(ctx, entry, payload.SecretsURL, hex.EncodeToString(urlHash), string(secrets)); err != nil {

Check failure on line 233 in core/services/workflows/syncer/handler.go

View workflow job for this annotation

GitHub Actions / lint

shadow: declaration of "err" shadows declaration at line 182 (govet)
logCustMsg(ctx, cma, fmt.Sprintf("failed to upsert workflow spec with secrets: %v", err), h.lggr)
return fmt.Errorf("failed to upsert workflow spec with secrets: %w", err)
}

if status != job.WorkflowSpecStatusActive {
return nil
}

// If status == active, start a new WorkflowEngine instance, and add it to local engine registry
moduleConfig := &host.ModuleConfig{Logger: h.lggr, Labeler: h.emitter}
sdkSpec, err := host.GetWorkflowSpec(ctx, moduleConfig, binary, config)
if err != nil {
logCustMsg(ctx, cma, fmt.Sprintf("failed to start workflow engine: failed to get workflow sdk spec: %v", err), h.lggr)
return fmt.Errorf("failed to get workflow sdk spec: %w", err)
}

cfg := workflows.Config{
Lggr: h.lggr,
Workflow: *sdkSpec,
WorkflowID: wfID,
WorkflowOwner: hex.EncodeToString(payload.WorkflowOwner),
WorkflowName: payload.WorkflowName,
Registry: h.capRegistry,
Store: h.workflowStore,
Config: config,
Binary: binary,
SecretsFetcher: h.secretsFetcher,
}
e, err := workflows.NewEngine(ctx, cfg)
if err != nil {
logCustMsg(ctx, cma, fmt.Sprintf("failed to create workflow engine: %v", err), h.lggr)
return fmt.Errorf("failed to create workflow engine: %w", err)
}

err = e.Start(ctx)
if err != nil {
logCustMsg(ctx, cma, fmt.Sprintf("failed to start workflow engine: %v", err), h.lggr)
return fmt.Errorf("failed to start workflow engine: %w", err)
}

h.engineRegistry.Add(wfID, e)
logCustMsg(ctx, cma, fmt.Sprintf("workflow engine started: %x", wfID), h.lggr)
return nil
}

// workflowUpdatedEvent handles the WorkflowUpdatedEvent event type.
Expand Down Expand Up @@ -199,3 +332,20 @@ func (h *eventHandler) forceUpdateSecretsEvent(

return nil
}

// sha calculates the sha256 hash of the wasm, config and secretsURL to determine the workflow ID.
func sha(wasm, config, secretsURL []byte) string {
sum := sha256.New()
sum.Write(wasm)
sum.Write(config)
sum.Write(secretsURL)
return hex.EncodeToString(sum.Sum(nil))
}

// logCustMsg emits a custom message to the external sink and logs an error if that fails.
func logCustMsg(ctx context.Context, cma custmsg.MessageEmitter, msg string, log logger.Logger) {
err := cma.Emit(ctx, msg)
if err != nil {
log.Errorf("failed to send custom message with msg: %s, err: %v", msg, err)
}
}
Loading

0 comments on commit 3a06e53

Please sign in to comment.