Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(scheduler): Report lack of dataflow engines in pipeline statuses #5080

8 changes: 4 additions & 4 deletions operator/scheduler/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,26 +203,26 @@ func (s *SchedulerClient) SubscribePipelineEvents(ctx context.Context, conn *grp
case scheduler.PipelineVersionState_PipelineReady:
logger.Info(
"Setting pipeline to ready",
"pipeline", event.PipelineName,
"pipeline", pipeline.Name,
"generation", pipeline.Generation,
)
pipeline.Status.CreateAndSetCondition(
v1alpha1.PipelineReady,
true,
pv.State.Status.String(),
pv.State.Reason,
pv.State.Status.String(),
)
default:
logger.Info(
"Setting pipeline to not ready",
"pipeline", event.PipelineName,
"pipeline", pipeline.Name,
"generation", pipeline.Generation,
)
pipeline.Status.CreateAndSetCondition(
v1alpha1.PipelineReady,
false,
pv.State.Status.String(),
pv.State.Reason,
pv.State.Status.String(),
)
}
// Set models ready
Expand Down
1 change: 1 addition & 0 deletions scheduler/pkg/coordinator/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type PipelineEventMsg struct {
UID string
ExperimentUpdate bool
ModelStatusChange bool
Source string
}

func (p PipelineEventMsg) String() string {
Expand Down
46 changes: 34 additions & 12 deletions scheduler/pkg/kafka/dataflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ const (
grpcMaxConcurrentStreams = 1_000_000
pipelineEventHandlerName = "kafka.dataflow.server.pipelines"
pendingEventsQueueSize int = 10
sourceChainerServer = "chainer-server"
)

type ChainerServer struct {
Expand Down Expand Up @@ -117,7 +118,7 @@ func (c *ChainerServer) PipelineUpdateEvent(ctx context.Context, message *chaine
if !message.Success {
statusVal = pipeline.PipelineFailed
}
err := c.pipelineHandler.SetPipelineState(message.Update.Pipeline, message.Update.Version, message.Update.Uid, statusVal, message.Reason)
err := c.pipelineHandler.SetPipelineState(message.Update.Pipeline, message.Update.Version, message.Update.Uid, statusVal, message.Reason, sourceChainerServer)
if err != nil {
logger.WithError(err).Errorf("Failed to update pipeline status for %s:%d (%s)", message.Update.Pipeline, message.Update.Version, message.Update.Uid)
return nil, status.Error(codes.Internal, err.Error())
Expand Down Expand Up @@ -340,7 +341,14 @@ func (c *ChainerServer) rebalance() {
}
c.mu.Lock()
if len(c.streams) == 0 {
if err := c.pipelineHandler.SetPipelineState(pv.Name, pv.Version, pv.UID, pipeline.PipelineCreate, "No servers available"); err != nil {
if err := c.pipelineHandler.SetPipelineState(
pv.Name,
pv.Version,
pv.UID,
pipeline.PipelineCreate,
"no dataflow engines available to handle pipeline",
sourceChainerServer,
); err != nil {
logger.WithError(err).Errorf("Failed to set pipeline state to creating for %s", pv.String())
}
} else {
Expand All @@ -349,7 +357,7 @@ func (c *ChainerServer) rebalance() {
for server, subscription := range c.streams {
if contains(servers, server) {
msg.Op = chainer.PipelineUpdateMessage_Create
if err := c.pipelineHandler.SetPipelineState(pv.Name, pv.Version, pv.UID, pipeline.PipelineCreating, "Rebalance"); err != nil {
if err := c.pipelineHandler.SetPipelineState(pv.Name, pv.Version, pv.UID, pipeline.PipelineCreating, "Rebalance", sourceChainerServer); err != nil {
logger.WithError(err).Errorf("Failed to set pipeline state to creating for %s", pv.String())
}
if err := subscription.stream.Send(msg); err != nil {
Expand All @@ -372,36 +380,50 @@ func (c *ChainerServer) handlePipelineEvent(event coordinator.PipelineEventMsg)
if event.ExperimentUpdate {
return
}
if sourceChainerServer == event.Source {
return
}

go func() {
c.mu.Lock()
defer c.mu.Unlock()

// Handle case where we have no subscribers
if len(c.streams) == 0 {
logger.Warnf("Can't handle event as no streams available for pipeline %s", event.PipelineName)
return
}

pv, err := c.pipelineHandler.GetPipelineVersion(event.PipelineName, event.PipelineVersion, event.UID)
if err != nil {
logger.WithError(err).Errorf("Failed to get pipeline from event %s", event.String())
return
}

logger.Debugf("Received event %s with state %s", event.String(), pv.State.Status.String())

// Handle case where we have no subscribers
if len(c.streams) == 0 {
errMsg := "no dataflow engines available to handle pipeline"
logger.WithField("pipeline", event.PipelineName).Warn(errMsg)

err := c.pipelineHandler.SetPipelineState(pv.Name, pv.Version, pv.UID, pv.State.Status, errMsg, sourceChainerServer)
if err != nil {
logger.
WithError(err).
WithField("pipeline", pv.String()).
WithField("status", pv.State.Status).
Error("failed to set pipeline state")
}

return
}

switch pv.State.Status {
case pipeline.PipelineCreate:
err := c.pipelineHandler.SetPipelineState(pv.Name, pv.Version, pv.UID, pipeline.PipelineCreating, "")
err := c.pipelineHandler.SetPipelineState(pv.Name, pv.Version, pv.UID, pipeline.PipelineCreating, "", sourceChainerServer)
if err != nil {
logger.WithError(err).Errorf("Failed to set pipeline state to creating for %s", pv.String())
}

msg := c.createPipelineMessage(pv)
c.sendPipelineMsgToSelectedServers(msg, pv)

case pipeline.PipelineTerminate:
err := c.pipelineHandler.SetPipelineState(pv.Name, pv.Version, pv.UID, pipeline.PipelineTerminating, "")
err := c.pipelineHandler.SetPipelineState(pv.Name, pv.Version, pv.UID, pipeline.PipelineTerminating, "", sourceChainerServer)
if err != nil {
logger.WithError(err).Errorf("Failed to set pipeline state to terminating for %s", pv.String())
}
Expand Down
2 changes: 1 addition & 1 deletion scheduler/pkg/store/experiment/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -742,7 +742,7 @@ func (f fakePipelineStore) GetPipelines() ([]*pipeline.Pipeline, error) {
panic("implement me")
}

func (f fakePipelineStore) SetPipelineState(name string, version uint32, uid string, state pipeline.PipelineStatus, reason string) error {
func (f fakePipelineStore) SetPipelineState(name string, version uint32, uid string, state pipeline.PipelineStatus, reason string, source string) error {
panic("implement me")
}

Expand Down
5 changes: 3 additions & 2 deletions scheduler/pkg/store/pipeline/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type PipelineHandler interface {
GetPipelineVersion(name string, version uint32, uid string) (*PipelineVersion, error)
GetPipeline(name string) (*Pipeline, error)
GetPipelines() ([]*Pipeline, error)
SetPipelineState(name string, version uint32, uid string, state PipelineStatus, reason string) error
SetPipelineState(name string, version uint32, uid string, state PipelineStatus, reason string, source string) error
GetAllRunningPipelineVersions() []coordinator.PipelineEventMsg
}

Expand Down Expand Up @@ -349,7 +349,7 @@ func (ps *PipelineStore) terminateOldUnterminatedPipelinesIfNeeded(pipeline *Pip
return evts
}

func (ps *PipelineStore) SetPipelineState(name string, versionNumber uint32, uid string, status PipelineStatus, reason string) error {
func (ps *PipelineStore) SetPipelineState(name string, versionNumber uint32, uid string, status PipelineStatus, reason string, source string) error {
logger := ps.logger.WithField("func", "SetPipelineState")
logger.Debugf("Attempt to set state on pipeline %s:%d status:%s", name, versionNumber, status.String())
evts, err := ps.setPipelineStateImpl(name, versionNumber, uid, status, reason)
Expand All @@ -358,6 +358,7 @@ func (ps *PipelineStore) SetPipelineState(name string, versionNumber uint32, uid
}
if ps.eventHub != nil {
for _, evt := range evts {
evt.Source = source
ps.eventHub.PublishPipelineEvent(setStatusPipelineEventSource, *evt)
}
}
Expand Down
2 changes: 1 addition & 1 deletion scheduler/pkg/store/pipeline/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -726,7 +726,7 @@ func TestSetPipelineState(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
err := test.store.SetPipelineState(test.pipelineName, test.pipelineVersion, test.uid, test.status, test.reason)
err := test.store.SetPipelineState(test.pipelineName, test.pipelineVersion, test.uid, test.status, test.reason, "")
if test.err == nil {
g.Expect(err).To(BeNil())
pv, err := test.store.GetPipelineVersion(test.pipelineName, test.pipelineVersion, test.uid)
Expand Down
Loading