Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(workflows): stub out event handlers #15313

Merged
merged 3 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ func Test_SecretsWorker(t *testing.T) {
contractReader,
fetcherFn,
wfRegistryAddr.Hex(),
nil,
nil,
syncer.WithTicker(giveTicker.C),
)

Expand Down
64 changes: 64 additions & 0 deletions core/services/workflows/syncer/engine_registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package syncer

import (
"errors"
"sync"

"github.com/smartcontractkit/chainlink/v2/core/services/workflows"
)

type engineRegistry struct {
engines map[string]*workflows.Engine
mu sync.RWMutex
}

func newEngineRegistry() *engineRegistry {
return &engineRegistry{
engines: make(map[string]*workflows.Engine),
}
}

// Add adds an engine to the registry.
func (r *engineRegistry) Add(id string, engine *workflows.Engine) {
r.mu.Lock()
defer r.mu.Unlock()
r.engines[id] = engine
}

// Get retrieves an engine from the registry.
func (r *engineRegistry) Get(id string) (*workflows.Engine, error) {
r.mu.RLock()
defer r.mu.RUnlock()
engine, found := r.engines[id]
if !found {
return nil, errors.New("engine not found")
}
return engine, nil
}

// Pop removes an engine from the registry and returns the engine if found.
func (r *engineRegistry) Pop(id string) (*workflows.Engine, error) {
r.mu.Lock()
defer r.mu.Unlock()
engine, ok := r.engines[id]
if !ok {
return nil, errors.New("remove failed: engine not found")
}
delete(r.engines, id)
return engine, nil
}

// Close closes all engines in the registry.
func (r *engineRegistry) Close() error {
r.mu.Lock()
defer r.mu.Unlock()
var err error
for id, engine := range r.engines {
closeErr := engine.Close()
if closeErr != nil {
err = errors.Join(err, closeErr)
}
delete(r.engines, id)
}
return err
}
141 changes: 135 additions & 6 deletions core/services/workflows/syncer/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,41 +3,170 @@ package syncer
import (
"context"
"encoding/hex"
"errors"
"fmt"

"github.com/smartcontractkit/chainlink-common/pkg/types/core"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/store"
)

var ErrNotImplemented = errors.New("not implemented")

// WorkflowRegistryrEventType is the type of event that is emitted by the WorkflowRegistry
type WorkflowRegistryEventType string

var (
// ForceUpdateSecretsEvent is emitted when a request to force update a workflows secrets is made
ForceUpdateSecretsEvent WorkflowRegistryEventType = "WorkflowForceUpdateSecretsRequestedV1"

// WorkflowRegisteredEvent is emitted when a workflow is registered
WorkflowRegisteredEvent WorkflowRegistryEventType = "WorkflowRegisteredV1"

// WorkflowUpdatedEvent is emitted when a workflow is updated
WorkflowUpdatedEvent WorkflowRegistryEventType = "WorkflowUpdatedV1"

// WorkflowPausedEvent is emitted when a workflow is paused
WorkflowPausedEvent WorkflowRegistryEventType = "WorkflowPausedV1"

// WorkflowActivatedEvent is emitted when a workflow is activated
WorkflowActivatedEvent WorkflowRegistryEventType = "WorkflowActivatedV1"

// WorkflowDeletedEvent is emitted when a workflow is deleted
WorkflowDeletedEvent WorkflowRegistryEventType = "WorkflowDeletedV1"
)

// WorkflowRegistryForceUpdateSecretsRequestedV1 is a chain agnostic definition of the WorkflowRegistry
// ForceUpdateSecretsRequested event.
type WorkflowRegistryForceUpdateSecretsRequestedV1 struct {
SecretsURLHash []byte
Owner []byte
WorkflowName string
}

type WorkflowRegistryWorkflowRegisteredV1 struct {
WorkflowID [32]byte
WorkflowOwner []byte
DonID uint32
Status uint8
WorkflowName string
BinaryURL string
ConfigURL string
SecretsURL string
}

type WorkflowRegistryWorkflowUpdatedV1 struct {
OldWorkflowID [32]byte
WorkflowOwner []byte
DonID uint32
NewWorkflowID [32]byte
WorkflowName string
BinaryURL string
ConfigURL string
SecretsURL string
}

type WorkflowRegistryWorkflowPausedV1 struct {
WorkflowID [32]byte
WorkflowOwner []byte
DonID uint32
WorkflowName string
}

type WorkflowRegistryWorkflowActivatedV1 struct {
WorkflowID [32]byte
WorkflowOwner []byte
DonID uint32
WorkflowName string
}

type WorkflowRegistryWorkflowDeletedV1 struct {
WorkflowID [32]byte
WorkflowOwner []byte
DonID uint32
WorkflowName string
}

// eventHandler is a handler for WorkflowRegistryEvent events. Each event type has a corresponding
// method that handles the event.
type eventHandler struct {
lggr logger.Logger
orm ORM
fetcher FetcherFunc
lggr logger.Logger
orm WorkflowSecretsDS
fetcher FetcherFunc
workflowStore store.Store
capRegistry core.CapabilitiesRegistry
engineRegistry *engineRegistry
}

