-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathbench.go
332 lines (279 loc) · 9.78 KB
/
bench.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
package raft
import (
"encoding/json"
"errors"
"fmt"
"math/rand"
"strings"
"sync"
"time"
)
// NewBenchmark creates either a blast or a simple benchmark depending on the
// blast boolean flag. In blast mode, N operations is executed simultaneously
// against the cluster putting a unique key with a random value of size S.
// In simple mode, C workers executes N requests each, putting a unique key
// with a random value of size S. Note that C is ignored in blast mode.
func NewBenchmark(options *Config, addr string, blast bool, N, S, C uint) (bench Benchmark, err error) {
if blast {
bench = &BlastBenchmark{
opts: options,
operations: N, dataSize: S,
benchmark: benchmark{method: "blast"},
}
} else {
bench = &SimpleBenchmark{
benchmark: benchmark{method: "simple"},
opts: options,
operations: N, dataSize: S, concurrency: C,
}
}
if err := bench.Run(addr); err != nil {
return nil, err
}
return bench, nil
}
// Benchmark defines the interface for all benchmark runners, both for
// execution as well as the delivery of results. A single benchmark is
// executed once and stores its internal results to be saved to disk.
type Benchmark interface {
Run(addr string) error // execute the benchmark, may return an error if already run
CSV(header bool) (string, error) // returns a CSV representation of the results
JSON(indent int) ([]byte, error) // returns a JSON representation of the results
}
//===========================================================================
// benchmark
//===========================================================================
// This embedded struct implements shared functionality between many of the
// implemented benchmarks, keeping track of the throughput and the numbber of
// successful or unsuccessful requests.
type benchmark struct {
method string // the name of the benchmark type
requests uint64 // the number of successful requests
failures uint64 // the number of failed requests
started time.Time // the time the benchmark was started
duration time.Duration // the duration of the benchmark period
latencies []time.Duration // observed latencies in the number of requests
}
// Complete returns true if requests and duration is greater than 0.
func (b *benchmark) Complete() bool {
return b.requests > 0 && b.duration > 0
}
// Throughput computes the number of requests (excluding failures) by the
// total duration of the experiment, e.g. the operations per second.
func (b *benchmark) Throughput() float64 {
if b.duration == 0 {
return 0.0
}
return float64(b.requests) / b.duration.Seconds()
}
// CSV returns a results row delimited by commas as:
//
// requests,failures,duration,throughput,version,benchmark
//
// If header is specified then string contains two rows with the header first.
func (b *benchmark) CSV(header bool) (string, error) {
if !b.Complete() {
return "", errors.New("benchmark has not been run yet")
}
row := fmt.Sprintf(
"%d,%d,%s,%0.4f,%s,%s",
b.requests, b.failures, b.duration, b.Throughput(), Version(), b.method,
)
if header {
return fmt.Sprintf("requests,failures,duration,throughput,version,benchmark\n%s", row), nil
}
return row, nil
}
// JSON returns a results row as a json object, formatted with or without the
// number of spaces specified by indent. Use no indent for JSON lines format.
func (b *benchmark) JSON(indent int) ([]byte, error) {
data := b.serialize()
if indent > 0 {
indent := strings.Repeat(" ", indent)
return json.MarshalIndent(data, "", indent)
}
return json.Marshal(data)
}
// serialize converts the benchmark into a map[string]interface{} -- useful
// for dumping the benchmark as JSON and used from structs that embed benchmark
// to include more data in the results.
func (b *benchmark) serialize() map[string]interface{} {
data := make(map[string]interface{})
data["requests"] = b.requests
data["failures"] = b.failures
data["duration"] = b.duration.String()
data["throughput"] = b.Throughput()
data["version"] = Version()
data["benchmark"] = b.method
return data
}
//===========================================================================
// Blast
//===========================================================================
// BlastBenchmark implements Benchmark by sending n Put requests to the specified server
// each in its own thread. It then records the total time it takes to complete
// all n requests and uses that to compute the throughput. Additionally, each
// thread records the latency of each request, so that outlier requests can
// be removed from the blast computation.
//
// Note: this benchmark Puts a unique key and short value to the server, its
// intent is to compute pedal to the metal write throughput.
type BlastBenchmark struct {
benchmark
opts *Config
operations uint
dataSize uint
}
// Run the blast benchmark against the system by putting a unique key and
// small value to the server as fast as possible and measuring the duration.
func (b *BlastBenchmark) Run(addr string) (err error) {
// N is the number of operations being run
N := b.operations
// S is the size of the value to put to the server
S := b.dataSize
// Initialize the blast latencies and results (resetting if rerun)
b.requests = 0
b.failures = 0
b.latencies = make([]time.Duration, N)
results := make([]bool, N)
// Initialize the keys and values so that it's not part of throughput.
keys := make([]string, N)
vals := make([][]byte, N)
for i := uint(0); i < N; i++ {
keys[i] = fmt.Sprintf("%X", i)
vals[i] = make([]byte, S)
rand.Read(vals[i])
}
// Create the wait group for all threads
group := new(sync.WaitGroup)
group.Add(int(N))
// Initialize a single client for all operations and connect.
var clients *Client
if clients, err = NewClient(addr, b.opts); err != nil {
return fmt.Errorf("could not create client: %s", err)
}
// Execute the blast operation against the server
b.started = time.Now()
for i := uint(0); i < N; i++ {
go func(k uint) {
// Make Put request and if there is no error, store true!
start := time.Now()
if _, err := clients.Commit(keys[k], vals[k]); err == nil {
results[k] = true
}
// Record the latency of the result, success or failure
b.latencies[k] = time.Since(start)
group.Done()
}(i)
}
group.Wait()
b.duration = time.Since(b.started)
// Compute successes and failures
for _, r := range results {
if r {
b.requests++
} else {
b.failures++
}
}
return nil
}
//===========================================================================
// Simple
//===========================================================================
// SimpleBenchmark implements benchmark by having concurrent workers continuously
// sending requests at the server for a fixed number of requests.
type SimpleBenchmark struct {
benchmark
opts *Config
operations uint
dataSize uint
concurrency uint
}
// Run the simple benchmark against the system such that each client puts a
// unique key and small value to the server as quickly as possible.
func (b *SimpleBenchmark) Run(addr string) (err error) {
n := b.operations // number of operations per client
C := b.concurrency // total number of clients
N := n * C // total number of operations
S := b.dataSize // size of the value to put to the server
// Initialize benchmark latencies and results (resetting if necessary)
b.requests, b.failures = 0, 0
b.latencies = make([]time.Duration, N)
results := make([]bool, N)
// Create the wait group for all threads
group := new(sync.WaitGroup)
group.Add(int(C))
// Initialize the clients per routine and connect to server. Note this
// should be done as late as possible to ensure connections are open for
// as little time as possible.
clients := make([]*Client, C)
for i := uint(0); i < C; i++ {
if clients[i], err = NewClient(addr, b.opts); err != nil {
return fmt.Errorf("could not create client %d: %s", i, err)
}
}
// Execute the concurrent workers against the system
b.started = time.Now()
for i := uint(0); i < C; i++ {
go func(k uint) {
// Execute n requests against the server
for j := uint(0); j < n; j++ {
// Compute storage index
idx := k*C + j
// Compute unique key and value based on client and request
key := fmt.Sprintf("%04X-%04X", k, j)
val := make([]byte, S)
rand.Read(val)
// Make Put request, and if no error, store true.
start := time.Now()
if _, err := clients[k].Commit(key, val); err == nil {
results[idx] = true
}
// Record the latency of the result
b.latencies[idx] = time.Since(start)
}
// Signal the main thread that we're done
group.Done()
}(i)
}
// Wait until benchmark is complete
group.Wait()
b.duration = time.Since(b.started)
// Compute successes and failures
for _, r := range results {
if r {
b.requests++
} else {
b.failures++
}
}
return nil
}
// CSV returns a results row delimited by commas as:
//
// concurrency,requests,failures,duration,throughput,version,benchmark
func (b *SimpleBenchmark) CSV(header bool) (csv string, err error) {
if csv, err = b.benchmark.CSV(header); err != nil {
return "", err
}
if header {
parts := strings.Split(csv, "\n")
if len(parts) != 2 {
return "", errors.New("could not parse benchmark header")
}
return fmt.Sprintf("concurrency,%s\n%d,%s", parts[0], b.concurrency, parts[1]), nil
}
return fmt.Sprintf("%d,%s", b.concurrency, csv), nil
}
// JSON returns a results row as a json object, formatted with or without the
// number of spaces specified by indent. Use no indent for JSON lines format.
func (b *SimpleBenchmark) JSON(indent int) ([]byte, error) {
data := b.benchmark.serialize()
data["concurrency"] = b.concurrency
if indent > 0 {
indent := strings.Repeat(" ", indent)
return json.MarshalIndent(data, "", indent)
}
return json.Marshal(data)
}