Skip to content

Commit

Permalink
Reuse connections
Browse files Browse the repository at this point in the history
  • Loading branch information
bgokden committed Aug 3, 2021
1 parent 684ea7f commit 07eb307
Showing 1 changed file with 9 additions and 8 deletions.
17 changes: 9 additions & 8 deletions util/connpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"log"
"sync"
"sync/atomic"
"time"

"github.com/goburrow/cache"
Expand Down Expand Up @@ -53,11 +54,8 @@ func (cc *ConnectionCache) Get(address string) *Connection {
func (cc *ConnectionCache) Put(c *Connection) {
if cpInterface, _ := cc.Provider.Get(c.Address); cpInterface != nil {
if cp, ok2 := cpInterface.(*ConnectionPool); ok2 {
if c.Counter < 20 {
cp.PutIfHealthy(c)
} else {
cp.Close(c)
}
atomic.AddInt64(&c.Counter, -1)
cp.PutIfHealthy(c)
}
}
}
Expand Down Expand Up @@ -89,7 +87,7 @@ type ConnectionPool struct {
type Connection struct {
Address string
Conn *grpc.ClientConn
Counter int
Counter int64
}

func (c *Connection) Close() {
Expand Down Expand Up @@ -125,7 +123,10 @@ func (cp *ConnectionPool) NewConnection(address string) *Connection {
func (cp *ConnectionPool) Get() *Connection {
c := cp.GetWithRetry(0)
if c != nil {
c.Counter++
atomic.AddInt64(&c.Counter, 1)
if atomic.LoadInt64(&c.Counter) < 10 {
cp.Put(c) // if there are less than 10 concurrent users put it back
}
}
return c
}
Expand Down Expand Up @@ -174,7 +175,7 @@ func (cp *ConnectionPool) PutIfHealthy(conn *Connection) {
}

func (cp *ConnectionPool) Close(conn *Connection) {
if conn != nil && conn.Conn != nil {
if conn != nil && conn.Conn != nil && atomic.LoadInt64(&conn.Counter) <= 0 {
err := conn.Conn.Close()
if err != nil {
log.Printf("Connection Close Error: %v\n", err.Error())
Expand Down

0 comments on commit 07eb307

Please sign in to comment.