diff --git a/providers/coordinator.go b/providers/coordinator.go index 2d00ebd8fc..9d8500f1a1 100644 --- a/providers/coordinator.go +++ b/providers/coordinator.go @@ -64,29 +64,18 @@ type RunningProvider struct { } // initialize the heartbeat with the provider -func (p *RunningProvider) heartbeat() { +func (p *RunningProvider) heartbeat() error { interval := 2 * time.Second gracePeriod := 3 * time.Second + + if err := p.doOneHeartbeat(interval + gracePeriod); err != nil { + p.Shutdown() + return err + } + go func() { for !p.isCloseOrShutdown() { - _, err := p.Plugin.Heartbeat(&pp.HeartbeatReq{ - Interval: uint64(interval + gracePeriod), - }) - - if status, ok := status.FromError(err); ok { - if status.Code() == 12 { - log.Error(). - Str("plugin-ID", p.ID). - Msg("please update the provider plugin for " + p.Name) - p.Shutdown() - break - } - } - - if err != nil { - log.Error(). - Str("plugin-ID", p.ID). - Msg("cannot establish heartbeat with the provider plugin for " + p.Name) + if err := p.doOneHeartbeat(interval + gracePeriod); err != nil { p.Shutdown() break } @@ -94,6 +83,23 @@ func (p *RunningProvider) heartbeat() { time.Sleep(interval) } }() + + return nil +} + +func (p *RunningProvider) doOneHeartbeat(t time.Duration) error { + _, err := p.Plugin.Heartbeat(&pp.HeartbeatReq{ + Interval: uint64(t), + }) + if err != nil { + if status, ok := status.FromError(err); ok { + if status.Code() == 12 { + return errors.New("please update the provider plugin for " + p.Name) + } + } + return errors.New("cannot establish heartbeat with the provider plugin for " + p.Name) + } + return nil } func (p *RunningProvider) isCloseOrShutdown() bool { @@ -229,7 +235,10 @@ func (c *coordinator) Start(id string, isEphemeral bool, update UpdateProvidersC Client: client, Schema: provider.Schema, } - res.heartbeat() + + if err := res.heartbeat(); err != nil { + return nil, err + } c.mutex.Lock() if isEphemeral {