Skip to content

Commit

Permalink
feat: allow gathering the stats per location
Browse files Browse the repository at this point in the history
  • Loading branch information
0x416e746f6e committed Jun 5, 2024
1 parent e73d237 commit b2126ae
Show file tree
Hide file tree
Showing 8 changed files with 112 additions and 28 deletions.
10 changes: 10 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,16 @@ build:
-o ./bin/latency-monitor \
github.com/flashbots/latency-monitor/cmd

.PHONY: run
run:
@go run github.com/flashbots/latency-monitor/cmd

.PHONY: local-test
local-test:
@go run github.com/flashbots/latency-monitor/cmd serve \
--transponder-peer localhost=127.0.0.1:32123 \
--transponder-interval 1s

.PHONY: snapshot
snapshot:
@goreleaser release --snapshot --clean
30 changes: 24 additions & 6 deletions cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/flashbots/latency-monitor/config"
"github.com/flashbots/latency-monitor/server"
"github.com/flashbots/latency-monitor/types"
"github.com/google/uuid"
"github.com/urfave/cli/v2"
)

Expand All @@ -31,6 +32,15 @@ func CommandServe(cfg *config.Config) *cli.Command {
Usage: "extra metrics labels in the format `label=value`",
},

&cli.IntFlag{
Category: categoryMetrics,
Destination: &cfg.Metrics.LatencyBucketsCount,
EnvVars: []string{envPrefix + "METRICS_LATENCY_BUCKETS_COUNT"},
Name: "metrics-latency-buckets-count",
Usage: "`count` of latency histogram buckets",
Value: 33,
},

