Skip to content

Commit

Permalink
[KS-55] Extend demo Engine to two targets (#12151)
Browse files Browse the repository at this point in the history
Additionally handle requests in separate goroutines, as they can be relatively long-lived.
  • Loading branch information
bolekk authored Feb 28, 2024
1 parent aa22ad5 commit 24c3718
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 57 deletions.
142 changes: 89 additions & 53 deletions core/services/workflows/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package workflows

import (
"context"
"errors"
"fmt"
"time"

Expand Down Expand Up @@ -31,13 +32,17 @@ type Engine struct {
consensusType string
consensusConfig *values.Map
consensus capabilities.ConsensusCapability
targetType string
targetConfig *values.Map
target capabilities.TargetCapability
targets []target
callbackCh chan capabilities.CapabilityResponse
cancel func()
}

type target struct {
typeStr string
config *values.Map
capability capabilities.TargetCapability
}

func (e *Engine) Start(ctx context.Context) error {
return e.StartOnce("Engine", func() error {
// create a new context, since the one passed in via Start is short-lived.
Expand Down Expand Up @@ -70,12 +75,18 @@ LOOP:
e.logger.Errorf("failed to get consensus capability: %s, retrying in %d seconds", err, retrySec)
break
}
e.target, err = e.registry.GetTarget(ctx, e.targetType)
if err != nil {
e.logger.Errorf("failed to get target capability: %s, retrying in %d seconds", err, retrySec)
break
failed := false
for i := range e.targets {
e.targets[i].capability, err = e.registry.GetTarget(ctx, e.targets[i].typeStr)
if err != nil {
e.logger.Errorf("failed to get target capability: %s, retrying in %d seconds", err, retrySec)
failed = true
break
}
}
if !failed {
break LOOP
}
break LOOP
}
}

Expand Down Expand Up @@ -130,50 +141,25 @@ func (e *Engine) triggerHandlerLoop(ctx context.Context) {
case <-ctx.Done():
return
case resp := <-e.callbackCh:
err := e.handleExecution(ctx, resp)
if err != nil {
e.logger.Error("error executing event %+v: %w", resp, err)
}
go e.handleExecution(ctx, resp)
}
}
}

func (e *Engine) handleExecution(ctx context.Context, event capabilities.CapabilityResponse) error {
func (e *Engine) handleExecution(ctx context.Context, event capabilities.CapabilityResponse) {
e.logger.Debugw("executing on a trigger event", "event", event)
results, err := e.handleConsensus(ctx, event)
result, err := e.handleConsensus(ctx, event)
if err != nil {
return err
}
if len(results.Underlying) == 0 {
return fmt.Errorf("consensus returned no reports")
e.logger.Errorf("error in handleConsensus %v", err)
return
}
if len(results.Underlying) > 1 {
e.logger.Debugw("consensus returned more than one report")
}

// we're expecting exactly one report
_, err = e.handleTarget(ctx, results.Underlying[0])
return err
}

func (e *Engine) handleTarget(ctx context.Context, resp values.Value) (*values.List, error) {
e.logger.Debugw("handle target")
inputs := map[string]values.Value{
"report": resp,
}

tr := capabilities.CapabilityRequest{
Inputs: &values.Map{Underlying: inputs},
Config: e.targetConfig,
Metadata: capabilities.RequestMetadata{
WorkflowID: mockedWorkflowID,
WorkflowExecutionID: mockedExecutionID,
},
err = e.handleTargets(ctx, result)
if err != nil {
e.logger.Error("error in handleTargets %v", err)
}
return capabilities.ExecuteSync(ctx, e.target, tr)
}

func (e *Engine) handleConsensus(ctx context.Context, event capabilities.CapabilityResponse) (*values.List, error) {
func (e *Engine) handleConsensus(ctx context.Context, event capabilities.CapabilityResponse) (values.Value, error) {
e.logger.Debugw("running consensus", "event", event)
cr := capabilities.CapabilityRequest{
Metadata: capabilities.RequestMetadata{
Expand All @@ -190,7 +176,45 @@ func (e *Engine) handleConsensus(ctx context.Context, event capabilities.Capabil
},
Config: e.consensusConfig,
}
return capabilities.ExecuteSync(ctx, e.consensus, cr)
chReports := make(chan capabilities.CapabilityResponse, 10)
newCtx, cancel := context.WithCancel(ctx)
defer cancel()
err := e.consensus.Execute(newCtx, chReports, cr)
if err != nil {
return nil, err
}
select {
case resp := <-chReports:
if resp.Err != nil {
return nil, resp.Err
}
return resp.Value, nil
case <-ctx.Done():
return nil, ctx.Err()
}
}

func (e *Engine) handleTargets(ctx context.Context, resp values.Value) error {
e.logger.Debugw("handle targets")
inputs := map[string]values.Value{
"report": resp,
}

var combinedErr error
for _, t := range e.targets {
e.logger.Debugw("sending to target", "target", t.typeStr, "inputs", inputs)
tr := capabilities.CapabilityRequest{
Inputs: &values.Map{Underlying: inputs},
Config: t.config,
Metadata: capabilities.RequestMetadata{
WorkflowID: mockedWorkflowID,
WorkflowExecutionID: mockedExecutionID,
},
}
_, err := capabilities.ExecuteSync(ctx, t.capability, tr)
combinedErr = errors.Join(combinedErr, err)
}
return combinedErr
}

func (e *Engine) Close() error {
Expand Down Expand Up @@ -227,7 +251,9 @@ func NewEngine(lggr logger.Logger, registry types.CapabilitiesRegistry) (engine
engine.triggerConfig, err = values.NewMap(
map[string]any{
"feedlist": []any{
123, 456, 789, // ETHUSD, LINKUSD, USDBTC
"0x1111111111111111111100000000000000000000000000000000000000000000", // ETHUSD
"0x2222222222222222222200000000000000000000000000000000000000000000", // LINKUSD
"0x3333333333333333333300000000000000000000000000000000000000000000", // BTCUSD
},
},
)
Expand All @@ -242,18 +268,18 @@ func NewEngine(lggr logger.Logger, registry types.CapabilitiesRegistry) (engine
"aggregation_config": map[string]any{
// ETHUSD
"0x1111111111111111111100000000000000000000000000000000000000000000": map[string]any{
"deviation": decimal.NewFromFloat(0.003),
"heartbeat": 24,
"deviation": decimal.NewFromFloat(0.001),
"heartbeat": 1800,
},
// LINKUSD
"0x2222222222222222222200000000000000000000000000000000000000000000": map[string]any{
"deviation": decimal.NewFromFloat(0.001),
"heartbeat": 24,
"heartbeat": 1800,
},
// BTCUSD
"0x3333333333333333333300000000000000000000000000000000000000000000": map[string]any{
"deviation": decimal.NewFromFloat(0.002),
"heartbeat": 6,
"deviation": decimal.NewFromFloat(0.001),
"heartbeat": 1800,
},
},
"encoder": "EVM",
Expand All @@ -265,10 +291,20 @@ func NewEngine(lggr logger.Logger, registry types.CapabilitiesRegistry) (engine
return nil, err
}

// Target
engine.targetType = "write_polygon-testnet-mumbai"
engine.targetConfig, err = values.NewMap(map[string]any{
"address": "0xaabbcc",
// Targets
engine.targets = make([]target, 2)
engine.targets[0].typeStr = "write_polygon-testnet-mumbai"
engine.targets[0].config, err = values.NewMap(map[string]any{
"address": "0x3F3554832c636721F1fD1822Ccca0354576741Ef",
"params": []any{"$(report)"},
"abi": "receive(report bytes)",
})
if err != nil {
return nil, err
}
engine.targets[1].typeStr = "write_ethereum-testnet-sepolia"
engine.targets[1].config, err = values.NewMap(map[string]any{
"address": "0x54e220867af6683aE6DcBF535B4f952cB5116510",
"params": []any{"$(report)"},
"abi": "receive(report bytes)",
})
Expand Down
24 changes: 20 additions & 4 deletions core/services/workflows/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,22 +95,37 @@ func TestEngineWithHardcodedWorkflow(t *testing.T) {
)
require.NoError(t, reg.Add(ctx, consensus))

target := newMockCapability(
target1 := newMockCapability(
capabilities.MustNewCapabilityInfo(
"write_polygon-testnet-mumbai",
capabilities.CapabilityTypeTarget,
"a write capability targeting polygon mainnet",
"a write capability targeting polygon mumbai testnet",
"v1.0.0",
),
func(req capabilities.CapabilityRequest) (capabilities.CapabilityResponse, error) {
list := req.Inputs.Underlying["report"].(*values.List)
return capabilities.CapabilityResponse{
Value: list.Underlying[0],
}, nil
},
)
require.NoError(t, reg.Add(ctx, target1))

target2 := newMockCapability(
capabilities.MustNewCapabilityInfo(
"write_ethereum-testnet-sepolia",
capabilities.CapabilityTypeTarget,
"a write capability targeting ethereum sepolia testnet",
"v1.0.0",
),
func(req capabilities.CapabilityRequest) (capabilities.CapabilityResponse, error) {
list := req.Inputs.Underlying["report"].(*values.List)
return capabilities.CapabilityResponse{
Value: list.Underlying[0],
}, nil
},
)
require.NoError(t, reg.Add(ctx, target))
require.NoError(t, reg.Add(ctx, target2))

lggr := logger.TestLogger(t)
eng, err := NewEngine(lggr, reg)
Expand All @@ -130,5 +145,6 @@ func TestEngineWithHardcodedWorkflow(t *testing.T) {
err = eng.Start(ctx)
require.NoError(t, err)
defer eng.Close()
assert.Equal(t, cr, <-target.response)
assert.Equal(t, cr, <-target1.response)
assert.Equal(t, cr, <-target2.response)
}

0 comments on commit 24c3718

Please sign in to comment.