Skip to content

Commit

Permalink
Merge branch 'feat/test-workflow' of github.com:dejanzele/armada into…
Browse files Browse the repository at this point in the history
… feat/test-workflow
  • Loading branch information
dejanzele committed Jun 28, 2023
2 parents 8984404 + 325580b commit a062acb
Show file tree
Hide file tree
Showing 51 changed files with 1,710 additions and 736 deletions.
1 change: 1 addition & 0 deletions config/armada/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ scheduling:
preemption:
nodeEvictionProbability: 1.0
nodeOversubscriptionEvictionProbability: 1.0
protectedFractionOfFairShare: 1.0
setNodeIdSelector: true
nodeIdLabel: kubernetes.io/hostname
setNodeName: false
Expand Down
3 changes: 3 additions & 0 deletions config/executor/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ kubernetes:
fatalPodSubmissionErrors:
- "admission webhook"
- "namespaces \".*\" not found"
stateChecks:
deadlineForSubmittedPodConsideredMissing: 15m
deadlineForActivePodConsideredMissing: 5m
pendingPodChecks:
deadlineForUpdates: 10m
deadlineForNodeAssignment: 5m
Expand Down
2 changes: 2 additions & 0 deletions internal/armada/configuration/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,8 @@ type PreemptionConfig struct {
// the probability of evicting jobs on oversubscribed nodes, i.e.,
// nodes on which the total resource requests are greater than the available resources.
NodeOversubscriptionEvictionProbability float64
// Only queues allocated more than this fraction of their fair share are considered for preemption.
ProtectedFractionOfFairShare float64
// If true, the Armada scheduler will add to scheduled pods a node selector
// NodeIdLabel: <value of label on node selected by scheduler>.
// If true, NodeIdLabel must be non-empty.
Expand Down
9 changes: 7 additions & 2 deletions internal/armada/server/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ func (q *AggregatedQueueServer) getJobs(ctx context.Context, req *api.StreamingL

// Group gangs.
for _, job := range jobs {
gangId, _, isGangJob, err := scheduler.GangIdAndCardinalityFromLegacySchedulerJob(job, q.schedulingConfig.Preemption.PriorityClasses)
gangId, _, isGangJob, err := scheduler.GangIdAndCardinalityFromLegacySchedulerJob(job)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -469,7 +469,11 @@ func (q *AggregatedQueueServer) getJobs(ctx context.Context, req *api.StreamingL
schedulerobjects.ResourceList{Resources: totalCapacity},
)
for queue, priorityFactor := range priorityFactorByQueue {
if err := sctx.AddQueueSchedulingContext(queue, priorityFactor, allocatedByQueueAndPriorityClassForPool[queue]); err != nil {
var weight float64 = 1
if priorityFactor > 0 {
weight = 1 / priorityFactor
}
if err := sctx.AddQueueSchedulingContext(queue, weight, allocatedByQueueAndPriorityClassForPool[queue]); err != nil {
return nil, err
}
}
Expand All @@ -484,6 +488,7 @@ func (q *AggregatedQueueServer) getJobs(ctx context.Context, req *api.StreamingL
constraints,
q.schedulingConfig.Preemption.NodeEvictionProbability,
q.schedulingConfig.Preemption.NodeOversubscriptionEvictionProbability,
q.schedulingConfig.Preemption.ProtectedFractionOfFairShare,
&SchedulerJobRepositoryAdapter{
r: q.jobRepository,
},
Expand Down
12 changes: 12 additions & 0 deletions internal/common/database/lookout/jobstates.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,18 @@ const (
)

var (
// JobStates is an ordered list of states
JobStates = []JobState{
JobQueued,
JobLeased,
JobPending,
JobRunning,
JobSucceeded,
JobFailed,
JobCancelled,
JobPreempted,
}

JobStateMap = map[int]JobState{
JobLeasedOrdinal: JobLeased,
JobQueuedOrdinal: JobQueued,
Expand Down
4 changes: 3 additions & 1 deletion internal/executor/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,11 @@ func setupExecutorApiComponents(
jobRunState,
submitter,
etcdHealthMonitor)
podIssueService := service.NewPodIssueService(
podIssueService := service.NewIssueHandler(
jobRunState,
clusterContext,
eventReporter,
config.Kubernetes.StateChecks,
pendingPodChecker,
config.Kubernetes.StuckTerminatingPodExpiry)

Expand Down
12 changes: 12 additions & 0 deletions internal/executor/configuration/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,17 @@ type PodDefaults struct {
Ingress *IngressConfiguration
}

type StateChecksConfiguration struct {
// Once a pod is submitted to kubernetes, this is how long we'll wait for it to appear in the kubernetes informer state
// If the pod hasn't appeared after this duration, it is considered missing
DeadlineForSubmittedPodConsideredMissing time.Duration
// Once the executor has seen a pod appear on the cluster, it considers that run Active
// If we get into a state where there is no longer a pod backing that Active run, this is how long we'll wait before we consider the pod missing
// The most likely cause of this is actually a bug in the executors processing of the kubernetes state
// However without it - we can have runs get indefinitely stuck as Active with no backing pod
DeadlineForActivePodConsideredMissing time.Duration
}

type IngressConfiguration struct {
HostnameSuffix string
CertNameSuffix string
Expand Down Expand Up @@ -54,6 +65,7 @@ type KubernetesConfiguration struct {
MaxTerminatedPods int
MinimumJobSize armadaresource.ComputeResources
PodDefaults *PodDefaults
StateChecks StateChecksConfiguration
PendingPodChecks *podchecks.Checks
FatalPodSubmissionErrors []string
// Minimum amount of resources marked as allocated to non-Armada pods on each node.
Expand Down
12 changes: 8 additions & 4 deletions internal/executor/job/job_run_state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,14 @@ func NewJobRunStateStore(clusterContext context.ClusterContext) *JobRunStateStor
return
}

stateStore.reportRunActive(pod)
if !util.IsPodFinishedAndReported(pod) {
stateStore.reportRunActive(pod)
}
},
})

// On start up, make sure our state matches current k8s state
err := stateStore.reconcileStateWithKubernetes()
err := stateStore.initialiseStateFromKubernetes()
if err != nil {
panic(err)
}
Expand All @@ -75,7 +77,7 @@ func NewJobRunStateStoreWithInitialState(initialJobRuns []*RunState) *JobRunStat
return stateStore
}

func (stateStore *JobRunStateStore) reconcileStateWithKubernetes() error {
func (stateStore *JobRunStateStore) initialiseStateFromKubernetes() error {
pods, err := stateStore.clusterContext.GetAllPods()
if err != nil {
return err
Expand All @@ -84,7 +86,9 @@ func (stateStore *JobRunStateStore) reconcileStateWithKubernetes() error {
return !util.IsLegacyManagedPod(pod)
})
for _, pod := range pods {
stateStore.reportRunActive(pod)
if !util.IsPodFinishedAndReported(pod) {
stateStore.reportRunActive(pod)
}
}

return nil
Expand Down
15 changes: 14 additions & 1 deletion internal/executor/job/job_run_state_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"sort"
"testing"
"time"

"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
Expand All @@ -23,7 +24,7 @@ var defaultRunInfoMeta = &RunMeta{
JobSet: "job-set-1",
}

func TestOnStartUp_ReconcilesWithKubernetes(t *testing.T) {
func TestOnStartUp_ReconcilesWithKubernetes_ActivePod(t *testing.T) {
existingPod := createPod()

jobRunStateManager, _ := setup(t, []*v1.Pod{existingPod})
Expand All @@ -38,6 +39,18 @@ func TestOnStartUp_ReconcilesWithKubernetes(t *testing.T) {
assert.Equal(t, allKnownJobRuns[0].Phase, Active)
}

func TestOnStartUp_ReconcilesWithKubernetes_IgnoresDonePods(t *testing.T) {
donePod := createPod()
donePod.Status.Phase = v1.PodSucceeded
donePod.Annotations[domain.JobDoneAnnotation] = "true"
donePod.Annotations[string(donePod.Status.Phase)] = fmt.Sprintf("%s", time.Now())

jobRunStateManager, _ := setup(t, []*v1.Pod{donePod})
allKnownJobRuns := jobRunStateManager.GetAll()

assert.Len(t, allKnownJobRuns, 0)
}

func TestReportRunLeased(t *testing.T) {
job := &SubmitJob{
Meta: SubmitJobMeta{
Expand Down
1 change: 1 addition & 0 deletions internal/executor/reporter/job_event_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ func (eventReporter *JobEventReporter) reportStatusUpdate(old *v1.Pod, new *v1.P
// Don't report status change for pods Armada is deleting
// This prevents reporting JobFailed when we delete a pod - for example due to cancellation
if util.IsMarkedForDeletion(new) {
log.Infof("not sending event to report pod %s moving into phase %s as pod is marked for deletion", new.Name, new.Status.Phase)
return
}
eventReporter.reportCurrentStatus(new)
Expand Down
Loading

0 comments on commit a062acb

Please sign in to comment.