&cli.StringFlag{
Category: categoryMetrics,
Destination: &cfg.Metrics.ListenAddress,
Expand All @@ -40,13 +50,13 @@ func CommandServe(cfg *config.Config) *cli.Command {
Value: "0.0.0.0:8080",
},

&cli.IntFlag{
&cli.StringFlag{
Category: categoryMetrics,
Destination: &cfg.Metrics.LatencyBucketsCount,
EnvVars: []string{envPrefix + "METRICS_LATENCY_BUCKETS_COUNT"},
Name: "metrics-latency-buckets-count",
Usage: "`count` of latency histogram buckets",
Value: 33,
Destination: &cfg.Metrics.Location,
EnvVars: []string{envPrefix + "METRICS_LOCATION"},
Name: "metrics-location",
Usage: fmt.Sprintf("`location` to be reported as 'from' and 'to' labels (max %d bytes)", types.LocationSize()),
Value: uuid.Must(uuid.NewRandom()).String(),
},

&cli.IntFlag{
Expand Down Expand Up @@ -110,6 +120,14 @@ func CommandServe(cfg *config.Config) *cli.Command {
Flags: flags,

Before: func(ctx *cli.Context) error {
// location
loc := []byte(cfg.Metrics.Location)
if len(loc) > types.LocationSize() {
return fmt.Errorf("byte representation of location must not exceed %d bytes: %s",
len(loc), cfg.Metrics.Location,
)
}

// metrics labels
l := metricsLabels.Value()
labels := make(map[string]string, len(l))
Expand Down
3 changes: 2 additions & 1 deletion config/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ package config
type Metrics struct {
ListenAddress string `yaml:"metrics_listen_address"`

Labels map[string]string
Labels map[string]string
Location string

LatencyBucketsCount int `yaml:"metrics_latency_buckets_count"`
MaxLatencyUs int `yaml:"metrics_max_latency_us"`
Expand Down
12 changes: 9 additions & 3 deletions server/probes.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@ func (s *Server) sendProbes(ctx context.Context, t *transponder.Transponder) {
}

p := types.Probe{
Sequence: peer.Sequence(),
SrcUUID: s.uuid,
DstUUID: peerUUID,
Sequence: peer.Sequence(),
SrcUUID: s.uuid,
SrcLocation: s.location,
DstUUID: peerUUID,
}
p.SrcTimestamp = time.Now()

Expand Down Expand Up @@ -97,6 +98,7 @@ func (s *Server) receiveProbes(ctx context.Context) transponder.Receive {
switch {
case p.DstTimestamp.IsZero(): // reply to the others' probes
p.DstTimestamp = ts
p.DstLocation = s.location
output, err := p.MarshalBinary()
if err != nil {
metrics.CounterFailedProbeRespond.Add(ctx, 1, s.labels, otelapi.WithAttributes(
Expand Down Expand Up @@ -138,11 +140,15 @@ func (s *Server) receiveProbes(ctx context.Context) transponder.Receive {
forwardLatency := float64(p.DstTimestamp.Sub(p.SrcTimestamp).Microseconds())
metrics.HistogramLatencyForwardTrip.Record(ctx, forwardLatency, s.labels, otelapi.WithAttributes(
otelattr.String("peer", peer.Name()),
otelattr.String("from", p.SrcLocation.String()),
otelattr.String("to", p.DstLocation.String()),
))

returnLatency := float64(ts.Sub(p.DstTimestamp).Microseconds())
metrics.HistogramLatencyReturnTrip.Record(ctx, returnLatency, s.labels, otelapi.WithAttributes(
otelattr.String("peer", peer.Name()),
otelattr.String("to", p.SrcLocation.String()),
otelattr.String("from", p.DstLocation.String()),
))

metrics.CountProbeReturned.Add(ctx, 1, s.labels, otelapi.WithAttributes(
Expand Down
18 changes: 12 additions & 6 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ type Server struct {
uuid uuid.UUID
peers map[uuid.UUID]*types.Peer

labels otelapi.MeasurementOption
labels otelapi.MeasurementOption
location types.Location
}

func New(cfg *config.Config) (*Server, error) {
Expand All @@ -45,6 +46,9 @@ func New(cfg *config.Config) (*Server, error) {
labels = append(labels, otelattr.String(k, v))
}

location := types.Location{}
copy(location[:], []byte(cfg.Metrics.Location))

peers := make(map[uuid.UUID]*types.Peer, len(cfg.Transponder.Peers))
for _, peer := range cfg.Transponder.Peers {
peerUUID := srvUUID
Expand All @@ -59,12 +63,14 @@ func New(cfg *config.Config) (*Server, error) {
}

return &Server{
cfg: cfg,
log: l,
uuid: srvUUID,
cfg: cfg,
log: l,

uuid: srvUUID,
peers: peers,

labels: otelapi.WithAttributeSet(otelattr.NewSet(labels...)),
peers: peers,
labels: otelapi.WithAttributeSet(otelattr.NewSet(labels...)),
location: location,
}, nil
}

Expand Down
18 changes: 18 additions & 0 deletions types/location.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package types

type Location [36]byte

func LocationSize() int {
return 36
}

func (l Location) String() string {
n := 0
for n < len(l) {
if l[n] == 0 {
break
}
n += 1
}
return string(l[:n])
}
36 changes: 24 additions & 12 deletions types/probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,21 @@ type Probe struct {
Sequence uint64
SrcUUID uuid.UUID
SrcTimestamp time.Time
SrcLocation Location
DstUUID uuid.UUID
DstTimestamp time.Time
DstLocation Location
}

func ProbeSize() int {
return 142
}

var (
ErrProbeFailedToEncodeBinaryRepresentation = errors.New("failed to encode probe into its binary representation")
ErrProbeFailedToDecodeBinaryRepresentation = errors.New("failed to decode probe from its binary representation")
)

func ProbeSize() int {
return 72
}

func (p Probe) MarshalBinary() ([]byte, error) {
rawSrcTimestamp, err := p.SrcTimestamp.MarshalBinary()
if err != nil {
Expand All @@ -40,13 +42,15 @@ func (p Probe) MarshalBinary() ([]byte, error) {
)
}

data := make([]byte, 72)
data := make([]byte, ProbeSize())

binary.LittleEndian.PutUint64(data[0:8], p.Sequence) // 00..07 : 8 bytes
copy(data[8:24], p.SrcUUID[:]) // 08..23 : 16 bytes
copy(data[24:39], rawSrcTimestamp) // 24..38 : 15 bytes
copy(data[39:55], p.DstUUID[:]) // 39..54 : 16 bytes
copy(data[55:70], rawDstTimestamp) // 55..71 : 15 bytes
binary.LittleEndian.PutUint64(data[0:8], p.Sequence) // 000..007 : 8 bytes
copy(data[8:24], p.SrcUUID[:]) // 008..023 : 16 bytes
copy(data[24:39], rawSrcTimestamp) // 024..038 : 15 bytes
copy(data[39:75], p.SrcLocation[:]) // 039..074 : 36 bytes
copy(data[75:91], p.DstUUID[:]) // 075..090 : 16 bytes
copy(data[91:106], rawDstTimestamp) // 091..105 : 15 bytes
copy(data[106:142], p.DstLocation[:]) // 106..142 : 36 bytes

return data, nil
}
Expand All @@ -72,26 +76,34 @@ func (p *Probe) UnmarshalBinary(data []byte) error {
)
}

dstUUID, err := uuid.FromBytes(data[39:55])
srcLocation := Location{}
copy(srcLocation[:], data[39:75])

dstUUID, err := uuid.FromBytes(data[75:91])
if err != nil {
return fmt.Errorf("%w: DstUUID: %w",
ErrProbeFailedToDecodeBinaryRepresentation, err,
)
}

dstTimestamp := &time.Time{}
if err := dstTimestamp.UnmarshalBinary(data[55:70]); err != nil {
if err := dstTimestamp.UnmarshalBinary(data[91:106]); err != nil {
return fmt.Errorf("%w: DstTimestamp: %w",
ErrProbeFailedToDecodeBinaryRepresentation, err,
)
}

dstLocation := Location{}
copy(dstLocation[:], data[106:142])

*p = Probe{
Sequence: binary.LittleEndian.Uint64(data[:8]),
SrcUUID: srcUUID,
SrcTimestamp: *srcTimestamp,
SrcLocation: srcLocation,
DstUUID: dstUUID,
DstTimestamp: *dstTimestamp,
DstLocation: dstLocation,
}

return nil
Expand Down
13 changes: 13 additions & 0 deletions types/probe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,20 @@ import (
)

func TestProbeEncodeDecode(t *testing.T) {
srcLocation := types.Location{}
dstLocation := types.Location{}

copy(srcLocation[:], []byte("sourceLocation"))
copy(dstLocation[:], []byte("destinationLocation"))

pOrg := types.Probe{
Sequence: 42,
SrcUUID: uuid.New(),
SrcTimestamp: time.Now(),
SrcLocation: types.Location(srcLocation),
DstUUID: uuid.New(),
DstTimestamp: time.Now(),
DstLocation: types.Location(dstLocation),
}

b, err := pOrg.MarshalBinary()
Expand All @@ -28,6 +36,11 @@ func TestProbeEncodeDecode(t *testing.T) {
require.Equal(t, pOrg.Sequence, pRes.Sequence)
require.Equal(t, pOrg.SrcUUID, pRes.SrcUUID)
require.Equal(t, pOrg.SrcTimestamp.UnixNano(), pRes.SrcTimestamp.UnixNano()) // otherwise, monotonic clock will drift
require.Equal(t, pOrg.SrcLocation, pRes.SrcLocation)
require.Equal(t, pOrg.DstUUID, pRes.DstUUID)
require.Equal(t, pOrg.DstTimestamp.UnixNano(), pRes.DstTimestamp.UnixNano()) // otherwise, monotonic clock will drift
require.Equal(t, pOrg.DstLocation, pRes.DstLocation)

t.Logf("Src: %s", pRes.SrcLocation.String())
t.Logf("Dst: %s", pRes.DstLocation.String())
}

0 comments on commit b2126ae

Please sign in to comment.