Skip to content

Commit

Permalink
chore(workflows): stub out event handlers (#15313)
Browse files Browse the repository at this point in the history
* chore(workflows): stub out event handlers

* chore(workflows): stubs out engine registry

* refactor(syncer): alias to ORM with new spec DS
  • Loading branch information
MStreet3 authored Nov 21, 2024
1 parent 64aeef9 commit 86ab654
Show file tree
Hide file tree
Showing 8 changed files with 300 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ func Test_SecretsWorker(t *testing.T) {
contractReader,
fetcherFn,
wfRegistryAddr.Hex(),
nil,
nil,
syncer.WithTicker(giveTicker.C),
)

Expand Down
64 changes: 64 additions & 0 deletions core/services/workflows/syncer/engine_registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package syncer

import (
"errors"
"sync"

"github.com/smartcontractkit/chainlink/v2/core/services/workflows"
)

type engineRegistry struct {
engines map[string]*workflows.Engine
mu sync.RWMutex
}

func newEngineRegistry() *engineRegistry {
return &engineRegistry{
engines: make(map[string]*workflows.Engine),
}
}

// Add adds an engine to the registry.
func (r *engineRegistry) Add(id string, engine *workflows.Engine) {
r.mu.Lock()
defer r.mu.Unlock()
r.engines[id] = engine
}

// Get retrieves an engine from the registry.
func (r *engineRegistry) Get(id string) (*workflows.Engine, error) {
r.mu.RLock()
defer r.mu.RUnlock()
engine, found := r.engines[id]
if !found {
return nil, errors.New("engine not found")
}
return engine, nil
}

// Pop removes an engine from the registry and returns the engine if found.
func (r *engineRegistry) Pop(id string) (*workflows.Engine, error) {
r.mu.Lock()
defer r.mu.Unlock()
engine, ok := r.engines[id]
if !ok {
return nil, errors.New("remove failed: engine not found")
}
delete(r.engines, id)
return engine, nil
}

// Close closes all engines in the registry.
func (r *engineRegistry) Close() error {
r.mu.Lock()
defer r.mu.Unlock()
var err error
for id, engine := range r.engines {
closeErr := engine.Close()
if closeErr != nil {
err = errors.Join(err, closeErr)
}
delete(r.engines, id)
}
return err
}
141 changes: 135 additions & 6 deletions core/services/workflows/syncer/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,41 +3,170 @@ package syncer
import (
"context"
"encoding/hex"
"errors"
"fmt"

"github.com/smartcontractkit/chainlink-common/pkg/types/core"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/store"
)

var ErrNotImplemented = errors.New("not implemented")

// WorkflowRegistryrEventType is the type of event that is emitted by the WorkflowRegistry
type WorkflowRegistryEventType string

var (
// ForceUpdateSecretsEvent is emitted when a request to force update a workflows secrets is made
ForceUpdateSecretsEvent WorkflowRegistryEventType = "WorkflowForceUpdateSecretsRequestedV1"

// WorkflowRegisteredEvent is emitted when a workflow is registered
WorkflowRegisteredEvent WorkflowRegistryEventType = "WorkflowRegisteredV1"

// WorkflowUpdatedEvent is emitted when a workflow is updated
WorkflowUpdatedEvent WorkflowRegistryEventType = "WorkflowUpdatedV1"

// WorkflowPausedEvent is emitted when a workflow is paused
WorkflowPausedEvent WorkflowRegistryEventType = "WorkflowPausedV1"

// WorkflowActivatedEvent is emitted when a workflow is activated
WorkflowActivatedEvent WorkflowRegistryEventType = "WorkflowActivatedV1"

// WorkflowDeletedEvent is emitted when a workflow is deleted
WorkflowDeletedEvent WorkflowRegistryEventType = "WorkflowDeletedV1"
)

// WorkflowRegistryForceUpdateSecretsRequestedV1 is a chain agnostic definition of the WorkflowRegistry
// ForceUpdateSecretsRequested event.
type WorkflowRegistryForceUpdateSecretsRequestedV1 struct {
SecretsURLHash []byte
Owner []byte
WorkflowName string
}

type WorkflowRegistryWorkflowRegisteredV1 struct {
WorkflowID [32]byte
WorkflowOwner []byte
DonID uint32
Status uint8
WorkflowName string
BinaryURL string
ConfigURL string
SecretsURL string
}

type WorkflowRegistryWorkflowUpdatedV1 struct {
OldWorkflowID [32]byte
WorkflowOwner []byte
DonID uint32
NewWorkflowID [32]byte
WorkflowName string
BinaryURL string
ConfigURL string
SecretsURL string
}

type WorkflowRegistryWorkflowPausedV1 struct {
WorkflowID [32]byte
WorkflowOwner []byte
DonID uint32
WorkflowName string
}

type WorkflowRegistryWorkflowActivatedV1 struct {
WorkflowID [32]byte
WorkflowOwner []byte
DonID uint32
WorkflowName string
}

type WorkflowRegistryWorkflowDeletedV1 struct {
WorkflowID [32]byte
WorkflowOwner []byte
DonID uint32
WorkflowName string
}

// eventHandler is a handler for WorkflowRegistryEvent events. Each event type has a corresponding
// method that handles the event.
type eventHandler struct {
lggr logger.Logger
orm ORM
fetcher FetcherFunc
lggr logger.Logger
orm WorkflowSecretsDS
fetcher FetcherFunc
workflowStore store.Store
capRegistry core.CapabilitiesRegistry
engineRegistry *engineRegistry
}

// newEventHandler returns a new eventHandler instance.
func newEventHandler(
lggr logger.Logger,
orm ORM,
gateway FetcherFunc,
workflowStore store.Store,
capRegistry core.CapabilitiesRegistry,
engineRegistry *engineRegistry,
) *eventHandler {
return &eventHandler{
lggr: lggr,
orm: orm,
fetcher: gateway,
lggr: lggr,
orm: orm,
fetcher: gateway,
workflowStore: workflowStore,
capRegistry: capRegistry,
engineRegistry: engineRegistry,
}
}

func (h *eventHandler) Handle(ctx context.Context, event WorkflowRegistryEvent) error {
switch event.EventType {
case ForceUpdateSecretsEvent:
return h.forceUpdateSecretsEvent(ctx, event)
case WorkflowRegisteredEvent:
return h.workflowRegisteredEvent(ctx, event)
case WorkflowUpdatedEvent:
return h.workflowUpdatedEvent(ctx, event)
case WorkflowPausedEvent:
return h.workflowPausedEvent(ctx, event)
case WorkflowActivatedEvent:
return h.workflowActivatedEvent(ctx, event)
default:
return fmt.Errorf("event type unsupported: %v", event.EventType)
}
}

// workflowRegisteredEvent handles the WorkflowRegisteredEvent event type.
// TODO: Implement this method
func (h *eventHandler) workflowRegisteredEvent(
_ context.Context,
_ WorkflowRegistryEvent,
) error {
return ErrNotImplemented
}

// workflowUpdatedEvent handles the WorkflowUpdatedEvent event type.
func (h *eventHandler) workflowUpdatedEvent(
_ context.Context,
_ WorkflowRegistryEvent,
) error {
return ErrNotImplemented
}

// workflowPausedEvent handles the WorkflowPausedEvent event type.
func (h *eventHandler) workflowPausedEvent(
_ context.Context,
_ WorkflowRegistryEvent,
) error {
return ErrNotImplemented
}

// workflowActivatedEvent handles the WorkflowActivatedEvent event type.
func (h *eventHandler) workflowActivatedEvent(
_ context.Context,
_ WorkflowRegistryEvent,
) error {
return ErrNotImplemented
}

// forceUpdateSecretsEvent handles the ForceUpdateSecretsEvent event type.
func (h *eventHandler) forceUpdateSecretsEvent(
ctx context.Context,
Expand Down
10 changes: 5 additions & 5 deletions core/services/workflows/syncer/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func Test_Handler(t *testing.T) {
}
mockORM.EXPECT().GetSecretsURLByHash(matches.AnyContext, giveHash).Return(giveURL, nil)
mockORM.EXPECT().Update(matches.AnyContext, giveHash, "contents").Return(int64(1), nil)
h := newEventHandler(lggr, mockORM, fetcher)
h := newEventHandler(lggr, mockORM, fetcher, nil, nil, nil)
err = h.Handle(ctx, giveEvent)
require.NoError(t, err)
})
Expand All @@ -52,7 +52,7 @@ func Test_Handler(t *testing.T) {
return []byte("contents"), nil
}

