Skip to content

Commit

Permalink
Add basic reconciliation between executor RunState and kubernetes (#2604
Browse files Browse the repository at this point in the history
)

* Tidy pod_issue_handler

This is largely a noop with a few minor changes

 - Move logging to happen before action is taken
 - Add logging for when issues self resolve
 - Don't break out of detectPodIssues - so we can detect more than 1 issue per round
 -

* Add basic reconciliation between executor RunState and kubernetes

We can get into the state where the executor RunState thinks there is an active run - but there is no pod backing the run

The result of this is that the run can never finish (as there is no pod) and will stay in Pending/Running forever

This PR just adds some basic reconciliation so if there is no pod backing the RunState then an action will be taken
  • Loading branch information
JamesMurkin authored Jun 23, 2023
1 parent c6466da commit d59732a
Show file tree
Hide file tree
Showing 7 changed files with 355 additions and 98 deletions.
3 changes: 3 additions & 0 deletions config/executor/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ kubernetes:
fatalPodSubmissionErrors:
- "admission webhook"
- "namespaces \".*\" not found"
stateChecks:
deadlineForSubmittedPodConsideredMissing: 15m
deadlineForActivePodConsideredMissing: 5m
pendingPodChecks:
deadlineForUpdates: 10m
deadlineForNodeAssignment: 5m
Expand Down
4 changes: 3 additions & 1 deletion internal/executor/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,11 @@ func setupExecutorApiComponents(
jobRunState,
submitter,
etcdHealthMonitor)
podIssueService := service.NewPodIssueService(
podIssueService := service.NewIssueHandler(
jobRunState,
clusterContext,
eventReporter,
config.Kubernetes.StateChecks,
pendingPodChecker,
config.Kubernetes.StuckTerminatingPodExpiry)

Expand Down
12 changes: 12 additions & 0 deletions internal/executor/configuration/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,17 @@ type PodDefaults struct {
Ingress *IngressConfiguration
}

type StateChecksConfiguration struct {
// Once a pod is submitted to kubernetes, this is how long we'll wait for it to appear in the kubernetes informer state
// If the pod hasn't appeared after this duration, it is considered missing
DeadlineForSubmittedPodConsideredMissing time.Duration
// Once the executor has seen a pod appear on the cluster, it considers that run Active
// If we get into a state where there is no longer a pod backing that Active run, this is how long we'll wait before we consider the pod missing
// The most likely cause of this is actually a bug in the executors processing of the kubernetes state
// However without it - we can have runs get indefinitely stuck as Active with no backing pod
DeadlineForActivePodConsideredMissing time.Duration
}

type IngressConfiguration struct {
HostnameSuffix string
CertNameSuffix string
Expand Down Expand Up @@ -54,6 +65,7 @@ type KubernetesConfiguration struct {
MaxTerminatedPods int
MinimumJobSize armadaresource.ComputeResources
PodDefaults *PodDefaults
StateChecks StateChecksConfiguration
PendingPodChecks *podchecks.Checks
FatalPodSubmissionErrors []string
// Minimum amount of resources marked as allocated to non-Armada pods on each node.
Expand Down
12 changes: 8 additions & 4 deletions internal/executor/job/job_run_state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,14 @@ func NewJobRunStateStore(clusterContext context.ClusterContext) *JobRunStateStor
return
}

stateStore.reportRunActive(pod)
if !util.IsPodFinishedAndReported(pod) {
stateStore.reportRunActive(pod)
}
},
})

// On start up, make sure our state matches current k8s state
err := stateStore.reconcileStateWithKubernetes()
err := stateStore.initialiseStateFromKubernetes()
if err != nil {
panic(err)
}
Expand All @@ -75,7 +77,7 @@ func NewJobRunStateStoreWithInitialState(initialJobRuns []*RunState) *JobRunStat
return stateStore
}

func (stateStore *JobRunStateStore) reconcileStateWithKubernetes() error {
func (stateStore *JobRunStateStore) initialiseStateFromKubernetes() error {
pods, err := stateStore.clusterContext.GetAllPods()
if err != nil {
return err
Expand All @@ -84,7 +86,9 @@ func (stateStore *JobRunStateStore) reconcileStateWithKubernetes() error {
return !util.IsLegacyManagedPod(pod)
})
for _, pod := range pods {
stateStore.reportRunActive(pod)
if !util.IsPodFinishedAndReported(pod) {
stateStore.reportRunActive(pod)
}
}

return nil
Expand Down
15 changes: 14 additions & 1 deletion internal/executor/job/job_run_state_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"sort"
"testing"
"time"

"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
Expand All @@ -23,7 +24,7 @@ var defaultRunInfoMeta = &RunMeta{
JobSet: "job-set-1",
}

func TestOnStartUp_ReconcilesWithKubernetes(t *testing.T) {
func TestOnStartUp_ReconcilesWithKubernetes_ActivePod(t *testing.T) {
existingPod := createPod()

jobRunStateManager, _ := setup(t, []*v1.Pod{existingPod})
Expand All @@ -38,6 +39,18 @@ func TestOnStartUp_ReconcilesWithKubernetes(t *testing.T) {
assert.Equal(t, allKnownJobRuns[0].Phase, Active)
}

func TestOnStartUp_ReconcilesWithKubernetes_IgnoresDonePods(t *testing.T) {
donePod := createPod()
donePod.Status.Phase = v1.PodSucceeded
donePod.Annotations[domain.JobDoneAnnotation] = "true"
donePod.Annotations[string(donePod.Status.Phase)] = fmt.Sprintf("%s", time.Now())

jobRunStateManager, _ := setup(t, []*v1.Pod{donePod})
allKnownJobRuns := jobRunStateManager.GetAll()

assert.Len(t, allKnownJobRuns, 0)
}

func TestReportRunLeased(t *testing.T) {
job := &SubmitJob{
Meta: SubmitJobMeta{
Expand Down
Loading

0 comments on commit d59732a

Please sign in to comment.