From 8c88fc487d3eafaaa7dddeb78b4d37b1f61fee5a Mon Sep 17 00:00:00 2001 From: Gabriel Paradiso Date: Fri, 13 Dec 2024 11:51:55 +0100 Subject: [PATCH] feat: implement GetWorkflowSpecByID --- core/services/workflows/syncer/handler.go | 54 +++++++------------ core/services/workflows/syncer/mocks/orm.go | 59 +++++++++++++++++++++ core/services/workflows/syncer/orm.go | 19 +++++++ core/services/workflows/syncer/orm_test.go | 39 ++++++++++++++ 4 files changed, 135 insertions(+), 36 deletions(-) diff --git a/core/services/workflows/syncer/handler.go b/core/services/workflows/syncer/handler.go index a5b003d2057..665e71a1bcf 100644 --- a/core/services/workflows/syncer/handler.go +++ b/core/services/workflows/syncer/handler.go @@ -400,6 +400,7 @@ func (h *eventHandler) workflowRegisteredEvent( ctx context.Context, payload WorkflowRegistryWorkflowRegisteredV1, ) error { + // Fetch the workflow artifacts from the database or download them from the specified URLs decodedBinary, config, err := h.getWorkflowArtifacts(ctx, payload) if err != nil { return err @@ -492,48 +493,29 @@ func (h *eventHandler) getWorkflowArtifacts( ctx context.Context, payload WorkflowRegistryWorkflowRegisteredV1, ) ([]byte, []byte, error) { - spec, err := h.orm.GetWorkflowSpec(ctx, hex.EncodeToString(payload.WorkflowOwner), payload.WorkflowName) - if err != nil { - return h.fetchAndDecodeWorkflowArtifacts(ctx, payload) - } - - // there is an update in the BinaryURL or ConfigURL, lets fetch it - if spec.ConfigURL != payload.ConfigURL || spec.BinaryURL != payload.BinaryURL { - return h.fetchAndDecodeWorkflowArtifacts(ctx, payload) - } - - // there is no update in the BinaryURL or ConfigURL, lets decode the stored artifacts - return h.decodeStoredWorkflowArtifacts(spec) -} - -func (h *eventHandler) fetchAndDecodeWorkflowArtifacts( - ctx context.Context, - payload WorkflowRegistryWorkflowRegisteredV1, -) ([]byte, []byte, error) { - binary, err := h.fetcher(ctx, payload.BinaryURL) + spec, err := h.orm.GetWorkflowSpecByID(ctx, hex.EncodeToString(payload.WorkflowID[:])) if err != nil { - return nil, nil, fmt.Errorf("failed to fetch binary from %s : %w", payload.BinaryURL, err) - } + binary, err2 := h.fetcher(ctx, payload.BinaryURL) + if err2 != nil { + return nil, nil, fmt.Errorf("failed to fetch binary from %s : %w", payload.BinaryURL, err) + } - decodedBinary, err := base64.StdEncoding.DecodeString(string(binary)) - if err != nil { - return nil, nil, fmt.Errorf("failed to decode binary: %w", err) - } + decodedBinary, err2 := base64.StdEncoding.DecodeString(string(binary)) + if err2 != nil { + return nil, nil, fmt.Errorf("failed to decode binary: %w", err) + } - var config []byte - if payload.ConfigURL != "" { - config, err = h.fetcher(ctx, payload.ConfigURL) - if err != nil { - return nil, nil, fmt.Errorf("failed to fetch config from %s : %w", payload.ConfigURL, err) + var config []byte + if payload.ConfigURL != "" { + config, err2 = h.fetcher(ctx, payload.ConfigURL) + if err2 != nil { + return nil, nil, fmt.Errorf("failed to fetch config from %s : %w", payload.ConfigURL, err) + } } + return decodedBinary, config, nil } - return decodedBinary, config, nil -} - -func (h *eventHandler) decodeStoredWorkflowArtifacts( - spec *job.WorkflowSpec, -) ([]byte, []byte, error) { + // there is no update in the BinaryURL or ConfigURL, lets decode the stored artifacts decodedBinary, err := hex.DecodeString(spec.Workflow) if err != nil { return nil, nil, fmt.Errorf("failed to decode stored workflow spec: %w", err) diff --git a/core/services/workflows/syncer/mocks/orm.go b/core/services/workflows/syncer/mocks/orm.go index da96f422361..09a543d65e3 100644 --- a/core/services/workflows/syncer/mocks/orm.go +++ b/core/services/workflows/syncer/mocks/orm.go @@ -540,6 +540,65 @@ func (_c *ORM_GetWorkflowSpec_Call) RunAndReturn(run func(context.Context, strin return _c } +// GetWorkflowSpecByID provides a mock function with given fields: ctx, id +func (_m *ORM) GetWorkflowSpecByID(ctx context.Context, id string) (*job.WorkflowSpec, error) { + ret := _m.Called(ctx, id) + + if len(ret) == 0 { + panic("no return value specified for GetWorkflowSpecByID") + } + + var r0 *job.WorkflowSpec + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string) (*job.WorkflowSpec, error)); ok { + return rf(ctx, id) + } + if rf, ok := ret.Get(0).(func(context.Context, string) *job.WorkflowSpec); ok { + r0 = rf(ctx, id) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*job.WorkflowSpec) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { + r1 = rf(ctx, id) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// ORM_GetWorkflowSpecByID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetWorkflowSpecByID' +type ORM_GetWorkflowSpecByID_Call struct { + *mock.Call +} + +// GetWorkflowSpecByID is a helper method to define mock.On call +// - ctx context.Context +// - id string +func (_e *ORM_Expecter) GetWorkflowSpecByID(ctx interface{}, id interface{}) *ORM_GetWorkflowSpecByID_Call { + return &ORM_GetWorkflowSpecByID_Call{Call: _e.mock.On("GetWorkflowSpecByID", ctx, id)} +} + +func (_c *ORM_GetWorkflowSpecByID_Call) Run(run func(ctx context.Context, id string)) *ORM_GetWorkflowSpecByID_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string)) + }) + return _c +} + +func (_c *ORM_GetWorkflowSpecByID_Call) Return(_a0 *job.WorkflowSpec, _a1 error) *ORM_GetWorkflowSpecByID_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *ORM_GetWorkflowSpecByID_Call) RunAndReturn(run func(context.Context, string) (*job.WorkflowSpec, error)) *ORM_GetWorkflowSpecByID_Call { + _c.Call.Return(run) + return _c +} + // Update provides a mock function with given fields: ctx, secretsURL, contents func (_m *ORM) Update(ctx context.Context, secretsURL string, contents string) (int64, error) { ret := _m.Called(ctx, secretsURL, contents) diff --git a/core/services/workflows/syncer/orm.go b/core/services/workflows/syncer/orm.go index 9980d8e7b78..78193c5eb7a 100644 --- a/core/services/workflows/syncer/orm.go +++ b/core/services/workflows/syncer/orm.go @@ -52,6 +52,9 @@ type WorkflowSpecsDS interface { // DeleteWorkflowSpec deletes the workflow spec for the given owner and name. DeleteWorkflowSpec(ctx context.Context, owner, name string) error + + // GetWorkflowSpecByID returns the workflow spec for the given workflowID. + GetWorkflowSpecByID(ctx context.Context, id string) (*job.WorkflowSpec, error) } type ORM interface { @@ -367,6 +370,22 @@ func (orm *orm) GetWorkflowSpec(ctx context.Context, owner, name string) (*job.W return &spec, nil } +func (orm *orm) GetWorkflowSpecByID(ctx context.Context, id string) (*job.WorkflowSpec, error) { + query := ` + SELECT * + FROM workflow_specs + WHERE workflow_id = $1 + ` + + var spec job.WorkflowSpec + err := orm.ds.GetContext(ctx, &spec, query, id) + if err != nil { + return nil, err + } + + return &spec, nil +} + func (orm *orm) DeleteWorkflowSpec(ctx context.Context, owner, name string) error { query := ` DELETE FROM workflow_specs diff --git a/core/services/workflows/syncer/orm_test.go b/core/services/workflows/syncer/orm_test.go index addca5c18e2..0ae09bbb1bb 100644 --- a/core/services/workflows/syncer/orm_test.go +++ b/core/services/workflows/syncer/orm_test.go @@ -197,6 +197,45 @@ func Test_GetWorkflowSpec(t *testing.T) { }) } +func Test_GetWorkflowSpecByID(t *testing.T) { + db := pgtest.NewSqlxDB(t) + ctx := testutils.Context(t) + lggr := logger.TestLogger(t) + orm := &orm{ds: db, lggr: lggr} + + t.Run("gets a workflow spec by ID", func(t *testing.T) { + spec := &job.WorkflowSpec{ + Workflow: "test_workflow", + Config: "test_config", + WorkflowID: "cid-123", + WorkflowOwner: "owner-123", + WorkflowName: "Test Workflow", + Status: job.WorkflowSpecStatusActive, + BinaryURL: "http://example.com/binary", + ConfigURL: "http://example.com/config", + CreatedAt: time.Now(), + SpecType: job.WASMFile, + } + + id, err := orm.UpsertWorkflowSpec(ctx, spec) + require.NoError(t, err) + require.NotZero(t, id) + + dbSpec, err := orm.GetWorkflowSpecByID(ctx, spec.WorkflowID) + require.NoError(t, err) + require.Equal(t, spec.Workflow, dbSpec.Workflow) + + err = orm.DeleteWorkflowSpec(ctx, spec.WorkflowOwner, spec.WorkflowName) + require.NoError(t, err) + }) + + t.Run("fails if no workflow spec exists", func(t *testing.T) { + dbSpec, err := orm.GetWorkflowSpecByID(ctx, "inexistent-workflow-id") + require.Error(t, err) + require.Nil(t, dbSpec) + }) +} + func Test_GetContentsByWorkflowID(t *testing.T) { db := pgtest.NewSqlxDB(t) ctx := testutils.Context(t)