Skip to content

Commit

Permalink
feat(workflows): handler partial wip
Browse files Browse the repository at this point in the history
  • Loading branch information
MStreet3 committed Nov 21, 2024
1 parent 809788d commit fdcac2f
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ func Test_SecretsWorker(t *testing.T) {
contractReader,
fetcherFn,
wfRegistryAddr.Hex(),
nil,
nil,
syncer.WithTicker(giveTicker.C),
)

Expand Down
25 changes: 18 additions & 7 deletions core/services/workflows/syncer/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import (
"errors"
"fmt"

"github.com/smartcontractkit/chainlink-common/pkg/types/core"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/store"
)

var ErrNotImplemented = errors.New("not implemented")
Expand Down Expand Up @@ -88,21 +90,30 @@ type WorkflowRegistryWorkflowDeletedV1 struct {
// eventHandler is a handler for WorkflowRegistryEvent events. Each event type has a corresponding
// method that handles the event.
type eventHandler struct {
lggr logger.Logger
orm ORM
fetcher FetcherFunc
lggr logger.Logger
orm WorkflowSecretsDS
fetcher FetcherFunc
workflowStore store.Store
capRegistry core.CapabilitiesRegistry
engineRegistry *engineRegistry
}

// newEventHandler returns a new eventHandler instance.
func newEventHandler(
lggr logger.Logger,
orm ORM,
orm WorkflowSecretsDS,
gateway FetcherFunc,
workflowStore store.Store,
capRegistry core.CapabilitiesRegistry,
engineRegistry *engineRegistry,
) *eventHandler {
return &eventHandler{
lggr: lggr,
orm: orm,
fetcher: gateway,
lggr: lggr,
orm: orm,
fetcher: gateway,
workflowStore: workflowStore,
capRegistry: capRegistry,
engineRegistry: engineRegistry,
}
}

Expand Down
10 changes: 5 additions & 5 deletions core/services/workflows/syncer/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func Test_Handler(t *testing.T) {
}
mockORM.EXPECT().GetSecretsURLByHash(matches.AnyContext, giveHash).Return(giveURL, nil)
mockORM.EXPECT().Update(matches.AnyContext, giveHash, "contents").Return(int64(1), nil)
h := newEventHandler(lggr, mockORM, fetcher)
h := newEventHandler(lggr, mockORM, fetcher, nil, nil, nil)
err = h.Handle(ctx, giveEvent)
require.NoError(t, err)
})
Expand All @@ -52,7 +52,7 @@ func Test_Handler(t *testing.T) {
return []byte("contents"), nil
}

