Skip to content

Commit

Permalink
Increase hub messages + fix deadlock
Browse files Browse the repository at this point in the history
  • Loading branch information
sakoush committed Aug 9, 2023
1 parent 1f8b375 commit cae199e
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 16 deletions.
2 changes: 1 addition & 1 deletion scheduler/pkg/agent/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import (

const (
grpcMaxConcurrentStreams = 1_000_000
pendingSyncsQueueSize int = 10
pendingSyncsQueueSize int = 1000
modelEventHandlerName = "agent.server.models"
modelScalingCoolingDownSeconds = 60 // this is currently used in scale down events
serverDrainingExtraWaitMillis = 500
Expand Down
40 changes: 27 additions & 13 deletions scheduler/pkg/envoy/processor/incremental.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import (
)

const (
pendingSyncsQueueSize int = 100
pendingSyncsQueueSize int = 1000
modelEventHandlerName = "incremental.processor.models"
experimentEventHandlerName = "incremental.processor.experiments"
pipelineEventHandlerName = "incremental.processor.pipelines"
Expand Down Expand Up @@ -544,13 +544,15 @@ func (p *IncrementalProcessor) modelUpdate(modelName string) error {
p.mu.Lock()
defer p.mu.Unlock()
p.modelStore.LockModel(modelName)
defer p.modelStore.UnlockModel(modelName)

logger.Debugf("Calling model update for %s", modelName)

model, err := p.modelStore.GetModel(modelName)
if err != nil {
logger.WithError(err).Warnf("Failed to sync model %s", modelName)
if err := p.removeRouteForServerInEnvoyCache(modelName); err != nil {
logger.WithError(err).Errorf("Failed to remove model route from envoy %s", modelName)
p.modelStore.UnlockModel(modelName)
return err
}
}
Expand All @@ -559,6 +561,7 @@ func (p *IncrementalProcessor) modelUpdate(modelName string) error {
if err := p.removeRouteForServerInEnvoyCache(modelName); err != nil {
logger.WithError(err).Errorf("Failed to remove model route from envoy %s", modelName)
}
p.modelStore.UnlockModel(modelName)
return p.updateEnvoy() // in practice we should not be here
}

Expand All @@ -568,6 +571,7 @@ func (p *IncrementalProcessor) modelUpdate(modelName string) error {
if err := p.removeRouteForServerInEnvoyCache(modelName); err != nil {
logger.WithError(err).Errorf("Failed to remove model route from envoy %s", modelName)
}
p.modelStore.UnlockModel(modelName)
return p.updateEnvoy() // in practice we should not be here
}

Expand All @@ -579,6 +583,7 @@ func (p *IncrementalProcessor) modelUpdate(modelName string) error {
logger.Debugf("sync: Model can't receive traffic - removing for %s", modelName)
if err := p.removeRouteForServerInEnvoyCache(modelName); err != nil {
logger.WithError(err).Errorf("Failed to remove model route from envoy %s", modelName)
p.modelStore.UnlockModel(modelName)
return err
}
modelRemoved = true
Expand All @@ -600,6 +605,7 @@ func (p *IncrementalProcessor) modelUpdate(modelName string) error {
// Remove routes before we recreate
if err := p.removeRouteForServerInEnvoyCache(modelName); err != nil {
logger.Debugf("Failed to remove route before starting update for %s", modelName)
p.modelStore.UnlockModel(modelName)
return err
}

Expand All @@ -610,6 +616,7 @@ func (p *IncrementalProcessor) modelUpdate(modelName string) error {
logger.WithError(err).Errorf("Failed to add traffic for model %s", modelName)
if err := p.removeRouteForServerInEnvoyCache(modelName); err != nil {
logger.WithError(err).Errorf("Failed to remove model route from envoy %s", modelName)
p.modelStore.UnlockModel(modelName)
return err
}
}
Expand All @@ -623,18 +630,9 @@ func (p *IncrementalProcessor) modelUpdate(modelName string) error {
version: latestModel.GetVersion(),
},
)
p.modelStore.UnlockModel(modelName)
p.triggerModelSyncIfNeeded()

if p.batchTriggerManual == nil {
p.batchTriggerManual = new(time.Time)
*p.batchTriggerManual = time.Now()
} else if time.Since(*p.batchTriggerManual) > p.batchWaitMillis {
// we have waited long enough so we can trigger the batch update
// we do this inline so that we do not require to release and reacquire the lock
// which under heavy load there is no guarantee of order and therefore could lead
// to starvation of the batch update
p.modelSync()
p.batchTriggerManual = nil
}
// we still need to enable the cron timer as there is no guarantee that the manual trigger will be called
if p.batchTrigger == nil && p.runEnvoyBatchUpdates {
p.batchTrigger = time.AfterFunc(p.batchWaitMillis, p.modelSyncWithLock)
Expand All @@ -654,6 +652,20 @@ func (p *IncrementalProcessor) callVersionCleanupIfNeeded(modelName string) {
}
}

func (p *IncrementalProcessor) triggerModelSyncIfNeeded() {
if p.batchTriggerManual == nil {
p.batchTriggerManual = new(time.Time)
*p.batchTriggerManual = time.Now()
} else if time.Since(*p.batchTriggerManual) > p.batchWaitMillis {
// we have waited long enough so we can trigger the batch update
// we do this inline so that we do not require to release and reacquire the lock
// which under heavy load there is no guarantee of order and therefore could lead
// to starvation of the batch update
p.modelSync()
*p.batchTriggerManual = time.Now()
}
}

func (p *IncrementalProcessor) modelSyncWithLock() {
p.mu.Lock()
defer p.mu.Unlock()
Expand Down Expand Up @@ -689,6 +701,7 @@ func (p *IncrementalProcessor) modelSync() {
continue
}

logger.Debugf("sherif: getting server model %s", mv.name)
s, err := p.modelStore.GetServer(v.Server(), false, false)
if err != nil {
logger.Debugf("Failed to get server for model %s server %s", mv.name, v.Server())
Expand Down Expand Up @@ -747,4 +760,5 @@ func (p *IncrementalProcessor) modelSync() {
// Reset
p.batchTrigger = nil
p.pendingModelVersions = nil
logger.Debugf("Done modelSync")
}
2 changes: 1 addition & 1 deletion scheduler/pkg/store/experiment/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
)

const (
pendingSyncsQueueSize int = 100
pendingSyncsQueueSize int = 1000
experimentStartEventSource = "experiment.store.start"
experimentStopEventSource = "experiment.store.stop"
modelEventHandlerName = "experiment.store.models"
Expand Down
2 changes: 1 addition & 1 deletion scheduler/pkg/store/pipeline/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
)

const (
pendingSyncsQueueSize int = 100
pendingSyncsQueueSize int = 1000
addPipelineEventSource = "pipeline.store.addpipeline"
removePipelineEventSource = "pipeline.store.removepipeline"
setStatusPipelineEventSource = "pipeline.store.setstatus"
Expand Down

0 comments on commit cae199e

Please sign in to comment.