Skip to content

Commit

Permalink
feat(workflows): adds orm methods for managing specs
Browse files Browse the repository at this point in the history
  • Loading branch information
MStreet3 committed Nov 21, 2024
1 parent 176e363 commit 9d2a67d
Show file tree
Hide file tree
Showing 6 changed files with 395 additions and 11 deletions.
26 changes: 17 additions & 9 deletions core/services/job/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -869,21 +869,29 @@ const (
DefaultSpecType = ""
)

type WorkflowSpecStatus uint8

const (
WorkflowSpecStatusActive WorkflowSpecStatus = iota
WorkflowSpecStatusPaused
)

type WorkflowSpec struct {
ID int32 `toml:"-"`
Workflow string `toml:"workflow"` // the raw representation of the workflow
Config string `toml:"config" db:"config"` // the raw representation of the config
// fields derived from the yaml spec, used for indexing the database
// note: i tried to make these private, but translating them to the database seems to require them to be public
WorkflowID string `toml:"-" db:"workflow_id"` // Derived. Do not modify. the CID of the workflow.
WorkflowOwner string `toml:"-" db:"workflow_owner"` // Derived. Do not modify. the owner of the workflow.
WorkflowName string `toml:"-" db:"workflow_name"` // Derived. Do not modify. the name of the workflow.
BinaryURL string `db:"binary_url"`
ConfigURL string `db:"config_url"`
SecretsID sql.NullInt64 `db:"secrets_id"`
CreatedAt time.Time `toml:"-"`
UpdatedAt time.Time `toml:"-"`
SpecType WorkflowSpecType `toml:"spec_type" db:"spec_type"`
WorkflowID string `toml:"-" db:"workflow_id"` // Derived. Do not modify. the CID of the workflow.
WorkflowOwner string `toml:"-" db:"workflow_owner"` // Derived. Do not modify. the owner of the workflow.
WorkflowName string `toml:"-" db:"workflow_name"` // Derived. Do not modify. the name of the workflow.
Status WorkflowSpecStatus `db:"status"`
BinaryURL string `db:"binary_url"`
ConfigURL string `db:"config_url"`
SecretsID sql.NullInt64 `db:"secrets_id"`
CreatedAt time.Time `toml:"-"`
UpdatedAt time.Time `toml:"-"`
SpecType WorkflowSpecType `toml:"spec_type" db:"spec_type"`
sdkWorkflow *sdk.WorkflowSpec
rawSpec []byte
config []byte
Expand Down
19 changes: 18 additions & 1 deletion core/services/workflows/syncer/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,25 @@ func (h *eventHandler) Handle(ctx context.Context, event WorkflowRegistryEvent)
// TODO: Implement this method
func (h *eventHandler) workflowRegisteredEvent(
_ context.Context,
_ WorkflowRegistryEvent,
event WorkflowRegistryEvent,
) error {
payload, ok := event.Data.(WorkflowRegistryWorkflowRegisteredV1)
if !ok {
return fmt.Errorf("invalid data type %T for event", event.Data)
}

_ = payload

// Pre-check: verify that the workflowID matches; if it doesn’t abort and log an error via Beholder.

// Download the contents of binaryURL, configURL and secretsURL and cache them locally.

// Create a new entry in the workflow_spec table corresponding for the new workflow, with the contents of the binaryURL + configURL in the table

// If status == active, start a new WorkflowEngine instance, and add it to local map of instances.

// Spin off task to refresh the secretsURL periodically.

return ErrNotImplemented
}

Expand Down
108 changes: 108 additions & 0 deletions core/services/workflows/syncer/mocks/orm.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

93 changes: 92 additions & 1 deletion core/services/workflows/syncer/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package syncer

import (
"context"
"database/sql"
"errors"
"time"

"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"
"github.com/smartcontractkit/chainlink/v2/core/logger"
Expand Down Expand Up @@ -34,6 +36,8 @@ type WorkflowSecretsDS interface {

type WorkflowSpecsDS interface {
CreateWorkflowSpec(ctx context.Context, spec *job.WorkflowSpec) (int64, error)
GetWorkflowSpec(ctx context.Context, owner, name string) (*job.WorkflowSpec, error)
DeleteWorkflowSpec(ctx context.Context, owner, name string) error
}

type ORM interface {
Expand Down Expand Up @@ -150,5 +154,92 @@ func (orm *orm) GetSecretsURLHash(owner, secretsURL []byte) ([]byte, error) {
}

func (orm *orm) CreateWorkflowSpec(ctx context.Context, spec *job.WorkflowSpec) (int64, error) {
return 0, errors.New("not implemented")
var id int64

query := `
INSERT INTO workflow_specs (
workflow,
config,
workflow_id,
workflow_owner,
workflow_name,
status,
binary_url,
config_url,
secrets_id,
created_at,
updated_at,
spec_type
) VALUES (
:workflow,
:config,
:workflow_id,
:workflow_owner,
:workflow_name,
:status,
:binary_url,
:config_url,
:secrets_id,
:created_at,
:updated_at,
:spec_type
) RETURNING id
`

stmt, err := orm.ds.PrepareNamedContext(ctx, query)
if err != nil {
return 0, err
}
defer stmt.Close()

spec.UpdatedAt = time.Now()
err = stmt.QueryRowxContext(ctx, spec).Scan(&id)

if err != nil {
return 0, err
}

return id, nil
}

func (orm *orm) GetWorkflowSpec(ctx context.Context, owner, name string) (*job.WorkflowSpec, error) {
query := `
SELECT *
FROM workflow_specs
WHERE workflow_owner = $1 AND workflow_name = $2
`

var spec job.WorkflowSpec
err := orm.ds.GetContext(ctx, &spec, query, owner, name)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, nil // No spec found
}
return nil, err
}

return &spec, nil
}

func (orm *orm) DeleteWorkflowSpec(ctx context.Context, owner, name string) error {
query := `
DELETE FROM workflow_specs
WHERE workflow_owner = $1 AND workflow_name = $2
`

result, err := orm.ds.ExecContext(ctx, query, owner, name)
if err != nil {
return err
}

rowsAffected, err := result.RowsAffected()
if err != nil {
return err
}

if rowsAffected == 0 {
return sql.ErrNoRows // No spec deleted
}

return nil
}
Loading

0 comments on commit 9d2a67d

Please sign in to comment.