From de552ca82e8461bcebf27ac2c0c50ee8b9b795a7 Mon Sep 17 00:00:00 2001 From: Sherif Akoush Date: Fri, 4 Aug 2023 13:56:18 +0100 Subject: [PATCH 01/10] separate out logic from lock in modelsync --- scheduler/pkg/envoy/processor/incremental.go | 10 +++++++--- scheduler/pkg/envoy/processor/incremental_test.go | 2 +- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/scheduler/pkg/envoy/processor/incremental.go b/scheduler/pkg/envoy/processor/incremental.go index bf10ae93ab..3921197172 100644 --- a/scheduler/pkg/envoy/processor/incremental.go +++ b/scheduler/pkg/envoy/processor/incremental.go @@ -623,7 +623,7 @@ func (p *IncrementalProcessor) modelUpdate(modelName string) error { ) if p.batchTrigger == nil && p.runEnvoyBatchUpdates { - p.batchTrigger = time.AfterFunc(p.batchWaitMillis, p.modelSync) + p.batchTrigger = time.AfterFunc(p.batchWaitMillis, p.modelSyncWithLock) } return nil @@ -640,10 +640,14 @@ func (p *IncrementalProcessor) callVersionCleanupIfNeeded(modelName string) { } } -func (p *IncrementalProcessor) modelSync() { - logger := p.logger.WithField("func", "modelSync") +func (p *IncrementalProcessor) modelSyncWithLock() { p.mu.Lock() defer p.mu.Unlock() + p.modelSync() +} + +func (p *IncrementalProcessor) modelSync() { + logger := p.logger.WithField("func", "modelSync") envoyErr := p.updateEnvoy() serverReplicaState := store.Available diff --git a/scheduler/pkg/envoy/processor/incremental_test.go b/scheduler/pkg/envoy/processor/incremental_test.go index bc9fd8ca9b..9efbcc46d4 100644 --- a/scheduler/pkg/envoy/processor/incremental_test.go +++ b/scheduler/pkg/envoy/processor/incremental_test.go @@ -766,7 +766,7 @@ func TestModelSync(t *testing.T) { for _, op := range test.ops { op(inc, g) } - inc.modelSync() + inc.modelSyncWithLock() for modelName, modelReplicas := range test.expectedReplicaStats { model, err := inc.modelStore.GetModel(modelName) g.Expect(err).To(BeNil()) From d722fa2d76d8f96a8b5524b767f83a5c9708fe43 Mon Sep 17 00:00:00 2001 From: Sherif Akoush Date: Fri, 4 Aug 2023 15:09:14 +0100 Subject: [PATCH 02/10] add manual trigger --- scheduler/pkg/envoy/processor/incremental.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/scheduler/pkg/envoy/processor/incremental.go b/scheduler/pkg/envoy/processor/incremental.go index 3921197172..7e1cb49581 100644 --- a/scheduler/pkg/envoy/processor/incremental.go +++ b/scheduler/pkg/envoy/processor/incremental.go @@ -63,6 +63,7 @@ type IncrementalProcessor struct { batchWaitMillis time.Duration pendingModelVersions []*pendingModelVersion versionCleaner cleaner.ModelVersionCleaner + batchTriggerManual *time.Time } type pendingModelVersion struct { @@ -94,6 +95,7 @@ func NewIncrementalProcessor( batchTrigger: nil, batchWaitMillis: util.EnvoyUpdateDefaultBatchWaitMillis, versionCleaner: versionCleaner, + batchTriggerManual: nil, } err := ip.setListeners() @@ -622,6 +624,13 @@ func (p *IncrementalProcessor) modelUpdate(modelName string) error { }, ) + if p.batchTriggerManual == nil { + *p.batchTriggerManual = time.Now() + } else if time.Since(*p.batchTriggerManual) > p.batchWaitMillis { + p.modelSync() + p.batchTriggerManual = nil + } + // we still need to enable the cron timer as there is no guarantee that the manual trigger will be called if p.batchTrigger == nil && p.runEnvoyBatchUpdates { p.batchTrigger = time.AfterFunc(p.batchWaitMillis, p.modelSyncWithLock) } From 5c7c18069e64179fcee6bc722521f997b509e0bf Mon Sep 17 00:00:00 2001 From: Sherif Akoush Date: Fri, 4 Aug 2023 15:23:48 +0100 Subject: [PATCH 03/10] fix null pointer exception --- scheduler/pkg/envoy/processor/incremental.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/scheduler/pkg/envoy/processor/incremental.go b/scheduler/pkg/envoy/processor/incremental.go index 7e1cb49581..a7e82bfaa9 100644 --- a/scheduler/pkg/envoy/processor/incremental.go +++ b/scheduler/pkg/envoy/processor/incremental.go @@ -625,8 +625,13 @@ func (p *IncrementalProcessor) modelUpdate(modelName string) error { ) if p.batchTriggerManual == nil { + p.batchTriggerManual = new(time.Time) *p.batchTriggerManual = time.Now() } else if time.Since(*p.batchTriggerManual) > p.batchWaitMillis { + // we have waited long enough so we can trigger the batch update + // we do this inline so that we do not require to release and reacquire the lock + // which under heavy load there is no guarantee of order and therefore could lead + // to starvation of the batch update p.modelSync() p.batchTriggerManual = nil } From 1f8b3756829053ce0c17494c6433d3276d8857ac Mon Sep 17 00:00:00 2001 From: Sherif Akoush Date: Fri, 4 Aug 2023 15:25:03 +0100 Subject: [PATCH 04/10] add log message --- scheduler/pkg/envoy/processor/incremental.go | 1 + 1 file changed, 1 insertion(+) diff --git a/scheduler/pkg/envoy/processor/incremental.go b/scheduler/pkg/envoy/processor/incremental.go index a7e82bfaa9..d12e3b4ce4 100644 --- a/scheduler/pkg/envoy/processor/incremental.go +++ b/scheduler/pkg/envoy/processor/incremental.go @@ -662,6 +662,7 @@ func (p *IncrementalProcessor) modelSyncWithLock() { func (p *IncrementalProcessor) modelSync() { logger := p.logger.WithField("func", "modelSync") + logger.Debugf("Calling model sync") envoyErr := p.updateEnvoy() serverReplicaState := store.Available From cae199eb8fd2a70e975c3719914e759ed59bd547 Mon Sep 17 00:00:00 2001 From: Sherif Akoush Date: Wed, 9 Aug 2023 09:39:34 +0100 Subject: [PATCH 05/10] Increase hub messages + fix deadlock --- scheduler/pkg/agent/server.go | 2 +- scheduler/pkg/envoy/processor/incremental.go | 40 +++++++++++++------- scheduler/pkg/store/experiment/store.go | 2 +- scheduler/pkg/store/pipeline/store.go | 2 +- 4 files changed, 30 insertions(+), 16 deletions(-) diff --git a/scheduler/pkg/agent/server.go b/scheduler/pkg/agent/server.go index fc83f41f9b..a14d1a582c 100644 --- a/scheduler/pkg/agent/server.go +++ b/scheduler/pkg/agent/server.go @@ -43,7 +43,7 @@ import ( const ( grpcMaxConcurrentStreams = 1_000_000 - pendingSyncsQueueSize int = 10 + pendingSyncsQueueSize int = 1000 modelEventHandlerName = "agent.server.models" modelScalingCoolingDownSeconds = 60 // this is currently used in scale down events serverDrainingExtraWaitMillis = 500 diff --git a/scheduler/pkg/envoy/processor/incremental.go b/scheduler/pkg/envoy/processor/incremental.go index d12e3b4ce4..3528dd478e 100644 --- a/scheduler/pkg/envoy/processor/incremental.go +++ b/scheduler/pkg/envoy/processor/incremental.go @@ -41,7 +41,7 @@ import ( ) const ( - pendingSyncsQueueSize int = 100 + pendingSyncsQueueSize int = 1000 modelEventHandlerName = "incremental.processor.models" experimentEventHandlerName = "incremental.processor.experiments" pipelineEventHandlerName = "incremental.processor.pipelines" @@ -544,13 +544,15 @@ func (p *IncrementalProcessor) modelUpdate(modelName string) error { p.mu.Lock() defer p.mu.Unlock() p.modelStore.LockModel(modelName) - defer p.modelStore.UnlockModel(modelName) + + logger.Debugf("Calling model update for %s", modelName) model, err := p.modelStore.GetModel(modelName) if err != nil { logger.WithError(err).Warnf("Failed to sync model %s", modelName) if err := p.removeRouteForServerInEnvoyCache(modelName); err != nil { logger.WithError(err).Errorf("Failed to remove model route from envoy %s", modelName) + p.modelStore.UnlockModel(modelName) return err } } @@ -559,6 +561,7 @@ func (p *IncrementalProcessor) modelUpdate(modelName string) error { if err := p.removeRouteForServerInEnvoyCache(modelName); err != nil { logger.WithError(err).Errorf("Failed to remove model route from envoy %s", modelName) } + p.modelStore.UnlockModel(modelName) return p.updateEnvoy() // in practice we should not be here } @@ -568,6 +571,7 @@ func (p *IncrementalProcessor) modelUpdate(modelName string) error { if err := p.removeRouteForServerInEnvoyCache(modelName); err != nil { logger.WithError(err).Errorf("Failed to remove model route from envoy %s", modelName) } + p.modelStore.UnlockModel(modelName) return p.updateEnvoy() // in practice we should not be here } @@ -579,6 +583,7 @@ func (p *IncrementalProcessor) modelUpdate(modelName string) error { logger.Debugf("sync: Model can't receive traffic - removing for %s", modelName) if err := p.removeRouteForServerInEnvoyCache(modelName); err != nil { logger.WithError(err).Errorf("Failed to remove model route from envoy %s", modelName) + p.modelStore.UnlockModel(modelName) return err } modelRemoved = true @@ -600,6 +605,7 @@ func (p *IncrementalProcessor) modelUpdate(modelName string) error { // Remove routes before we recreate if err := p.removeRouteForServerInEnvoyCache(modelName); err != nil { logger.Debugf("Failed to remove route before starting update for %s", modelName) + p.modelStore.UnlockModel(modelName) return err } @@ -610,6 +616,7 @@ func (p *IncrementalProcessor) modelUpdate(modelName string) error { logger.WithError(err).Errorf("Failed to add traffic for model %s", modelName) if err := p.removeRouteForServerInEnvoyCache(modelName); err != nil { logger.WithError(err).Errorf("Failed to remove model route from envoy %s", modelName) + p.modelStore.UnlockModel(modelName) return err } } @@ -623,18 +630,9 @@ func (p *IncrementalProcessor) modelUpdate(modelName string) error { version: latestModel.GetVersion(), }, ) + p.modelStore.UnlockModel(modelName) + p.triggerModelSyncIfNeeded() - if p.batchTriggerManual == nil { - p.batchTriggerManual = new(time.Time) - *p.batchTriggerManual = time.Now() - } else if time.Since(*p.batchTriggerManual) > p.batchWaitMillis { - // we have waited long enough so we can trigger the batch update - // we do this inline so that we do not require to release and reacquire the lock - // which under heavy load there is no guarantee of order and therefore could lead - // to starvation of the batch update - p.modelSync() - p.batchTriggerManual = nil - } // we still need to enable the cron timer as there is no guarantee that the manual trigger will be called if p.batchTrigger == nil && p.runEnvoyBatchUpdates { p.batchTrigger = time.AfterFunc(p.batchWaitMillis, p.modelSyncWithLock) @@ -654,6 +652,20 @@ func (p *IncrementalProcessor) callVersionCleanupIfNeeded(modelName string) { } } +func (p *IncrementalProcessor) triggerModelSyncIfNeeded() { + if p.batchTriggerManual == nil { + p.batchTriggerManual = new(time.Time) + *p.batchTriggerManual = time.Now() + } else if time.Since(*p.batchTriggerManual) > p.batchWaitMillis { + // we have waited long enough so we can trigger the batch update + // we do this inline so that we do not require to release and reacquire the lock + // which under heavy load there is no guarantee of order and therefore could lead + // to starvation of the batch update + p.modelSync() + *p.batchTriggerManual = time.Now() + } +} + func (p *IncrementalProcessor) modelSyncWithLock() { p.mu.Lock() defer p.mu.Unlock() @@ -689,6 +701,7 @@ func (p *IncrementalProcessor) modelSync() { continue } + logger.Debugf("sherif: getting server model %s", mv.name) s, err := p.modelStore.GetServer(v.Server(), false, false) if err != nil { logger.Debugf("Failed to get server for model %s server %s", mv.name, v.Server()) @@ -747,4 +760,5 @@ func (p *IncrementalProcessor) modelSync() { // Reset p.batchTrigger = nil p.pendingModelVersions = nil + logger.Debugf("Done modelSync") } diff --git a/scheduler/pkg/store/experiment/store.go b/scheduler/pkg/store/experiment/store.go index d2ec00bfe8..9fb8b30743 100644 --- a/scheduler/pkg/store/experiment/store.go +++ b/scheduler/pkg/store/experiment/store.go @@ -31,7 +31,7 @@ import ( ) const ( - pendingSyncsQueueSize int = 100 + pendingSyncsQueueSize int = 1000 experimentStartEventSource = "experiment.store.start" experimentStopEventSource = "experiment.store.stop" modelEventHandlerName = "experiment.store.models" diff --git a/scheduler/pkg/store/pipeline/store.go b/scheduler/pkg/store/pipeline/store.go index bd7dcc9f05..301eb90dcb 100644 --- a/scheduler/pkg/store/pipeline/store.go +++ b/scheduler/pkg/store/pipeline/store.go @@ -31,7 +31,7 @@ import ( ) const ( - pendingSyncsQueueSize int = 100 + pendingSyncsQueueSize int = 1000 addPipelineEventSource = "pipeline.store.addpipeline" removePipelineEventSource = "pipeline.store.removepipeline" setStatusPipelineEventSource = "pipeline.store.setstatus" From b2c866484948e9633382419bf92ed1855cd5ad58 Mon Sep 17 00:00:00 2001 From: Sherif Akoush Date: Wed, 9 Aug 2023 10:47:01 +0100 Subject: [PATCH 06/10] optimise logic --- scheduler/pkg/envoy/processor/incremental.go | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/scheduler/pkg/envoy/processor/incremental.go b/scheduler/pkg/envoy/processor/incremental.go index 3528dd478e..f29a44f487 100644 --- a/scheduler/pkg/envoy/processor/incremental.go +++ b/scheduler/pkg/envoy/processor/incremental.go @@ -631,11 +631,13 @@ func (p *IncrementalProcessor) modelUpdate(modelName string) error { }, ) p.modelStore.UnlockModel(modelName) - p.triggerModelSyncIfNeeded() + triggered := p.triggerModelSyncIfNeeded() - // we still need to enable the cron timer as there is no guarantee that the manual trigger will be called - if p.batchTrigger == nil && p.runEnvoyBatchUpdates { - p.batchTrigger = time.AfterFunc(p.batchWaitMillis, p.modelSyncWithLock) + if !triggered { + // we still need to enable the cron timer as there is no guarantee that the manual trigger will be called + if p.batchTrigger == nil && p.runEnvoyBatchUpdates { + p.batchTrigger = time.AfterFunc(p.batchWaitMillis, p.modelSyncWithLock) + } } return nil @@ -652,18 +654,21 @@ func (p *IncrementalProcessor) callVersionCleanupIfNeeded(modelName string) { } } -func (p *IncrementalProcessor) triggerModelSyncIfNeeded() { +func (p *IncrementalProcessor) triggerModelSyncIfNeeded() bool { if p.batchTriggerManual == nil { p.batchTriggerManual = new(time.Time) *p.batchTriggerManual = time.Now() - } else if time.Since(*p.batchTriggerManual) > p.batchWaitMillis { + } + if time.Since(*p.batchTriggerManual) > p.batchWaitMillis { // we have waited long enough so we can trigger the batch update // we do this inline so that we do not require to release and reacquire the lock // which under heavy load there is no guarantee of order and therefore could lead // to starvation of the batch update p.modelSync() *p.batchTriggerManual = time.Now() + return true } + return false } func (p *IncrementalProcessor) modelSyncWithLock() { From b3de961004b5d5b2626298d31175414908f89c0f Mon Sep 17 00:00:00 2001 From: Sherif Akoush Date: Fri, 11 Aug 2023 10:09:40 +0100 Subject: [PATCH 07/10] remove extra log message --- scheduler/pkg/envoy/processor/incremental.go | 1 - 1 file changed, 1 deletion(-) diff --git a/scheduler/pkg/envoy/processor/incremental.go b/scheduler/pkg/envoy/processor/incremental.go index f29a44f487..b0404f6fdd 100644 --- a/scheduler/pkg/envoy/processor/incremental.go +++ b/scheduler/pkg/envoy/processor/incremental.go @@ -706,7 +706,6 @@ func (p *IncrementalProcessor) modelSync() { continue } - logger.Debugf("sherif: getting server model %s", mv.name) s, err := p.modelStore.GetServer(v.Server(), false, false) if err != nil { logger.Debugf("Failed to get server for model %s server %s", mv.name, v.Server()) From a4dff31acd32ca0be17187e458384c0780de1171 Mon Sep 17 00:00:00 2001 From: Sherif Akoush Date: Fri, 11 Aug 2023 13:16:42 +0100 Subject: [PATCH 08/10] do not reset server if we have model replicas --- scheduler/pkg/agent/server_test.go | 2 +- scheduler/pkg/scheduler/scheduler.go | 12 ++++++++++-- scheduler/pkg/scheduler/scheduler_test.go | 2 +- scheduler/pkg/store/experiment/store_test.go | 2 +- scheduler/pkg/store/memory_status.go | 8 +++++--- scheduler/pkg/store/pipeline/status_test.go | 2 +- scheduler/pkg/store/store.go | 2 +- 7 files changed, 20 insertions(+), 10 deletions(-) diff --git a/scheduler/pkg/agent/server_test.go b/scheduler/pkg/agent/server_test.go index 2e0166e41c..70bd8dfcf5 100644 --- a/scheduler/pkg/agent/server_test.go +++ b/scheduler/pkg/agent/server_test.go @@ -38,7 +38,7 @@ type mockStore struct { var _ store.ModelStore = (*mockStore)(nil) -func (m *mockStore) FailedScheduling(modelVersion *store.ModelVersion, reason string) { +func (m *mockStore) FailedScheduling(modelVersion *store.ModelVersion, reason string, reset bool) { } func (m *mockStore) UpdateModel(config *pbs.LoadModelRequest) error { diff --git a/scheduler/pkg/scheduler/scheduler.go b/scheduler/pkg/scheduler/scheduler.go index 431749b205..5fbb81ad37 100644 --- a/scheduler/pkg/scheduler/scheduler.go +++ b/scheduler/pkg/scheduler/scheduler.go @@ -149,9 +149,11 @@ func (s *SimpleScheduler) scheduleToServer(modelName string) error { filteredServers, debugTrail = s.filterServers(latestModel, servers, debugTrail) s.sortServers(latestModel, filteredServers) ok := false - logger.Debugf("Model %s candidate servers %v", modelName, filteredServers) + resetServerInCaseOfError := true + logger.Debugf("Model %s with desired replicas %d candidate servers %v", modelName, latestModel.DesiredReplicas(), filteredServers) // For each server filter and sort replicas and attempt schedule if enough replicas for _, candidateServer := range filteredServers { + logger.Debugf("Candidate server %s", candidateServer.Name) var candidateReplicas *sorters.CandidateServer // we need a lock here, we could have many goroutines at sorting @@ -159,6 +161,12 @@ func (s *SimpleScheduler) scheduleToServer(modelName string) error { s.muSortAndUpdate.Lock() candidateReplicas, debugTrail = s.filterReplicas(latestModel, candidateServer, debugTrail) if len(candidateReplicas.ChosenReplicas) < latestModel.DesiredReplicas() { + if len(candidateReplicas.ChosenReplicas) > 0 { + // in this case we have some replicas but not enough, typically in the case where we are scaling up + // beyond the number of the server replicas we have + // therefore we do not want to reset the server as we will lose the replicas we have + resetServerInCaseOfError = false + } s.muSortAndUpdate.Unlock() continue } @@ -175,7 +183,7 @@ func (s *SimpleScheduler) scheduleToServer(modelName string) error { } if !ok { failureErrMsg := fmt.Sprintf("failed to schedule model %s. %v", modelName, debugTrail) - s.store.FailedScheduling(latestModel, failureErrMsg) + s.store.FailedScheduling(latestModel, failureErrMsg, resetServerInCaseOfError) return fmt.Errorf(failureErrMsg) } } diff --git a/scheduler/pkg/scheduler/scheduler_test.go b/scheduler/pkg/scheduler/scheduler_test.go index 0096265319..1081f1f5af 100644 --- a/scheduler/pkg/scheduler/scheduler_test.go +++ b/scheduler/pkg/scheduler/scheduler_test.go @@ -38,7 +38,7 @@ type mockStore struct { var _ store.ModelStore = (*mockStore)(nil) -func (f mockStore) FailedScheduling(modelVersion *store.ModelVersion, reason string) { +func (f mockStore) FailedScheduling(modelVersion *store.ModelVersion, reason string, reset bool) { } func (f mockStore) UnloadVersionModels(modelKey string, version uint32) (bool, error) { diff --git a/scheduler/pkg/store/experiment/store_test.go b/scheduler/pkg/store/experiment/store_test.go index 528fdea6ac..809641a224 100644 --- a/scheduler/pkg/store/experiment/store_test.go +++ b/scheduler/pkg/store/experiment/store_test.go @@ -399,7 +399,7 @@ func (f fakeModelStore) DrainServerReplica(serverName string, replicaIdx int) ([ panic("implement me") } -func (f fakeModelStore) FailedScheduling(modelVersion *store.ModelVersion, reason string) { +func (f fakeModelStore) FailedScheduling(modelVersion *store.ModelVersion, reason string, reset bool) { panic("implement me") } diff --git a/scheduler/pkg/store/memory_status.go b/scheduler/pkg/store/memory_status.go index 89eceb4d2f..e262259a5a 100644 --- a/scheduler/pkg/store/memory_status.go +++ b/scheduler/pkg/store/memory_status.go @@ -110,7 +110,7 @@ func updateModelState(isLatest bool, modelVersion *ModelVersion, prevModelVersio } } -func (m *MemoryStore) FailedScheduling(modelVersion *ModelVersion, reason string) { +func (m *MemoryStore) FailedScheduling(modelVersion *ModelVersion, reason string, reset bool) { modelVersion.state = ModelStatus{ State: ScheduleFailed, Reason: reason, @@ -118,8 +118,10 @@ func (m *MemoryStore) FailedScheduling(modelVersion *ModelVersion, reason string AvailableReplicas: modelVersion.state.AvailableReplicas, UnavailableReplicas: modelVersion.GetModel().GetDeploymentSpec().GetReplicas() - modelVersion.state.AvailableReplicas, } - // make sure we reset server - modelVersion.server = "" + // make sure we reset server but only if there are no available replicas + if reset { + modelVersion.server = "" + } m.eventHub.PublishModelEvent( modelFailureEventSource, coordinator.ModelEventMsg{ diff --git a/scheduler/pkg/store/pipeline/status_test.go b/scheduler/pkg/store/pipeline/status_test.go index ea74c9e34f..f4dd15f06b 100644 --- a/scheduler/pkg/store/pipeline/status_test.go +++ b/scheduler/pkg/store/pipeline/status_test.go @@ -95,7 +95,7 @@ func (f fakeModelStore) RemoveServerReplica(serverName string, replicaIdx int) ( panic("implement me") } -func (f fakeModelStore) FailedScheduling(modelVersion *store.ModelVersion, reason string) { +func (f fakeModelStore) FailedScheduling(modelVersion *store.ModelVersion, reason string, reset bool) { panic("implement me") } diff --git a/scheduler/pkg/store/store.go b/scheduler/pkg/store/store.go index 2edfdcb7d4..a13dd8afa5 100644 --- a/scheduler/pkg/store/store.go +++ b/scheduler/pkg/store/store.go @@ -126,6 +126,6 @@ type ModelStore interface { ServerNotify(request *pb.ServerNotifyRequest) error RemoveServerReplica(serverName string, replicaIdx int) ([]string, error) // return previously loaded models DrainServerReplica(serverName string, replicaIdx int) ([]string, error) // return previously loaded models - FailedScheduling(modelVersion *ModelVersion, reason string) + FailedScheduling(modelVersion *ModelVersion, reason string, reset bool) GetAllModels() []string } From 56138183192465a9ee4e5e5b9655881951419ba2 Mon Sep 17 00:00:00 2001 From: Sherif Akoush Date: Fri, 11 Aug 2023 14:01:30 +0100 Subject: [PATCH 09/10] update logic of reset server --- scheduler/pkg/scheduler/scheduler.go | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/scheduler/pkg/scheduler/scheduler.go b/scheduler/pkg/scheduler/scheduler.go index 5fbb81ad37..124def8a09 100644 --- a/scheduler/pkg/scheduler/scheduler.go +++ b/scheduler/pkg/scheduler/scheduler.go @@ -149,7 +149,6 @@ func (s *SimpleScheduler) scheduleToServer(modelName string) error { filteredServers, debugTrail = s.filterServers(latestModel, servers, debugTrail) s.sortServers(latestModel, filteredServers) ok := false - resetServerInCaseOfError := true logger.Debugf("Model %s with desired replicas %d candidate servers %v", modelName, latestModel.DesiredReplicas(), filteredServers) // For each server filter and sort replicas and attempt schedule if enough replicas for _, candidateServer := range filteredServers { @@ -161,12 +160,6 @@ func (s *SimpleScheduler) scheduleToServer(modelName string) error { s.muSortAndUpdate.Lock() candidateReplicas, debugTrail = s.filterReplicas(latestModel, candidateServer, debugTrail) if len(candidateReplicas.ChosenReplicas) < latestModel.DesiredReplicas() { - if len(candidateReplicas.ChosenReplicas) > 0 { - // in this case we have some replicas but not enough, typically in the case where we are scaling up - // beyond the number of the server replicas we have - // therefore we do not want to reset the server as we will lose the replicas we have - resetServerInCaseOfError = false - } s.muSortAndUpdate.Unlock() continue } @@ -183,7 +176,8 @@ func (s *SimpleScheduler) scheduleToServer(modelName string) error { } if !ok { failureErrMsg := fmt.Sprintf("failed to schedule model %s. %v", modelName, debugTrail) - s.store.FailedScheduling(latestModel, failureErrMsg, resetServerInCaseOfError) + // we do not want to reset the server if it has live replicas + s.store.FailedScheduling(latestModel, failureErrMsg, !latestModel.HasLiveReplicas()) return fmt.Errorf(failureErrMsg) } } From 214c89fc38d8e622a4684b64d359aad01b56804a Mon Sep 17 00:00:00 2001 From: Sherif Akoush Date: Fri, 11 Aug 2023 14:03:30 +0100 Subject: [PATCH 10/10] add note to describe setting time --- scheduler/pkg/envoy/processor/incremental.go | 1 + 1 file changed, 1 insertion(+) diff --git a/scheduler/pkg/envoy/processor/incremental.go b/scheduler/pkg/envoy/processor/incremental.go index b0404f6fdd..1984c4be63 100644 --- a/scheduler/pkg/envoy/processor/incremental.go +++ b/scheduler/pkg/envoy/processor/incremental.go @@ -655,6 +655,7 @@ func (p *IncrementalProcessor) callVersionCleanupIfNeeded(modelName string) { } func (p *IncrementalProcessor) triggerModelSyncIfNeeded() bool { + // the first time we trigger the batch update we need to set the time if p.batchTriggerManual == nil { p.batchTriggerManual = new(time.Time) *p.batchTriggerManual = time.Now()