forked from stripe/veneur
-
Notifications
You must be signed in to change notification settings - Fork 0
/
forward_test.go
185 lines (167 loc) · 5.22 KB
/
forward_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
package veneur
import (
"context"
"net/http"
"net/http/httptest"
"testing"
"time"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stripe/veneur/samplers"
"github.com/stripe/veneur/sinks"
"github.com/stripe/veneur/ssf"
)
type forwardFixture struct {
t testing.TB
proxy *Proxy
global *Server
globalTS *httptest.Server
proxyTS *httptest.Server
server *Server
}
// newForwardingFixture constructs and returns a chain of veneur
// servers that represent a fairly typical metrics pipeline:
// [local veneur] -> [veneur proxy] -> [global veneur]
//
// The globalSink argument is the global veneur's metric sink, and the
// localConfig argument is the local veneur agent's config, which will
// be amended to include the forwarder and proxy addresses.
func newForwardingFixture(t testing.TB, localConfig Config, transport http.RoundTripper, globalSink sinks.MetricSink) *forwardFixture {
ff := &forwardFixture{t: t}
// Make the global veneur:
ff.global = setupVeneurServer(t, globalConfig(), transport, globalSink, nil, nil)
ff.globalTS = httptest.NewServer(handleImport(ff.global))
// Make the proxy that sends to the global veneur:
proxyCfg := generateProxyConfig()
proxyCfg.ForwardAddress = ff.globalTS.URL
proxyCfg.ConsulTraceServiceName = ""
proxyCfg.ConsulForwardServiceName = ""
proxy, err := NewProxyFromConfig(logrus.New(), proxyCfg)
require.NoError(t, err)
ff.proxy = &proxy
ff.proxy.Start()
ff.proxyTS = httptest.NewServer(ff.proxy.Handler())
// Now make the local server, have it forward to the proxy:
localConfig.ForwardAddress = ff.proxyTS.URL
ff.server = setupVeneurServer(t, localConfig, transport, nil, nil, nil)
return ff
}
// Close shuts down the chain of veneur servers.
func (ff *forwardFixture) Close() {
ff.proxy.Shutdown()
ff.proxyTS.Close()
ff.global.Shutdown()
ff.globalTS.Close()
ff.server.Shutdown()
}
// Flush synchronously waits until all metrics have been flushed along
// the chain of veneur servers.
func (ff *forwardFixture) Flush(ctx context.Context) {
ff.server.Flush(ctx)
// The proxy proxies synchronously, so when the local server
// returns, we assume it has proxied everything to the global
// veneur.
ff.global.Flush(ctx)
}
// IngestSpan synchronously writes a span to the forwarding fixture's
// local veneur's span worker. The fixture must be flushed via the
// (*forwardFixture).Flush method so the ingestion effects can be
// observed.
func (ff *forwardFixture) IngestSpan(span *ssf.SSFSpan) {
ff.server.SpanChan <- span
}
// IngestMetric synchronously writes a metric to the forwarding
// fixture's local veneur span worker. The fixture must be flushed via
// the (*forwardFixture).Flush method so the ingestion effects can be
// observed.
func (ff *forwardFixture) IngestMetric(m *samplers.UDPMetric) {
ff.server.Workers[0].ProcessMetric(m)
}
// TestForwardingIndicatorMetrics ensures that the metrics extracted
// from indicator spans make it across the entire chain, from a local
// veneur, through a proxy, to a global veneur & get reported
// on the global veneur.
func TestE2EForwardingIndicatorMetrics(t *testing.T) {
t.Parallel()
ch := make(chan []samplers.InterMetric)
sink, _ := NewChannelMetricSink(ch)
cfg := localConfig()
cfg.IndicatorSpanTimerName = "indicator.span.timer"
ffx := newForwardingFixture(t, cfg, nil, sink)
defer ffx.Close()
start := time.Now()
end := start.Add(5 * time.Second)
span := &ssf.SSFSpan{
Id: 5,
TraceId: 5,
Name: "foo",
Service: "indicator_testing",
StartTimestamp: start.UnixNano(),
EndTimestamp: end.UnixNano(),
Indicator: true,
}
ffx.IngestSpan(span)
done := make(chan struct{})
go func() {
metrics := <-ch
for _, suffix := range []string{".50percentile", ".75percentile", ".99percentile"} {
mName := "indicator.span.timer" + suffix
found := false
for _, m := range metrics {
if m.Name == mName {
found = true
}
}
assert.True(t, found, "Metric named %s missing", mName)
}
close(done)
}()
ffx.Flush(context.TODO())
select {
case <-done:
case <-time.After(3 * time.Second):
t.Fatal("Timed out waiting for a metric after 5 seconds")
}
}
func TestE2EForwardMetric(t *testing.T) {
t.Parallel()
ch := make(chan []samplers.InterMetric)
sink, _ := NewChannelMetricSink(ch)
cfg := localConfig()
cfg.IndicatorSpanTimerName = "indicator.span.timer"
ffx := newForwardingFixture(t, cfg, nil, sink)
defer ffx.Close()
ffx.IngestMetric(&samplers.UDPMetric{
MetricKey: samplers.MetricKey{
Name: "a.b.c",
Type: "histogram",
},
Value: 20.0,
Digest: 12345,
SampleRate: 1.0,
Scope: samplers.MixedScope,
})
done := make(chan struct{})
go func() {
metrics := <-ch
require.Equal(t, 3, len(metrics), "metrics:\n%#v", metrics)
for _, suffix := range []string{".50percentile", ".75percentile", ".99percentile"} {
mName := "a.b.c" + suffix
found := false
for _, m := range metrics {
if m.Name == mName {
found = true
}
}
assert.True(t, found, "Metric named %s missing", mName)
}
close(done)
}()
ffx.Flush(context.TODO())
select {
case <-done:
case <-time.After(3 * time.Second):
t.Fatal("Timed out waiting for a metric after 5 seconds")
}
}