diff --git a/config/scheduler/config.yaml b/config/scheduler/config.yaml index f87ac102b6b..7f29baa6db9 100644 --- a/config/scheduler/config.yaml +++ b/config/scheduler/config.yaml @@ -139,4 +139,11 @@ scheduling: resolution: "1Mi" gangIdAnnotation: armadaproject.io/gangId gangCardinalityAnnotation: armadaproject.io/gangCardinality + failureEstimatorConfig: + nodeSuccessProbabilityCordonThreshold: 0.1 + queueSuccessProbabilityCordonThreshold: 0.05 + nodeCordonTimeout: "10m" + queueCordonTimeout: "1m" + nodeEquilibriumFailureRate: 0.0167 # 1 per minute. + queueEquilibriumFailureRate: 0.0167 # 1 per minute. diff --git a/internal/armada/configuration/types.go b/internal/armada/configuration/types.go index 70d8c550a9a..a5ea6272f45 100644 --- a/internal/armada/configuration/types.go +++ b/internal/armada/configuration/types.go @@ -219,6 +219,8 @@ type SchedulingConfig struct { ExecutorUpdateFrequency time.Duration // Enable new preemption strategy. EnableNewPreemptionStrategy bool + // Controls node and queue success probability estimation. + FailureEstimatorConfig FailureEstimatorConfig } const ( @@ -315,6 +317,18 @@ type PreemptionConfig struct { PriorityClassNameOverride *string } +// FailureEstimatorConfig contains config controlling node and queue success probability estimation. +// See the internal/scheduler/failureestimator package for details. +type FailureEstimatorConfig struct { + Disabled bool + NodeSuccessProbabilityCordonThreshold float64 + QueueSuccessProbabilityCordonThreshold float64 + NodeCordonTimeout time.Duration + QueueCordonTimeout time.Duration + NodeEquilibriumFailureRate float64 + QueueEquilibriumFailureRate float64 +} + // TODO: we can probably just typedef this to map[string]string type PostgresConfig struct { Connection map[string]string diff --git a/internal/scheduler/failureestimator/failureestimator.go b/internal/scheduler/failureestimator/failureestimator.go new file mode 100644 index 00000000000..9e35f5158e0 --- /dev/null +++ b/internal/scheduler/failureestimator/failureestimator.go @@ -0,0 +1,307 @@ +package failureestimator + +import ( + "fmt" + "math" + "sync" + "time" + + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + + "github.com/armadaproject/armada/internal/common/armadaerrors" +) + +const ( + namespace = "armada" + subsystem = "scheduler" + + // Floating point tolerance. Also used when applying limits to avoid divide-by-zero issues. + eps = 1e-15 + // Assumed success probability of "good" nodes (queues) used when calculating step size. + healthySuccessProbability = 0.95 +) + +// FailureEstimator is a system for answering the following question: +// "What's the probability of a job from queue Q completing successfully when scheduled on node N?" +// We assume the job may fail either because the job or node is faulty, and we assume these failures are independent. +// Denote by +// - P_q the probability of a job from queue q succeeding when running on a perfect node and +// - P_n is the probability of a perfect job succeeding on node n. +// The success probability of a job from queue q on node n is then Pr(p_q*p_n=1), +// where p_q and p_n are drawn from Bernoulli distributions with parameter P_q and P_n, respectively. +// +// Now, the goal is to jointly estimate P_q and P_n for each queue and node using observed successes and failures. +// The method used is statistical and only relies on knowing which queue a job belongs to and on which node it ran. +// The intuition of the method is that: +// - A job from a queue with many failures doesn't say much about the node; likely it's the job that's the problem. +// - A job failing on a node with many failures doesn't say much about the job; likely it's the node that's the problem. +// And vice versa. +// +// Specifically, we maximise the log-likelihood function of P_q and P_n using observed successes and failures. +// This maximisation is performed using online gradient descent, where for each success or failure, +// we update the corresponding P_q and P_n by taking a gradient step. +// See New(...) for more details regarding step size. +// +// Finally, we exponentially decay P_q and P_N towards 1 over time, +// such that nodes and queues for which we observe no failures appear to become healthier over time. +// See New(...) function for details regarding decay. +// +// This module internally only maintains success probability estimates, as this makes the maths cleaner. +// When exporting these via API calls we convert to failure probabilities as these are more intuitive to reason about. +type FailureEstimator struct { + // Map from node (queue) name to the estimated success probability of that node (queue). For example: + // - successProbabilityByNode["myNode"] = 0.85]: estimated failure probability of a perfect job run on "myNode" is 15%. + // - successProbabilityByQueue["myQueue"] = 0.60]: estimated failure probability of a job from "myQueue" run on a perfect node is 40%. + successProbabilityByNode map[string]float64 + successProbabilityByQueue map[string]float64 + + // Success probability below which to consider nodes (jobs) unhealthy. + nodeSuccessProbabilityCordonThreshold float64 + queueSuccessProbabilityCordonThreshold float64 + + // Exponential decay factor controlling how quickly estimated node (queue) success probability decays towards 1. + // Computed from: + // - {node, queue}SuccessProbabilityCordonThreshold + // - {node, queue}CordonTimeout + nodeFailureProbabilityDecayRate float64 + queueFailureProbabilityDecayRate float64 + timeOfLastDecay time.Time + + // Gradient descent step size. Controls the extent to which new data affects successProbabilityBy{Node, Queue}. + // Computed from: + // - {node, queue}SuccessProbabilityCordonThreshold + // - {node, queue}FailureProbabilityDecayRate + // - {node, queue}EquilibriumFailureRate + nodeStepSize float64 + queueStepSize float64 + + // Prometheus metrics. + failureProbabilityByNodeDesc *prometheus.Desc + failureProbabilityByQueueDesc *prometheus.Desc + + // If true, this module is disabled. + disabled bool + + // Mutex protecting the above fields. + // Prevents concurrent map modification issues when scraping metrics. + mu sync.Mutex +} + +// New returns a new FailureEstimator. Parameters have the following meaning: +// - {node, queue}SuccessProbabilityCordonThreshold: Success probability below which nodes (queues) are considered unhealthy. +// - {node, queue}CordonTimeout: Amount of time for which nodes (queues) remain unhealthy in the absence of any job successes or failures for that node (queue). +// - {node, queue}EquilibriumFailureRate: Job failures per second necessary for a node (queue) to remain unhealthy in the absence of any successes for that node (queue). +func New( + nodeSuccessProbabilityCordonThreshold float64, + queueSuccessProbabilityCordonThreshold float64, + nodeCordonTimeout time.Duration, + queueCordonTimeout time.Duration, + nodeEquilibriumFailureRate float64, + queueEquilibriumFailureRate float64, +) (*FailureEstimator, error) { + if nodeSuccessProbabilityCordonThreshold < 0 || nodeSuccessProbabilityCordonThreshold > 1 { + return nil, errors.WithStack(&armadaerrors.ErrInvalidArgument{ + Name: "nodeSuccessProbabilityCordonThreshold", + Value: nodeSuccessProbabilityCordonThreshold, + Message: fmt.Sprintf("outside allowed range [0, 1]"), + }) + } + if queueSuccessProbabilityCordonThreshold < 0 || queueSuccessProbabilityCordonThreshold > 1 { + return nil, errors.WithStack(&armadaerrors.ErrInvalidArgument{ + Name: "queueSuccessProbabilityCordonThreshold", + Value: queueSuccessProbabilityCordonThreshold, + Message: fmt.Sprintf("outside allowed range [0, 1]"), + }) + } + if nodeCordonTimeout < 0 { + return nil, errors.WithStack(&armadaerrors.ErrInvalidArgument{ + Name: "nodeCordonTimeout", + Value: nodeCordonTimeout, + Message: fmt.Sprintf("outside allowed range [0, Inf)"), + }) + } + if queueCordonTimeout < 0 { + return nil, errors.WithStack(&armadaerrors.ErrInvalidArgument{ + Name: "queueCordonTimeout", + Value: queueCordonTimeout, + Message: fmt.Sprintf("outside allowed range [0, Inf)"), + }) + } + if nodeEquilibriumFailureRate <= 0 { + return nil, errors.WithStack(&armadaerrors.ErrInvalidArgument{ + Name: "nodeEquilibriumFailureRate", + Value: nodeEquilibriumFailureRate, + Message: fmt.Sprintf("outside allowed range (0, Inf)"), + }) + } + if queueEquilibriumFailureRate <= 0 { + return nil, errors.WithStack(&armadaerrors.ErrInvalidArgument{ + Name: "queueEquilibriumFailureRate", + Value: queueEquilibriumFailureRate, + Message: fmt.Sprintf("outside allowed range (0, Inf)"), + }) + } + + // Compute decay rates such that a node (queue) with success probability 0 will over {node, queue}CordonTimeout time + // decay to a success probability of {node, queue}SuccessProbabilityCordonThreshold. + nodeFailureProbabilityDecayRate := math.Exp(math.Log(1-nodeSuccessProbabilityCordonThreshold) / nodeCordonTimeout.Seconds()) + queueFailureProbabilityDecayRate := math.Exp(math.Log(1-queueSuccessProbabilityCordonThreshold) / queueCordonTimeout.Seconds()) + + // Compute step size such that a node (queue) with success probability {node, queue}SuccessProbabilityCordonThreshold + // for which we observe 0 successes and {node, queue}EquilibriumFailureRate failures per second from "good" nodes (queues) + // will remain at exactly {node, queue}SuccessProbabilityCordonThreshold success probability. + dNodeSuccessProbability := healthySuccessProbability / (1 - nodeSuccessProbabilityCordonThreshold*healthySuccessProbability) + dQueueSuccessProbability := healthySuccessProbability / (1 - queueSuccessProbabilityCordonThreshold*healthySuccessProbability) + nodeStepSize := (1 - nodeSuccessProbabilityCordonThreshold - (1-nodeSuccessProbabilityCordonThreshold)*nodeFailureProbabilityDecayRate) / dNodeSuccessProbability / nodeEquilibriumFailureRate + queueStepSize := (1 - queueSuccessProbabilityCordonThreshold - (1-queueSuccessProbabilityCordonThreshold)*queueFailureProbabilityDecayRate) / dQueueSuccessProbability / queueEquilibriumFailureRate + + return &FailureEstimator{ + successProbabilityByNode: make(map[string]float64, 1024), + successProbabilityByQueue: make(map[string]float64, 128), + nodeSuccessProbabilityCordonThreshold: nodeSuccessProbabilityCordonThreshold, + queueSuccessProbabilityCordonThreshold: queueSuccessProbabilityCordonThreshold, + nodeFailureProbabilityDecayRate: nodeFailureProbabilityDecayRate, + queueFailureProbabilityDecayRate: queueFailureProbabilityDecayRate, + timeOfLastDecay: time.Now(), + nodeStepSize: nodeStepSize, + queueStepSize: queueStepSize, + failureProbabilityByNodeDesc: prometheus.NewDesc( + fmt.Sprintf("%s_%s_node_failure_probability", namespace, subsystem), + "Estimated per-node failure probability.", + []string{"node"}, + nil, + ), + failureProbabilityByQueueDesc: prometheus.NewDesc( + fmt.Sprintf("%s_%s_queue_failure_probability", namespace, subsystem), + "Estimated per-queue failure probability.", + []string{"queue"}, + nil, + ), + }, nil +} + +func (fe *FailureEstimator) Disable(v bool) { + if fe == nil { + return + } + fe.disabled = v +} + +func (fe *FailureEstimator) IsDisabled() bool { + if fe == nil { + return true + } + return fe.disabled +} + +// Decay moves the success probabilities of nodes (queues) closer to 1, depending on the configured cordon timeout. +// Periodically calling Decay() ensures nodes (queues) considered unhealthy are eventually considered healthy again, +// even if we observe no successes for those nodes (queues). +func (fe *FailureEstimator) Decay() { + fe.mu.Lock() + defer fe.mu.Unlock() + t := time.Now() + fe.decay(t.Sub(fe.timeOfLastDecay).Seconds()) + fe.timeOfLastDecay = t + return +} + +func (fe *FailureEstimator) decay(secondsSinceLastDecay float64) { + nodeFailureProbabilityDecay := math.Pow(fe.nodeFailureProbabilityDecayRate, secondsSinceLastDecay) + for k, v := range fe.successProbabilityByNode { + failureProbability := 1 - v + failureProbability *= nodeFailureProbabilityDecay + successProbability := 1 - failureProbability + fe.successProbabilityByNode[k] = applyBounds(successProbability) + } + + queueFailureProbabilityDecay := math.Pow(fe.queueFailureProbabilityDecayRate, secondsSinceLastDecay) + for k, v := range fe.successProbabilityByQueue { + failureProbability := 1 - v + failureProbability *= queueFailureProbabilityDecay + successProbability := 1 - failureProbability + fe.successProbabilityByQueue[k] = applyBounds(successProbability) + } + return +} + +// Update with success=false decreases the estimated success probability of the provided node and queue. +// Calling with success=true increases the estimated success probability of the provided node and queue. +// This update is performed by taking one gradient descent step. +func (fe *FailureEstimator) Update(node, queue string, success bool) { + fe.mu.Lock() + defer fe.mu.Unlock() + + // Assume that nodes (queues) we haven't seen before have a 50% success probability. + // Avoiding extreme values for new nodes (queues) helps avoid drastic changes to existing estimates. + nodeSuccessProbability, ok := fe.successProbabilityByNode[node] + if !ok { + nodeSuccessProbability = 0.5 + } + queueSuccessProbability, ok := fe.successProbabilityByQueue[queue] + if !ok { + queueSuccessProbability = 0.5 + } + + dNodeSuccessProbability, dQueueSuccessProbability := fe.negLogLikelihoodGradient(nodeSuccessProbability, queueSuccessProbability, success) + nodeSuccessProbability -= fe.nodeStepSize * dNodeSuccessProbability + queueSuccessProbability -= fe.queueStepSize * dQueueSuccessProbability + + fe.successProbabilityByNode[node] = applyBounds(nodeSuccessProbability) + fe.successProbabilityByQueue[queue] = applyBounds(queueSuccessProbability) +} + +// applyBounds ensures values stay in the range [eps, 1-eps]. +// This to avoid divide-by-zero issues. +func applyBounds(v float64) float64 { + if v < eps { + return eps + } else if v > 1.0-eps { + return 1.0 - eps + } else { + return v + } +} + +// negLogLikelihoodGradient returns the gradient of the negated log-likelihood function with respect to P_q and P_n. +func (fe *FailureEstimator) negLogLikelihoodGradient(nodeSuccessProbability, queueSuccessProbability float64, success bool) (float64, float64) { + if success { + dNodeSuccessProbability := -1 / nodeSuccessProbability + dQueueSuccessProbability := -1 / queueSuccessProbability + return dNodeSuccessProbability, dQueueSuccessProbability + } else { + dNodeSuccessProbability := queueSuccessProbability / (1 - nodeSuccessProbability*queueSuccessProbability) + dQueueSuccessProbability := nodeSuccessProbability / (1 - nodeSuccessProbability*queueSuccessProbability) + return dNodeSuccessProbability, dQueueSuccessProbability + } +} + +func (fe *FailureEstimator) Describe(ch chan<- *prometheus.Desc) { + if fe.IsDisabled() { + return + } + ch <- fe.failureProbabilityByNodeDesc + ch <- fe.failureProbabilityByQueueDesc +} + +func (fe *FailureEstimator) Collect(ch chan<- prometheus.Metric) { + if fe.IsDisabled() { + return + } + fe.mu.Lock() + defer fe.mu.Unlock() + + // Report failure probability rounded to nearest multiple of 0.01. + // (As it's unlikely the estimate is accurate to within less than this.) + for k, v := range fe.successProbabilityByNode { + failureProbability := 1 - v + failureProbability = math.Round(failureProbability*100) / 100 + ch <- prometheus.MustNewConstMetric(fe.failureProbabilityByNodeDesc, prometheus.GaugeValue, failureProbability, k) + } + for k, v := range fe.successProbabilityByQueue { + failureProbability := 1 - v + failureProbability = math.Round(failureProbability*100) / 100 + ch <- prometheus.MustNewConstMetric(fe.failureProbabilityByQueueDesc, prometheus.GaugeValue, failureProbability, k) + } +} diff --git a/internal/scheduler/failureestimator/failureestimator_test.go b/internal/scheduler/failureestimator/failureestimator_test.go new file mode 100644 index 00000000000..3b02f2a096a --- /dev/null +++ b/internal/scheduler/failureestimator/failureestimator_test.go @@ -0,0 +1,190 @@ +package failureestimator + +import ( + "math" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestNew(t *testing.T) { + successProbabilityCordonThreshold := 0.05 + cordonTimeout := 10 * time.Minute + equilibriumFailureRate := 0.1 + + // Node decay rate and step size tests. + fe, err := New( + successProbabilityCordonThreshold, + 0, + cordonTimeout, + 0, + equilibriumFailureRate, + eps, + ) + require.NoError(t, err) + assert.LessOrEqual(t, fe.nodeFailureProbabilityDecayRate, 0.9999145148300861+eps) + assert.GreaterOrEqual(t, fe.nodeFailureProbabilityDecayRate, 0.9999145148300861-eps) + assert.LessOrEqual(t, fe.nodeStepSize, 0.0008142462434300169+eps) + assert.GreaterOrEqual(t, fe.nodeStepSize, 0.0008142462434300169-eps) + + // Queue decay rate and step size tests. + fe, err = New( + 0, + successProbabilityCordonThreshold, + 0, + cordonTimeout, + eps, + equilibriumFailureRate, + ) + require.NoError(t, err) + assert.LessOrEqual(t, fe.queueFailureProbabilityDecayRate, 0.9999145148300861+eps) + assert.GreaterOrEqual(t, fe.queueFailureProbabilityDecayRate, 0.9999145148300861-eps) + assert.LessOrEqual(t, fe.queueStepSize, 0.0008142462434300169+eps) + assert.GreaterOrEqual(t, fe.queueStepSize, 0.0008142462434300169-eps) +} + +func TestDecay(t *testing.T) { + successProbabilityCordonThreshold := 0.05 + cordonTimeout := 10 * time.Minute + equilibriumFailureRate := 0.1 + + // Node decay tests. + fe, err := New( + successProbabilityCordonThreshold, + 0, + cordonTimeout, + 0, + equilibriumFailureRate, + eps, + ) + require.NoError(t, err) + + fe.successProbabilityByNode["foo"] = 0.0 + fe.successProbabilityByNode["bar"] = 0.4 + fe.successProbabilityByNode["baz"] = 1.0 + + fe.decay(0) + assert.Equal( + t, + fe.successProbabilityByNode, + map[string]float64{ + "foo": eps, + "bar": 0.4, + "baz": 1.0 - eps, + }, + ) + + fe.decay(1) + assert.Equal( + t, + fe.successProbabilityByNode, + map[string]float64{ + "foo": 1 - (1-eps)*math.Pow(fe.nodeFailureProbabilityDecayRate, 1), + "bar": 1 - (1-0.4)*math.Pow(fe.nodeFailureProbabilityDecayRate, 1), + "baz": 1.0 - eps, + }, + ) + + fe.decay(1e6) + assert.Equal( + t, + fe.successProbabilityByNode, + map[string]float64{ + "foo": 1.0 - eps, + "bar": 1.0 - eps, + "baz": 1.0 - eps, + }, + ) + + // Queue decay tests. + fe, err = New( + 0, + successProbabilityCordonThreshold, + 0, + cordonTimeout, + eps, + equilibriumFailureRate, + ) + require.NoError(t, err) + + fe.successProbabilityByQueue["foo"] = 0.0 + fe.successProbabilityByQueue["bar"] = 0.4 + fe.successProbabilityByQueue["baz"] = 1.0 + + fe.decay(0) + assert.Equal( + t, + fe.successProbabilityByQueue, + map[string]float64{ + "foo": eps, + "bar": 0.4, + "baz": 1.0 - eps, + }, + ) + + fe.decay(1) + assert.Equal( + t, + fe.successProbabilityByQueue, + map[string]float64{ + "foo": 1 - (1-eps)*math.Pow(fe.queueFailureProbabilityDecayRate, 1), + "bar": 1 - (1-0.4)*math.Pow(fe.queueFailureProbabilityDecayRate, 1), + "baz": 1.0 - eps, + }, + ) + + fe.decay(1e6) + assert.Equal( + t, + fe.successProbabilityByQueue, + map[string]float64{ + "foo": 1.0 - eps, + "bar": 1.0 - eps, + "baz": 1.0 - eps, + }, + ) +} + +func TestUpdate(t *testing.T) { + successProbabilityCordonThreshold := 0.05 + cordonTimeout := 10 * time.Minute + equilibriumFailureRate := 0.1 + + fe, err := New( + successProbabilityCordonThreshold, + successProbabilityCordonThreshold, + cordonTimeout, + cordonTimeout, + equilibriumFailureRate, + equilibriumFailureRate, + ) + require.NoError(t, err) + + fe.Update("node", "queue", false) + nodeSuccessProbability, ok := fe.successProbabilityByNode["node"] + require.True(t, ok) + queueSuccessProbability, ok := fe.successProbabilityByQueue["queue"] + require.True(t, ok) + assert.Greater(t, nodeSuccessProbability, eps) + assert.Greater(t, queueSuccessProbability, eps) + assert.Less(t, nodeSuccessProbability, healthySuccessProbability-eps) + assert.Less(t, queueSuccessProbability, healthySuccessProbability-eps) + + fe.Update("node", "queue", true) + assert.Greater(t, fe.successProbabilityByNode["node"], nodeSuccessProbability) + assert.Greater(t, fe.successProbabilityByQueue["queue"], queueSuccessProbability) + + for i := 0; i < 100000; i++ { + fe.Update("node", "queue", false) + } + assert.Equal(t, fe.successProbabilityByNode["node"], eps) + assert.Equal(t, fe.successProbabilityByQueue["queue"], eps) + + for i := 0; i < 100000; i++ { + fe.Update("node", "queue", true) + } + assert.Equal(t, fe.successProbabilityByNode["node"], 1-eps) + assert.Equal(t, fe.successProbabilityByQueue["queue"], 1-eps) +} diff --git a/internal/scheduler/metrics/metrics_test.go b/internal/scheduler/metrics/metrics_test.go new file mode 100644 index 00000000000..65bd502635e --- /dev/null +++ b/internal/scheduler/metrics/metrics_test.go @@ -0,0 +1,21 @@ +package metrics + +import ( + "regexp" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestFoo(t *testing.T) { + r, err := regexp.Compile("foo.*bar") + require.NoError(t, err) + assert.True(t, r.MatchString("foobar")) + assert.True(t, r.MatchString("foo bar")) + assert.True(t, r.MatchString("foo and bar")) + assert.True(t, r.MatchString("this is foo and bar so")) + assert.False(t, r.MatchString("barfoo")) + assert.False(t, r.MatchString("foo")) + assert.False(t, r.MatchString("bar")) +} diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index beffde1cc53..cacc6e263a4 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -16,6 +16,7 @@ import ( "github.com/armadaproject/armada/internal/common/logging" schedulercontext "github.com/armadaproject/armada/internal/scheduler/context" "github.com/armadaproject/armada/internal/scheduler/database" + "github.com/armadaproject/armada/internal/scheduler/failureestimator" "github.com/armadaproject/armada/internal/scheduler/jobdb" "github.com/armadaproject/armada/internal/scheduler/kubernetesobjects/affinity" "github.com/armadaproject/armada/internal/scheduler/metrics" @@ -74,6 +75,8 @@ type Scheduler struct { metrics *SchedulerMetrics // New scheduler metrics due to replace the above. schedulerMetrics *metrics.Metrics + // Used to estimate the probability of a job from a particular queue succeeding on a particular node. + failureEstimator *failureestimator.FailureEstimator // If true, enable scheduler assertions. // In particular, assert that the jobDb is in a valid state at the end of each cycle. enableAssertions bool @@ -94,6 +97,7 @@ func NewScheduler( nodeIdLabel string, metrics *SchedulerMetrics, schedulerMetrics *metrics.Metrics, + failureEstimator *failureestimator.FailureEstimator, ) (*Scheduler, error) { return &Scheduler{ jobRepository: jobRepository, @@ -114,6 +118,7 @@ func NewScheduler( runsSerial: -1, metrics: metrics, schedulerMetrics: schedulerMetrics, + failureEstimator: failureEstimator, }, nil } @@ -273,6 +278,26 @@ func (s *Scheduler) cycle(ctx *armadacontext.Context, updateAll bool, leaderToke } } + // Update success probability estimates. + if !s.failureEstimator.IsDisabled() { + s.failureEstimator.Decay() + for _, jst := range jsts { + if jst.Job == nil { + continue + } + run := jst.Job.LatestRun() + if run == nil { + continue + } + if jst.Failed { + s.failureEstimator.Update(run.NodeName(), jst.Job.GetQueue(), false) + } + if jst.Succeeded { + s.failureEstimator.Update(run.NodeName(), jst.Job.GetQueue(), true) + } + } + } + // Generate any eventSequences that came out of synchronising the db state. events, err := s.generateUpdateMessages(ctx, txn, updatedJobs, jobRepoRunErrorsByRunId) if err != nil { diff --git a/internal/scheduler/scheduler_test.go b/internal/scheduler/scheduler_test.go index ef256b969b4..6eef207aeed 100644 --- a/internal/scheduler/scheduler_test.go +++ b/internal/scheduler/scheduler_test.go @@ -779,6 +779,7 @@ func TestScheduler_TestCycle(t *testing.T) { nodeIdLabel, schedulerMetrics, nil, + nil, ) require.NoError(t, err) sched.EnableAssertions() @@ -940,6 +941,7 @@ func TestRun(t *testing.T) { nodeIdLabel, schedulerMetrics, nil, + nil, ) require.NoError(t, err) sched.EnableAssertions() @@ -1164,6 +1166,7 @@ func TestScheduler_TestSyncState(t *testing.T) { nodeIdLabel, schedulerMetrics, nil, + nil, ) require.NoError(t, err) sched.EnableAssertions() @@ -2313,6 +2316,7 @@ func TestCycleConsistency(t *testing.T) { nodeIdLabel, schedulerMetrics, nil, + nil, ) require.NoError(t, err) scheduler.clock = testClock diff --git a/internal/scheduler/schedulerapp.go b/internal/scheduler/schedulerapp.go index af8d5f59df5..3eda93540e1 100644 --- a/internal/scheduler/schedulerapp.go +++ b/internal/scheduler/schedulerapp.go @@ -31,6 +31,7 @@ import ( "github.com/armadaproject/armada/internal/common/types" schedulerconfig "github.com/armadaproject/armada/internal/scheduler/configuration" "github.com/armadaproject/armada/internal/scheduler/database" + "github.com/armadaproject/armada/internal/scheduler/failureestimator" "github.com/armadaproject/armada/internal/scheduler/jobdb" "github.com/armadaproject/armada/internal/scheduler/metrics" "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" @@ -215,6 +216,23 @@ func Run(config schedulerconfig.Configuration) error { if err := prometheus.Register(schedulerMetrics); err != nil { return errors.WithStack(err) } + + failureEstimator, err := failureestimator.New( + config.Scheduling.FailureEstimatorConfig.NodeSuccessProbabilityCordonThreshold, + config.Scheduling.FailureEstimatorConfig.QueueSuccessProbabilityCordonThreshold, + config.Scheduling.FailureEstimatorConfig.NodeCordonTimeout, + config.Scheduling.FailureEstimatorConfig.QueueCordonTimeout, + config.Scheduling.FailureEstimatorConfig.NodeEquilibriumFailureRate, + config.Scheduling.FailureEstimatorConfig.QueueEquilibriumFailureRate, + ) + if err != nil { + return err + } + failureEstimator.Disable(config.Scheduling.FailureEstimatorConfig.Disabled) + if err := prometheus.Register(failureEstimator); err != nil { + return errors.WithStack(err) + } + scheduler, err := NewScheduler( jobDb, jobRepository, @@ -230,6 +248,7 @@ func Run(config schedulerconfig.Configuration) error { config.Scheduling.Preemption.NodeIdLabel, schedulingRoundMetrics, schedulerMetrics, + failureEstimator, ) if err != nil { return errors.WithMessage(err, "error creating scheduler")