// newEventHandler returns a new eventHandler instance.
func newEventHandler(
lggr logger.Logger,
orm ORM,
gateway FetcherFunc,
workflowStore store.Store,
capRegistry core.CapabilitiesRegistry,
engineRegistry *engineRegistry,
) *eventHandler {
return &eventHandler{
lggr: lggr,
orm: orm,
fetcher: gateway,
lggr: lggr,
orm: orm,
fetcher: gateway,
workflowStore: workflowStore,
capRegistry: capRegistry,
engineRegistry: engineRegistry,
}
}

func (h *eventHandler) Handle(ctx context.Context, event WorkflowRegistryEvent) error {
switch event.EventType {
case ForceUpdateSecretsEvent:
return h.forceUpdateSecretsEvent(ctx, event)
case WorkflowRegisteredEvent:
return h.workflowRegisteredEvent(ctx, event)
case WorkflowUpdatedEvent:
return h.workflowUpdatedEvent(ctx, event)
case WorkflowPausedEvent:
return h.workflowPausedEvent(ctx, event)
case WorkflowActivatedEvent:
return h.workflowActivatedEvent(ctx, event)
default:
return fmt.Errorf("event type unsupported: %v", event.EventType)
}
}

// workflowRegisteredEvent handles the WorkflowRegisteredEvent event type.
// TODO: Implement this method
func (h *eventHandler) workflowRegisteredEvent(
_ context.Context,
_ WorkflowRegistryEvent,
) error {
return ErrNotImplemented
}

// workflowUpdatedEvent handles the WorkflowUpdatedEvent event type.
func (h *eventHandler) workflowUpdatedEvent(
_ context.Context,
_ WorkflowRegistryEvent,
) error {
return ErrNotImplemented
}

// workflowPausedEvent handles the WorkflowPausedEvent event type.
func (h *eventHandler) workflowPausedEvent(
_ context.Context,
_ WorkflowRegistryEvent,
) error {
return ErrNotImplemented
}

// workflowActivatedEvent handles the WorkflowActivatedEvent event type.
func (h *eventHandler) workflowActivatedEvent(
_ context.Context,
_ WorkflowRegistryEvent,
) error {
return ErrNotImplemented
}

// forceUpdateSecretsEvent handles the ForceUpdateSecretsEvent event type.
func (h *eventHandler) forceUpdateSecretsEvent(
ctx context.Context,
Expand Down
10 changes: 5 additions & 5 deletions core/services/workflows/syncer/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func Test_Handler(t *testing.T) {
}
mockORM.EXPECT().GetSecretsURLByHash(matches.AnyContext, giveHash).Return(giveURL, nil)
mockORM.EXPECT().Update(matches.AnyContext, giveHash, "contents").Return(int64(1), nil)
h := newEventHandler(lggr, mockORM, fetcher)
h := newEventHandler(lggr, mockORM, fetcher, nil, nil, nil)
err = h.Handle(ctx, giveEvent)
require.NoError(t, err)
})
Expand All @@ -52,7 +52,7 @@ func Test_Handler(t *testing.T) {
return []byte("contents"), nil
}

h := newEventHandler(lggr, mockORM, fetcher)
h := newEventHandler(lggr, mockORM, fetcher, nil, nil, nil)
err := h.Handle(ctx, giveEvent)
require.Error(t, err)
require.Contains(t, err.Error(), "event type unsupported")
Expand All @@ -61,7 +61,7 @@ func Test_Handler(t *testing.T) {
t.Run("fails to get secrets url", func(t *testing.T) {
mockORM := mocks.NewORM(t)
ctx := testutils.Context(t)
h := newEventHandler(lggr, mockORM, nil)
h := newEventHandler(lggr, mockORM, nil, nil, nil, nil)
giveURL := "https://original-url.com"
giveBytes, err := crypto.Keccak256([]byte(giveURL))
require.NoError(t, err)
Expand Down Expand Up @@ -101,7 +101,7 @@ func Test_Handler(t *testing.T) {
return nil, assert.AnError
}
mockORM.EXPECT().GetSecretsURLByHash(matches.AnyContext, giveHash).Return(giveURL, nil)
h := newEventHandler(lggr, mockORM, fetcher)
h := newEventHandler(lggr, mockORM, fetcher, nil, nil, nil)
err = h.Handle(ctx, giveEvent)
require.Error(t, err)
require.ErrorIs(t, err, assert.AnError)
Expand All @@ -128,7 +128,7 @@ func Test_Handler(t *testing.T) {
}
mockORM.EXPECT().GetSecretsURLByHash(matches.AnyContext, giveHash).Return(giveURL, nil)
mockORM.EXPECT().Update(matches.AnyContext, giveHash, "contents").Return(0, assert.AnError)
h := newEventHandler(lggr, mockORM, fetcher)
h := newEventHandler(lggr, mockORM, fetcher, nil, nil, nil)
err = h.Handle(ctx, giveEvent)
require.Error(t, err)
require.ErrorIs(t, err, assert.AnError)
Expand Down
58 changes: 58 additions & 0 deletions core/services/workflows/syncer/mocks/orm.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading