Skip to content

Commit

Permalink
f
Browse files Browse the repository at this point in the history
  • Loading branch information
MStreet3 committed Nov 21, 2024
1 parent fbc7907 commit 6ab3bb5
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 29 deletions.
57 changes: 57 additions & 0 deletions core/services/workflows/syncer/mocks/orm.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 17 additions & 3 deletions core/services/workflows/syncer/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type WorkflowSecretsDS interface {
}

type WorkflowSpecsDS interface {
CreateWorkflowSpec(ctx context.Context, spec *job.WorkflowSpec) (int64, error)
UpsertWorkflowSpec(ctx context.Context, spec *job.WorkflowSpec) (int64, error)
GetWorkflowSpec(ctx context.Context, owner, name string) (*job.WorkflowSpec, error)
DeleteWorkflowSpec(ctx context.Context, owner, name string) error
}
Expand Down Expand Up @@ -153,7 +153,7 @@ func (orm *orm) GetSecretsURLHash(owner, secretsURL []byte) ([]byte, error) {
return crypto.Keccak256(append(owner, secretsURL...))
}

func (orm *orm) CreateWorkflowSpec(ctx context.Context, spec *job.WorkflowSpec) (int64, error) {
func (orm *orm) UpsertWorkflowSpec(ctx context.Context, spec *job.WorkflowSpec) (int64, error) {
var id int64

query := `
Expand Down Expand Up @@ -183,7 +183,21 @@ func (orm *orm) CreateWorkflowSpec(ctx context.Context, spec *job.WorkflowSpec)
:created_at,
:updated_at,
:spec_type
) RETURNING id
) ON CONFLICT (workflow_owner, workflow_name) DO UPDATE
SET
workflow = EXCLUDED.workflow,
config = EXCLUDED.config,
workflow_id = EXCLUDED.workflow_id,
workflow_owner = EXCLUDED.workflow_owner,
workflow_name = EXCLUDED.workflow_name,
status = EXCLUDED.status,
binary_url = EXCLUDED.binary_url,
config_url = EXCLUDED.config_url,
secrets_id = EXCLUDED.secrets_id,
created_at = EXCLUDED.created_at,
updated_at = EXCLUDED.updated_at,
spec_type = EXCLUDED.spec_type
RETURNING id
`

stmt, err := orm.ds.PrepareNamedContext(ctx, query)
Expand Down
46 changes: 20 additions & 26 deletions core/services/workflows/syncer/orm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,13 @@ func TestWorkflowArtifactsORM_GetAndUpdate(t *testing.T) {
assert.Equal(t, "new contents", contents)
}

func Test_CreateWorkflowSpec(t *testing.T) {
func Test_UpsertWorkflowSpec(t *testing.T) {
db := pgtest.NewSqlxDB(t)
ctx := testutils.Context(t)
lggr := logger.TestLogger(t)
orm := &orm{ds: db, lggr: lggr}

t.Run("creates a workflow spec without workflow secrets", func(t *testing.T) {
t.Run("inserts new spec", func(t *testing.T) {
spec := &job.WorkflowSpec{
Workflow: "test_workflow",
Config: "test_config",
Expand All @@ -75,51 +75,45 @@ func Test_CreateWorkflowSpec(t *testing.T) {
SpecType: job.WASMFile,
}

id, err := orm.CreateWorkflowSpec(ctx, spec)
_, err := orm.UpsertWorkflowSpec(ctx, spec)
require.NoError(t, err)
require.NotZero(t, id)

// Verify the record exists in the database
var dbSpec job.WorkflowSpec
err = db.Get(&dbSpec, `SELECT * FROM workflow_specs WHERE id = $1`, id)
err = db.Get(&dbSpec, `SELECT * FROM workflow_specs WHERE workflow_owner = $1 AND workflow_name = $2`, spec.WorkflowOwner, spec.WorkflowName)
require.NoError(t, err)
require.Equal(t, spec.Workflow, dbSpec.Workflow)
})

t.Run("creates a workflow spec with workflow secrets", func(t *testing.T) {
giveURL := "https://example.com"
giveBytes, err := crypto.Keccak256([]byte(giveURL))
require.NoError(t, err)
giveHash := hex.EncodeToString(giveBytes)
giveContent := "some contents"

gotID, err := orm.Create(ctx, giveURL, giveHash, giveContent)
require.NoError(t, err)

t.Run("updates existing spec", func(t *testing.T) {
spec := &job.WorkflowSpec{
Workflow: "test_workflow",
Config: "test_config",
WorkflowID: "cid-456",
WorkflowID: "cid-123",
WorkflowOwner: "owner-123",
WorkflowName: "Test Workflow with Secrets",
SecretsID: sql.NullInt64{Int64: gotID, Valid: true},
WorkflowName: "Test Workflow",
Status: job.WorkflowSpecStatusActive,
BinaryURL: "http://example.com/binary",
ConfigURL: "http://example.com/config",
CreatedAt: time.Now(),
SpecType: job.WASMFile,
}

id, err := orm.CreateWorkflowSpec(ctx, spec)
_, err := orm.UpsertWorkflowSpec(ctx, spec)
require.NoError(t, err)
require.NotZero(t, id)

// Verify the record exists in the database
// Update the status
spec.Status = job.WorkflowSpecStatusPaused

_, err = orm.UpsertWorkflowSpec(ctx, spec)
require.NoError(t, err)

// Verify the record is updated in the database
var dbSpec job.WorkflowSpec
err = db.Get(&dbSpec, `SELECT * FROM workflow_specs WHERE id = $1`, id)
err = db.Get(&dbSpec, `SELECT * FROM workflow_specs WHERE workflow_owner = $1 AND workflow_name = $2`, spec.WorkflowOwner, spec.WorkflowName)
require.NoError(t, err)
require.Equal(t, spec.Workflow, dbSpec.Workflow)
require.Equal(t, gotID, dbSpec.SecretsID.Int64)
require.Equal(t, spec.Config, dbSpec.Config)
require.Equal(t, spec.Status, dbSpec.Status)
})
}

Expand All @@ -143,7 +137,7 @@ func Test_DeleteWorkflowSpec(t *testing.T) {
SpecType: job.WASMFile,
}

id, err := orm.CreateWorkflowSpec(ctx, spec)
id, err := orm.UpsertWorkflowSpec(ctx, spec)
require.NoError(t, err)
require.NotZero(t, id)

Expand Down Expand Up @@ -184,7 +178,7 @@ func Test_GetWorkflowSpec(t *testing.T) {
SpecType: job.WASMFile,
}

id, err := orm.CreateWorkflowSpec(ctx, spec)
id, err := orm.UpsertWorkflowSpec(ctx, spec)
require.NoError(t, err)
require.NotZero(t, id)

Expand Down

0 comments on commit 6ab3bb5

Please sign in to comment.