Skip to content

Commit

Permalink
[CAPPL-338] Move to a common method to generate a workflowID (#15513)
Browse files Browse the repository at this point in the history
* [CAPPL-338] Harmonize generate workflow ID implementations

* Linting
  • Loading branch information
cedric-cordenier authored Dec 5, 2024
1 parent 90a82cb commit c3c926f
Show file tree
Hide file tree
Showing 15 changed files with 80 additions and 78 deletions.
4 changes: 3 additions & 1 deletion core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ module github.com/smartcontractkit/chainlink/core/scripts

go 1.23.3

toolchain go1.23.4

// Make sure we're working with the latest chainlink libs
replace github.com/smartcontractkit/chainlink/v2 => ../../

Expand All @@ -24,7 +26,7 @@ require (
github.com/prometheus/client_golang v1.20.5
github.com/shopspring/decimal v1.4.0
github.com/smartcontractkit/chainlink-automation v0.8.1
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241202172404-26d4a0b45b23
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241204184525-29871ced7b4d
github.com/smartcontractkit/chainlink/deployment v0.0.0-00010101000000-000000000000
github.com/smartcontractkit/chainlink/v2 v2.14.0-mercury-20240807.0.20241106193309-5560cd76211a
github.com/smartcontractkit/libocr v0.0.0-20241007185508-adbe57025f12
Expand Down
4 changes: 2 additions & 2 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1142,8 +1142,8 @@ github.com/smartcontractkit/chainlink-automation v0.8.1 h1:sTc9LKpBvcKPc1JDYAmgB
github.com/smartcontractkit/chainlink-automation v0.8.1/go.mod h1:Iij36PvWZ6blrdC5A/nrQUBuf3MH3JvsBB9sSyc9W08=
github.com/smartcontractkit/chainlink-ccip v0.0.0-20241204015713-8956bb614e9e h1:GnM6ZWV6vlk2+n6c6o+v/R1LtXzBGVVx7r37nt/h6Uc=
github.com/smartcontractkit/chainlink-ccip v0.0.0-20241204015713-8956bb614e9e/go.mod h1:80vGBbOfertJig0xFKsRfm+i17FkjdKkk1dAaGE45Os=
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241202172404-26d4a0b45b23 h1:eFXguUq9e4ETuSteII8ge3mJKyyVIt5pIUxuvyuUuTA=
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241202172404-26d4a0b45b23/go.mod h1:bQktEJf7sJ0U3SmIcXvbGUox7SmXcnSEZ4kUbT8R5Nk=
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241204184525-29871ced7b4d h1:5XKarlliHXVVAhpCeOAx/TRU7eWsJ3tkqRI3I6Cc0SU=
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241204184525-29871ced7b4d/go.mod h1:bQktEJf7sJ0U3SmIcXvbGUox7SmXcnSEZ4kUbT8R5Nk=
github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241202195413-82468150ac1e h1:PRoeby6ZlTuTkv2f+7tVU4+zboTfRzI+beECynF4JQ0=
github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241202195413-82468150ac1e/go.mod h1:mUh5/woemsVaHgTorA080hrYmO3syBCmPdnWc/5dOqk=
github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241202141438-a90db35252db h1:N1RH1hSr2ACzOFc9hkCcjE8pRBTdcU3p8nsTJByaLes=
Expand Down
11 changes: 6 additions & 5 deletions core/services/chainlink/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,18 +300,19 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
return nil, fmt.Errorf("expected 1 key, got %d", len(keys))
}

fetcher := syncer.NewFetcherService(globalLogger, gatewayConnectorWrapper)
lggr := globalLogger.Named("WorkflowRegistrySyncer")
fetcher := syncer.NewFetcherService(lggr, gatewayConnectorWrapper)

eventHandler := syncer.NewEventHandler(globalLogger, syncer.NewWorkflowRegistryDS(opts.DS, globalLogger),
fetcher.Fetch, workflowstore.NewDBStore(opts.DS, globalLogger, clockwork.NewRealClock()), opts.CapabilitiesRegistry,
eventHandler := syncer.NewEventHandler(lggr, syncer.NewWorkflowRegistryDS(opts.DS, globalLogger),
fetcher.Fetch, workflowstore.NewDBStore(opts.DS, lggr, clockwork.NewRealClock()), opts.CapabilitiesRegistry,
custmsg.NewLabeler(), clockwork.NewRealClock(), keys[0])

loader := syncer.NewWorkflowRegistryContractLoader(globalLogger, cfg.Capabilities().WorkflowRegistry().Address(), func(ctx context.Context, bytes []byte) (syncer.ContractReader, error) {
loader := syncer.NewWorkflowRegistryContractLoader(lggr, cfg.Capabilities().WorkflowRegistry().Address(), func(ctx context.Context, bytes []byte) (syncer.ContractReader, error) {
return relayer.NewContractReader(ctx, bytes)
}, eventHandler)

globalLogger.Debugw("Creating WorkflowRegistrySyncer")
wfSyncer := syncer.NewWorkflowRegistry(globalLogger, func(ctx context.Context, bytes []byte) (syncer.ContractReader, error) {
wfSyncer := syncer.NewWorkflowRegistry(lggr, func(ctx context.Context, bytes []byte) (syncer.ContractReader, error) {
return relayer.NewContractReader(ctx, bytes)
}, cfg.Capabilities().WorkflowRegistry().Address(),
syncer.WorkflowEventPollerConfig{
Expand Down
2 changes: 1 addition & 1 deletion core/services/workflows/syncer/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (s *FetcherService) Start(ctx context.Context) error {
return s.StartOnce("FetcherService", func() error {
connector := s.wrapper.GetGatewayConnector()

outgoingConnectorLggr := s.lggr.Named("WorkflowSyncer")
outgoingConnectorLggr := s.lggr.Named("OutgoingConnectorHandler")

webAPIConfig := webapi.ServiceConfig{
RateLimiter: common.RateLimiterConfig{
Expand Down
32 changes: 16 additions & 16 deletions core/services/workflows/syncer/handler.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package syncer

import (
"bytes"
"context"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"errors"
Expand All @@ -14,6 +14,7 @@ import (

"github.com/smartcontractkit/chainlink-common/pkg/custmsg"
"github.com/smartcontractkit/chainlink-common/pkg/types/core"
pkgworkflows "github.com/smartcontractkit/chainlink-common/pkg/workflows"
"github.com/smartcontractkit/chainlink-common/pkg/workflows/secrets"
"github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/host"
"github.com/smartcontractkit/chainlink/v2/core/logger"
Expand Down Expand Up @@ -263,6 +264,7 @@ func (h *eventHandler) Handle(ctx context.Context, event Event) error {
return err
}

h.lggr.Debugw("handled force update secrets events for URL hash", "urlHash", payload.SecretsURLHash)
return nil
case WorkflowRegisteredEvent:
payload, ok := event.GetData().(WorkflowRegistryWorkflowRegisteredV1)
Expand All @@ -282,7 +284,7 @@ func (h *eventHandler) Handle(ctx context.Context, event Event) error {
return err
}

h.lggr.Debugf("workflow 0x%x registered and started", wfID)
h.lggr.Debugw("handled workflow registration event", "workflowID", wfID)
return nil
case WorkflowUpdatedEvent:
payload, ok := event.GetData().(WorkflowRegistryWorkflowUpdatedV1)
Expand All @@ -302,6 +304,7 @@ func (h *eventHandler) Handle(ctx context.Context, event Event) error {
return err
}

h.lggr.Debugw("handled workflow updated event", "workflowID", newWorkflowID)
return nil
case WorkflowPausedEvent:
payload, ok := event.GetData().(WorkflowRegistryWorkflowPausedV1)
Expand All @@ -321,6 +324,7 @@ func (h *eventHandler) Handle(ctx context.Context, event Event) error {
logCustMsg(ctx, cma, fmt.Sprintf("failed to handle workflow paused event: %v", err), h.lggr)
return err
}
h.lggr.Debugw("handled workflow paused event", "workflowID", wfID)
return nil
case WorkflowActivatedEvent:
payload, ok := event.GetData().(WorkflowRegistryWorkflowActivatedV1)
Expand All @@ -340,6 +344,7 @@ func (h *eventHandler) Handle(ctx context.Context, event Event) error {
return err
}

h.lggr.Debugw("handled workflow activated event", "workflowID", wfID)
return nil
case WorkflowDeletedEvent:
payload, ok := event.GetData().(WorkflowRegistryWorkflowDeletedV1)
Expand All @@ -360,6 +365,7 @@ func (h *eventHandler) Handle(ctx context.Context, event Event) error {
return err
}

h.lggr.Debugw("handled workflow deleted event", "workflowID", wfID)
return nil
default:
return fmt.Errorf("event type unsupported: %v", event.GetEventType())
Expand All @@ -371,8 +377,6 @@ func (h *eventHandler) workflowRegisteredEvent(
ctx context.Context,
payload WorkflowRegistryWorkflowRegisteredV1,
) error {
wfID := hex.EncodeToString(payload.WorkflowID[:])

// Download the contents of binaryURL, configURL and secretsURL and cache them locally.
binary, err := h.fetcher(ctx, payload.BinaryURL)
if err != nil {
Expand All @@ -390,11 +394,14 @@ func (h *eventHandler) workflowRegisteredEvent(
}

// Calculate the hash of the binary and config files
hash := workflowID(binary, config, []byte(payload.SecretsURL))
hash, err := pkgworkflows.GenerateWorkflowID(payload.Owner, binary, config, payload.SecretsURL)
if err != nil {
return fmt.Errorf("failed to generate workflow id: %w", err)
}

// Pre-check: verify that the workflowID matches; if it doesn’t abort and log an error via Beholder.
if hash != wfID {
return fmt.Errorf("workflowID mismatch: %s != %s", hash, wfID)
if !bytes.Equal(hash[:], payload.WorkflowID[:]) {
return fmt.Errorf("workflowID mismatch: %x != %x", hash, payload.WorkflowID)
}

// Save the workflow secrets
Expand All @@ -409,6 +416,7 @@ func (h *eventHandler) workflowRegisteredEvent(
status = job.WorkflowSpecStatusPaused
}

wfID := hex.EncodeToString(payload.WorkflowID[:])
entry := &job.WorkflowSpec{
Workflow: hex.EncodeToString(binary),
Config: string(config),
Expand All @@ -425,6 +433,7 @@ func (h *eventHandler) workflowRegisteredEvent(
}

if status != job.WorkflowSpecStatusActive {
h.lggr.Debugw("workflow is marked as paused, so not starting it", "workflow", wfID)
return nil
}

Expand Down Expand Up @@ -611,15 +620,6 @@ func (h *eventHandler) tryEngineCleanup(wfID string) error {
return nil
}

// workflowID returns a hex encoded sha256 hash of the wasm, config and secretsURL.
func workflowID(wasm, config, secretsURL []byte) string {
sum := sha256.New()
sum.Write(wasm)
sum.Write(config)
sum.Write(secretsURL)
return hex.EncodeToString(sum.Sum(nil))
}

// logCustMsg emits a custom message to the external sink and logs an error if that fails.
func logCustMsg(ctx context.Context, cma custmsg.MessageEmitter, msg string, log logger.Logger) {
err := cma.Emit(ctx, msg)
Expand Down
69 changes: 30 additions & 39 deletions core/services/workflows/syncer/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/smartcontractkit/chainlink-common/pkg/custmsg"
pkgworkflows "github.com/smartcontractkit/chainlink-common/pkg/workflows"
"github.com/smartcontractkit/chainlink-common/pkg/workflows/secrets"
"github.com/smartcontractkit/chainlink/v2/core/capabilities"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
Expand Down Expand Up @@ -194,16 +195,12 @@ func Test_workflowRegisteredHandler(t *testing.T) {
})
)

giveWFID := workflowID(binary, config, []byte(secretsURL))

b, err := hex.DecodeString(giveWFID)
giveWFID, err := pkgworkflows.GenerateWorkflowID(wfOwner, binary, config, secretsURL)
require.NoError(t, err)
wfID := make([]byte, 32)
copy(wfID, b)

paused := WorkflowRegistryWorkflowRegisteredV1{
Status: uint8(1),
WorkflowID: [32]byte(wfID),
WorkflowID: giveWFID,
Owner: wfOwner,
WorkflowName: "workflow-name",
BinaryURL: binaryURL,
Expand Down Expand Up @@ -250,16 +247,14 @@ func Test_workflowRegisteredHandler(t *testing.T) {
})
)

giveWFID := workflowID(binary, config, []byte(secretsURL))
giveWFID, err := pkgworkflows.GenerateWorkflowID(wfOwner, binary, config, secretsURL)
require.NoError(t, err)

b, err := hex.DecodeString(giveWFID)
require.NoError(t, err)
wfID := make([]byte, 32)
copy(wfID, b)

active := WorkflowRegistryWorkflowRegisteredV1{
Status: uint8(0),
WorkflowID: [32]byte(wfID),
WorkflowID: giveWFID,
Owner: wfOwner,
WorkflowName: "workflow-name",
BinaryURL: binaryURL,
Expand Down Expand Up @@ -291,7 +286,7 @@ func Test_workflowRegisteredHandler(t *testing.T) {
require.Equal(t, job.WorkflowSpecStatusActive, dbSpec.Status)

// Verify the engine is started
engine, err := h.engineRegistry.Get(giveWFID)
engine, err := h.engineRegistry.Get(hex.EncodeToString(giveWFID[:]))
require.NoError(t, err)
err = engine.Ready()
require.NoError(t, err)
Expand Down Expand Up @@ -321,16 +316,14 @@ func Test_workflowDeletedHandler(t *testing.T) {
})
)

giveWFID := workflowID(binary, config, []byte(secretsURL))
giveWFID, err := pkgworkflows.GenerateWorkflowID(wfOwner, binary, config, secretsURL)

b, err := hex.DecodeString(giveWFID)
require.NoError(t, err)
wfID := make([]byte, 32)
copy(wfID, b)
wfIDs := hex.EncodeToString(giveWFID[:])

active := WorkflowRegistryWorkflowRegisteredV1{
Status: uint8(0),
WorkflowID: [32]byte(wfID),
WorkflowID: giveWFID,
Owner: wfOwner,
WorkflowName: "workflow-name",
BinaryURL: binaryURL,
Expand Down Expand Up @@ -362,13 +355,13 @@ func Test_workflowDeletedHandler(t *testing.T) {
require.Equal(t, job.WorkflowSpecStatusActive, dbSpec.Status)

// Verify the engine is started
engine, err := h.engineRegistry.Get(giveWFID)
engine, err := h.engineRegistry.Get(wfIDs)
require.NoError(t, err)
err = engine.Ready()
require.NoError(t, err)

deleteEvent := WorkflowRegistryWorkflowDeletedV1{
WorkflowID: [32]byte(wfID),
WorkflowID: giveWFID,
WorkflowOwner: wfOwner,
WorkflowName: "workflow-name",
DonID: 1,
Expand All @@ -381,7 +374,7 @@ func Test_workflowDeletedHandler(t *testing.T) {
require.Error(t, err)

// Verify the engine is deleted
_, err = h.engineRegistry.Get(giveWFID)
_, err = h.engineRegistry.Get(wfIDs)
require.Error(t, err)
})
}
Expand Down Expand Up @@ -412,22 +405,20 @@ func Test_workflowPausedActivatedUpdatedHandler(t *testing.T) {
})
)

giveWFID := workflowID(binary, config, []byte(secretsURL))
updatedWFID := workflowID(binary, updateConfig, []byte(secretsURL))
giveWFID, err := pkgworkflows.GenerateWorkflowID(wfOwner, binary, config, secretsURL)
require.NoError(t, err)
updatedWFID, err := pkgworkflows.GenerateWorkflowID(wfOwner, binary, updateConfig, secretsURL)
require.NoError(t, err)

b, err := hex.DecodeString(giveWFID)
require.NoError(t, err)
wfID := make([]byte, 32)
copy(wfID, b)
wfIDs := hex.EncodeToString(giveWFID[:])

b, err = hex.DecodeString(updatedWFID)
require.NoError(t, err)
newWFID := make([]byte, 32)
copy(newWFID, b)
newWFIDs := hex.EncodeToString(updatedWFID[:])

active := WorkflowRegistryWorkflowRegisteredV1{
Status: uint8(0),
WorkflowID: [32]byte(wfID),
WorkflowID: giveWFID,
Owner: wfOwner,
WorkflowName: "workflow-name",
BinaryURL: binaryURL,
Expand Down Expand Up @@ -459,14 +450,14 @@ func Test_workflowPausedActivatedUpdatedHandler(t *testing.T) {
require.Equal(t, job.WorkflowSpecStatusActive, dbSpec.Status)

// Verify the engine is started
engine, err := h.engineRegistry.Get(giveWFID)
engine, err := h.engineRegistry.Get(wfIDs)
require.NoError(t, err)
err = engine.Ready()
require.NoError(t, err)

// create a paused event
pauseEvent := WorkflowRegistryWorkflowPausedV1{
WorkflowID: [32]byte(wfID),
WorkflowID: giveWFID,
WorkflowOwner: wfOwner,
WorkflowName: "workflow-name",
DonID: 1,
Expand All @@ -482,12 +473,12 @@ func Test_workflowPausedActivatedUpdatedHandler(t *testing.T) {
require.Equal(t, job.WorkflowSpecStatusPaused, dbSpec.Status)

// Verify the engine is removed
_, err = h.engineRegistry.Get(giveWFID)
_, err = h.engineRegistry.Get(wfIDs)
require.Error(t, err)

// create an activated workflow event
activatedEvent := WorkflowRegistryWorkflowActivatedV1{
WorkflowID: [32]byte(wfID),
WorkflowID: giveWFID,
WorkflowOwner: wfOwner,
WorkflowName: "workflow-name",
DonID: 1,
Expand All @@ -504,15 +495,15 @@ func Test_workflowPausedActivatedUpdatedHandler(t *testing.T) {
require.Equal(t, job.WorkflowSpecStatusActive, dbSpec.Status)

// Verify the engine is started
engine, err = h.engineRegistry.Get(giveWFID)
engine, err = h.engineRegistry.Get(wfIDs)
require.NoError(t, err)
err = engine.Ready()
require.NoError(t, err)

// create an updated event
updatedEvent := WorkflowRegistryWorkflowUpdatedV1{
OldWorkflowID: [32]byte(wfID),
NewWorkflowID: [32]byte(newWFID),
OldWorkflowID: giveWFID,
NewWorkflowID: updatedWFID,
WorkflowOwner: wfOwner,
WorkflowName: "workflow-name",
BinaryURL: binaryURL,
Expand All @@ -529,16 +520,16 @@ func Test_workflowPausedActivatedUpdatedHandler(t *testing.T) {
require.Equal(t, hex.EncodeToString(wfOwner), dbSpec.WorkflowOwner)
require.Equal(t, "workflow-name", dbSpec.WorkflowName)
require.Equal(t, job.WorkflowSpecStatusActive, dbSpec.Status)
require.Equal(t, hex.EncodeToString(newWFID), dbSpec.WorkflowID)
require.Equal(t, newWFIDs, dbSpec.WorkflowID)
require.Equal(t, newConfigURL, dbSpec.ConfigURL)
require.Equal(t, string(updateConfig), dbSpec.Config)

// old engine is no longer running
_, err = h.engineRegistry.Get(giveWFID)
_, err = h.engineRegistry.Get(wfIDs)
require.Error(t, err)

// new engine is started
engine, err = h.engineRegistry.Get(updatedWFID)
engine, err = h.engineRegistry.Get(newWFIDs)
require.NoError(t, err)
err = engine.Ready()
require.NoError(t, err)
Expand Down
Loading

0 comments on commit c3c926f

Please sign in to comment.