Skip to content

Commit

Permalink
net: add multi listener impl for net.Listener
Browse files Browse the repository at this point in the history
This adds an implementation of net.Listener which listens
on and accepts connections from multiple addresses.

Signed-off-by: Daman Arora <aroradaman@gmail.com>
  • Loading branch information
aroradaman committed Jun 14, 2024
1 parent fe8a2dd commit 716fbb7
Show file tree
Hide file tree
Showing 2 changed files with 443 additions and 0 deletions.
161 changes: 161 additions & 0 deletions net/multi_listen.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package net

import (
"context"
"fmt"
"net"
"sync"
"sync/atomic"
)

// connAndErrPair packs results of accept call on actual listeners together along with the
// index of the listener. It is used for communication between main and listener goroutines.
type connAndErrPair struct {
index int
conn net.Conn
err error
}

// multiListener implements net.Listener
type multiListener struct {
latestAcceptedIndex atomic.Int32
addrs []net.Addr
listeners []net.Listener

wg sync.WaitGroup
connAndErrCh chan connAndErrPair
stopCh chan struct{}
}

// compile time check to ensure *multiListener implements net.Listener.
var _ net.Listener = &multiListener{}

// MultiListen returns net.Listener which can listen for and accept
// TCP connections on multiple addresses.
func MultiListen(ctx context.Context, addresses []string) (net.Listener, error) {
return multiListen(
ctx,
addresses,
net.ResolveTCPAddr,
func(ctx context.Context, network, address string) (net.Listener, error) {
var lc net.ListenConfig
return lc.Listen(ctx, network, address)
})
}

func multiListen(
ctx context.Context,
addresses []string,
resolveTCPAddrFunc func(network, address string) (*net.TCPAddr, error),
listenFunc func(ctx context.Context, network, address string) (net.Listener, error),
) (net.Listener, error) {
ml := &multiListener{
connAndErrCh: make(chan connAndErrPair),
stopCh: make(chan struct{}),
}

for _, address := range addresses {
addr, err := resolveTCPAddrFunc("tcp", address)
if err != nil {
// close all listeners
_ = ml.Close()
return nil, err
}

var network string
host, _, err := net.SplitHostPort(addr.String())
if err != nil {
// close all listeners
_ = ml.Close()
return nil, err
}
switch IPFamilyOf(ParseIPSloppy(host)) {
case IPv4:
network = "tcp4"
case IPv6:
network = "tcp6"
default:
// close all listeners
_ = ml.Close()
return nil, fmt.Errorf("failed to identify ip family of address '%s", addr.String())
}

l, err := listenFunc(ctx, network, addr.String())
if err != nil {
// close all listeners
_ = ml.Close()
return nil, err
}

ml.addrs = append(ml.addrs, addr)
ml.listeners = append(ml.listeners, l)
}

for i := range ml.listeners {
index := i
ml.wg.Add(1)
// spawn a go routine for every listener to wait for incoming connection requests
go func() {
defer ml.wg.Done()
for {
conn, err := ml.listeners[index].Accept()
if err != nil {
select {
// Accept() will throw "use of closed network connection" when listener is closed,
// we can ignore that error and break out of this goroutine.
case <-ml.stopCh:
return
default:
ml.connAndErrCh <- connAndErrPair{conn: conn, err: err, index: index}
}
}
ml.connAndErrCh <- connAndErrPair{conn: conn, err: err, index: index}
}
}()
}
return ml, nil
}

// Accept is part of net.Listener interface.
func (ml *multiListener) Accept() (net.Conn, error) {
connAndErr, ok := <-ml.connAndErrCh
if !ok {
return nil, fmt.Errorf("use of closed network connection")
}
// update latestAcceptedIndex with index of the listener which accepted the connection
ml.latestAcceptedIndex.Store(int32(connAndErr.index))
return connAndErr.conn, connAndErr.err

}

// Close is part of net.Listener interface.
func (ml *multiListener) Close() error {
close(ml.stopCh)
close(ml.connAndErrCh)
for i := range ml.listeners {
_ = ml.listeners[i].Close()
}
ml.wg.Wait()
return nil
}

// Addr is part of net.Listener interface.
func (ml *multiListener) Addr() net.Addr {
return ml.addrs[ml.latestAcceptedIndex.Load()]
}
Loading

0 comments on commit 716fbb7

Please sign in to comment.