Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(scheduler): Manual trigger envoy update #5074

Merged
merged 10 commits into from
Aug 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion scheduler/pkg/agent/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import (

const (
grpcMaxConcurrentStreams = 1_000_000
pendingSyncsQueueSize int = 10
pendingSyncsQueueSize int = 1000
modelEventHandlerName = "agent.server.models"
modelScalingCoolingDownSeconds = 60 // this is currently used in scale down events
serverDrainingExtraWaitMillis = 500
Expand Down
2 changes: 1 addition & 1 deletion scheduler/pkg/agent/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type mockStore struct {

var _ store.ModelStore = (*mockStore)(nil)

func (m *mockStore) FailedScheduling(modelVersion *store.ModelVersion, reason string) {
func (m *mockStore) FailedScheduling(modelVersion *store.ModelVersion, reason string, reset bool) {
}

func (m *mockStore) UpdateModel(config *pbs.LoadModelRequest) error {
Expand Down
50 changes: 44 additions & 6 deletions scheduler/pkg/envoy/processor/incremental.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import (
)

const (
pendingSyncsQueueSize int = 100
pendingSyncsQueueSize int = 1000
modelEventHandlerName = "incremental.processor.models"
experimentEventHandlerName = "incremental.processor.experiments"
pipelineEventHandlerName = "incremental.processor.pipelines"
Expand All @@ -63,6 +63,7 @@ type IncrementalProcessor struct {
batchWaitMillis time.Duration
pendingModelVersions []*pendingModelVersion
versionCleaner cleaner.ModelVersionCleaner
batchTriggerManual *time.Time
}

type pendingModelVersion struct {
Expand Down Expand Up @@ -94,6 +95,7 @@ func NewIncrementalProcessor(
batchTrigger: nil,
batchWaitMillis: util.EnvoyUpdateDefaultBatchWaitMillis,
versionCleaner: versionCleaner,
batchTriggerManual: nil,
}

err := ip.setListeners()
Expand Down Expand Up @@ -542,13 +544,15 @@ func (p *IncrementalProcessor) modelUpdate(modelName string) error {
p.mu.Lock()
defer p.mu.Unlock()
p.modelStore.LockModel(modelName)
defer p.modelStore.UnlockModel(modelName)

logger.Debugf("Calling model update for %s", modelName)

model, err := p.modelStore.GetModel(modelName)
if err != nil {
logger.WithError(err).Warnf("Failed to sync model %s", modelName)
if err := p.removeRouteForServerInEnvoyCache(modelName); err != nil {
logger.WithError(err).Errorf("Failed to remove model route from envoy %s", modelName)
p.modelStore.UnlockModel(modelName)
return err
}
}
Expand All @@ -557,6 +561,7 @@ func (p *IncrementalProcessor) modelUpdate(modelName string) error {
if err := p.removeRouteForServerInEnvoyCache(modelName); err != nil {
logger.WithError(err).Errorf("Failed to remove model route from envoy %s", modelName)
}
p.modelStore.UnlockModel(modelName)
return p.updateEnvoy() // in practice we should not be here
}

Expand All @@ -566,6 +571,7 @@ func (p *IncrementalProcessor) modelUpdate(modelName string) error {
if err := p.removeRouteForServerInEnvoyCache(modelName); err != nil {
logger.WithError(err).Errorf("Failed to remove model route from envoy %s", modelName)
}
p.modelStore.UnlockModel(modelName)
return p.updateEnvoy() // in practice we should not be here
}

Expand All @@ -577,6 +583,7 @@ func (p *IncrementalProcessor) modelUpdate(modelName string) error {
logger.Debugf("sync: Model can't receive traffic - removing for %s", modelName)
if err := p.removeRouteForServerInEnvoyCache(modelName); err != nil {
logger.WithError(err).Errorf("Failed to remove model route from envoy %s", modelName)
p.modelStore.UnlockModel(modelName)
return err
}
modelRemoved = true
Expand All @@ -598,6 +605,7 @@ func (p *IncrementalProcessor) modelUpdate(modelName string) error {
// Remove routes before we recreate
if err := p.removeRouteForServerInEnvoyCache(modelName); err != nil {
logger.Debugf("Failed to remove route before starting update for %s", modelName)
p.modelStore.UnlockModel(modelName)
return err
}

Expand All @@ -608,6 +616,7 @@ func (p *IncrementalProcessor) modelUpdate(modelName string) error {
logger.WithError(err).Errorf("Failed to add traffic for model %s", modelName)
if err := p.removeRouteForServerInEnvoyCache(modelName); err != nil {
logger.WithError(err).Errorf("Failed to remove model route from envoy %s", modelName)
p.modelStore.UnlockModel(modelName)
return err
}
}
Expand All @@ -621,9 +630,14 @@ func (p *IncrementalProcessor) modelUpdate(modelName string) error {
version: latestModel.GetVersion(),
},
)
p.modelStore.UnlockModel(modelName)
triggered := p.triggerModelSyncIfNeeded()

if p.batchTrigger == nil && p.runEnvoyBatchUpdates {
p.batchTrigger = time.AfterFunc(p.batchWaitMillis, p.modelSync)
if !triggered {
// we still need to enable the cron timer as there is no guarantee that the manual trigger will be called
if p.batchTrigger == nil && p.runEnvoyBatchUpdates {
p.batchTrigger = time.AfterFunc(p.batchWaitMillis, p.modelSyncWithLock)
}
}

return nil
Expand All @@ -640,10 +654,33 @@ func (p *IncrementalProcessor) callVersionCleanupIfNeeded(modelName string) {
}
}

func (p *IncrementalProcessor) modelSync() {
logger := p.logger.WithField("func", "modelSync")
func (p *IncrementalProcessor) triggerModelSyncIfNeeded() bool {
// the first time we trigger the batch update we need to set the time
if p.batchTriggerManual == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add comment on why we do this for clarity

p.batchTriggerManual = new(time.Time)
*p.batchTriggerManual = time.Now()
}
if time.Since(*p.batchTriggerManual) > p.batchWaitMillis {
// we have waited long enough so we can trigger the batch update
// we do this inline so that we do not require to release and reacquire the lock
// which under heavy load there is no guarantee of order and therefore could lead
// to starvation of the batch update
p.modelSync()
*p.batchTriggerManual = time.Now()
return true
}
return false
}

func (p *IncrementalProcessor) modelSyncWithLock() {
p.mu.Lock()
defer p.mu.Unlock()
p.modelSync()
}

func (p *IncrementalProcessor) modelSync() {
logger := p.logger.WithField("func", "modelSync")
logger.Debugf("Calling model sync")

envoyErr := p.updateEnvoy()
serverReplicaState := store.Available
Expand Down Expand Up @@ -728,4 +765,5 @@ func (p *IncrementalProcessor) modelSync() {
// Reset
p.batchTrigger = nil
p.pendingModelVersions = nil
logger.Debugf("Done modelSync")
}
2 changes: 1 addition & 1 deletion scheduler/pkg/envoy/processor/incremental_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -766,7 +766,7 @@ func TestModelSync(t *testing.T) {
for _, op := range test.ops {
op(inc, g)
}
inc.modelSync()
inc.modelSyncWithLock()
for modelName, modelReplicas := range test.expectedReplicaStats {
model, err := inc.modelStore.GetModel(modelName)
g.Expect(err).To(BeNil())
Expand Down
6 changes: 4 additions & 2 deletions scheduler/pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,10 @@ func (s *SimpleScheduler) scheduleToServer(modelName string) error {
filteredServers, debugTrail = s.filterServers(latestModel, servers, debugTrail)
s.sortServers(latestModel, filteredServers)
ok := false
logger.Debugf("Model %s candidate servers %v", modelName, filteredServers)
logger.Debugf("Model %s with desired replicas %d candidate servers %v", modelName, latestModel.DesiredReplicas(), filteredServers)
// For each server filter and sort replicas and attempt schedule if enough replicas
for _, candidateServer := range filteredServers {
logger.Debugf("Candidate server %s", candidateServer.Name)
var candidateReplicas *sorters.CandidateServer

// we need a lock here, we could have many goroutines at sorting
Expand All @@ -175,7 +176,8 @@ func (s *SimpleScheduler) scheduleToServer(modelName string) error {
}
if !ok {
failureErrMsg := fmt.Sprintf("failed to schedule model %s. %v", modelName, debugTrail)
s.store.FailedScheduling(latestModel, failureErrMsg)
// we do not want to reset the server if it has live replicas
s.store.FailedScheduling(latestModel, failureErrMsg, !latestModel.HasLiveReplicas())
return fmt.Errorf(failureErrMsg)
}
}
Expand Down
2 changes: 1 addition & 1 deletion scheduler/pkg/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type mockStore struct {

var _ store.ModelStore = (*mockStore)(nil)

func (f mockStore) FailedScheduling(modelVersion *store.ModelVersion, reason string) {
func (f mockStore) FailedScheduling(modelVersion *store.ModelVersion, reason string, reset bool) {
}

func (f mockStore) UnloadVersionModels(modelKey string, version uint32) (bool, error) {
Expand Down
2 changes: 1 addition & 1 deletion scheduler/pkg/store/experiment/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
)

const (
pendingSyncsQueueSize int = 100
pendingSyncsQueueSize int = 1000
experimentStartEventSource = "experiment.store.start"
experimentStopEventSource = "experiment.store.stop"
modelEventHandlerName = "experiment.store.models"
Expand Down
2 changes: 1 addition & 1 deletion scheduler/pkg/store/experiment/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ func (f fakeModelStore) DrainServerReplica(serverName string, replicaIdx int) ([
panic("implement me")
}

func (f fakeModelStore) FailedScheduling(modelVersion *store.ModelVersion, reason string) {
func (f fakeModelStore) FailedScheduling(modelVersion *store.ModelVersion, reason string, reset bool) {
panic("implement me")
}

Expand Down
8 changes: 5 additions & 3 deletions scheduler/pkg/store/memory_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,16 +110,18 @@ func updateModelState(isLatest bool, modelVersion *ModelVersion, prevModelVersio
}
}

func (m *MemoryStore) FailedScheduling(modelVersion *ModelVersion, reason string) {
func (m *MemoryStore) FailedScheduling(modelVersion *ModelVersion, reason string, reset bool) {
modelVersion.state = ModelStatus{
State: ScheduleFailed,
Reason: reason,
Timestamp: time.Now(),
AvailableReplicas: modelVersion.state.AvailableReplicas,
UnavailableReplicas: modelVersion.GetModel().GetDeploymentSpec().GetReplicas() - modelVersion.state.AvailableReplicas,
}
// make sure we reset server
modelVersion.server = ""
// make sure we reset server but only if there are no available replicas
if reset {
modelVersion.server = ""
}
m.eventHub.PublishModelEvent(
modelFailureEventSource,
coordinator.ModelEventMsg{
Expand Down
2 changes: 1 addition & 1 deletion scheduler/pkg/store/pipeline/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (f fakeModelStore) RemoveServerReplica(serverName string, replicaIdx int) (
panic("implement me")
}

func (f fakeModelStore) FailedScheduling(modelVersion *store.ModelVersion, reason string) {
func (f fakeModelStore) FailedScheduling(modelVersion *store.ModelVersion, reason string, reset bool) {
panic("implement me")
}

Expand Down
2 changes: 1 addition & 1 deletion scheduler/pkg/store/pipeline/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
)

const (
pendingSyncsQueueSize int = 100
pendingSyncsQueueSize int = 1000
addPipelineEventSource = "pipeline.store.addpipeline"
removePipelineEventSource = "pipeline.store.removepipeline"
setStatusPipelineEventSource = "pipeline.store.setstatus"
Expand Down
2 changes: 1 addition & 1 deletion scheduler/pkg/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,6 @@ type ModelStore interface {
ServerNotify(request *pb.ServerNotifyRequest) error
RemoveServerReplica(serverName string, replicaIdx int) ([]string, error) // return previously loaded models
DrainServerReplica(serverName string, replicaIdx int) ([]string, error) // return previously loaded models
FailedScheduling(modelVersion *ModelVersion, reason string)
FailedScheduling(modelVersion *ModelVersion, reason string, reset bool)
GetAllModels() []string
}
Loading