Skip to content

Commit

Permalink
fix(workflows/syncer): skips fetching data for missing urls (#15519)
Browse files Browse the repository at this point in the history
  • Loading branch information
MStreet3 authored Dec 9, 2024
1 parent a707433 commit 8420829
Show file tree
Hide file tree
Showing 2 changed files with 168 additions and 85 deletions.
18 changes: 12 additions & 6 deletions core/services/workflows/syncer/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,14 +383,20 @@ func (h *eventHandler) workflowRegisteredEvent(
return fmt.Errorf("failed to fetch binary from %s : %w", payload.BinaryURL, err)
}

config, err := h.fetcher(ctx, payload.ConfigURL)
if err != nil {
return fmt.Errorf("failed to fetch config from %s : %w", payload.ConfigURL, err)
var config []byte
if payload.ConfigURL != "" {
config, err = h.fetcher(ctx, payload.ConfigURL)
if err != nil {
return fmt.Errorf("failed to fetch config from %s : %w", payload.ConfigURL, err)
}
}

secrets, err := h.fetcher(ctx, payload.SecretsURL)
if err != nil {
return fmt.Errorf("failed to fetch secrets from %s : %w", payload.SecretsURL, err)
var secrets []byte
if payload.SecretsURL != "" {
secrets, err = h.fetcher(ctx, payload.SecretsURL)
if err != nil {
return fmt.Errorf("failed to fetch secrets from %s : %w", payload.SecretsURL, err)
}
}

// Calculate the hash of the binary and config files
Expand Down
235 changes: 156 additions & 79 deletions core/services/workflows/syncer/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,94 +173,182 @@ const (
)

func Test_workflowRegisteredHandler(t *testing.T) {
t.Run("success with paused workflow registered", func(t *testing.T) {
var (
ctx = testutils.Context(t)
lggr = logger.TestLogger(t)
db = pgtest.NewSqlxDB(t)
orm = NewWorkflowRegistryDS(db, lggr)
emitter = custmsg.NewLabeler()
var binaryURL = "http://example.com/binary"
var secretsURL = "http://example.com/secrets"
var configURL = "http://example.com/config"
var config = []byte("")
var wfOwner = []byte("0xOwner")
var binary = wasmtest.CreateTestBinary(binaryCmd, binaryLocation, true, t)
defaultValidationFn := func(t *testing.T, ctx context.Context, h *eventHandler, wfOwner []byte, wfName string, wfID string) {
// Verify the record is updated in the database
dbSpec, err := h.orm.GetWorkflowSpec(ctx, hex.EncodeToString(wfOwner), "workflow-name")
require.NoError(t, err)
require.Equal(t, hex.EncodeToString(wfOwner), dbSpec.WorkflowOwner)
require.Equal(t, "workflow-name", dbSpec.WorkflowName)
require.Equal(t, job.WorkflowSpecStatusActive, dbSpec.Status)

binary = wasmtest.CreateTestBinary(binaryCmd, binaryLocation, true, t)
config = []byte("")
secretsURL = "http://example.com"
binaryURL = "http://example.com/binary"
configURL = "http://example.com/config"
wfOwner = []byte("0xOwner")
// Verify the engine is started
engine, err := h.engineRegistry.Get(wfID)
require.NoError(t, err)
err = engine.Ready()
require.NoError(t, err)
}

fetcher = newMockFetcher(map[string]mockFetchResp{
var tt = []testCase{
{
Name: "success with active workflow registered",
fetcher: newMockFetcher(map[string]mockFetchResp{
binaryURL: {Body: binary, Err: nil},
configURL: {Body: config, Err: nil},
secretsURL: {Body: []byte("secrets"), Err: nil},
})
)

giveWFID, err := pkgworkflows.GenerateWorkflowID(wfOwner, binary, config, secretsURL)
require.NoError(t, err)

paused := WorkflowRegistryWorkflowRegisteredV1{
Status: uint8(1),
WorkflowID: giveWFID,
Owner: wfOwner,
WorkflowName: "workflow-name",
BinaryURL: binaryURL,
ConfigURL: configURL,
SecretsURL: secretsURL,
}
}),
GiveConfig: config,
ConfigURL: configURL,
SecretsURL: secretsURL,
BinaryURL: binaryURL,
GiveBinary: binary,
WFOwner: wfOwner,
Event: func(wfID []byte) WorkflowRegistryWorkflowRegisteredV1 {
return WorkflowRegistryWorkflowRegisteredV1{
Status: uint8(0),
WorkflowID: [32]byte(wfID),
Owner: wfOwner,
WorkflowName: "workflow-name",
BinaryURL: binaryURL,
ConfigURL: configURL,
SecretsURL: secretsURL,
}
},
validationFn: defaultValidationFn,
},
{
Name: "success with paused workflow registered",
fetcher: newMockFetcher(map[string]mockFetchResp{
binaryURL: {Body: binary, Err: nil},
configURL: {Body: config, Err: nil},
secretsURL: {Body: []byte("secrets"), Err: nil},
}),
GiveConfig: config,
ConfigURL: configURL,
SecretsURL: secretsURL,
BinaryURL: binaryURL,
GiveBinary: binary,
WFOwner: wfOwner,
Event: func(wfID []byte) WorkflowRegistryWorkflowRegisteredV1 {
return WorkflowRegistryWorkflowRegisteredV1{
Status: uint8(1),
WorkflowID: [32]byte(wfID),
Owner: wfOwner,
WorkflowName: "workflow-name",
BinaryURL: binaryURL,
ConfigURL: configURL,
SecretsURL: secretsURL,
}
},
validationFn: func(t *testing.T, ctx context.Context, h *eventHandler, wfOwner []byte, wfName string, wfID string) {
// Verify the record is updated in the database
dbSpec, err := h.orm.GetWorkflowSpec(ctx, hex.EncodeToString(wfOwner), "workflow-name")
require.NoError(t, err)
require.Equal(t, hex.EncodeToString(wfOwner), dbSpec.WorkflowOwner)
require.Equal(t, "workflow-name", dbSpec.WorkflowName)
require.Equal(t, job.WorkflowSpecStatusPaused, dbSpec.Status)

// Verify there is no running engine
_, err = h.engineRegistry.Get(wfID)
require.Error(t, err)
},
},
{
Name: "skips fetch if config url is missing",
GiveConfig: make([]byte, 0),
ConfigURL: "",
SecretsURL: secretsURL,
BinaryURL: binaryURL,
GiveBinary: binary,
WFOwner: wfOwner,
fetcher: newMockFetcher(map[string]mockFetchResp{
binaryURL: {Body: binary, Err: nil},
secretsURL: {Body: []byte("secrets"), Err: nil},
}),
validationFn: defaultValidationFn,
Event: func(wfID []byte) WorkflowRegistryWorkflowRegisteredV1 {
return WorkflowRegistryWorkflowRegisteredV1{
Status: uint8(0),
WorkflowID: [32]byte(wfID),
Owner: wfOwner,
WorkflowName: "workflow-name",
BinaryURL: binaryURL,
SecretsURL: secretsURL,
}
},
},
{
Name: "skips fetch if secrets url is missing",
GiveConfig: config,
ConfigURL: configURL,
BinaryURL: binaryURL,
GiveBinary: binary,
WFOwner: wfOwner,
fetcher: newMockFetcher(map[string]mockFetchResp{
binaryURL: {Body: binary, Err: nil},
configURL: {Body: config, Err: nil},
}),
validationFn: defaultValidationFn,
Event: func(wfID []byte) WorkflowRegistryWorkflowRegisteredV1 {
return WorkflowRegistryWorkflowRegisteredV1{
Status: uint8(0),
WorkflowID: [32]byte(wfID),
Owner: wfOwner,
WorkflowName: "workflow-name",
BinaryURL: binaryURL,
ConfigURL: configURL,
}
},
},
}

h := &eventHandler{
lggr: lggr,
orm: orm,
fetcher: fetcher,
emitter: emitter,
}
err = h.workflowRegisteredEvent(ctx, paused)
require.NoError(t, err)
for _, tc := range tt {
testRunningWorkflow(t, tc)
}
}

// Verify the record is updated in the database
dbSpec, err := orm.GetWorkflowSpec(ctx, hex.EncodeToString(wfOwner), "workflow-name")
require.NoError(t, err)
require.Equal(t, hex.EncodeToString(wfOwner), dbSpec.WorkflowOwner)
require.Equal(t, "workflow-name", dbSpec.WorkflowName)
require.Equal(t, job.WorkflowSpecStatusPaused, dbSpec.Status)
})
type testCase struct {
Name string
SecretsURL string
BinaryURL string
GiveBinary []byte
GiveConfig []byte
ConfigURL string
WFOwner []byte
fetcher FetcherFunc
Event func([]byte) WorkflowRegistryWorkflowRegisteredV1
validationFn func(t *testing.T, ctx context.Context, h *eventHandler, wfOwner []byte, wfName string, wfID string)
}

t.Run("success with active workflow registered", func(t *testing.T) {
func testRunningWorkflow(t *testing.T, cmd testCase) {
t.Helper()
t.Run(cmd.Name, func(t *testing.T) {
var (
ctx = testutils.Context(t)
lggr = logger.TestLogger(t)
db = pgtest.NewSqlxDB(t)
orm = NewWorkflowRegistryDS(db, lggr)
emitter = custmsg.NewLabeler()

binary = wasmtest.CreateTestBinary(binaryCmd, binaryLocation, true, t)
config = []byte("")
secretsURL = "http://example.com"
binaryURL = "http://example.com/binary"
configURL = "http://example.com/config"
wfOwner = []byte("0xOwner")
binary = cmd.GiveBinary
config = cmd.GiveConfig
secretsURL = cmd.SecretsURL
wfOwner = cmd.WFOwner

fetcher = newMockFetcher(map[string]mockFetchResp{
binaryURL: {Body: binary, Err: nil},
configURL: {Body: config, Err: nil},
secretsURL: {Body: []byte("secrets"), Err: nil},
})
fetcher = cmd.fetcher
)

giveWFID, err := pkgworkflows.GenerateWorkflowID(wfOwner, binary, config, secretsURL)
require.NoError(t, err)

require.NoError(t, err)
wfID := hex.EncodeToString(giveWFID[:])

active := WorkflowRegistryWorkflowRegisteredV1{
Status: uint8(0),
WorkflowID: giveWFID,
Owner: wfOwner,
WorkflowName: "workflow-name",
BinaryURL: binaryURL,
ConfigURL: configURL,
SecretsURL: secretsURL,
}
event := cmd.Event(giveWFID[:])

er := newEngineRegistry()
store := wfstore.NewDBStore(db, lggr, clockwork.NewFakeClock())
Expand All @@ -275,21 +363,10 @@ func Test_workflowRegisteredHandler(t *testing.T) {
capRegistry: registry,
workflowStore: store,
}
err = h.workflowRegisteredEvent(ctx, active)
require.NoError(t, err)

// Verify the record is updated in the database
dbSpec, err := orm.GetWorkflowSpec(ctx, hex.EncodeToString(wfOwner), "workflow-name")
err = h.workflowRegisteredEvent(ctx, event)
require.NoError(t, err)
require.Equal(t, hex.EncodeToString(wfOwner), dbSpec.WorkflowOwner)
require.Equal(t, "workflow-name", dbSpec.WorkflowName)
require.Equal(t, job.WorkflowSpecStatusActive, dbSpec.Status)

// Verify the engine is started
engine, err := h.engineRegistry.Get(hex.EncodeToString(giveWFID[:]))
require.NoError(t, err)
err = engine.Ready()
require.NoError(t, err)
cmd.validationFn(t, ctx, h, wfOwner, "workflow-name", wfID)
})
}

Expand Down

0 comments on commit 8420829

Please sign in to comment.