From d59732afc89b59e5e00de752edc5faa697fca66e Mon Sep 17 00:00:00 2001 From: JamesMurkin Date: Fri, 23 Jun 2023 16:31:01 +0100 Subject: [PATCH] Add basic reconciliation between executor RunState and kubernetes (#2604) * Tidy pod_issue_handler This is largely a noop with a few minor changes - Move logging to happen before action is taken - Add logging for when issues self resolve - Don't break out of detectPodIssues - so we can detect more than 1 issue per round - * Add basic reconciliation between executor RunState and kubernetes We can get into the state where the executor RunState thinks there is an active run - but there is no pod backing the run The result of this is that the run can never finish (as there is no pod) and will stay in Pending/Running forever This PR just adds some basic reconciliation so if there is no pod backing the RunState then an action will be taken --- config/executor/config.yaml | 3 + internal/executor/application.go | 4 +- internal/executor/configuration/types.go | 12 + internal/executor/job/job_run_state_store.go | 12 +- .../executor/job/job_run_state_store_test.go | 15 +- .../executor/service/pod_issue_handler.go | 293 +++++++++++++----- .../service/pod_issue_handler_test.go | 114 ++++++- 7 files changed, 355 insertions(+), 98 deletions(-) diff --git a/config/executor/config.yaml b/config/executor/config.yaml index c81beb2773d..44cb869cc91 100644 --- a/config/executor/config.yaml +++ b/config/executor/config.yaml @@ -59,6 +59,9 @@ kubernetes: fatalPodSubmissionErrors: - "admission webhook" - "namespaces \".*\" not found" + stateChecks: + deadlineForSubmittedPodConsideredMissing: 15m + deadlineForActivePodConsideredMissing: 5m pendingPodChecks: deadlineForUpdates: 10m deadlineForNodeAssignment: 5m diff --git a/internal/executor/application.go b/internal/executor/application.go index 47c9f02dd33..d7ad549c47c 100644 --- a/internal/executor/application.go +++ b/internal/executor/application.go @@ -189,9 +189,11 @@ func setupExecutorApiComponents( jobRunState, submitter, etcdHealthMonitor) - podIssueService := service.NewPodIssueService( + podIssueService := service.NewIssueHandler( + jobRunState, clusterContext, eventReporter, + config.Kubernetes.StateChecks, pendingPodChecker, config.Kubernetes.StuckTerminatingPodExpiry) diff --git a/internal/executor/configuration/types.go b/internal/executor/configuration/types.go index 04f7ccfa482..4798f29710a 100644 --- a/internal/executor/configuration/types.go +++ b/internal/executor/configuration/types.go @@ -26,6 +26,17 @@ type PodDefaults struct { Ingress *IngressConfiguration } +type StateChecksConfiguration struct { + // Once a pod is submitted to kubernetes, this is how long we'll wait for it to appear in the kubernetes informer state + // If the pod hasn't appeared after this duration, it is considered missing + DeadlineForSubmittedPodConsideredMissing time.Duration + // Once the executor has seen a pod appear on the cluster, it considers that run Active + // If we get into a state where there is no longer a pod backing that Active run, this is how long we'll wait before we consider the pod missing + // The most likely cause of this is actually a bug in the executors processing of the kubernetes state + // However without it - we can have runs get indefinitely stuck as Active with no backing pod + DeadlineForActivePodConsideredMissing time.Duration +} + type IngressConfiguration struct { HostnameSuffix string CertNameSuffix string @@ -54,6 +65,7 @@ type KubernetesConfiguration struct { MaxTerminatedPods int MinimumJobSize armadaresource.ComputeResources PodDefaults *PodDefaults + StateChecks StateChecksConfiguration PendingPodChecks *podchecks.Checks FatalPodSubmissionErrors []string // Minimum amount of resources marked as allocated to non-Armada pods on each node. diff --git a/internal/executor/job/job_run_state_store.go b/internal/executor/job/job_run_state_store.go index 421d650e7d8..2752ac5bfb3 100644 --- a/internal/executor/job/job_run_state_store.go +++ b/internal/executor/job/job_run_state_store.go @@ -51,12 +51,14 @@ func NewJobRunStateStore(clusterContext context.ClusterContext) *JobRunStateStor return } - stateStore.reportRunActive(pod) + if !util.IsPodFinishedAndReported(pod) { + stateStore.reportRunActive(pod) + } }, }) // On start up, make sure our state matches current k8s state - err := stateStore.reconcileStateWithKubernetes() + err := stateStore.initialiseStateFromKubernetes() if err != nil { panic(err) } @@ -75,7 +77,7 @@ func NewJobRunStateStoreWithInitialState(initialJobRuns []*RunState) *JobRunStat return stateStore } -func (stateStore *JobRunStateStore) reconcileStateWithKubernetes() error { +func (stateStore *JobRunStateStore) initialiseStateFromKubernetes() error { pods, err := stateStore.clusterContext.GetAllPods() if err != nil { return err @@ -84,7 +86,9 @@ func (stateStore *JobRunStateStore) reconcileStateWithKubernetes() error { return !util.IsLegacyManagedPod(pod) }) for _, pod := range pods { - stateStore.reportRunActive(pod) + if !util.IsPodFinishedAndReported(pod) { + stateStore.reportRunActive(pod) + } } return nil diff --git a/internal/executor/job/job_run_state_store_test.go b/internal/executor/job/job_run_state_store_test.go index 9092ffa90d9..da29c9a4f7f 100644 --- a/internal/executor/job/job_run_state_store_test.go +++ b/internal/executor/job/job_run_state_store_test.go @@ -4,6 +4,7 @@ import ( "fmt" "sort" "testing" + "time" "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" @@ -23,7 +24,7 @@ var defaultRunInfoMeta = &RunMeta{ JobSet: "job-set-1", } -func TestOnStartUp_ReconcilesWithKubernetes(t *testing.T) { +func TestOnStartUp_ReconcilesWithKubernetes_ActivePod(t *testing.T) { existingPod := createPod() jobRunStateManager, _ := setup(t, []*v1.Pod{existingPod}) @@ -38,6 +39,18 @@ func TestOnStartUp_ReconcilesWithKubernetes(t *testing.T) { assert.Equal(t, allKnownJobRuns[0].Phase, Active) } +func TestOnStartUp_ReconcilesWithKubernetes_IgnoresDonePods(t *testing.T) { + donePod := createPod() + donePod.Status.Phase = v1.PodSucceeded + donePod.Annotations[domain.JobDoneAnnotation] = "true" + donePod.Annotations[string(donePod.Status.Phase)] = fmt.Sprintf("%s", time.Now()) + + jobRunStateManager, _ := setup(t, []*v1.Pod{donePod}) + allKnownJobRuns := jobRunStateManager.GetAll() + + assert.Len(t, allKnownJobRuns, 0) +} + func TestReportRunLeased(t *testing.T) { job := &SubmitJob{ Meta: SubmitJobMeta{ diff --git a/internal/executor/service/pod_issue_handler.go b/internal/executor/service/pod_issue_handler.go index 8d6b6d99200..7cf61f0007a 100644 --- a/internal/executor/service/pod_issue_handler.go +++ b/internal/executor/service/pod_issue_handler.go @@ -8,19 +8,22 @@ import ( log "github.com/sirupsen/logrus" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/clock" "k8s.io/client-go/tools/cache" + "github.com/armadaproject/armada/internal/executor/configuration" executorContext "github.com/armadaproject/armada/internal/executor/context" + "github.com/armadaproject/armada/internal/executor/job" "github.com/armadaproject/armada/internal/executor/podchecks" "github.com/armadaproject/armada/internal/executor/reporter" "github.com/armadaproject/armada/internal/executor/util" "github.com/armadaproject/armada/pkg/api" ) -type IssueType int +type podIssueType int const ( - UnableToSchedule IssueType = iota + UnableToSchedule podIssueType = iota StuckStartingUp StuckTerminating ExternallyDeleted @@ -29,46 +32,64 @@ const ( type podIssue struct { // A copy of the pod when an issue was detected OriginalPodState *v1.Pod - JobId string - RunId string Message string Retryable bool - Reported bool DeletionRequested bool - Type IssueType + Type podIssueType Cause api.Cause } +type reconciliationIssue struct { + InitialDetectionTime time.Time + OriginalRunState *job.RunState +} + type issue struct { CurrentPodState *v1.Pod - Issue *podIssue + RunIssue *runIssue +} + +type runIssue struct { + JobId string + RunId string + PodIssue *podIssue + ReconciliationIssue *reconciliationIssue + Reported bool } -type PodIssueService struct { +type IssueHandler struct { clusterContext executorContext.ClusterContext eventReporter reporter.EventReporter pendingPodChecker podchecks.PodChecker + stateChecksConfig configuration.StateChecksConfiguration stuckTerminatingPodExpiry time.Duration // JobRunId -> PodIssue - knownPodIssues map[string]*podIssue + knownPodIssues map[string]*runIssue podIssueMutex sync.Mutex + jobRunState job.RunStateStore + clock clock.Clock } -func NewPodIssueService( +func NewIssueHandler( + jobRunState job.RunStateStore, clusterContext executorContext.ClusterContext, eventReporter reporter.EventReporter, + stateChecksConfig configuration.StateChecksConfiguration, pendingPodChecker podchecks.PodChecker, stuckTerminatingPodExpiry time.Duration, -) *PodIssueService { - podIssueService := &PodIssueService{ +) *IssueHandler { + issueHandler := &IssueHandler{ + jobRunState: jobRunState, clusterContext: clusterContext, eventReporter: eventReporter, pendingPodChecker: pendingPodChecker, + stateChecksConfig: stateChecksConfig, stuckTerminatingPodExpiry: stuckTerminatingPodExpiry, - knownPodIssues: map[string]*podIssue{}, + knownPodIssues: map[string]*runIssue{}, podIssueMutex: sync.Mutex{}, + clock: clock.RealClock{}, } clusterContext.AddPodEventHandler(cache.ResourceEventHandlerFuncs{ @@ -78,20 +99,20 @@ func NewPodIssueService( log.Errorf("Failed to process pod event due to it being an unexpected type. Failed to process %+v", obj) return } - podIssueService.handleDeletedPod(pod) + issueHandler.handleDeletedPod(pod) }, }) - return podIssueService + return issueHandler } -func (p *PodIssueService) registerIssue(issue *podIssue) { +func (p *IssueHandler) registerIssue(issue *runIssue) { p.podIssueMutex.Lock() defer p.podIssueMutex.Unlock() runId := issue.RunId if runId == "" { - log.Warnf("Not registering an issue for job %s (%s) as run id was empty", issue.JobId, issue.OriginalPodState.Name) + log.Warnf("Not registering an issue for job %s as run id was empty", issue.JobId) return } _, exists := p.knownPodIssues[issue.RunId] @@ -102,18 +123,18 @@ func (p *PodIssueService) registerIssue(issue *podIssue) { } } -func (p *PodIssueService) markIssuesResolved(issue *podIssue) { +func (p *IssueHandler) markIssuesResolved(issue *runIssue) { p.podIssueMutex.Lock() defer p.podIssueMutex.Unlock() delete(p.knownPodIssues, issue.RunId) } -func (p *PodIssueService) markIssueReported(issue *podIssue) { +func (p *IssueHandler) markIssueReported(issue *runIssue) { issue.Reported = true } -func (p *PodIssueService) HandlePodIssues() { +func (p *IssueHandler) HandlePodIssues() { managedPods, err := p.clusterContext.GetBatchPods() if err != nil { log.WithError(err).Errorf("unable to handle pod issus as failed to load pods") @@ -122,26 +143,29 @@ func (p *PodIssueService) HandlePodIssues() { return !util.IsLegacyManagedPod(pod) }) p.detectPodIssues(managedPods) + p.detectReconciliationIssues(managedPods) ctx, cancel := context.WithTimeout(context.Background(), time.Minute*2) defer cancel() - p.handleKnownPodIssues(ctx, managedPods) + p.handleKnownIssues(ctx, managedPods) } -func (p *PodIssueService) detectPodIssues(allManagedPods []*v1.Pod) { +func (p *IssueHandler) detectPodIssues(allManagedPods []*v1.Pod) { for _, pod := range allManagedPods { - if pod.DeletionTimestamp != nil && pod.DeletionTimestamp.Add(p.stuckTerminatingPodExpiry).Before(time.Now()) { + if pod.DeletionTimestamp != nil && pod.DeletionTimestamp.Add(p.stuckTerminatingPodExpiry).Before(p.clock.Now()) { // pod is stuck in terminating phase, this sometimes happen on node failure // it is safer to produce failed event than retrying as the job might have run already issue := &podIssue{ OriginalPodState: pod.DeepCopy(), - JobId: util.ExtractJobId(pod), - RunId: util.ExtractJobRunId(pod), Message: "pod stuck in terminating phase, this might be due to platform problems", Retryable: false, Type: StuckTerminating, } - p.registerIssue(issue) + p.registerIssue(&runIssue{ + JobId: util.ExtractJobId(pod), + RunId: util.ExtractJobRunId(pod), + PodIssue: issue, + }) } else if pod.Status.Phase == v1.PodUnknown || pod.Status.Phase == v1.PodPending { podEvents, err := p.clusterContext.GetPodEvents(pod) @@ -155,7 +179,7 @@ func (p *PodIssueService) detectPodIssues(allManagedPods []*v1.Pod) { continue } - action, cause, podCheckMessage := p.pendingPodChecker.GetAction(pod, podEvents, time.Now().Sub(lastStateChange)) + action, cause, podCheckMessage := p.pendingPodChecker.GetAction(pod, podEvents, p.clock.Now().Sub(lastStateChange)) if action != podchecks.ActionWait { retryable := action == podchecks.ActionRetry @@ -169,25 +193,27 @@ func (p *PodIssueService) detectPodIssues(allManagedPods []*v1.Pod) { issue := &podIssue{ OriginalPodState: pod.DeepCopy(), - JobId: util.ExtractJobId(pod), - RunId: util.ExtractJobRunId(pod), Message: message, Retryable: retryable, Type: podIssueType, } - p.registerIssue(issue) + p.registerIssue(&runIssue{ + JobId: util.ExtractJobId(pod), + RunId: util.ExtractJobRunId(pod), + PodIssue: issue, + }) } } } } -func (p *PodIssueService) handleKnownPodIssues(ctx context.Context, allManagedPods []*v1.Pod) { +func (p *IssueHandler) handleKnownIssues(ctx context.Context, allManagedPods []*v1.Pod) { // Make issues from pods + issues issues := createIssues(allManagedPods, p.knownPodIssues) - util.ProcessItemsWithThreadPool(ctx, 20, issues, p.handlePodIssue) + util.ProcessItemsWithThreadPool(ctx, 20, issues, p.handleRunIssue) } -func createIssues(managedPods []*v1.Pod, podIssues map[string]*podIssue) []*issue { +func createIssues(managedPods []*v1.Pod, runIssues map[string]*runIssue) []*issue { podsByRunId := make(map[string]*v1.Pod, len(managedPods)) for _, pod := range managedPods { @@ -199,25 +225,40 @@ func createIssues(managedPods []*v1.Pod, podIssues map[string]*podIssue) []*issu } } - result := make([]*issue, 0, len(podIssues)) + result := make([]*issue, 0, len(runIssues)) - for _, podIssue := range podIssues { - relatedPod := podsByRunId[podIssue.RunId] - result = append(result, &issue{CurrentPodState: relatedPod, Issue: podIssue}) + for _, runIssue := range runIssues { + relatedPod := podsByRunId[runIssue.RunId] + result = append(result, &issue{CurrentPodState: relatedPod, RunIssue: runIssue}) } return result } -func (p *PodIssueService) handlePodIssue(issue *issue) { +func (p *IssueHandler) handleRunIssue(issue *issue) { + if issue == nil || issue.RunIssue == nil { + log.Warnf("issue found with missing issue details") + return + } + if issue.RunIssue.PodIssue != nil { + p.handlePodIssue(issue) + } else if issue.RunIssue.ReconciliationIssue != nil { + p.handleReconciliationIssue(issue) + } else { + log.Warnf("issue found with no issue details set for job %s run %s", issue.RunIssue.JobId, issue.RunIssue.RunId) + p.markIssuesResolved(issue.RunIssue) + } +} + +func (p *IssueHandler) handlePodIssue(issue *issue) { hasSelfResolved := hasPodIssueSelfResolved(issue) if hasSelfResolved { - log.Infof("Issue for job %s run %s has self resolved", issue.Issue.JobId, issue.Issue.RunId) - p.markIssuesResolved(issue.Issue) + log.Infof("Issue for job %s run %s has self resolved", issue.RunIssue.JobId, issue.RunIssue.RunId) + p.markIssuesResolved(issue.RunIssue) return } - if issue.Issue.Retryable { + if issue.RunIssue.PodIssue.Retryable { p.handleRetryableJobIssue(issue) } else { p.handleNonRetryableJobIssue(issue) @@ -229,32 +270,32 @@ func (p *PodIssueService) handlePodIssue(issue *issue) { // - Report JobFailedEvent // // Once that is done we are free to cleanup the pod -func (p *PodIssueService) handleNonRetryableJobIssue(issue *issue) { - if !issue.Issue.Reported { - log.Infof("Non-retryable issue detected for job %s run %s - %s", issue.Issue.JobId, issue.Issue.RunId, issue.Issue.Message) - message := issue.Issue.Message +func (p *IssueHandler) handleNonRetryableJobIssue(issue *issue) { + if !issue.RunIssue.Reported { + log.Infof("Non-retryable issue detected for job %s run %s - %s", issue.RunIssue.JobId, issue.RunIssue.RunId, issue.RunIssue.PodIssue.Message) + message := issue.RunIssue.PodIssue.Message events := make([]reporter.EventMessage, 0, 2) - if issue.Issue.Type == StuckStartingUp || issue.Issue.Type == UnableToSchedule { - unableToScheduleEvent := reporter.CreateJobUnableToScheduleEvent(issue.Issue.OriginalPodState, message, p.clusterContext.GetClusterId()) - events = append(events, reporter.EventMessage{Event: unableToScheduleEvent, JobRunId: issue.Issue.RunId}) + if issue.RunIssue.PodIssue.Type == StuckStartingUp || issue.RunIssue.PodIssue.Type == UnableToSchedule { + unableToScheduleEvent := reporter.CreateJobUnableToScheduleEvent(issue.RunIssue.PodIssue.OriginalPodState, message, p.clusterContext.GetClusterId()) + events = append(events, reporter.EventMessage{Event: unableToScheduleEvent, JobRunId: issue.RunIssue.RunId}) } - failedEvent := reporter.CreateSimpleJobFailedEvent(issue.Issue.OriginalPodState, message, p.clusterContext.GetClusterId(), issue.Issue.Cause) - events = append(events, reporter.EventMessage{Event: failedEvent, JobRunId: issue.Issue.RunId}) + failedEvent := reporter.CreateSimpleJobFailedEvent(issue.RunIssue.PodIssue.OriginalPodState, message, p.clusterContext.GetClusterId(), issue.RunIssue.PodIssue.Cause) + events = append(events, reporter.EventMessage{Event: failedEvent, JobRunId: issue.RunIssue.RunId}) err := p.eventReporter.Report(events) if err != nil { - log.Errorf("Failed to report failed event for job %s because %s", issue.Issue.JobId, err) + log.Errorf("Failed to report failed event for job %s because %s", issue.RunIssue.JobId, err) return } - p.markIssueReported(issue.Issue) + p.markIssueReported(issue.RunIssue) } if issue.CurrentPodState != nil { p.clusterContext.DeletePods([]*v1.Pod{issue.CurrentPodState}) - issue.Issue.DeletionRequested = true + issue.RunIssue.PodIssue.DeletionRequested = true } else { - p.markIssuesResolved(issue.Issue) + p.markIssuesResolved(issue.RunIssue) } } @@ -265,18 +306,18 @@ func (p *PodIssueService) handleNonRetryableJobIssue(issue *issue) { // Special consideration must be taken that most of these pods are somewhat "stuck" in pending. // So can transition to Running/Completed/Failed in the middle of this // We must not return the lease if the pod state changes - as likely it has become "unstuck" -func (p *PodIssueService) handleRetryableJobIssue(issue *issue) { - if !issue.Issue.Reported { - log.Infof("Retryable issue detected for job %s run %s - %s", issue.Issue.JobId, issue.Issue.RunId, issue.Issue.Message) - if issue.Issue.Type == StuckStartingUp || issue.Issue.Type == UnableToSchedule { - event := reporter.CreateJobUnableToScheduleEvent(issue.Issue.OriginalPodState, issue.Issue.Message, p.clusterContext.GetClusterId()) - err := p.eventReporter.Report([]reporter.EventMessage{{Event: event, JobRunId: issue.Issue.RunId}}) +func (p *IssueHandler) handleRetryableJobIssue(issue *issue) { + if !issue.RunIssue.Reported { + log.Infof("Retryable issue detected for job %s run %s - %s", issue.RunIssue.JobId, issue.RunIssue.RunId, issue.RunIssue.PodIssue.Message) + if issue.RunIssue.PodIssue.Type == StuckStartingUp || issue.RunIssue.PodIssue.Type == UnableToSchedule { + event := reporter.CreateJobUnableToScheduleEvent(issue.RunIssue.PodIssue.OriginalPodState, issue.RunIssue.PodIssue.Message, p.clusterContext.GetClusterId()) + err := p.eventReporter.Report([]reporter.EventMessage{{Event: event, JobRunId: issue.RunIssue.RunId}}) if err != nil { log.Errorf("Failure to report stuck pod event %+v because %s", event, err) return } } - p.markIssueReported(issue.Issue) + p.markIssueReported(issue.RunIssue) } if issue.CurrentPodState != nil { @@ -285,34 +326,34 @@ func (p *PodIssueService) handleRetryableJobIssue(issue *issue) { return pod.Status.Phase == v1.PodPending }, true) if err != nil { - log.Errorf("Failed to delete pod of running job %s because %s", issue.Issue.JobId, err) + log.Errorf("Failed to delete pod of running job %s because %s", issue.RunIssue.JobId, err) return } else { - issue.Issue.DeletionRequested = true + issue.RunIssue.PodIssue.DeletionRequested = true } } else { // TODO // When we have our own internal state - we don't need to wait for the pod deletion to complete // We can just mark is to delete in our state and return the lease - jobRunAttempted := issue.Issue.Type != UnableToSchedule - returnLeaseEvent := reporter.CreateReturnLeaseEvent(issue.Issue.OriginalPodState, issue.Issue.Message, p.clusterContext.GetClusterId(), jobRunAttempted) - err := p.eventReporter.Report([]reporter.EventMessage{{Event: returnLeaseEvent, JobRunId: issue.Issue.RunId}}) + jobRunAttempted := issue.RunIssue.PodIssue.Type != UnableToSchedule + returnLeaseEvent := reporter.CreateReturnLeaseEvent(issue.RunIssue.PodIssue.OriginalPodState, issue.RunIssue.PodIssue.Message, p.clusterContext.GetClusterId(), jobRunAttempted) + err := p.eventReporter.Report([]reporter.EventMessage{{Event: returnLeaseEvent, JobRunId: issue.RunIssue.RunId}}) if err != nil { - log.Errorf("Failed to return lease for job %s because %s", issue.Issue.JobId, err) + log.Errorf("Failed to return lease for job %s because %s", issue.RunIssue.JobId, err) return } - p.markIssuesResolved(issue.Issue) + p.markIssuesResolved(issue.RunIssue) } } func hasPodIssueSelfResolved(issue *issue) bool { - if issue == nil || issue.Issue == nil { + if issue == nil || issue.RunIssue == nil || issue.RunIssue.PodIssue == nil { return true } - isStuckStartingUpAndResolvable := issue.Issue.Type == StuckStartingUp && - (issue.Issue.Retryable || (!issue.Issue.Retryable && !issue.Issue.Reported)) - if issue.Issue.Type == UnableToSchedule || isStuckStartingUpAndResolvable { + isStuckStartingUpAndResolvable := issue.RunIssue.PodIssue.Type == StuckStartingUp && + (issue.RunIssue.PodIssue.Retryable || (!issue.RunIssue.PodIssue.Retryable && !issue.RunIssue.Reported)) + if issue.RunIssue.PodIssue.Type == UnableToSchedule || isStuckStartingUpAndResolvable { // If pod has disappeared - don't consider it resolved as we still need to report the issue if issue.CurrentPodState == nil { return false @@ -324,7 +365,7 @@ func hasPodIssueSelfResolved(issue *issue) bool { } // Pod has started running, and we haven't requested deletion - let it continue - if issue.CurrentPodState.Status.Phase == v1.PodRunning && !issue.Issue.DeletionRequested { + if issue.CurrentPodState.Status.Phase == v1.PodRunning && !issue.RunIssue.PodIssue.DeletionRequested { return true } // TODO There is an edge case here where the pod has started running but we have requested deletion @@ -344,19 +385,107 @@ func createStuckPodMessage(retryable bool, originalMessage string) string { return fmt.Sprintf("Unable to schedule pod with unrecoverable problem, Armada will not retry.\n%s", originalMessage) } -func (p *PodIssueService) handleDeletedPod(pod *v1.Pod) { +func (p *IssueHandler) handleDeletedPod(pod *v1.Pod) { jobId := util.ExtractJobId(pod) if jobId != "" { isUnexpectedDeletion := !util.IsMarkedForDeletion(pod) && !util.IsPodFinishedAndReported(pod) if isUnexpectedDeletion { - p.registerIssue(&podIssue{ - OriginalPodState: pod, - JobId: jobId, - RunId: util.ExtractJobRunId(pod), - Message: "Pod was unexpectedly deleted", - Retryable: false, - Reported: false, - Type: ExternallyDeleted, + p.registerIssue(&runIssue{ + JobId: jobId, + RunId: util.ExtractJobRunId(pod), + PodIssue: &podIssue{ + OriginalPodState: pod, + Message: "Pod was unexpectedly deleted", + Retryable: false, + Type: ExternallyDeleted, + }, + }) + } + } +} + +func (p *IssueHandler) handleReconciliationIssue(issue *issue) { + if issue.RunIssue.ReconciliationIssue == nil { + log.Warnf("unexpected trying to process an issue as a reconciliation issue for job %s run %s", issue.RunIssue.JobId, issue.RunIssue.RunId) + p.markIssuesResolved(issue.RunIssue) + return + } + + currentRunState := p.jobRunState.Get(issue.RunIssue.RunId) + if currentRunState == nil { + // No run for the run id - so there isn't a reconciliation issue + p.markIssuesResolved(issue.RunIssue) + return + } + + if issue.CurrentPodState != nil { + p.markIssuesResolved(issue.RunIssue) + return + } + + if issue.RunIssue.ReconciliationIssue.OriginalRunState.Phase != currentRunState.Phase || currentRunState.CancelRequested || currentRunState.PreemptionRequested { + // State of the run has changed - resolve + // If there is still an issue, it'll be re-detected + p.markIssuesResolved(issue.RunIssue) + return + } + + timeSinceInitialDetection := p.clock.Now().Sub(issue.RunIssue.ReconciliationIssue.InitialDetectionTime) + + // If there is an active run and the associated pod has been missing for more than a given time period, report the run as failed + if currentRunState.Phase == job.Active && timeSinceInitialDetection > p.stateChecksConfig.DeadlineForActivePodConsideredMissing { + log.Infof("Pod missing for active run detected for job %s run %s", issue.RunIssue.JobId, issue.RunIssue.RunId) + + event := &api.JobFailedEvent{ + JobId: currentRunState.Meta.JobId, + JobSetId: currentRunState.Meta.JobSet, + Queue: currentRunState.Meta.Queue, + Created: p.clock.Now(), + ClusterId: p.clusterContext.GetClusterId(), + Reason: fmt.Sprintf("Pod is unexpectedly missing in Kubernetes"), + Cause: api.Cause_Error, + } + + err := p.eventReporter.Report([]reporter.EventMessage{{Event: event, JobRunId: issue.RunIssue.RunId}}) + if err != nil { + log.Errorf("Failure to report failed event %+v because %s", event, err) + return + } + + p.markIssueReported(issue.RunIssue) + p.markIssuesResolved(issue.RunIssue) + } else if currentRunState.Phase == job.SuccessfulSubmission && timeSinceInitialDetection > p.stateChecksConfig.DeadlineForSubmittedPodConsideredMissing { + // If a pod hasn't shown up after a successful submission for a given time period, delete it from the run state + // This will cause it to be re-leased and submitted again + // If the issue is we are out of sync with kubernetes, the second submission will fail and kill the job + p.jobRunState.Delete(currentRunState.Meta.RunId) + p.markIssuesResolved(issue.RunIssue) + } +} + +func (p *IssueHandler) detectReconciliationIssues(pods []*v1.Pod) { + runs := p.jobRunState.GetAllWithFilter(func(state *job.RunState) bool { + return (state.Phase == job.Active || state.Phase == job.SuccessfulSubmission) && !state.CancelRequested && !state.PreemptionRequested + }) + + runIdsToPod := make(map[string]*v1.Pod, len(pods)) + for _, pod := range pods { + runId := util.ExtractJobRunId(pod) + if runId != "" { + runIdsToPod[runId] = pod + } + } + + for _, run := range runs { + _, present := runIdsToPod[run.Meta.RunId] + if !present { + p.registerIssue(&runIssue{ + JobId: run.Meta.JobId, + RunId: run.Meta.RunId, + ReconciliationIssue: &reconciliationIssue{ + InitialDetectionTime: p.clock.Now(), + OriginalRunState: run.DeepCopy(), + }, }) } } diff --git a/internal/executor/service/pod_issue_handler_test.go b/internal/executor/service/pod_issue_handler_test.go index bab9ea8bb2c..45a9168cdd5 100644 --- a/internal/executor/service/pod_issue_handler_test.go +++ b/internal/executor/service/pod_issue_handler_test.go @@ -6,8 +6,11 @@ import ( "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/clock" + "github.com/armadaproject/armada/internal/executor/configuration" fakecontext "github.com/armadaproject/armada/internal/executor/context/fake" + "github.com/armadaproject/armada/internal/executor/job" "github.com/armadaproject/armada/internal/executor/reporter" "github.com/armadaproject/armada/internal/executor/reporter/mocks" "github.com/armadaproject/armada/internal/executor/util" @@ -15,7 +18,7 @@ import ( ) func TestPodIssueService_DoesNothingIfNoPodsAreFound(t *testing.T) { - podIssueService, _, eventsReporter := setupTestComponents() + podIssueService, _, _, eventsReporter := setupTestComponents([]*job.RunState{}) podIssueService.HandlePodIssues() @@ -23,7 +26,7 @@ func TestPodIssueService_DoesNothingIfNoPodsAreFound(t *testing.T) { } func TestPodIssueService_DoesNothingIfNoStuckPodsAreFound(t *testing.T) { - podIssueService, fakeClusterContext, eventsReporter := setupTestComponents() + podIssueService, _, fakeClusterContext, eventsReporter := setupTestComponents([]*job.RunState{}) runningPod := makeRunningPod(false) addPod(t, fakeClusterContext, runningPod) @@ -35,7 +38,7 @@ func TestPodIssueService_DoesNothingIfNoStuckPodsAreFound(t *testing.T) { } func TestPodIssueService_DeletesPodAndReportsFailed_IfStuckAndUnretryable(t *testing.T) { - podIssueService, fakeClusterContext, eventsReporter := setupTestComponents() + podIssueService, _, fakeClusterContext, eventsReporter := setupTestComponents([]*job.RunState{}) unretryableStuckPod := makeUnretryableStuckPod(false) addPod(t, fakeClusterContext, unretryableStuckPod) @@ -54,7 +57,7 @@ func TestPodIssueService_DeletesPodAndReportsFailed_IfStuckAndUnretryable(t *tes } func TestPodIssueService_DeletesPodAndReportsFailed_IfStuckTerminating(t *testing.T) { - podIssueService, fakeClusterContext, eventsReporter := setupTestComponents() + podIssueService, _, fakeClusterContext, eventsReporter := setupTestComponents([]*job.RunState{}) terminatingPod := makeTerminatingPod(false) addPod(t, fakeClusterContext, terminatingPod) @@ -70,7 +73,7 @@ func TestPodIssueService_DeletesPodAndReportsFailed_IfStuckTerminating(t *testin } func TestPodIssueService_DeletesPodAndReportsLeaseReturned_IfRetryableStuckPod(t *testing.T) { - podIssueService, fakeClusterContext, eventsReporter := setupTestComponents() + podIssueService, _, fakeClusterContext, eventsReporter := setupTestComponents([]*job.RunState{}) retryableStuckPod := makeRetryableStuckPod(false) addPod(t, fakeClusterContext, retryableStuckPod) @@ -95,7 +98,7 @@ func TestPodIssueService_DeletesPodAndReportsLeaseReturned_IfRetryableStuckPod(t } func TestPodIssueService_ReportsFailed_IfDeletedExternally(t *testing.T) { - podIssueService, fakeClusterContext, eventsReporter := setupTestComponents() + podIssueService, _, fakeClusterContext, eventsReporter := setupTestComponents([]*job.RunState{}) runningPod := makeRunningPod(false) fakeClusterContext.SimulateDeletionEvent(runningPod) @@ -108,17 +111,108 @@ func TestPodIssueService_ReportsFailed_IfDeletedExternally(t *testing.T) { assert.Equal(t, failedEvent.JobId, util.ExtractJobId(runningPod)) } -func setupTestComponents() (*PodIssueService, *fakecontext.SyncFakeClusterContext, *mocks.FakeEventReporter) { +func TestPodIssueService_ReportsFailed_IfPodOfActiveRunGoesMissing(t *testing.T) { + baseTime := time.Now() + fakeClock := clock.NewFakeClock(baseTime) + podIssueService, _, _, eventsReporter := setupTestComponents([]*job.RunState{createRunState("job-1", "run-1", job.Active)}) + podIssueService.clock = fakeClock + + podIssueService.HandlePodIssues() + // Nothing should happen, until the issue has been seen for a configured amount of time + assert.Len(t, eventsReporter.ReceivedEvents, 0) + + fakeClock.SetTime(baseTime.Add(10 * time.Minute)) + podIssueService.HandlePodIssues() + // Reports Failed + assert.Len(t, eventsReporter.ReceivedEvents, 1) + failedEvent, ok := eventsReporter.ReceivedEvents[0].Event.(*api.JobFailedEvent) + assert.True(t, ok) + assert.Equal(t, failedEvent.JobId, "job-1") +} + +func TestPodIssueService_DoesNothing_IfMissingPodOfActiveRunReturns(t *testing.T) { + baseTime := time.Now() + fakeClock := clock.NewFakeClock(baseTime) + runningPod := makeRunningPod(false) + runState := createRunState(util.ExtractJobId(runningPod), util.ExtractJobRunId(runningPod), job.Active) + podIssueService, _, fakeClusterContext, eventsReporter := setupTestComponents([]*job.RunState{runState}) + podIssueService.clock = fakeClock + + podIssueService.HandlePodIssues() + // Nothing should happen, until the issue has been seen for a configured amount of time + assert.Len(t, eventsReporter.ReceivedEvents, 0) + + addPod(t, fakeClusterContext, runningPod) + fakeClock.SetTime(baseTime.Add(10 * time.Minute)) + podIssueService.HandlePodIssues() + assert.Len(t, eventsReporter.ReceivedEvents, 0) +} + +func TestPodIssueService_DeleteRunFromRunState_IfSubmittedPodNeverAppears(t *testing.T) { + baseTime := time.Now() + fakeClock := clock.NewFakeClock(baseTime) + podIssueService, runStateStore, _, eventsReporter := setupTestComponents([]*job.RunState{createRunState("job-1", "run-1", job.SuccessfulSubmission)}) + podIssueService.clock = fakeClock + + podIssueService.HandlePodIssues() + // Nothing should happen, until the issue has been seen for a configured amount of time + assert.Len(t, eventsReporter.ReceivedEvents, 0) + assert.Len(t, runStateStore.GetAll(), 1) + + fakeClock.SetTime(baseTime.Add(20 * time.Minute)) + podIssueService.HandlePodIssues() + assert.Len(t, eventsReporter.ReceivedEvents, 0) + // Pod has been missing for greater than configured period, run should get deleted + assert.Len(t, runStateStore.GetAll(), 0) +} + +func TestPodIssueService_DoesNothing_IfSubmittedPodAppears(t *testing.T) { + baseTime := time.Now() + fakeClock := clock.NewFakeClock(baseTime) + runningPod := makeRunningPod(false) + runState := createRunState(util.ExtractJobId(runningPod), util.ExtractJobRunId(runningPod), job.SuccessfulSubmission) + podIssueService, runStateStore, fakeClusterContext, eventsReporter := setupTestComponents([]*job.RunState{runState}) + podIssueService.clock = fakeClock + + podIssueService.HandlePodIssues() + // Nothing should happen, until the issue has been seen for a configured amount of time + assert.Len(t, eventsReporter.ReceivedEvents, 0) + assert.Len(t, runStateStore.GetAll(), 1) + + addPod(t, fakeClusterContext, runningPod) + fakeClock.SetTime(baseTime.Add(20 * time.Minute)) + podIssueService.HandlePodIssues() + assert.Len(t, runStateStore.GetAll(), 1) +} + +func setupTestComponents(initialRunState []*job.RunState) (*IssueHandler, *job.JobRunStateStore, *fakecontext.SyncFakeClusterContext, *mocks.FakeEventReporter) { fakeClusterContext := fakecontext.NewSyncFakeClusterContext() eventReporter := mocks.NewFakeEventReporter() pendingPodChecker := makePodChecker() - - podIssueHandler := NewPodIssueService( + runStateStore := job.NewJobRunStateStoreWithInitialState(initialRunState) + stateChecksConfig := configuration.StateChecksConfiguration{ + DeadlineForSubmittedPodConsideredMissing: time.Minute * 15, + DeadlineForActivePodConsideredMissing: time.Minute * 5, + } + + podIssueHandler := NewIssueHandler( + runStateStore, fakeClusterContext, eventReporter, + stateChecksConfig, pendingPodChecker, time.Minute*3, ) - return podIssueHandler, fakeClusterContext, eventReporter + return podIssueHandler, runStateStore, fakeClusterContext, eventReporter +} + +func createRunState(jobId string, runId string, phase job.RunPhase) *job.RunState { + return &job.RunState{ + Phase: phase, + Meta: &job.RunMeta{ + JobId: jobId, + RunId: runId, + }, + } }