Skip to content

Commit

Permalink
Replace time.After with time.NewTimer
Browse files Browse the repository at this point in the history
Improper use of time.After can lead to memory leaks if the timer never
gets a chance to fire.

Signed-off-by: Gabriel Adrian Samfira <gsamfira@cloudbasesolutions.com>
  • Loading branch information
gabriel-samfira committed Jul 5, 2024
1 parent 49f1b7a commit 2554f70
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 16 deletions.
4 changes: 3 additions & 1 deletion database/watcher/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,15 @@ func (w *consumer) Send(payload common.ChangePayload) {
}
}

timer := time.NewTimer(1 * time.Second)
defer timer.Stop()
slog.DebugContext(w.ctx, "sending payload")
select {
case <-w.quit:
slog.DebugContext(w.ctx, "consumer is closed")
case <-w.ctx.Done():
slog.DebugContext(w.ctx, "consumer is closed")
case <-time.After(1 * time.Second):
case <-timer.C:
slog.DebugContext(w.ctx, "timeout trying to send payload", "payload", payload)
case w.messages <- payload:
}
Expand Down
4 changes: 3 additions & 1 deletion database/watcher/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@ func (w *producer) Notify(payload common.ChangePayload) error {
return common.ErrProducerClosed
}

timer := time.NewTimer(1 * time.Second)
defer timer.Stop()
select {
case <-w.quit:
return common.ErrProducerClosed
case <-w.ctx.Done():
return common.ErrProducerClosed
case <-time.After(1 * time.Second):
case <-timer.C:
return common.ErrProducerTimeoutErr
case w.messages <- payload:
}
Expand Down
8 changes: 6 additions & 2 deletions runner/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -793,8 +793,10 @@ func (r *basePoolManager) Status() params.PoolManagerStatus {
func (r *basePoolManager) waitForTimeoutOrCancelled(timeout time.Duration) {
slog.DebugContext(
r.ctx, fmt.Sprintf("sleeping for %.2f minutes", timeout.Minutes()))
timer := time.NewTimer(timeout)
defer timer.Stop()
select {
case <-time.After(timeout):
case <-timer.C:
case <-r.ctx.Done():
case <-r.quit:
}
Expand Down Expand Up @@ -1471,13 +1473,15 @@ func (r *basePoolManager) addPendingInstances() error {

func (r *basePoolManager) Wait() error {
done := make(chan struct{})
timer := time.NewTimer(60 * time.Second)
go func() {
r.wg.Wait()
timer.Stop()
close(done)
}()
select {
case <-done:
case <-time.After(60 * time.Second):
case <-timer.C:
return errors.Wrap(runnerErrors.ErrTimeout, "waiting for pool to stop")
}
return nil
Expand Down
5 changes: 3 additions & 2 deletions runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,11 +431,12 @@ func (r *Runner) waitForErrorGroupOrTimeout(g *errgroup.Group) error {
go func() {
done <- g.Wait()
}()

timer := time.NewTimer(60 * time.Second)
defer timer.Stop()
select {
case err := <-done:
return err
case <-time.After(60 * time.Second):
case <-timer.C:
return fmt.Errorf("timed out waiting for pool manager start")
}
}
Expand Down
18 changes: 11 additions & 7 deletions websocket/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,11 @@ func (c *Client) Write(msg []byte) (int, error) {

tmp := make([]byte, len(msg))
copy(tmp, msg)
timer := time.NewTimer(5 * time.Second)
defer timer.Stop()

select {
case <-time.After(5 * time.Second):
case <-timer.C:
return 0, fmt.Errorf("timed out sending message to client")
case c.send <- tmp:
}
Expand Down Expand Up @@ -193,11 +195,6 @@ func (c *Client) writeMessage(messageType int, message []byte) error {

// clientWriter
func (c *Client) clientWriter() {
ticker := time.NewTicker(pingPeriod)
defer func() {
c.Stop()
ticker.Stop()
}()
// Set up expiration timer.
// NOTE: if a token is created without an expiration date
// this will be set to nil, which will close the loop bellow
Expand All @@ -208,6 +205,13 @@ func (c *Client) clientWriter() {
if expires != nil {
authExpires = *expires
}
authTimer := time.NewTimer(time.Until(authExpires))
ticker := time.NewTicker(pingPeriod)
defer func() {
c.Stop()
ticker.Stop()
authTimer.Stop()
}()
for {
select {
case message, ok := <-c.send:
Expand Down Expand Up @@ -236,7 +240,7 @@ func (c *Client) clientWriter() {
}
case <-c.ctx.Done():
return
case <-time.After(time.Until(authExpires)):
case <-authTimer.C:
// Auth has expired
slog.DebugContext(c.ctx, "auth expired, closing connection")
return
Expand Down
9 changes: 6 additions & 3 deletions websocket/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,10 @@ func (h *Hub) Unregister(client *Client) error {
func (h *Hub) Write(msg []byte) (int, error) {
tmp := make([]byte, len(msg))
copy(tmp, msg)

timer := time.NewTimer(5 * time.Second)
defer timer.Stop()
select {
case <-time.After(5 * time.Second):
case <-timer.C:
return 0, fmt.Errorf("timed out sending message to client")
case h.broadcast <- tmp:
}
Expand All @@ -134,9 +135,11 @@ func (h *Hub) Stop() error {
}

func (h *Hub) Wait() error {
timer := time.NewTimer(60 * time.Second)
defer timer.Stop()
select {
case <-h.closed:
case <-time.After(60 * time.Second):
case <-timer.C:
return fmt.Errorf("timed out waiting for hub stop")
}
return nil
Expand Down

0 comments on commit 2554f70

Please sign in to comment.