Skip to content

Commit

Permalink
added a connection ok check before returning connection to the pool (#11
Browse files Browse the repository at this point in the history
)

* added a connection ok check before returning connection to the pool

Testing Done
- reproduced reconnect issue when connection closed by server by killing server process
- verified the fix

resolves #4

* check buffer flush error

* remove vendor
  • Loading branch information
sandeepbhojwani authored and vikramraman committed Jun 19, 2019
1 parent 9a7e594 commit 102ede1
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 2 deletions.
20 changes: 19 additions & 1 deletion backend/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package backend

import (
"errors"
"io"
"net"
"time"

Expand Down Expand Up @@ -87,8 +88,25 @@ func (cp *Pool) release(conn net.Conn) {
}
}

func (cp *Pool) connOk(conn net.Conn) bool {
if conn == nil {
return false
}
if err := conn.SetReadDeadline(time.Now().Add(2 * time.Millisecond)); err != nil {
return false
}
// you have to try reading atleast 1 byte to detect closed connection
b1 := make([]byte, 1)
_, err := conn.Read(b1)
if err != nil && err == io.EOF {
log.Info("Connection is closed")
return false
}
return true
}

func (cp *Pool) Return(conn net.Conn, failed bool) {
if failed {
if failed || !cp.connOk(conn) {
cp.release(conn)
return
}
Expand Down
6 changes: 5 additions & 1 deletion backend/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,11 @@ func (w *MetricWriter) Write(rq prompb.WriteRequest) error {
fail := false
bw := bufio.NewWriter(out)
defer func() {
bw.Flush()
err := bw.Flush()
if err != nil {
log.Infof("failed to write to socket due to %v", err)
fail = true
}
w.pool.Return(out, fail)
}()
for _, ts := range rq.Timeseries {
Expand Down

0 comments on commit 102ede1

Please sign in to comment.