Skip to content

Commit

Permalink
feat: implement GetWorkflowSpecByID
Browse files Browse the repository at this point in the history
  • Loading branch information
agparadiso committed Dec 13, 2024
1 parent 707c7d3 commit 8c88fc4
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 36 deletions.
54 changes: 18 additions & 36 deletions core/services/workflows/syncer/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,7 @@ func (h *eventHandler) workflowRegisteredEvent(
ctx context.Context,
payload WorkflowRegistryWorkflowRegisteredV1,
) error {
// Fetch the workflow artifacts from the database or download them from the specified URLs
decodedBinary, config, err := h.getWorkflowArtifacts(ctx, payload)
if err != nil {
return err
Expand Down Expand Up @@ -492,48 +493,29 @@ func (h *eventHandler) getWorkflowArtifacts(
ctx context.Context,
payload WorkflowRegistryWorkflowRegisteredV1,
) ([]byte, []byte, error) {
spec, err := h.orm.GetWorkflowSpec(ctx, hex.EncodeToString(payload.WorkflowOwner), payload.WorkflowName)
if err != nil {
return h.fetchAndDecodeWorkflowArtifacts(ctx, payload)
}

// there is an update in the BinaryURL or ConfigURL, lets fetch it
if spec.ConfigURL != payload.ConfigURL || spec.BinaryURL != payload.BinaryURL {
return h.fetchAndDecodeWorkflowArtifacts(ctx, payload)
}

// there is no update in the BinaryURL or ConfigURL, lets decode the stored artifacts
return h.decodeStoredWorkflowArtifacts(spec)
}

func (h *eventHandler) fetchAndDecodeWorkflowArtifacts(
ctx context.Context,
payload WorkflowRegistryWorkflowRegisteredV1,
) ([]byte, []byte, error) {
binary, err := h.fetcher(ctx, payload.BinaryURL)
spec, err := h.orm.GetWorkflowSpecByID(ctx, hex.EncodeToString(payload.WorkflowID[:]))
if err != nil {
return nil, nil, fmt.Errorf("failed to fetch binary from %s : %w", payload.BinaryURL, err)
}
binary, err2 := h.fetcher(ctx, payload.BinaryURL)
if err2 != nil {
return nil, nil, fmt.Errorf("failed to fetch binary from %s : %w", payload.BinaryURL, err)
}

decodedBinary, err := base64.StdEncoding.DecodeString(string(binary))
if err != nil {
return nil, nil, fmt.Errorf("failed to decode binary: %w", err)
}
decodedBinary, err2 := base64.StdEncoding.DecodeString(string(binary))
if err2 != nil {
return nil, nil, fmt.Errorf("failed to decode binary: %w", err)
}

var config []byte
if payload.ConfigURL != "" {
config, err = h.fetcher(ctx, payload.ConfigURL)
if err != nil {
return nil, nil, fmt.Errorf("failed to fetch config from %s : %w", payload.ConfigURL, err)
var config []byte
if payload.ConfigURL != "" {
config, err2 = h.fetcher(ctx, payload.ConfigURL)
if err2 != nil {
return nil, nil, fmt.Errorf("failed to fetch config from %s : %w", payload.ConfigURL, err)
}
}
return decodedBinary, config, nil
}

return decodedBinary, config, nil
}

func (h *eventHandler) decodeStoredWorkflowArtifacts(
spec *job.WorkflowSpec,
) ([]byte, []byte, error) {
// there is no update in the BinaryURL or ConfigURL, lets decode the stored artifacts
decodedBinary, err := hex.DecodeString(spec.Workflow)
if err != nil {
return nil, nil, fmt.Errorf("failed to decode stored workflow spec: %w", err)
Expand Down
59 changes: 59 additions & 0 deletions core/services/workflows/syncer/mocks/orm.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 19 additions & 0 deletions core/services/workflows/syncer/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ type WorkflowSpecsDS interface {

// DeleteWorkflowSpec deletes the workflow spec for the given owner and name.
DeleteWorkflowSpec(ctx context.Context, owner, name string) error

// GetWorkflowSpecByID returns the workflow spec for the given workflowID.
GetWorkflowSpecByID(ctx context.Context, id string) (*job.WorkflowSpec, error)
}

type ORM interface {
Expand Down Expand Up @@ -367,6 +370,22 @@ func (orm *orm) GetWorkflowSpec(ctx context.Context, owner, name string) (*job.W
return &spec, nil
}

func (orm *orm) GetWorkflowSpecByID(ctx context.Context, id string) (*job.WorkflowSpec, error) {
query := `
SELECT *
FROM workflow_specs
WHERE workflow_id = $1
`

var spec job.WorkflowSpec
err := orm.ds.GetContext(ctx, &spec, query, id)
if err != nil {
return nil, err
}

return &spec, nil
}

func (orm *orm) DeleteWorkflowSpec(ctx context.Context, owner, name string) error {
query := `
DELETE FROM workflow_specs
Expand Down
39 changes: 39 additions & 0 deletions core/services/workflows/syncer/orm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,45 @@ func Test_GetWorkflowSpec(t *testing.T) {
})
}

func Test_GetWorkflowSpecByID(t *testing.T) {
db := pgtest.NewSqlxDB(t)
ctx := testutils.Context(t)
lggr := logger.TestLogger(t)
orm := &orm{ds: db, lggr: lggr}

t.Run("gets a workflow spec by ID", func(t *testing.T) {
spec := &job.WorkflowSpec{
Workflow: "test_workflow",
Config: "test_config",
WorkflowID: "cid-123",
WorkflowOwner: "owner-123",
WorkflowName: "Test Workflow",
Status: job.WorkflowSpecStatusActive,
BinaryURL: "http://example.com/binary",
ConfigURL: "http://example.com/config",
CreatedAt: time.Now(),
SpecType: job.WASMFile,
}

id, err := orm.UpsertWorkflowSpec(ctx, spec)
require.NoError(t, err)
require.NotZero(t, id)

dbSpec, err := orm.GetWorkflowSpecByID(ctx, spec.WorkflowID)
require.NoError(t, err)
require.Equal(t, spec.Workflow, dbSpec.Workflow)

err = orm.DeleteWorkflowSpec(ctx, spec.WorkflowOwner, spec.WorkflowName)
require.NoError(t, err)
})

t.Run("fails if no workflow spec exists", func(t *testing.T) {
dbSpec, err := orm.GetWorkflowSpecByID(ctx, "inexistent-workflow-id")
require.Error(t, err)
require.Nil(t, dbSpec)
})
}

func Test_GetContentsByWorkflowID(t *testing.T) {
db := pgtest.NewSqlxDB(t)
ctx := testutils.Context(t)
Expand Down

0 comments on commit 8c88fc4

Please sign in to comment.