Skip to content

Commit

Permalink
tcp: parameterize metrics collector
Browse files Browse the repository at this point in the history
  • Loading branch information
MarcoPolo committed Oct 31, 2024
1 parent 362e583 commit 3418f2a
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 14 deletions.
29 changes: 19 additions & 10 deletions p2p/transport/tcp/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ var (

const collectFrequency = 10 * time.Second

var collector *aggregatingCollector
var defaultCollector *aggregatingCollector

var initMetricsOnce sync.Once

Expand All @@ -34,8 +34,8 @@ func initMetrics() {
bytesSentDesc = prometheus.NewDesc("tcp_sent_bytes", "TCP bytes sent", nil, nil)
bytesRcvdDesc = prometheus.NewDesc("tcp_rcvd_bytes", "TCP bytes received", nil, nil)

collector = newAggregatingCollector()
prometheus.MustRegister(collector)
defaultCollector = newAggregatingCollector()
prometheus.MustRegister(defaultCollector)

const direction = "direction"

Expand Down Expand Up @@ -196,14 +196,16 @@ func (c *aggregatingCollector) Collect(metrics chan<- prometheus.Metric) {

func (c *aggregatingCollector) ClosedConn(conn *tracingConn, direction string) {
c.mutex.Lock()
collector.removeConn(conn.id)
c.removeConn(conn.id)
c.mutex.Unlock()
closedConns.WithLabelValues(direction).Inc()
}

type tracingConn struct {
id uint64

collector *aggregatingCollector

startTime time.Time
isClient bool

Expand All @@ -213,7 +215,8 @@ type tracingConn struct {
closeErr error
}

func newTracingConn(c manet.Conn, isClient bool) (*tracingConn, error) {
// newTracingConn wraps a manet.Conn with a tracingConn. A nil collector will use the default collector.
func newTracingConn(c manet.Conn, collector *aggregatingCollector, isClient bool) (*tracingConn, error) {
initMetricsOnce.Do(func() { initMetrics() })
conn, err := tcp.NewConn(c)
if err != nil {
Expand All @@ -224,8 +227,12 @@ func newTracingConn(c manet.Conn, isClient bool) (*tracingConn, error) {
isClient: isClient,
Conn: c,
tcpConn: conn,
collector: collector,
}
if tc.collector == nil {
tc.collector = defaultCollector
}
tc.id = collector.AddConn(tc)
tc.id = tc.collector.AddConn(tc)
newConns.WithLabelValues(tc.getDirection()).Inc()
return tc, nil
}
Expand All @@ -239,7 +246,7 @@ func (c *tracingConn) getDirection() string {

func (c *tracingConn) Close() error {
c.closeOnce.Do(func() {
collector.ClosedConn(c, c.getDirection())
c.collector.ClosedConn(c, c.getDirection())
c.closeErr = c.Conn.Close()
})
return c.closeErr
Expand All @@ -258,16 +265,18 @@ func (c *tracingConn) getTCPInfo() (*tcpinfo.Info, error) {

type tracingListener struct {
manet.Listener
collector *aggregatingCollector
}

func newTracingListener(l manet.Listener) *tracingListener {
return &tracingListener{Listener: l}
// newTracingListener wraps a manet.Listener with a tracingListener. A nil collector will use the default collector.
func newTracingListener(l manet.Listener, collector *aggregatingCollector) *tracingListener {
return &tracingListener{Listener: l, collector: collector}
}

func (l *tracingListener) Accept() (manet.Conn, error) {
conn, err := l.Listener.Accept()
if err != nil {
return nil, err
}
return newTracingConn(conn, false)
return newTracingConn(conn, l.collector, false)
}
8 changes: 6 additions & 2 deletions p2p/transport/tcp/metrics_none.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,9 @@ package tcp

import manet "github.com/multiformats/go-multiaddr/net"

func newTracingConn(c manet.Conn, _ bool) (manet.Conn, error) { return c, nil }
func newTracingListener(l manet.Listener) manet.Listener { return l }
type aggregatingCollector struct{}

func newTracingConn(c manet.Conn, collector *aggregatingCollector, isClient bool) (manet.Conn, error) {
return c, nil
}
func newTracingListener(l manet.Listener, collector *aggregatingCollector) manet.Listener { return l }
6 changes: 4 additions & 2 deletions p2p/transport/tcp/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ type TcpTransport struct {
rcmgr network.ResourceManager

reuse reuseport.Transport

metricsCollector *aggregatingCollector
}

var _ transport.Transport = &TcpTransport{}
Expand Down Expand Up @@ -212,7 +214,7 @@ func (t *TcpTransport) dialWithScope(ctx context.Context, raddr ma.Multiaddr, p
c := conn
if t.enableMetrics {
var err error
c, err = newTracingConn(conn, true)
c, err = newTracingConn(conn, t.metricsCollector, true)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -250,7 +252,7 @@ func (t *TcpTransport) Listen(laddr ma.Multiaddr) (transport.Listener, error) {
return nil, err
}
if t.enableMetrics {
list = newTracingListener(&tcpListener{list, 0})
list = newTracingListener(&tcpListener{list, 0}, t.metricsCollector)
}
return t.upgrader.UpgradeListener(t, list), nil
}
Expand Down

0 comments on commit 3418f2a

Please sign in to comment.