diff --git a/core/services/workflows/engine.go b/core/services/workflows/engine.go index 01d1326e072..8985f9d1599 100644 --- a/core/services/workflows/engine.go +++ b/core/services/workflows/engine.go @@ -2,6 +2,7 @@ package workflows import ( "context" + "errors" "fmt" "time" @@ -31,13 +32,17 @@ type Engine struct { consensusType string consensusConfig *values.Map consensus capabilities.ConsensusCapability - targetType string - targetConfig *values.Map - target capabilities.TargetCapability + targets []target callbackCh chan capabilities.CapabilityResponse cancel func() } +type target struct { + typeStr string + config *values.Map + capability capabilities.TargetCapability +} + func (e *Engine) Start(ctx context.Context) error { return e.StartOnce("Engine", func() error { // create a new context, since the one passed in via Start is short-lived. @@ -70,12 +75,18 @@ LOOP: e.logger.Errorf("failed to get consensus capability: %s, retrying in %d seconds", err, retrySec) break } - e.target, err = e.registry.GetTarget(ctx, e.targetType) - if err != nil { - e.logger.Errorf("failed to get target capability: %s, retrying in %d seconds", err, retrySec) - break + failed := false + for i := range e.targets { + e.targets[i].capability, err = e.registry.GetTarget(ctx, e.targets[i].typeStr) + if err != nil { + e.logger.Errorf("failed to get target capability: %s, retrying in %d seconds", err, retrySec) + failed = true + break + } + } + if !failed { + break LOOP } - break LOOP } } @@ -130,50 +141,25 @@ func (e *Engine) triggerHandlerLoop(ctx context.Context) { case <-ctx.Done(): return case resp := <-e.callbackCh: - err := e.handleExecution(ctx, resp) - if err != nil { - e.logger.Error("error executing event %+v: %w", resp, err) - } + go e.handleExecution(ctx, resp) } } } -func (e *Engine) handleExecution(ctx context.Context, event capabilities.CapabilityResponse) error { +func (e *Engine) handleExecution(ctx context.Context, event capabilities.CapabilityResponse) { e.logger.Debugw("executing on a trigger event", "event", event) - results, err := e.handleConsensus(ctx, event) + result, err := e.handleConsensus(ctx, event) if err != nil { - return err - } - if len(results.Underlying) == 0 { - return fmt.Errorf("consensus returned no reports") + e.logger.Errorf("error in handleConsensus %v", err) + return } - if len(results.Underlying) > 1 { - e.logger.Debugw("consensus returned more than one report") - } - - // we're expecting exactly one report - _, err = e.handleTarget(ctx, results.Underlying[0]) - return err -} - -func (e *Engine) handleTarget(ctx context.Context, resp values.Value) (*values.List, error) { - e.logger.Debugw("handle target") - inputs := map[string]values.Value{ - "report": resp, - } - - tr := capabilities.CapabilityRequest{ - Inputs: &values.Map{Underlying: inputs}, - Config: e.targetConfig, - Metadata: capabilities.RequestMetadata{ - WorkflowID: mockedWorkflowID, - WorkflowExecutionID: mockedExecutionID, - }, + err = e.handleTargets(ctx, result) + if err != nil { + e.logger.Error("error in handleTargets %v", err) } - return capabilities.ExecuteSync(ctx, e.target, tr) } -func (e *Engine) handleConsensus(ctx context.Context, event capabilities.CapabilityResponse) (*values.List, error) { +func (e *Engine) handleConsensus(ctx context.Context, event capabilities.CapabilityResponse) (values.Value, error) { e.logger.Debugw("running consensus", "event", event) cr := capabilities.CapabilityRequest{ Metadata: capabilities.RequestMetadata{ @@ -190,7 +176,45 @@ func (e *Engine) handleConsensus(ctx context.Context, event capabilities.Capabil }, Config: e.consensusConfig, } - return capabilities.ExecuteSync(ctx, e.consensus, cr) + chReports := make(chan capabilities.CapabilityResponse, 10) + newCtx, cancel := context.WithCancel(ctx) + defer cancel() + err := e.consensus.Execute(newCtx, chReports, cr) + if err != nil { + return nil, err + } + select { + case resp := <-chReports: + if resp.Err != nil { + return nil, resp.Err + } + return resp.Value, nil + case <-ctx.Done(): + return nil, ctx.Err() + } +} + +func (e *Engine) handleTargets(ctx context.Context, resp values.Value) error { + e.logger.Debugw("handle targets") + inputs := map[string]values.Value{ + "report": resp, + } + + var combinedErr error + for _, t := range e.targets { + e.logger.Debugw("sending to target", "target", t.typeStr, "inputs", inputs) + tr := capabilities.CapabilityRequest{ + Inputs: &values.Map{Underlying: inputs}, + Config: t.config, + Metadata: capabilities.RequestMetadata{ + WorkflowID: mockedWorkflowID, + WorkflowExecutionID: mockedExecutionID, + }, + } + _, err := capabilities.ExecuteSync(ctx, t.capability, tr) + combinedErr = errors.Join(combinedErr, err) + } + return combinedErr } func (e *Engine) Close() error { @@ -227,7 +251,9 @@ func NewEngine(lggr logger.Logger, registry types.CapabilitiesRegistry) (engine engine.triggerConfig, err = values.NewMap( map[string]any{ "feedlist": []any{ - 123, 456, 789, // ETHUSD, LINKUSD, USDBTC + "0x1111111111111111111100000000000000000000000000000000000000000000", // ETHUSD + "0x2222222222222222222200000000000000000000000000000000000000000000", // LINKUSD + "0x3333333333333333333300000000000000000000000000000000000000000000", // BTCUSD }, }, ) @@ -242,18 +268,18 @@ func NewEngine(lggr logger.Logger, registry types.CapabilitiesRegistry) (engine "aggregation_config": map[string]any{ // ETHUSD "0x1111111111111111111100000000000000000000000000000000000000000000": map[string]any{ - "deviation": decimal.NewFromFloat(0.003), - "heartbeat": 24, + "deviation": decimal.NewFromFloat(0.001), + "heartbeat": 1800, }, // LINKUSD "0x2222222222222222222200000000000000000000000000000000000000000000": map[string]any{ "deviation": decimal.NewFromFloat(0.001), - "heartbeat": 24, + "heartbeat": 1800, }, // BTCUSD "0x3333333333333333333300000000000000000000000000000000000000000000": map[string]any{ - "deviation": decimal.NewFromFloat(0.002), - "heartbeat": 6, + "deviation": decimal.NewFromFloat(0.001), + "heartbeat": 1800, }, }, "encoder": "EVM", @@ -265,10 +291,20 @@ func NewEngine(lggr logger.Logger, registry types.CapabilitiesRegistry) (engine return nil, err } - // Target - engine.targetType = "write_polygon-testnet-mumbai" - engine.targetConfig, err = values.NewMap(map[string]any{ - "address": "0xaabbcc", + // Targets + engine.targets = make([]target, 2) + engine.targets[0].typeStr = "write_polygon-testnet-mumbai" + engine.targets[0].config, err = values.NewMap(map[string]any{ + "address": "0x3F3554832c636721F1fD1822Ccca0354576741Ef", + "params": []any{"$(report)"}, + "abi": "receive(report bytes)", + }) + if err != nil { + return nil, err + } + engine.targets[1].typeStr = "write_ethereum-testnet-sepolia" + engine.targets[1].config, err = values.NewMap(map[string]any{ + "address": "0x54e220867af6683aE6DcBF535B4f952cB5116510", "params": []any{"$(report)"}, "abi": "receive(report bytes)", }) diff --git a/core/services/workflows/engine_test.go b/core/services/workflows/engine_test.go index aa84dc29cc9..e63264c789f 100644 --- a/core/services/workflows/engine_test.go +++ b/core/services/workflows/engine_test.go @@ -95,22 +95,37 @@ func TestEngineWithHardcodedWorkflow(t *testing.T) { ) require.NoError(t, reg.Add(ctx, consensus)) - target := newMockCapability( + target1 := newMockCapability( capabilities.MustNewCapabilityInfo( "write_polygon-testnet-mumbai", capabilities.CapabilityTypeTarget, - "a write capability targeting polygon mainnet", + "a write capability targeting polygon mumbai testnet", "v1.0.0", ), func(req capabilities.CapabilityRequest) (capabilities.CapabilityResponse, error) { + list := req.Inputs.Underlying["report"].(*values.List) + return capabilities.CapabilityResponse{ + Value: list.Underlying[0], + }, nil + }, + ) + require.NoError(t, reg.Add(ctx, target1)) + target2 := newMockCapability( + capabilities.MustNewCapabilityInfo( + "write_ethereum-testnet-sepolia", + capabilities.CapabilityTypeTarget, + "a write capability targeting ethereum sepolia testnet", + "v1.0.0", + ), + func(req capabilities.CapabilityRequest) (capabilities.CapabilityResponse, error) { list := req.Inputs.Underlying["report"].(*values.List) return capabilities.CapabilityResponse{ Value: list.Underlying[0], }, nil }, ) - require.NoError(t, reg.Add(ctx, target)) + require.NoError(t, reg.Add(ctx, target2)) lggr := logger.TestLogger(t) eng, err := NewEngine(lggr, reg) @@ -130,5 +145,6 @@ func TestEngineWithHardcodedWorkflow(t *testing.T) { err = eng.Start(ctx) require.NoError(t, err) defer eng.Close() - assert.Equal(t, cr, <-target.response) + assert.Equal(t, cr, <-target1.response) + assert.Equal(t, cr, <-target2.response) }