h := newEventHandler(lggr, mockORM, fetcher)
h := newEventHandler(lggr, mockORM, fetcher, nil, nil, nil)
err := h.Handle(ctx, giveEvent)
require.Error(t, err)
require.Contains(t, err.Error(), "event type unsupported")
Expand All @@ -61,7 +61,7 @@ func Test_Handler(t *testing.T) {
t.Run("fails to get secrets url", func(t *testing.T) {
mockORM := mocks.NewORM(t)
ctx := testutils.Context(t)
h := newEventHandler(lggr, mockORM, nil)
h := newEventHandler(lggr, mockORM, nil, nil, nil, nil)
giveURL := "https://original-url.com"
giveBytes, err := crypto.Keccak256([]byte(giveURL))
require.NoError(t, err)
Expand Down Expand Up @@ -101,7 +101,7 @@ func Test_Handler(t *testing.T) {
return nil, assert.AnError
}
mockORM.EXPECT().GetSecretsURLByHash(matches.AnyContext, giveHash).Return(giveURL, nil)
h := newEventHandler(lggr, mockORM, fetcher)
h := newEventHandler(lggr, mockORM, fetcher, nil, nil, nil)
err = h.Handle(ctx, giveEvent)
require.Error(t, err)
require.ErrorIs(t, err, assert.AnError)
Expand All @@ -128,7 +128,7 @@ func Test_Handler(t *testing.T) {
}
mockORM.EXPECT().GetSecretsURLByHash(matches.AnyContext, giveHash).Return(giveURL, nil)
mockORM.EXPECT().Update(matches.AnyContext, giveHash, "contents").Return(0, assert.AnError)
h := newEventHandler(lggr, mockORM, fetcher)
h := newEventHandler(lggr, mockORM, fetcher, nil, nil, nil)
err = h.Handle(ctx, giveEvent)
require.Error(t, err)
require.ErrorIs(t, err, assert.AnError)
Expand Down
58 changes: 58 additions & 0 deletions core/services/workflows/syncer/mocks/orm.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 86ab654

Please sign in to comment.