Skip to content

Commit

Permalink
Add some bookkeeping to only keep the target connection open as long as
Browse files Browse the repository at this point in the history
there are connected clients.
  • Loading branch information
IngmarStein committed May 23, 2024
1 parent 55addf6 commit 053534c
Showing 1 changed file with 49 additions and 6 deletions.
55 changes: 49 additions & 6 deletions pkg/multiplexer/multiplexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,16 @@ import (
"time"
)

type messageType int

const (
Connection messageType = iota
Disconnection
Packet
)

type reqContainer struct {
typ messageType
message []byte
sender chan<- *respContainer
}
Expand Down Expand Up @@ -49,7 +58,7 @@ func (mux *Multiplexer) Start() error {
var wg sync.WaitGroup
mux.wg = &wg

requestQueue := make(chan *reqContainer, 1)
requestQueue := make(chan *reqContainer, 32)
mux.requestQueue = requestQueue

// target connection loop
Expand Down Expand Up @@ -85,12 +94,15 @@ L:
func (mux *Multiplexer) handleConnection(conn net.Conn, sender chan<- *reqContainer) {
defer func(c net.Conn) {
err := c.Close()
sender <- &reqContainer{typ: Disconnection}
if err != nil {
logrus.Errorf("%v", err)
logrus.Error(err)
}
}(conn)

sender <- &reqContainer{typ: Connection}
callback := make(chan *respContainer)

for {
msg, err := mux.messageReader.ReadMessage(conn)
if err == io.EOF {
Expand All @@ -109,6 +121,7 @@ func (mux *Multiplexer) handleConnection(conn net.Conn, sender chan<- *reqContai

// enqueue request msg to target conn loop
sender <- &reqContainer{
typ: Packet,
message: msg,
sender: callback,
}
Expand Down Expand Up @@ -146,9 +159,31 @@ func (mux *Multiplexer) createTargetConn() net.Conn {
}

func (mux *Multiplexer) targetConnLoop(requestQueue <-chan *reqContainer) {
conn := mux.createTargetConn()
var conn net.Conn
clients := 0

for container := range requestQueue {
switch container.typ {
case Connection:
clients++
continue
case Disconnection:
clients--
if clients == 0 && conn != nil {
err := conn.Close()
if err != nil {
logrus.Error(err)
}
}
continue
case Packet:
break
}

if conn == nil {
conn = mux.createTargetConn()
}

request := container.message
_, err := conn.Write(request)
if err != nil {
Expand All @@ -158,7 +193,11 @@ func (mux *Multiplexer) targetConnLoop(requestQueue <-chan *reqContainer) {

logrus.Errorf("target connection: %v", err)
// renew conn
conn = mux.createTargetConn()
err = conn.Close()
if err != nil {
logrus.Error(err)
}
conn = nil
continue
}

Expand All @@ -169,14 +208,18 @@ func (mux *Multiplexer) targetConnLoop(requestQueue <-chan *reqContainer) {
}

if logrus.IsLevelEnabled(logrus.DebugLevel) {
logrus.Debug("Message from Target Server...")
logrus.Debug("Message from target server...")
spew.Dump(msg)
}

if err != nil {
logrus.Errorf("target connection: %v", err)
// renew conn
conn = mux.createTargetConn()
err = conn.Close()
if err != nil {
logrus.Error(err)
}
conn = nil
continue
}
}
Expand Down

0 comments on commit 053534c

Please sign in to comment.