Skip to content

Commit

Permalink
Merge pull request #97 from pinpt/PROD-740
Browse files Browse the repository at this point in the history
remove ping code
  • Loading branch information
Jeff Haynie authored Jun 27, 2020
2 parents ffe0e24 + 011c3d3 commit 6eeafc5
Showing 1 changed file with 1 addition and 52 deletions.
53 changes: 1 addition & 52 deletions event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,6 @@ func (c *SubscriptionChannel) Close() error {
}
close(c.done)
close(c.ch)
c.checkLastPingsStop()
}
return nil
}
Expand All @@ -430,45 +429,6 @@ var defaultInsecureWSDialer = &websocket.Dialer{
EnableCompression: true,
}

func (c *SubscriptionChannel) pingReceived() {
c.lastPingMu.Lock()
c.lastPing = time.Now()
c.lastPingMu.Unlock()
}

const expectedDurationBetweenPings = 30 * time.Second

func (c *SubscriptionChannel) checkLastPingsInit() {
c.pingReceived() // make initial checks right
c.lastPingTimer = time.NewTicker(expectedDurationBetweenPings)
c.lastPingsDone = make(chan bool)
}

func (c *SubscriptionChannel) checkLastPingsLoop() {
for {
select {
case <-c.lastPingsDone:
return
case <-c.lastPingTimer.C:
c.lastPingMu.Lock()
lastPing := c.lastPing
c.lastPingMu.Unlock()
// crash if 5 last pings were missed
// would be better to retry subscription, but can't figure out how to do this cleanly here, since it would be blocked on wch.ReadJSON()
if time.Since(lastPing) > 5*expectedDurationBetweenPings {
panic(fmt.Errorf("no pings received for %v, expecting pings every %v", time.Since(lastPing), expectedDurationBetweenPings))
}
}
}
}

func (c *SubscriptionChannel) checkLastPingsStop() {
if c.lastPingTimer != nil {
c.lastPingTimer.Stop()
c.lastPingsDone <- true
}
}

// Publish will send a message to the event api
func (c *SubscriptionChannel) Publish(event PublishEvent, options ...Option) error {
ts := time.Now()
Expand Down Expand Up @@ -659,12 +619,6 @@ func (c *SubscriptionChannel) run() {
c.mu.Unlock()
log.Debug(c.subscription.Logger, "connected")

pinghandler := wch.PingHandler()
wch.SetPingHandler(func(data string) error {
c.pingReceived()
return pinghandler(data)
})

var subid string

if len(c.subscription.Topics) > 0 {
Expand Down Expand Up @@ -695,11 +649,6 @@ func (c *SubscriptionChannel) run() {
c.mu.Unlock()
}

if !c.subscription.DisablePing {
c.checkLastPingsInit() // have it separately to avoid races with Close
go c.checkLastPingsLoop()
}

errors = 0
var errored bool
var closed bool
Expand Down Expand Up @@ -862,7 +811,7 @@ type Subscription struct {
HTTPHeaders map[string]string `json:"-"`
CloseTimeout time.Duration `json:"-"`
DispatchTimeout time.Duration `json:"-"`
DisablePing bool `json:"-"`
DisablePing bool `json:"-"` // Deprecated
}

// NewSubscription will create a subscription to the event server and will continously read events (as they arrive)
Expand Down

0 comments on commit 6eeafc5

Please sign in to comment.