Skip to content

Commit

Permalink
fix(scheduler): Manual trigger envoy update (#5074)
Browse files Browse the repository at this point in the history
* separate out logic from lock in modelsync

* add manual trigger

* fix null pointer exception

* add log message

* Increase hub messages + fix deadlock

* optimise logic

* remove extra log message

* do not reset server if we have model replicas

* update logic of reset server

* add note to describe setting time
  • Loading branch information
sakoush authored Aug 16, 2023
1 parent 8d9547e commit 085d578
Show file tree
Hide file tree
Showing 12 changed files with 62 additions and 20 deletions.
2 changes: 1 addition & 1 deletion scheduler/pkg/agent/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import (

const (
grpcMaxConcurrentStreams = 1_000_000
pendingSyncsQueueSize int = 10
pendingSyncsQueueSize int = 1000
modelEventHandlerName = "agent.server.models"
modelScalingCoolingDownSeconds = 60 // this is currently used in scale down events
serverDrainingExtraWaitMillis = 500
Expand Down
2 changes: 1 addition & 1 deletion scheduler/pkg/agent/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type mockStore struct {

var _ store.ModelStore = (*mockStore)(nil)

func (m *mockStore) FailedScheduling(modelVersion *store.ModelVersion, reason string) {
func (m *mockStore) FailedScheduling(modelVersion *store.ModelVersion, reason string, reset bool) {
}

func (m *mockStore) UpdateModel(config *pbs.LoadModelRequest) error {
Expand Down
50 changes: 44 additions & 6 deletions scheduler/pkg/envoy/processor/incremental.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import (
)

const (
pendingSyncsQueueSize int = 100
pendingSyncsQueueSize int = 1000
modelEventHandlerName = "incremental.processor.models"
experimentEventHandlerName = "incremental.processor.experiments"
pipelineEventHandlerName = "incremental.processor.pipelines"
Expand All @@ -63,6 +63,7 @@ type IncrementalProcessor struct {
batchWaitMillis time.Duration
pendingModelVersions []*pendingModelVersion
versionCleaner cleaner.ModelVersionCleaner
batchTriggerManual *time.Time
}

type pendingModelVersion struct {
Expand Down Expand Up @@ -94,6 +95,7 @@ func NewIncrementalProcessor(
batchTrigger: nil,
batchWaitMillis: util.EnvoyUpdateDefaultBatchWaitMillis,
versionCleaner: versionCleaner,
batchTriggerManual: nil,
}

err := ip.setListeners()
Expand Down Expand Up @@ -542,13 +544,15 @@ func (p *IncrementalProcessor) modelUpdate(modelName string) error {
p.mu.Lock()
defer p.mu.Unlock()
p.modelStore.LockModel(modelName)
defer p.modelStore.UnlockModel(modelName)

logger.Debugf("Calling model update for %s", modelName)

model, err := p.modelStore.GetModel(modelName)
if err != nil {
logger.WithError(err).Warnf("Failed to sync model %s", modelName)
if err := p.removeRouteForServerInEnvoyCache(modelName); err != nil {
logger.WithError(err).Errorf("Failed to remove model route from envoy %s", modelName)
p.modelStore.UnlockModel(modelName)
return err
}
}
Expand All @@ -557,6 +561,7 @@ func (p *IncrementalProcessor) modelUpdate(modelName string) error {
if err := p.removeRouteForServerInEnvoyCache(modelName); err != nil {
logger.WithError(err).Errorf("Failed to remove model route from envoy %s", modelName)
}
p.modelStore.UnlockModel(modelName)
return p.updateEnvoy() // in practice we should not be here
}

Expand All @@ -566,6 +571,7 @@ func (p *IncrementalProcessor) modelUpdate(modelName string) error {
if err := p.removeRouteForServerInEnvoyCache(modelName); err != nil {
logger.WithError(err).Errorf("Failed to remove model route from envoy %s", modelName)
}
p.modelStore.UnlockModel(modelName)
return p.updateEnvoy() // in practice we should not be here
}

Expand All @@ -577,6 +583,7 @@ func (p *IncrementalProcessor) modelUpdate(modelName string) error {
logger.Debugf("sync: Model can't receive traffic - removing for %s", modelName)
if err := p.removeRouteForServerInEnvoyCache(modelName); err != nil {
logger.WithError(err).Errorf("Failed to remove model route from envoy %s", modelName)
p.modelStore.UnlockModel(modelName)
return err
}
modelRemoved = true
Expand All @@ -598,6 +605,7 @@ func (p *IncrementalProcessor) modelUpdate(modelName string) error {
// Remove routes before we recreate
if err := p.removeRouteForServerInEnvoyCache(modelName); err != nil {
logger.Debugf("Failed to remove route before starting update for %s", modelName)
p.modelStore.UnlockModel(modelName)
return err
}

Expand All @@ -608,6 +616,7 @@ func (p *IncrementalProcessor) modelUpdate(modelName string) error {
logger.WithError(err).Errorf("Failed to add traffic for model %s", modelName)
if err := p.removeRouteForServerInEnvoyCache(modelName); err != nil {
logger.WithError(err).Errorf("Failed to remove model route from envoy %s", modelName)
p.modelStore.UnlockModel(modelName)
return err
}
}
Expand All @@ -621,9 +630,14 @@ func (p *IncrementalProcessor) modelUpdate(modelName string) error {
version: latestModel.GetVersion(),
},
)
p.modelStore.UnlockModel(modelName)
triggered := p.triggerModelSyncIfNeeded()

if p.batchTrigger == nil && p.runEnvoyBatchUpdates {
p.batchTrigger = time.AfterFunc(p.batchWaitMillis, p.modelSync)
if !triggered {
// we still need to enable the cron timer as there is no guarantee that the manual trigger will be called
if p.batchTrigger == nil && p.runEnvoyBatchUpdates {
p.batchTrigger = time.AfterFunc(p.batchWaitMillis, p.modelSyncWithLock)
}
}

return nil
Expand All @@ -640,10 +654,33 @@ func (p *IncrementalProcessor) callVersionCleanupIfNeeded(modelName string) {
}
}

func (p *IncrementalProcessor) modelSync() {
logger := p.logger.WithField("func", "modelSync")
func (p *IncrementalProcessor) triggerModelSyncIfNeeded() bool {
// the first time we trigger the batch update we need to set the time
if p.batchTriggerManual == nil {
p.batchTriggerManual = new(time.Time)
*p.batchTriggerManual = time.Now()
}
if time.Since(*p.batchTriggerManual) > p.batchWaitMillis {
// we have waited long enough so we can trigger the batch update
// we do this inline so that we do not require to release and reacquire the lock
// which under heavy load there is no guarantee of order and therefore could lead
// to starvation of the batch update
p.modelSync()
*p.batchTriggerManual = time.Now()
return true
}
return false
}

func (p *IncrementalProcessor) modelSyncWithLock() {
p.mu.Lock()
defer p.mu.Unlock()
p.modelSync()
}

func (p *IncrementalProcessor) modelSync() {
logger := p.logger.WithField("func", "modelSync")
logger.Debugf("Calling model sync")

envoyErr := p.updateEnvoy()
serverReplicaState := store.Available
Expand Down Expand Up @@ -728,4 +765,5 @@ func (p *IncrementalProcessor) modelSync() {
// Reset
p.batchTrigger = nil
p.pendingModelVersions = nil
logger.Debugf("Done modelSync")
}
2 changes: 1 addition & 1 deletion scheduler/pkg/envoy/processor/incremental_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -766,7 +766,7 @@ func TestModelSync(t *testing.T) {
for _, op := range test.ops {
op(inc, g)
}
inc.modelSync()
inc.modelSyncWithLock()
for modelName, modelReplicas := range test.expectedReplicaStats {
model, err := inc.modelStore.GetModel(modelName)
g.Expect(err).To(BeNil())
Expand Down
6 changes: 4 additions & 2 deletions scheduler/pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,10 @@ func (s *SimpleScheduler) scheduleToServer(modelName string) error {
filteredServers, debugTrail = s.filterServers(latestModel, servers, debugTrail)
s.sortServers(latestModel, filteredServers)
ok := false
logger.Debugf("Model %s candidate servers %v", modelName, filteredServers)
logger.Debugf("Model %s with desired replicas %d candidate servers %v", modelName, latestModel.DesiredReplicas(), filteredServers)
// For each server filter and sort replicas and attempt schedule if enough replicas
for _, candidateServer := range filteredServers {
logger.Debugf("Candidate server %s", candidateServer.Name)
var candidateReplicas *sorters.CandidateServer

// we need a lock here, we could have many goroutines at sorting
Expand All @@ -175,7 +176,8 @@ func (s *SimpleScheduler) scheduleToServer(modelName string) error {
}
if !ok {
failureErrMsg := fmt.Sprintf("failed to schedule model %s. %v", modelName, debugTrail)
s.store.FailedScheduling(latestModel, failureErrMsg)
// we do not want to reset the server if it has live replicas
s.store.FailedScheduling(latestModel, failureErrMsg, !latestModel.HasLiveReplicas())
return fmt.Errorf(failureErrMsg)
}
}
Expand Down
2 changes: 1 addition & 1 deletion scheduler/pkg/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type mockStore struct {

var _ store.ModelStore = (*mockStore)(nil)

func (f mockStore) FailedScheduling(modelVersion *store.ModelVersion, reason string) {
func (f mockStore) FailedScheduling(modelVersion *store.ModelVersion, reason string, reset bool) {
}

func (f mockStore) UnloadVersionModels(modelKey string, version uint32) (bool, error) {
Expand Down
2 changes: 1 addition & 1 deletion scheduler/pkg/store/experiment/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
)

const (
pendingSyncsQueueSize int = 100
pendingSyncsQueueSize int = 1000
experimentStartEventSource = "experiment.store.start"
experimentStopEventSource = "experiment.store.stop"
modelEventHandlerName = "experiment.store.models"
Expand Down
2 changes: 1 addition & 1 deletion scheduler/pkg/store/experiment/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ func (f fakeModelStore) DrainServerReplica(serverName string, replicaIdx int) ([
panic("implement me")
}

func (f fakeModelStore) FailedScheduling(modelVersion *store.ModelVersion, reason string) {
func (f fakeModelStore) FailedScheduling(modelVersion *store.ModelVersion, reason string, reset bool) {
panic("implement me")
}

Expand Down
8 changes: 5 additions & 3 deletions scheduler/pkg/store/memory_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,16 +110,18 @@ func updateModelState(isLatest bool, modelVersion *ModelVersion, prevModelVersio
}
}

func (m *MemoryStore) FailedScheduling(modelVersion *ModelVersion, reason string) {
func (m *MemoryStore) FailedScheduling(modelVersion *ModelVersion, reason string, reset bool) {
modelVersion.state = ModelStatus{
State: ScheduleFailed,
Reason: reason,
Timestamp: time.Now(),
AvailableReplicas: modelVersion.state.AvailableReplicas,
UnavailableReplicas: modelVersion.GetModel().GetDeploymentSpec().GetReplicas() - modelVersion.state.AvailableReplicas,
}
// make sure we reset server
modelVersion.server = ""
// make sure we reset server but only if there are no available replicas
if reset {
modelVersion.server = ""
}
m.eventHub.PublishModelEvent(
modelFailureEventSource,
coordinator.ModelEventMsg{
Expand Down
2 changes: 1 addition & 1 deletion scheduler/pkg/store/pipeline/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (f fakeModelStore) RemoveServerReplica(serverName string, replicaIdx int) (
panic("implement me")
}

func (f fakeModelStore) FailedScheduling(modelVersion *store.ModelVersion, reason string) {
func (f fakeModelStore) FailedScheduling(modelVersion *store.ModelVersion, reason string, reset bool) {
panic("implement me")
}

Expand Down
2 changes: 1 addition & 1 deletion scheduler/pkg/store/pipeline/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
)

const (
pendingSyncsQueueSize int = 100
pendingSyncsQueueSize int = 1000
addPipelineEventSource = "pipeline.store.addpipeline"
removePipelineEventSource = "pipeline.store.removepipeline"
setStatusPipelineEventSource = "pipeline.store.setstatus"
Expand Down
2 changes: 1 addition & 1 deletion scheduler/pkg/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,6 @@ type ModelStore interface {
ServerNotify(request *pb.ServerNotifyRequest) error
RemoveServerReplica(serverName string, replicaIdx int) ([]string, error) // return previously loaded models
DrainServerReplica(serverName string, replicaIdx int) ([]string, error) // return previously loaded models
FailedScheduling(modelVersion *ModelVersion, reason string)
FailedScheduling(modelVersion *ModelVersion, reason string, reset bool)
GetAllModels() []string
}

0 comments on commit 085d578

Please sign in to comment.