h := newEventHandler(lggr, mockORM, fetcher)
h := newEventHandler(lggr, mockORM, fetcher, nil, nil, nil)
err := h.Handle(ctx, giveEvent)
require.Error(t, err)
require.Contains(t, err.Error(), "event type unsupported")
Expand All @@ -61,7 +61,7 @@ func Test_Handler(t *testing.T) {
t.Run("fails to get secrets url", func(t *testing.T) {
mockORM := mocks.NewORM(t)
ctx := testutils.Context(t)
h := newEventHandler(lggr, mockORM, nil)
h := newEventHandler(lggr, mockORM, nil, nil, nil, nil)
giveURL := "https://original-url.com"
giveBytes, err := crypto.Keccak256([]byte(giveURL))
require.NoError(t, err)
Expand Down Expand Up @@ -101,7 +101,7 @@ func Test_Handler(t *testing.T) {
return nil, assert.AnError
}
mockORM.EXPECT().GetSecretsURLByHash(matches.AnyContext, giveHash).Return(giveURL, nil)
h := newEventHandler(lggr, mockORM, fetcher)
h := newEventHandler(lggr, mockORM, fetcher, nil, nil, nil)
err = h.Handle(ctx, giveEvent)
require.Error(t, err)
require.ErrorIs(t, err, assert.AnError)
Expand All @@ -128,7 +128,7 @@ func Test_Handler(t *testing.T) {
}
mockORM.EXPECT().GetSecretsURLByHash(matches.AnyContext, giveHash).Return(giveURL, nil)
mockORM.EXPECT().Update(matches.AnyContext, giveHash, "contents").Return(0, assert.AnError)
h := newEventHandler(lggr, mockORM, fetcher)
h := newEventHandler(lggr, mockORM, fetcher, nil, nil, nil)
err = h.Handle(ctx, giveEvent)
require.Error(t, err)
require.ErrorIs(t, err, assert.AnError)
Expand Down
16 changes: 13 additions & 3 deletions core/services/workflows/syncer/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ package syncer

import (
"context"
"errors"

"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/utils/crypto"
)

type ORM interface {
type WorkflowSecretsDS interface {
// GetSecretsURLByID returns the secrets URL for the given ID.
GetSecretsURLByID(ctx context.Context, id int64) (string, error)

Expand All @@ -30,14 +32,18 @@ type ORM interface {
Create(ctx context.Context, secretsURL, hash, contents string) (int64, error)
}

type WorkflowRegistryDS = ORM
type WorkflowSpecsDS interface {
CreateWorkflowSpec(ctx context.Context, spec *job.WorkflowSpec) (int64, error)
}

type WorkflowRegistryDS = WorkflowSecretsDS

type orm struct {
ds sqlutil.DataSource
lggr logger.Logger
}

var _ ORM = (*orm)(nil)
var _ WorkflowSecretsDS = (*orm)(nil)

func NewWorkflowRegistryDS(ds sqlutil.DataSource, lggr logger.Logger) *orm {
return &orm{
Expand Down Expand Up @@ -137,3 +143,7 @@ func (orm *orm) Create(ctx context.Context, url, hash, contents string) (int64,
func (orm *orm) GetSecretsURLHash(owner, secretsURL []byte) ([]byte, error) {
return crypto.Keccak256(append(owner, secretsURL...))
}

func (orm *orm) CreateWorkflowSpec(ctx context.Context, spec *job.WorkflowSpec) (int64, error) {
return 0, errors.New("not implemented")
}
23 changes: 18 additions & 5 deletions core/services/workflows/syncer/workflow_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ import (

"github.com/smartcontractkit/chainlink-common/pkg/services"
types "github.com/smartcontractkit/chainlink-common/pkg/types"
"github.com/smartcontractkit/chainlink-common/pkg/types/core"
query "github.com/smartcontractkit/chainlink-common/pkg/types/query"
"github.com/smartcontractkit/chainlink-common/pkg/types/query/primitives"
"github.com/smartcontractkit/chainlink-common/pkg/values"
"github.com/smartcontractkit/chainlink/v2/core/gethwrappers/workflow/generated/workflow_registry_wrapper"
"github.com/smartcontractkit/chainlink/v2/core/logger"
evmtypes "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/types"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/store"
)

const name = "WorkflowRegistrySyncer"
Expand Down Expand Up @@ -115,6 +117,10 @@ type workflowRegistry struct {
// heap is a min heap that merges batches of events from the contract query goroutines. The
// default min heap is sorted by block height.
heap Heap

workflowStore store.Store
capRegistry core.CapabilitiesRegistry
engineRegistry *engineRegistry
}

// WithTicker allows external callers to provide a ticker to the workflowRegistry. This is useful
Expand All @@ -139,14 +145,19 @@ func NewWorkflowRegistry[T ContractReader](
reader T,
gateway FetcherFunc,
addr string,
workflowStore store.Store,
capRegistry core.CapabilitiesRegistry,
opts ...func(*workflowRegistry),
) *workflowRegistry {
ets := []WorkflowRegistryEventType{ForceUpdateSecretsEvent}
wr := &workflowRegistry{
lggr: lggr.Named(name),
orm: orm,
reader: reader,
gateway: gateway,
lggr: lggr.Named(name),
orm: orm,
reader: reader,
gateway: gateway,
workflowStore: workflowStore,
capRegistry: capRegistry,
engineRegistry: newEngineRegistry(),
cfg: ContractEventPollerConfig{
ContractName: ContractName,
ContractAddress: addr,
Expand All @@ -160,7 +171,9 @@ func NewWorkflowRegistry[T ContractReader](
eventsCh: make(chan WorkflowRegistryEventResponse),
batchCh: make(chan []WorkflowRegistryEventResponse, len(ets)),
}
wr.handler = newEventHandler(wr.lggr, wr.orm, wr.gateway)
wr.handler = newEventHandler(wr.lggr, wr.orm, wr.gateway, wr.workflowStore, wr.capRegistry,
wr.engineRegistry,
)
for _, opt := range opts {
opt(wr)
}
Expand Down
2 changes: 1 addition & 1 deletion core/services/workflows/syncer/workflow_registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func Test_Workflow_Registry_Syncer(t *testing.T) {
return []byte(wantContents), nil
}
ticker = make(chan time.Time)
worker = NewWorkflowRegistry(lggr, orm, reader, gateway, giveCfg.ContractAddress, WithTicker(ticker))
worker = NewWorkflowRegistry(lggr, orm, reader, gateway, giveCfg.ContractAddress, nil, nil, WithTicker(ticker))
)

// Cleanup the worker
Expand Down

0 comments on commit fdcac2f

Please sign in to comment.