Skip to content

Commit

Permalink
[KS-490] Support per-step timeout overrides in the Engine (#15367)
Browse files Browse the repository at this point in the history
  • Loading branch information
bolekk authored Nov 21, 2024
1 parent 1231f14 commit cb20337
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 7 deletions.
32 changes: 25 additions & 7 deletions core/services/workflows/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/store"
)

const fifteenMinutesMs = 15 * 60 * 1000
const (
fifteenMinutesMs = 15 * 60 * 1000
reservedFieldNameStepTimeout = "cre_step_timeout"
maxStepTimeoutOverrideSec = 10 * 60 // 10 minutes
)

type stepRequest struct {
stepRef string
Expand Down Expand Up @@ -769,10 +773,7 @@ func (e *Engine) workerForStepRequest(ctx context.Context, msg stepRequest) {
// TODO ks-462 inputs
logCustMsg(ctx, cma, "executing step", l)

stepCtx, cancel := context.WithTimeout(ctx, e.stepTimeoutDuration)
defer cancel()

inputs, outputs, err := e.executeStep(stepCtx, l, msg)
inputs, outputs, err := e.executeStep(ctx, l, msg)
var stepStatus string
switch {
case errors.Is(capabilities.ErrStopExecution, err):
Expand Down Expand Up @@ -919,6 +920,20 @@ func (e *Engine) executeStep(ctx context.Context, lggr logger.Logger, msg stepRe
if err != nil {
return nil, nil, err
}
stepTimeoutDuration := e.stepTimeoutDuration
if timeoutOverride, ok := config.Underlying[reservedFieldNameStepTimeout]; ok {
var desiredTimeout int64
err2 := timeoutOverride.UnwrapTo(&desiredTimeout)
if err2 != nil {
e.logger.Warnw("couldn't decode step timeout override, using default", "error", err2, "default", stepTimeoutDuration)
} else {
if desiredTimeout > maxStepTimeoutOverrideSec {
e.logger.Warnw("desired step timeout is too large, limiting to max value", "maxValue", maxStepTimeoutOverrideSec)
desiredTimeout = maxStepTimeoutOverrideSec
}
stepTimeoutDuration = time.Duration(desiredTimeout) * time.Second
}
}

tr := capabilities.CapabilityRequest{
Inputs: inputsMap,
Expand All @@ -934,8 +949,11 @@ func (e *Engine) executeStep(ctx context.Context, lggr logger.Logger, msg stepRe
},
}

e.metrics.incrementCapabilityInvocationCounter(ctx)
output, err := step.capability.Execute(ctx, tr)
stepCtx, cancel := context.WithTimeout(ctx, stepTimeoutDuration)
defer cancel()

e.metrics.incrementCapabilityInvocationCounter(stepCtx)
output, err := step.capability.Execute(stepCtx, tr)
if err != nil {
return inputsMap, nil, err
}
Expand Down
1 change: 1 addition & 0 deletions core/services/workflows/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ targets:
address: "0x54e220867af6683aE6DcBF535B4f952cB5116510"
params: ["$(report)"]
abi: "receive(report bytes)"
cre_step_timeout: 610
`

type testHooks struct {
Expand Down

0 comments on commit cb20337

Please sign in to comment.