From f025a899aade7f931eb4721331446f5f82d61ee1 Mon Sep 17 00:00:00 2001 From: Dmitry Kropachev Date: Mon, 28 Feb 2022 09:32:01 +0300 Subject: [PATCH] scylla-bench: non-blocking logging Fix following things: 1. Get rid of 2 histograms per thread to reduce memory usage 2. Make latency printing non-blocking 3. Reorganize code 4. Get rid of histogram merging fixes #80, #49, #73, #69 --- main.go | 37 ++- modes.go | 118 ++----- pkg/config/config.go | 136 +++++++++ pkg/rate_limiter/rate_limiter.go | 43 +++ pkg/results/merged_result.go | 148 --------- pkg/results/result.go | 306 +++++++++++++------ pkg/results/thread_result.go | 106 ------- pkg/results/thread_results.go | 136 --------- pkg/stack/stack.go | 84 +++++ pkg/test_run/test_run.go | 157 ++++++++++ pkg/{results/auxiliary.go => tools/tools.go} | 2 +- pkg/worker/worker.go | 100 ++++++ pkg/workloads/workloads.go | 4 + 13 files changed, 777 insertions(+), 600 deletions(-) create mode 100644 pkg/config/config.go create mode 100644 pkg/rate_limiter/rate_limiter.go delete mode 100644 pkg/results/merged_result.go delete mode 100644 pkg/results/thread_result.go delete mode 100644 pkg/results/thread_results.go create mode 100644 pkg/stack/stack.go create mode 100644 pkg/test_run/test_run.go rename pkg/{results/auxiliary.go => tools/tools.go} (98%) create mode 100644 pkg/worker/worker.go diff --git a/main.go b/main.go index 58c5020..0ead577 100644 --- a/main.go +++ b/main.go @@ -4,6 +4,10 @@ import ( "crypto/tls" "flag" "fmt" + "github.com/scylladb/scylla-bench/pkg/config" + "github.com/scylladb/scylla-bench/pkg/rate_limiter" + "github.com/scylladb/scylla-bench/pkg/test_run" + "github.com/scylladb/scylla-bench/pkg/worker" "log" "os" "os/signal" @@ -164,7 +168,7 @@ func GetWorkload(name string, threadId int, partitionOffset int64, mode string, panic("unreachable") } -func GetMode(name string) func(session *gocql.Session, testResult *results.TestThreadResult, workload WorkloadGenerator, rateLimiter RateLimiter) { +func GetMode(name string) func(session *gocql.Session, testResult *worker.Worker, workload WorkloadGenerator, rateLimiter rate_limiter.RateLimiter) { switch name { case "write": if rowsPerRequest == 1 { @@ -363,7 +367,7 @@ func main() { errorToTimeoutCutoffTime = time.Second } - if err := results.ValidateGlobalLatencyType(latencyType); err != nil { + if err := config.ValidateGlobalLatencyType(latencyType); err != nil { log.Fatal(errors.Wrap(err, "Bad value for latency-type")) } @@ -521,15 +525,20 @@ func main() { } setResultsConfiguration() - fmt.Println("Hdr memory consumption:\t", results.GetHdrMemoryConsumption(concurrency), "bytes") + if config.GetGlobalMeasureLatency() { + fmt.Println("Hdr memory consumption:\t", results.GetHdrMemoryConsumption(), "bytes") + } - testResult := RunConcurrently(maximumRate, func(i int, testResult *results.TestThreadResult, rateLimiter RateLimiter) { + totalResults := test_run.NewTestRun(concurrency, maximumRate) + totalResults.SetStartTime() + totalResults.PrintResultsHeader() + totalResults.RunTest(func(i int, testResult *worker.Worker, rateLimiter rate_limiter.RateLimiter) { GetMode(mode)(session, testResult, GetWorkload(workload, i, partitionOffset, mode, writeRate, distribution), rateLimiter) }) - - testResult.GetTotalResults() - testResult.PrintTotalResults() - os.Exit(testResult.GetFinalStatus()) + totalResults.StartPrintingPartialResult() + totalResults.GetTotalResults() + totalResults.PrintTotalResults() + os.Exit(totalResults.GetFinalStatus()) } func newHostSelectionPolicy(policy string, hosts []string) (gocql.HostSelectionPolicy, error) { @@ -546,14 +555,14 @@ func newHostSelectionPolicy(policy string, hosts []string) (gocql.HostSelectionP } func setResultsConfiguration() { - results.SetGlobalMeasureLatency(measureLatency) - results.SetGlobalHdrLatencyFile(hdrLatencyFile) - results.SetGlobalHdrLatencyUnits(hdrLatencyUnits) - results.SetGlobalHistogramConfiguration( + config.SetGlobalMeasureLatency(measureLatency) + config.SetGlobalHdrLatencyFile(hdrLatencyFile) + config.SetGlobalHdrLatencyUnits(hdrLatencyUnits) + config.SetGlobalHistogramConfiguration( time.Microsecond.Nanoseconds()*50, (timeout * 3).Nanoseconds(), hdrLatencySigFig, ) - results.SetGlobalConcurrency(concurrency) - results.SetGlobalLatencyTypeFromString(latencyType) + config.SetGlobalConcurrency(concurrency) + config.SetGlobalLatencyTypeFromString(latencyType) } diff --git a/modes.go b/modes.go index eb430e2..cc1a557 100644 --- a/modes.go +++ b/modes.go @@ -5,6 +5,7 @@ import ( "crypto/sha256" "encoding/binary" "fmt" + "github.com/scylladb/scylla-bench/pkg/rate_limiter" "log" "math/rand" "strings" @@ -13,75 +14,10 @@ import ( "github.com/gocql/gocql" "github.com/pkg/errors" - "github.com/scylladb/scylla-bench/pkg/results" + "github.com/scylladb/scylla-bench/pkg/worker" . "github.com/scylladb/scylla-bench/pkg/workloads" ) -type RateLimiter interface { - Wait() - Expected() time.Time -} - -type UnlimitedRateLimiter struct{} - -func (*UnlimitedRateLimiter) Wait() {} - -func (*UnlimitedRateLimiter) Expected() time.Time { - return time.Time{} -} - -type MaximumRateLimiter struct { - Period time.Duration - StartTime time.Time - CompletedOperations int64 -} - -func (mxrl *MaximumRateLimiter) Wait() { - mxrl.CompletedOperations++ - nextRequest := mxrl.StartTime.Add(mxrl.Period * time.Duration(mxrl.CompletedOperations)) - now := time.Now() - if now.Before(nextRequest) { - time.Sleep(nextRequest.Sub(now)) - } -} - -func (mxrl *MaximumRateLimiter) Expected() time.Time { - return mxrl.StartTime.Add(mxrl.Period * time.Duration(mxrl.CompletedOperations)) -} - -func NewRateLimiter(maximumRate int, timeOffset time.Duration) RateLimiter { - if maximumRate == 0 { - return &UnlimitedRateLimiter{} - } - period := time.Duration(int64(time.Second) / int64(maximumRate)) - return &MaximumRateLimiter{period, time.Now(), 0} -} - -func RunConcurrently(maximumRate int, workload func(id int, testResult *results.TestThreadResult, rateLimiter RateLimiter)) *results.TestResults { - var timeOffsetUnit int64 - if maximumRate != 0 { - timeOffsetUnit = int64(time.Second) / int64(maximumRate) - maximumRate /= concurrency - } else { - timeOffsetUnit = 0 - } - - totalResults := results.TestResults{} - totalResults.Init(concurrency) - totalResults.SetStartTime() - totalResults.PrintResultsHeader() - - for i := 0; i < concurrency; i++ { - testResult := totalResults.GetTestResult(i) - go func(i int) { - timeOffset := time.Duration(timeOffsetUnit * int64(i)) - workload(i, testResult, NewRateLimiter(maximumRate, timeOffset)) - }(i) - } - - return &totalResults -} - type TestIterator struct { iteration uint workload WorkloadGenerator @@ -109,10 +45,8 @@ func (ti *TestIterator) IsDone() bool { } } -func RunTest(threadResult *results.TestThreadResult, workload WorkloadGenerator, rateLimiter RateLimiter, test func(rb *results.TestThreadResult) (error, time.Duration)) { - - start := time.Now() - partialStart := start +func RunTest(threadResult *worker.Worker, workload WorkloadGenerator, rateLimiter rate_limiter.RateLimiter, test func(rb *worker.Worker) (error, time.Duration)) { + defer threadResult.StopReporting() iter := NewTestIterator(workload) errorsAtRow := 0 for !iter.IsDone() { @@ -140,26 +74,14 @@ func RunTest(threadResult *results.TestThreadResult, workload WorkloadGenerator, threadResult.RecordCoFixedLatency(endTime.Sub(expectedStartTime)) } - now := time.Now() if maxErrorsAtRow > 0 && errorsAtRow >= maxErrorsAtRow { threadResult.SubmitCriticalError(errors.New(fmt.Sprintf("Error limit (maxErrorsAtRow) of %d errors is reached", errorsAtRow))) - } - if results.GlobalErrorFlag { - threadResult.ResultChannel <- *threadResult.PartialResult - threadResult.ResetPartialResult() break } - if now.Sub(partialStart) > time.Second { - threadResult.ResultChannel <- *threadResult.PartialResult - threadResult.ResetPartialResult() - partialStart = now + if worker.GlobalErrorFlag { + break } } - end := time.Now() - - threadResult.FullResult.ElapsedTime = end.Sub(start) - threadResult.ResultChannel <- *threadResult.FullResult - threadResult.StopReporting() } const ( @@ -298,10 +220,10 @@ func ValidateData(pk int64, ck int64, data []byte) error { return nil } -func DoWrites(session *gocql.Session, threadResult *results.TestThreadResult, workload WorkloadGenerator, rateLimiter RateLimiter) { +func DoWrites(session *gocql.Session, threadResult *worker.Worker, workload WorkloadGenerator, rateLimiter rate_limiter.RateLimiter) { query := session.Query("INSERT INTO " + keyspaceName + "." + tableName + " (pk, ck, v) VALUES (?, ?, ?)") - RunTest(threadResult, workload, rateLimiter, func(rb *results.TestThreadResult) (error, time.Duration) { + RunTest(threadResult, workload, rateLimiter, func(rb *worker.Worker) (error, time.Duration) { pk := workload.NextPartitionKey() ck := workload.NextClusteringKey() value := GenerateData(pk, ck, clusteringRowSizeDist.Generate()) @@ -322,15 +244,15 @@ func DoWrites(session *gocql.Session, threadResult *results.TestThreadResult, wo }) } -func DoBatchedWrites(session *gocql.Session, threadResult *results.TestThreadResult, workload WorkloadGenerator, rateLimiter RateLimiter) { +func DoBatchedWrites(session *gocql.Session, threadResult *worker.Worker, workload WorkloadGenerator, rateLimiter rate_limiter.RateLimiter) { request := fmt.Sprintf("INSERT INTO %s.%s (pk, ck, v) VALUES (?, ?, ?)", keyspaceName, tableName) - RunTest(threadResult, workload, rateLimiter, func(rb *results.TestThreadResult) (error, time.Duration) { + RunTest(threadResult, workload, rateLimiter, func(rb *worker.Worker) (error, time.Duration) { batch := session.NewBatch(gocql.UnloggedBatch) - batchSize := 0 + batchSize := uint64(0) currentPk := workload.NextPartitionKey() - for !workload.IsPartitionDone() && atomic.LoadUint32(&stopAll) == 0 && batchSize < rowsPerRequest { + for !workload.IsPartitionDone() && atomic.LoadUint32(&stopAll) == 0 && batchSize < uint64(rowsPerRequest) { ck := workload.NextClusteringKey() batchSize++ value := GenerateData(currentPk, ck, clusteringRowSizeDist.Generate()) @@ -352,11 +274,11 @@ func DoBatchedWrites(session *gocql.Session, threadResult *results.TestThreadRes }) } -func DoCounterUpdates(session *gocql.Session, threadResult *results.TestThreadResult, workload WorkloadGenerator, rateLimiter RateLimiter) { +func DoCounterUpdates(session *gocql.Session, threadResult *worker.Worker, workload WorkloadGenerator, rateLimiter rate_limiter.RateLimiter) { query := session.Query("UPDATE " + keyspaceName + "." + counterTableName + " SET c1 = c1 + ?, c2 = c2 + ?, c3 = c3 + ?, c4 = c4 + ?, c5 = c5 + ? WHERE pk = ? AND ck = ?") - RunTest(threadResult, workload, rateLimiter, func(rb *results.TestThreadResult) (error, time.Duration) { + RunTest(threadResult, workload, rateLimiter, func(rb *worker.Worker) (error, time.Duration) { pk := workload.NextPartitionKey() ck := workload.NextClusteringKey() bound := query.Bind(ck, ck+1, ck+2, ck+3, ck+4, pk, ck) @@ -376,11 +298,11 @@ func DoCounterUpdates(session *gocql.Session, threadResult *results.TestThreadRe }) } -func DoReads(session *gocql.Session, threadResult *results.TestThreadResult, workload WorkloadGenerator, rateLimiter RateLimiter) { +func DoReads(session *gocql.Session, threadResult *worker.Worker, workload WorkloadGenerator, rateLimiter rate_limiter.RateLimiter) { DoReadsFromTable(tableName, session, threadResult, workload, rateLimiter) } -func DoCounterReads(session *gocql.Session, threadResult *results.TestThreadResult, workload WorkloadGenerator, rateLimiter RateLimiter) { +func DoCounterReads(session *gocql.Session, threadResult *worker.Worker, workload WorkloadGenerator, rateLimiter rate_limiter.RateLimiter) { DoReadsFromTable(counterTableName, session, threadResult, workload, rateLimiter) } @@ -415,7 +337,7 @@ func BuildReadQuery(table string, orderBy string, session *gocql.Session) *gocql return session.Query(request) } -func DoReadsFromTable(table string, session *gocql.Session, threadResult *results.TestThreadResult, workload WorkloadGenerator, rateLimiter RateLimiter) { +func DoReadsFromTable(table string, session *gocql.Session, threadResult *worker.Worker, workload WorkloadGenerator, rateLimiter rate_limiter.RateLimiter) { var queries []*gocql.Query queryIdx := 0 queryIdxMax := len(selectOrderByParsed) - 1 @@ -424,7 +346,7 @@ func DoReadsFromTable(table string, session *gocql.Session, threadResult *result queries = append(queries, BuildReadQuery(table, orderBy, session)) } - RunTest(threadResult, workload, rateLimiter, func(rb *results.TestThreadResult) (error, time.Duration) { + RunTest(threadResult, workload, rateLimiter, func(rb *worker.Worker) (error, time.Duration) { pk := workload.NextPartitionKey() query := queries[queryIdx] @@ -507,11 +429,11 @@ func DoReadsFromTable(table string, session *gocql.Session, threadResult *result }) } -func DoScanTable(session *gocql.Session, threadResult *results.TestThreadResult, workload WorkloadGenerator, rateLimiter RateLimiter) { +func DoScanTable(session *gocql.Session, threadResult *worker.Worker, workload WorkloadGenerator, rateLimiter rate_limiter.RateLimiter) { request := fmt.Sprintf("SELECT * FROM %s.%s WHERE token(pk) >= ? AND token(pk) <= ?", keyspaceName, tableName) query := session.Query(request) - RunTest(threadResult, workload, rateLimiter, func(rb *results.TestThreadResult) (error, time.Duration) { + RunTest(threadResult, workload, rateLimiter, func(rb *worker.Worker) (error, time.Duration) { requestStart := time.Now() currentRange := workload.NextTokenRange() bound := query.Bind(currentRange.Start, currentRange.End) diff --git a/pkg/config/config.go b/pkg/config/config.go new file mode 100644 index 0000000..c5d1cf2 --- /dev/null +++ b/pkg/config/config.go @@ -0,0 +1,136 @@ +package config + +import ( + "errors" + "fmt" + "time" +) + +type LatencyType int + +const ( + LatencyTypeCoordinatedOmissionFixed LatencyType = iota + LatencyTypeRaw LatencyType = iota +) + +var LatencyTypes = map[string]LatencyType{ + "raw": LatencyTypeRaw, + "fixed-coordinated-omission": LatencyTypeCoordinatedOmissionFixed, +} + +type HistogramConfiguration struct { + MinValue int64 + MaxValue int64 + SigFig int +} + +type Configuration struct { + concurrency int + measureLatency bool + hdrLatencyFile string + hdrLatencyScale int64 + latencyTypeToPrint LatencyType + latencyHistogramConfiguration HistogramConfiguration + reportingCycle time.Duration +} + + +var Config Configuration + + +func SetGlobalHistogramConfiguration(minValue int64, maxValue int64, sigFig int) { + Config.latencyHistogramConfiguration.MinValue = minValue / Config.hdrLatencyScale + Config.latencyHistogramConfiguration.MaxValue = maxValue / Config.hdrLatencyScale + Config.latencyHistogramConfiguration.SigFig = sigFig +} + +func GetGlobalHistogramConfiguration() *HistogramConfiguration { + return &Config.latencyHistogramConfiguration +} + +func SetGlobalLatencyType(latencyType LatencyType) { + Config.latencyTypeToPrint = latencyType +} + +func GetGlobalLatencyType() LatencyType { + return Config.latencyTypeToPrint +} + +func SetGlobalLatencyTypeFromString(latencyType string) { + SetGlobalLatencyType(LatencyTypes[latencyType]) +} + +func ValidateGlobalLatencyType(latencyType string) error { + _, ok := LatencyTypes[latencyType] + if !ok { + return errors.New(fmt.Sprintf("unkown value %s, supported values are: raw, fixed-coordinated-omission", latencyType)) + } + return nil +} + +func SetGlobalMeasureLatency(value bool) { + Config.measureLatency = value +} + +func GetGlobalMeasureLatency() bool { + return Config.measureLatency +} + +func SetGlobalHdrLatencyFile(value string) { + Config.hdrLatencyFile = value +} + +func GetGlobalHdrLatencyFile() string { + return Config.hdrLatencyFile +} + +func SetGlobalHdrLatencyUnits(value string) { + switch value { + case "ns": + Config.hdrLatencyScale = 1 + break + case "us": + Config.hdrLatencyScale = 1000 + break + case "ms": + Config.hdrLatencyScale = 1000000 + break + default: + panic("Wrong value for hdr-latency-scale, only supported values are: ns, us and ms") + } +} + +func GetGlobalHdrLatencyScale() int64 { + return Config.hdrLatencyScale +} + +func SetGlobalConcurrency(value int) { + Config.concurrency = value +} + +func GetGlobalConcurrency() int { + return Config.concurrency +} + +func NumberOfLatencyResultsInPartialReportCycle() int64 { + // Minimum delay is considered to be 0.5ms + result := int64(Config.concurrency) * int64(Config.reportingCycle) / int64(time.Millisecond/2) * 2 + if result >= 10000000 { + return 10000000 + } + return result +} + + +func init() { + Config = Configuration{ + measureLatency: false, + latencyHistogramConfiguration: HistogramConfiguration{ + MinValue: 0, + MaxValue: 2 ^ 63 - 1, + SigFig: 3, + }, + latencyTypeToPrint: LatencyTypeRaw, + reportingCycle: time.Second, + } +} diff --git a/pkg/rate_limiter/rate_limiter.go b/pkg/rate_limiter/rate_limiter.go new file mode 100644 index 0000000..8a1d860 --- /dev/null +++ b/pkg/rate_limiter/rate_limiter.go @@ -0,0 +1,43 @@ +package rate_limiter + +import "time" + +type RateLimiter interface { + Wait() + Expected() time.Time +} + +type UnlimitedRateLimiter struct{} + +func (*UnlimitedRateLimiter) Wait() {} + +func (*UnlimitedRateLimiter) Expected() time.Time { + return time.Time{} +} + +type MaximumRateLimiter struct { + Period time.Duration + StartTime time.Time + CompletedOperations int64 +} + +func (mxrl *MaximumRateLimiter) Wait() { + mxrl.CompletedOperations++ + nextRequest := mxrl.StartTime.Add(mxrl.Period * time.Duration(mxrl.CompletedOperations)) + now := time.Now() + if now.Before(nextRequest) { + time.Sleep(nextRequest.Sub(now)) + } +} + +func (mxrl *MaximumRateLimiter) Expected() time.Time { + return mxrl.StartTime.Add(mxrl.Period * time.Duration(mxrl.CompletedOperations)) +} + +func NewRateLimiter(maximumRate int, timeOffset time.Duration) RateLimiter { + if maximumRate == 0 { + return &UnlimitedRateLimiter{} + } + period := time.Duration(int64(time.Second) / int64(maximumRate)) + return &MaximumRateLimiter{period, time.Now(), 0} +} diff --git a/pkg/results/merged_result.go b/pkg/results/merged_result.go deleted file mode 100644 index b399574..0000000 --- a/pkg/results/merged_result.go +++ /dev/null @@ -1,148 +0,0 @@ -package results - -import ( - "fmt" - "log" - "os" - "path/filepath" - "time" - - "github.com/HdrHistogram/hdrhistogram-go" -) - -type MergedResult struct { - Time time.Duration - Operations int - ClusteringRows int - OperationsPerSecond float64 - ClusteringRowsPerSecond float64 - Errors int - CriticalErrors []error - HistogramStartTime int64 - RawLatency *hdrhistogram.Histogram - CoFixedLatency *hdrhistogram.Histogram -} - -func NewMergedResult() *MergedResult { - result := &MergedResult{} - if globalResultConfiguration.measureLatency { - result.HistogramStartTime = time.Now().UnixNano() - result.RawLatency = NewHistogram(&globalResultConfiguration.latencyHistogramConfiguration, "raw") - result.CoFixedLatency = NewHistogram(&globalResultConfiguration.latencyHistogramConfiguration, "co-fixed") - } - return result -} - -func (mr *MergedResult) AddResult(result Result) { - mr.Time += result.ElapsedTime - mr.Operations += result.Operations - mr.ClusteringRows += result.ClusteringRows - mr.OperationsPerSecond += float64(result.Operations) / result.ElapsedTime.Seconds() - mr.ClusteringRowsPerSecond += float64(result.ClusteringRows) / result.ElapsedTime.Seconds() - mr.Errors += result.Errors - if result.CriticalErrors != nil { - if mr.CriticalErrors == nil { - mr.CriticalErrors = result.CriticalErrors - } else { - for _, err := range result.CriticalErrors { - mr.CriticalErrors = append(mr.CriticalErrors, err) - } - } - } - if globalResultConfiguration.measureLatency { - dropped := mr.RawLatency.Merge(result.RawLatency) - if dropped > 0 { - log.Print("dropped: ", dropped) - } - dropped = mr.CoFixedLatency.Merge(result.CoFixedLatency) - if dropped > 0 { - log.Print("dropped: ", dropped) - } - } -} - -func InitHdrLogWriter(fileName string, baseTime int64) *hdrhistogram.HistogramLogWriter { - fileNameAbs, err := filepath.Abs(fileName) - if err != nil { - panic(err) - } - dirName := filepath.Dir(fileNameAbs) - err = os.MkdirAll(dirName, os.ModePerm) - if err != nil { - if ! os.IsExist(err) { - panic(err) - } - } - - file, err := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE, os.ModePerm) - if err != nil { - panic(err) - } - - writer := hdrhistogram.NewHistogramLogWriter(file) - if err = writer.OutputLogFormatVersion(); err != nil { - panic(err) - } - - if err = writer.OutputComment("Logging op latencies for Cassandra Stress"); err != nil { - panic(err) - } - baseTimeMsec := baseTime / 1000000 - writer.SetBaseTime(baseTimeMsec) - if err = writer.OutputBaseTime(baseTimeMsec); err != nil { - panic(err) - } - if err = writer.OutputStartTime(baseTimeMsec); err != nil { - panic(err) - } - if err = writer.OutputLegend(); err != nil { - panic(err) - } - return writer -} - -func (mr *MergedResult) getLatencyHistogram() *hdrhistogram.Histogram { - if globalResultConfiguration.latencyTypeToPrint == LatencyTypeCoordinatedOmissionFixed { - return mr.CoFixedLatency - } - return mr.RawLatency -} - -func (mr *MergedResult) SaveLatenciesToHdrHistogram(hdrLogWriter *hdrhistogram.HistogramLogWriter) { - startTimeMs := mr.HistogramStartTime / 1000000000 - endTimeMs := time.Now().UnixNano() / 1000000000 - mr.CoFixedLatency.SetStartTimeMs(startTimeMs) - mr.CoFixedLatency.SetEndTimeMs(endTimeMs) - if err := hdrLogWriter.OutputIntervalHistogram(mr.CoFixedLatency); err != nil { - fmt.Printf("Failed to write co-fixed hdr histogram: %s\n", err.Error()) - } - mr.RawLatency.SetStartTimeMs(startTimeMs) - mr.RawLatency.SetEndTimeMs(endTimeMs) - if err := hdrLogWriter.OutputIntervalHistogram(mr.RawLatency); err != nil { - fmt.Printf("Failed to write raw hdr histogram: %s\n", err.Error()) - } -} - -func (mr *MergedResult) PrintPartialResult() { - latencyError := "" - if globalResultConfiguration.measureLatency { - scale := globalResultConfiguration.hdrLatencyScale - var latencyHist = mr.getLatencyHistogram() - fmt.Printf(withLatencyLineFmt, Round(mr.Time), mr.Operations, mr.ClusteringRows, mr.Errors, - Round(time.Duration(latencyHist.Max() * scale)), Round(time.Duration(latencyHist.ValueAtQuantile(99.9) * scale)), Round(time.Duration(latencyHist.ValueAtQuantile(99) * scale)), - Round(time.Duration(latencyHist.ValueAtQuantile(95) * scale)), Round(time.Duration(latencyHist.ValueAtQuantile(90) * scale)), - Round(time.Duration(latencyHist.ValueAtQuantile(50) * scale)), Round(time.Duration(latencyHist.Mean() * float64(scale))), - latencyError) - } else { - fmt.Printf(withoutLatencyLineFmt, Round(mr.Time), mr.Operations, mr.ClusteringRows, mr.Errors) - } -} - -func (mr *MergedResult) PrintCriticalErrors() { - if mr.CriticalErrors != nil { - fmt.Printf("\nFollowing critical errors where caught during the run:\n") - for _, err := range mr.CriticalErrors { - fmt.Printf(" %s\n", err.Error()) - } - } -} diff --git a/pkg/results/result.go b/pkg/results/result.go index 2862456..00c2925 100644 --- a/pkg/results/result.go +++ b/pkg/results/result.go @@ -1,138 +1,250 @@ package results import ( - "errors" "fmt" - "time" - "github.com/HdrHistogram/hdrhistogram-go" + "github.com/scylladb/scylla-bench/pkg/config" + "github.com/scylladb/scylla-bench/pkg/stack" + "github.com/scylladb/scylla-bench/pkg/tools" + "os" + "path/filepath" + "sync/atomic" + "time" ) -type histogramConfiguration struct { - minValue int64 - maxValue int64 - sigFig int -} - const ( - LatencyTypeCoordinatedOmissionFixed = iota - LatencyTypeRaw = iota + withLatencyLineFmt = "\n%5v %7v %7v %6v %-6v %-6v %-6v %-6v %-6v %-6v %-6v" + withoutLatencyLineFmt = "\n%5v %7v %7v %6v" ) -var LatencyTypes = map[string]int{ - "raw": LatencyTypeRaw, - "fixed-coordinated-omission": LatencyTypeCoordinatedOmissionFixed, +type PartialResult struct { + hdrLogWriter *hdrhistogram.HistogramLogWriter + histogramStartTime int64 + Operations uint64 + ClusteringRows uint64 + Errors uint64 + RawLatencyStack *stack.Stack + CoFixedLatencyStack *stack.Stack + LatencyToPrint *hdrhistogram.Histogram + RawLatency *hdrhistogram.Histogram + CoFixedLatency *hdrhistogram.Histogram +} + +type TotalResult struct { + PartialResult + criticalErrors []error + criticalNum uint32 +} + +func initStack() *stack.Stack { + return stack.New(config.NumberOfLatencyResultsInPartialReportCycle()) +} + +func NewPartialResult() *PartialResult { + var hdrLogWriter *hdrhistogram.HistogramLogWriter + var RawLatencyStack, CoFixedLatencyStack *stack.Stack + var RawLatency, CoFixedLatency, LatencyToPrint *hdrhistogram.Histogram + if config.GetGlobalMeasureLatency() { + RawLatency = NewHistogram(config.GetGlobalHistogramConfiguration(), "raw") + CoFixedLatency = NewHistogram(config.GetGlobalHistogramConfiguration(), "co-fixed") + + // TODO: Link histograms to stacks via callbacks + RawLatencyStack = initStack() + CoFixedLatencyStack = initStack() + if config.GetGlobalLatencyType() == config.LatencyTypeRaw { + LatencyToPrint = RawLatency + } else if config.GetGlobalLatencyType() == config.LatencyTypeCoordinatedOmissionFixed { + LatencyToPrint = CoFixedLatency + } + if config.GetGlobalHdrLatencyFile() != "" { + // We need this rounding since hdr histogram rounding up baseTime dividing it by 1000 + // before reducing it from start time, which is divided by 1000000000 before applied to histogram + // Which gives small chance that rounded baseTime would be greater than histogram start time, which will lead to + // negative time in the histogram log + hdrLogWriter = InitHdrLogWriter( + config.GetGlobalHdrLatencyFile(), + (time.Now().UnixNano() / 1000000000) * 1000000000) + } + } + result := &PartialResult{ + hdrLogWriter: hdrLogWriter, + Operations: 0, + ClusteringRows: 0, + Errors: 0, + RawLatencyStack: RawLatencyStack, + CoFixedLatencyStack: CoFixedLatencyStack, + RawLatency: RawLatency, + CoFixedLatency: CoFixedLatency, + LatencyToPrint: LatencyToPrint, + } + if RawLatencyStack != nil { + RawLatencyStack.SetDataCallBack(func(data *[]int64, dataLength uint64){ + dataResolved := *data + for i := uint64(0); i < dataLength; i ++ { + RawLatency.RecordValue(dataResolved[i]) + } + }) + } + if CoFixedLatencyStack != nil { + result.CoFixedLatencyStack.SetDataCallBack(func(data *[]int64, dataLength uint64) { + dataResolved := *data + for i := uint64(0); i < dataLength; i++ { + CoFixedLatency.RecordValue(dataResolved[i]) + } + }) + } + return result } -type Configuration struct { - concurrency int - measureLatency bool - hdrLatencyFile string - hdrLatencyScale int64 - latencyTypeToPrint int - latencyHistogramConfiguration histogramConfiguration -} +func InitHdrLogWriter(fileName string, baseTime int64) *hdrhistogram.HistogramLogWriter { + fileNameAbs, err := filepath.Abs(fileName) + if err != nil { + panic(err) + } + dirName := filepath.Dir(fileNameAbs) + err = os.MkdirAll(dirName, os.ModePerm) + if err != nil { + if ! os.IsExist(err) { + panic(err) + } + } -type Result struct { - Final bool - ElapsedTime time.Duration - Operations int - ClusteringRows int - Errors int - CriticalErrors []error - RawLatency *hdrhistogram.Histogram - CoFixedLatency *hdrhistogram.Histogram -} + file, err := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE, os.ModePerm) + if err != nil { + panic(err) + } -func SetGlobalHistogramConfiguration(minValue int64, maxValue int64, sigFig int) { - globalResultConfiguration.latencyHistogramConfiguration.minValue = minValue / globalResultConfiguration.hdrLatencyScale - globalResultConfiguration.latencyHistogramConfiguration.maxValue = maxValue / globalResultConfiguration.hdrLatencyScale - globalResultConfiguration.latencyHistogramConfiguration.sigFig = sigFig -} + writer := hdrhistogram.NewHistogramLogWriter(file) + if err = writer.OutputLogFormatVersion(); err != nil { + panic(err) + } -func GetGlobalHistogramConfiguration() (int64, int64, int) { - return globalResultConfiguration.latencyHistogramConfiguration.minValue, - globalResultConfiguration.latencyHistogramConfiguration.maxValue, - globalResultConfiguration.latencyHistogramConfiguration.sigFig + if err = writer.OutputComment("Logging op latencies for Cassandra Stress"); err != nil { + panic(err) + } + baseTimeMsec := baseTime / 1000000 + writer.SetBaseTime(baseTimeMsec) + if err = writer.OutputBaseTime(baseTimeMsec); err != nil { + panic(err) + } + if err = writer.OutputStartTime(baseTimeMsec); err != nil { + panic(err) + } + if err = writer.OutputLegend(); err != nil { + panic(err) + } + return writer } -func SetGlobalLatencyType(latencyType int) { - globalResultConfiguration.latencyTypeToPrint = latencyType -} -func GetGlobalLatencyType(latencyType int) { - globalResultConfiguration.latencyTypeToPrint = latencyType +func NewTotalResult(concurrency int) *TotalResult { + return &TotalResult{ + *NewPartialResult(), + make([]error, concurrency * 100), + 0, + } } -func SetGlobalLatencyTypeFromString(latencyType string) { - SetGlobalLatencyType(LatencyTypes[latencyType]) +func (tr *PartialResult) Reset() { + tr.histogramStartTime = time.Now().UnixNano() + tr.Operations = 0 + tr.ClusteringRows = 0 + tr.CoFixedLatency.Reset() + tr.RawLatency.Reset() } -func ValidateGlobalLatencyType(latencyType string) error { - _, ok := LatencyTypes[latencyType] - if !ok { - return errors.New(fmt.Sprintf("unkown value %s, supported values are: raw, fixed-coordinated-omission", latencyType)) - } - return nil +func (tr *PartialResult) FlushDataToHistogram() bool { + isThereNewData := tr.RawLatencyStack.Swap(false) + isThereNewCoFixedData := tr.CoFixedLatencyStack.Swap(false) + return isThereNewData || isThereNewCoFixedData } -func SetGlobalMeasureLatency(value bool) { - globalResultConfiguration.measureLatency = value +func (tr *PartialResult) SaveLatenciesToHdrHistogram() { + if tr.hdrLogWriter == nil { + return + } + startTimeMs := tr.histogramStartTime / 1000000000 + endTimeMs := time.Now().UnixNano() / 1000000000 + tr.CoFixedLatency.SetStartTimeMs(startTimeMs) + tr.CoFixedLatency.SetEndTimeMs(endTimeMs) + if err := tr.hdrLogWriter.OutputIntervalHistogram(tr.CoFixedLatency); err != nil { + fmt.Printf("Failed to write co-fixed hdr histogram: %s\n", err.Error()) + } + tr.RawLatency.SetStartTimeMs(startTimeMs) + tr.RawLatency.SetEndTimeMs(endTimeMs) + if err := tr.hdrLogWriter.OutputIntervalHistogram(tr.RawLatency); err != nil { + fmt.Printf("Failed to write raw hdr histogram: %s\n", err.Error()) + } } -func GetGlobalMeasureLatency() bool { - return globalResultConfiguration.measureLatency +func (tr *PartialResult) PrintPartialResult(printTime time.Duration) { + if tr.LatencyToPrint == nil { + fmt.Printf(withoutLatencyLineFmt, tools.Round(printTime), tr.Operations, tr.ClusteringRows, tr.Errors) + return + } + scale := config.GetGlobalHdrLatencyScale() + var latencyHist = tr.LatencyToPrint + fmt.Printf( + withLatencyLineFmt, + tools.Round(printTime), tr.Operations, tr.ClusteringRows, tr.Errors, + tools.Round(time.Duration(latencyHist.Max() * scale)), + tools.Round(time.Duration(latencyHist.ValueAtQuantile(99.9) * scale)), + tools.Round(time.Duration(latencyHist.ValueAtQuantile(99) * scale)), + tools.Round(time.Duration(latencyHist.ValueAtQuantile(95) * scale)), + tools.Round(time.Duration(latencyHist.ValueAtQuantile(90) * scale)), + tools.Round(time.Duration(latencyHist.ValueAtQuantile(50) * scale)), + tools.Round(time.Duration(latencyHist.Mean() * float64(scale))), + ) +} + +func (tr *PartialResult) PrintPartialResultHeader() { + if tr.LatencyToPrint == nil { + fmt.Printf(withoutLatencyLineFmt, "time", "ops/s", "rows/s", "errors") + } else { + fmt.Printf(withLatencyLineFmt, "time", "ops/s", "rows/s", "errors", "max", "99.9th", "99th", "95th", "90th", "median", "mean") + } } -func SetGlobalHdrLatencyFile(value string) { - globalResultConfiguration.hdrLatencyFile = value +func (tr *TotalResult) SubmitCriticalError(err *error) { + idx := atomic.AddUint32(&tr.criticalNum, 1) + tr.criticalErrors[idx] = *err } -func SetGlobalHdrLatencyUnits(value string) { - switch value { - case "ns": - globalResultConfiguration.hdrLatencyScale = 1 - break - case "us": - globalResultConfiguration.hdrLatencyScale = 1000 - break - case "ms": - globalResultConfiguration.hdrLatencyScale = 1000000 - break - default: - panic("Wrong value for hdr-latency-scale, only supported values are: ns, us and ms") +func (tr *TotalResult) PrintCriticalErrors() { + if ! tr.IsCriticalErrorsFound() { + return + } + fmt.Printf("\nFollowing critical errors where caught during the run:\n") + for _, err := range tr.criticalErrors { + if err != nil { + fmt.Printf(" %s\n", err.Error()) + } } } -func SetGlobalConcurrency(value int) { - globalResultConfiguration.concurrency = value -} - -func GetGlobalConcurrency() int { - return globalResultConfiguration.concurrency +func (tr *TotalResult) IsCriticalErrorsFound() bool { + if tr.criticalErrors == nil || len(tr.criticalErrors) == 0 { + return false + } + for _, err := range tr.criticalErrors { + if err != nil { + return true + } + } + return false } -func NewHistogram(config *histogramConfiguration, name string) *hdrhistogram.Histogram { - histogram := hdrhistogram.New(config.minValue, config.maxValue, config.sigFig) +func NewHistogram(config *config.HistogramConfiguration, name string) *hdrhistogram.Histogram { + histogram := hdrhistogram.New(config.MinValue, config.MaxValue, config.SigFig) histogram.SetTag(name) return histogram } -var globalResultConfiguration Configuration - -func GetHdrMemoryConsumption(concurrency int) int { - hdr := NewHistogram(&globalResultConfiguration.latencyHistogramConfiguration, "example_hdr") - return hdr.ByteSize() * concurrency * 4 -} -func init() { - globalResultConfiguration = Configuration{ - measureLatency: false, - latencyHistogramConfiguration: histogramConfiguration{ - minValue: 0, - maxValue: 2 ^ 63 - 1, - sigFig: 3, - }, - latencyTypeToPrint: LatencyTypeRaw, - } +func GetHdrMemoryConsumption() int { + // Size of four HDR Histograms + hdrSize := NewHistogram(config.GetGlobalHistogramConfiguration(), "example_hdr").ByteSize() * 4 + // Size of four Stacks + stackSize := int(initStack().GetMemoryConsumption()) * 4 + return hdrSize + stackSize } diff --git a/pkg/results/thread_result.go b/pkg/results/thread_result.go deleted file mode 100644 index 68c89f9..0000000 --- a/pkg/results/thread_result.go +++ /dev/null @@ -1,106 +0,0 @@ -package results - -import ( - "time" -) - -type TestThreadResult struct { - FullResult *Result - PartialResult *Result - ResultChannel chan Result - partialStart time.Time -} - -var GlobalErrorFlag = false - -func NewTestThreadResult() *TestThreadResult { - r := &TestThreadResult{} - r.FullResult = &Result{} - r.PartialResult = &Result{} - r.FullResult.Final = true - if globalResultConfiguration.measureLatency { - r.FullResult.RawLatency = NewHistogram(&globalResultConfiguration.latencyHistogramConfiguration, "raw-latency") - r.FullResult.CoFixedLatency = NewHistogram(&globalResultConfiguration.latencyHistogramConfiguration, "co-fixed-lantecy") - r.PartialResult.RawLatency = NewHistogram(&globalResultConfiguration.latencyHistogramConfiguration, "raw-latency") - r.PartialResult.CoFixedLatency = NewHistogram(&globalResultConfiguration.latencyHistogramConfiguration, "co-fixed-lantecy") - } - r.ResultChannel = make(chan Result, 10000) - return r -} - -func (r *TestThreadResult) IncOps() { - r.FullResult.Operations++ - r.PartialResult.Operations++ -} - -func (r *TestThreadResult) IncRows() { - r.FullResult.ClusteringRows++ - r.PartialResult.ClusteringRows++ -} - -func (r *TestThreadResult) AddRows(n int) { - r.FullResult.ClusteringRows += n - r.PartialResult.ClusteringRows += n -} - -func (r *TestThreadResult) IncErrors() { - r.FullResult.Errors++ - r.PartialResult.Errors++ -} - -func (r *TestThreadResult) SubmitCriticalError(err error) { - if r.FullResult.CriticalErrors == nil { - r.FullResult.CriticalErrors = []error{err} - } else { - r.FullResult.CriticalErrors = append(r.FullResult.CriticalErrors, err) - } - GlobalErrorFlag = true -} - -func (r *TestThreadResult) ResetPartialResult() { - r.PartialResult = &Result{} - if globalResultConfiguration.measureLatency { - r.PartialResult.RawLatency = NewHistogram(&globalResultConfiguration.latencyHistogramConfiguration, "raw-latency") - r.PartialResult.CoFixedLatency = NewHistogram(&globalResultConfiguration.latencyHistogramConfiguration, "co-fixed-lantecy") - } -} - -func (r *TestThreadResult) RecordRawLatency(latency time.Duration) { - if !globalResultConfiguration.measureLatency { - return - } - lv := latency.Nanoseconds() / globalResultConfiguration.hdrLatencyScale - - if lv >= globalResultConfiguration.latencyHistogramConfiguration.maxValue { - lv = globalResultConfiguration.latencyHistogramConfiguration.maxValue - } - _ = r.FullResult.RawLatency.RecordValue(lv) - _ = r.PartialResult.RawLatency.RecordValue(lv) -} - -func (r *TestThreadResult) RecordCoFixedLatency(latency time.Duration) { - if !globalResultConfiguration.measureLatency { - return - } - lv := latency.Nanoseconds() / globalResultConfiguration.hdrLatencyScale - - if lv >= globalResultConfiguration.latencyHistogramConfiguration.maxValue { - lv = globalResultConfiguration.latencyHistogramConfiguration.maxValue - } - _ = r.FullResult.CoFixedLatency.RecordValue(lv) - _ = r.PartialResult.CoFixedLatency.RecordValue(lv) -} - -func (r *TestThreadResult) SubmitResult() { - now := time.Now() - if now.Sub(r.partialStart) > time.Second { - r.ResultChannel <- *r.PartialResult - r.ResetPartialResult() - r.partialStart = now - } - -} - -func (r *TestThreadResult) StopReporting() { - close(r.ResultChannel) -} diff --git a/pkg/results/thread_results.go b/pkg/results/thread_results.go deleted file mode 100644 index e322a17..0000000 --- a/pkg/results/thread_results.go +++ /dev/null @@ -1,136 +0,0 @@ -package results - -import ( - "fmt" - "github.com/HdrHistogram/hdrhistogram-go" - "time" -) - -const ( - withLatencyLineFmt = "\n%5v %7v %7v %6v %-6v %-6v %-6v %-6v %-6v %-6v %-6v %v" - withoutLatencyLineFmt = "\n%5v %7v %7v %6v" -) - -type TestResults struct { - threadResults []*TestThreadResult - numberOfThreads int - startTime time.Time - totalResult *MergedResult -} - -func (tr *TestResults) Init(concurrency int) { - tr.threadResults = make([]*TestThreadResult, concurrency) - tr.numberOfThreads = concurrency - for i := range tr.threadResults { - tr.threadResults[i] = NewTestThreadResult() - } -} - -func (tr *TestResults) SetStartTime() { - tr.startTime = time.Now() -} - -func (tr *TestResults) GetTestResult(idx int) *TestThreadResult { - return tr.threadResults[idx] -} - -func (tr *TestResults) GetTestResults() []*TestThreadResult { - return tr.threadResults -} - -func (tr *TestResults) GetResultsFromThreadsAndMerge() (bool, *MergedResult) { - result := NewMergedResult() - final := false - for i, ch := range tr.threadResults { - res := <-ch.ResultChannel - if !final && res.Final { - final = true - for _, ch2 := range tr.threadResults[0:i] { - res = <-ch2.ResultChannel - for !res.Final { - res = <-ch2.ResultChannel - } - result.AddResult(res) - } - } else if final && !res.Final { - for !res.Final { - res = <-ch.ResultChannel - } - } - result.AddResult(res) - } - result.Time /= time.Duration(globalResultConfiguration.concurrency) - return final, result -} - -func (tr *TestResults) GetTotalResults() { - var final bool - var result *MergedResult - - // We need this rounding since hdr histogram round up baseTime dividing by 1000 - // before reducing it from start time, which is divided by 1000000000 before applied to histogram - // giving small chance that rounded baseTime would be greater than histogram start time and negative - // times in the histogram log - baseTime := (time.Now().UnixNano() / 1000000000) * 1000000000 - - var hdrLogWriter *hdrhistogram.HistogramLogWriter - if globalResultConfiguration.hdrLatencyFile != "" { - hdrLogWriter = InitHdrLogWriter(globalResultConfiguration.hdrLatencyFile, baseTime) - } - - for { - final, result = tr.GetResultsFromThreadsAndMerge() - if final { - break - } - result.Time = time.Since(tr.startTime) - result.PrintPartialResult() - if hdrLogWriter != nil { - result.SaveLatenciesToHdrHistogram(hdrLogWriter) - } - } - tr.totalResult = result -} - -func (tr *TestResults) PrintResultsHeader() { - if globalResultConfiguration.measureLatency { - fmt.Printf(withLatencyLineFmt, "time", "ops/s", "rows/s", "errors", "max", "99.9th", "99th", "95th", "90th", "median", "mean", "") - } else { - fmt.Printf(withoutLatencyLineFmt, "time", "ops/s", "rows/s", "errors") - } -} - -func (tr *TestResults) PrintTotalResults() { - fmt.Println("\nResults") - fmt.Println("Time (avg):\t", tr.totalResult.Time) - fmt.Println("Total ops:\t", tr.totalResult.Operations) - fmt.Println("Total rows:\t", tr.totalResult.ClusteringRows) - if tr.totalResult.Errors != 0 { - fmt.Println("Total errors:\t", tr.totalResult.Errors) - } - fmt.Println("Operations/s:\t", tr.totalResult.OperationsPerSecond) - fmt.Println("Rows/s:\t\t", tr.totalResult.ClusteringRowsPerSecond) - if globalResultConfiguration.measureLatency { - printLatencyResults("raw latency", tr.totalResult.RawLatency) - printLatencyResults("c-o fixed latency", tr.totalResult.CoFixedLatency) - } - tr.totalResult.PrintCriticalErrors() -} - -func printLatencyResults(name string, latency *hdrhistogram.Histogram) { - scale := globalResultConfiguration.hdrLatencyScale - fmt.Println(name, ":\n max:\t\t", time.Duration(latency.Max() * scale), - "\n 99.9th:\t", time.Duration(latency.ValueAtQuantile(99.9) * scale), - "\n 99th:\t\t", time.Duration(latency.ValueAtQuantile(99) * scale), - "\n 95th:\t\t", time.Duration(latency.ValueAtQuantile(95) * scale), - "\n 90th:\t\t", time.Duration(latency.ValueAtQuantile(90) * scale), - "\n median:\t", time.Duration(latency.ValueAtQuantile(50) * scale), - "\n mean:\t\t", time.Duration(latency.Mean() * float64(scale))) -} - -func (tr *TestResults) GetFinalStatus() int { - if tr.totalResult.CriticalErrors != nil { - return 1 - } - return 0 -} diff --git a/pkg/stack/stack.go b/pkg/stack/stack.go new file mode 100644 index 0000000..93f4241 --- /dev/null +++ b/pkg/stack/stack.go @@ -0,0 +1,84 @@ +package stack + +import ( + "log" + "sync" + "sync/atomic" +) + +type CbFunction func(*[]int64, uint64) + +// Stack is a stack implementation that is designed to be able to push data into it and process it at the same time +// with no locking time +type Stack struct { + data *[]int64 + swap *[]int64 + current uint64 + upperLimit uint64 + cbInProgress sync.Mutex + swapLock sync.Mutex + swapCb *CbFunction +} + +func New(elementsLimit int64) *Stack { + data := make([]int64, elementsLimit+1) + swap := make([]int64, elementsLimit+1) + return &Stack{ + data: &data, + swap: &swap, + swapLock: sync.Mutex{}, + cbInProgress: sync.Mutex{}, + current: uint64(0), + upperLimit: uint64(elementsLimit), + } +} + +func (s *Stack) SetDataCallBack(cb CbFunction) { + s.swapCb = &cb +} + +func (s *Stack) Swap(runCbInParallel bool) bool { + s.swapLock.Lock() + isThereNewData := s.swapData(runCbInParallel, s.current) + s.swapLock.Unlock() + return isThereNewData +} + +//go:norace +func (s *Stack) swapData(runCbInParallel bool, current uint64) bool { + s.cbInProgress.Lock() + s.data, s.swap, s.current = s.swap, s.data, 0 + isThereNewData := len(*s.swap) > 0 + if runCbInParallel { + go s.runCb(s.swap, current) + } else { + s.runCb(s.swap, current) + } + return isThereNewData +} + +func (s *Stack) runCb(values *[]int64, itemCount uint64) { + defer s.cbInProgress.Unlock() + if s.swapCb != nil { + (*s.swapCb)(values, itemCount) + } +} + +//go:norace +func (s *Stack) Push(value int64) { + idx := atomic.AddUint64(&s.current, 1) + if idx >= s.upperLimit { + s.swapLock.Lock() + if s.current >= s.upperLimit { + log.Println("Stack is full, flush it") + s.swapData(true, idx-1) + } + idx = atomic.AddUint64(&s.current, 1) + s.swapLock.Unlock() + } + (*s.data)[idx-1] = value +} + +func (s *Stack) GetMemoryConsumption() uint64 { + return s.upperLimit * 8 * 2 +} diff --git a/pkg/test_run/test_run.go b/pkg/test_run/test_run.go new file mode 100644 index 0000000..2c17a09 --- /dev/null +++ b/pkg/test_run/test_run.go @@ -0,0 +1,157 @@ +package test_run + +import ( + "fmt" + "github.com/HdrHistogram/hdrhistogram-go" + "github.com/scylladb/scylla-bench/pkg/config" + "github.com/scylladb/scylla-bench/pkg/rate_limiter" + "github.com/scylladb/scylla-bench/pkg/results" + "github.com/scylladb/scylla-bench/pkg/tools" + "github.com/scylladb/scylla-bench/pkg/worker" + "github.com/scylladb/scylla-bench/pkg/workloads" + "sync" + "time" +) + + +type TestRun struct { + workers []*worker.Worker + numberOfThreads int + startTime time.Time + stopTime *time.Time + partialResult results.PartialResult + totalResult results.TotalResult + waitGroup sync.WaitGroup + measureLatency bool + hdrLatencyScale int64 + hdrLatencyMaxValue int64 + timeOffsetUnit int64 + maximumRate int +} + +func NewTestRun(concurrency int, maximumRate int) *TestRun { + var timeOffsetUnit int64 + if maximumRate != 0 { + timeOffsetUnit = int64(time.Second) / int64(maximumRate) + maximumRate /= concurrency + } else { + timeOffsetUnit = 0 + } + tr := &TestRun{ + timeOffsetUnit: timeOffsetUnit, + maximumRate: maximumRate, + workers: make([]*worker.Worker, concurrency), + numberOfThreads: concurrency, + partialResult: *results.NewPartialResult(), + totalResult: *results.NewTotalResult(concurrency), + measureLatency: config.GetGlobalMeasureLatency(), + hdrLatencyScale: config.GetGlobalHdrLatencyScale(), + hdrLatencyMaxValue: config.GetGlobalHistogramConfiguration().MaxValue, + } + tr.waitGroup.Add(concurrency) + for i := 0; i < concurrency; i++ { + tr.workers[i] = worker.NewWorker( + &tr.partialResult, + &tr.totalResult, + &tr.waitGroup, + tr.measureLatency, + tr.hdrLatencyScale, + tr.hdrLatencyMaxValue, + ) + } + return tr +} + +func (tr *TestRun) NumberOfDataPointsInPartialReportCycle() int64 { + nsInReportCycleTime := int64(time.Second) + nsPerDataPoint := int64(1000) + return nsInReportCycleTime / nsPerDataPoint +} + +func (tr *TestRun) SetStartTime() { + tr.startTime = time.Now() +} + +func (tr *TestRun) GetTestResult(idx int) *worker.Worker { + return tr.workers[idx] +} + +func (tr *TestRun) GetElapsedTime() time.Duration { + return tr.stopTime.Sub(tr.startTime) +} + +func (tr *TestRun) GetTestResults() []*worker.Worker { + return tr.workers +} + +func (tr *TestRun) GetTotalResults() { + tr.waitGroup.Wait() + timeNow := time.Now() + tr.stopTime = &timeNow + tr.partialResult.FlushDataToHistogram() + tr.totalResult.FlushDataToHistogram() +} + +func (tr *TestRun) StartPrintingPartialResult() { + go func() { + for range time.Tick(time.Second) { + tr.partialResult.FlushDataToHistogram() + tr.partialResult.PrintPartialResult(time.Now().Sub(tr.startTime)) + tr.partialResult.Reset() + if tr.stopTime != nil { + if tr.partialResult.FlushDataToHistogram() { + tr.partialResult.PrintPartialResult(time.Now().Sub(tr.startTime)) + } + break + } + } + }() +} + +func (tr *TestRun) PrintResultsHeader() { + tr.partialResult.PrintPartialResultHeader() +} + +func (tr *TestRun) RunTest(workload workloads.WorkloadFunction) { + for i := 0; i < tr.numberOfThreads; i++ { + testResult := tr.GetTestResult(i) + go func(i int) { + timeOffset := time.Duration(tr.timeOffsetUnit * int64(i)) + workload(i, testResult, rate_limiter.NewRateLimiter(tr.maximumRate, timeOffset)) + }(i) + } +} + +func (tr *TestRun) PrintTotalResults() { + fmt.Println("\nResults") + fmt.Println("Time (avg):\t", tools.Round(tr.GetElapsedTime())) + fmt.Println("Total ops:\t", tr.totalResult.Operations) + fmt.Println("Total rows:\t", tr.totalResult.ClusteringRows) + if tr.totalResult.Errors != 0 { + fmt.Println("Total errors:\t", tr.totalResult.Errors) + } + fmt.Println("Operations/s:\t", tr.totalResult.Operations / uint64(tr.GetElapsedTime().Seconds())) + fmt.Println("Rows/s:\t\t", tr.totalResult.ClusteringRows / uint64(tr.GetElapsedTime().Seconds())) + if tr.measureLatency { + printLatencyResults("raw latency", tr.totalResult.RawLatency, tr.hdrLatencyScale) + printLatencyResults("c-o fixed latency", tr.totalResult.CoFixedLatency, tr.hdrLatencyScale) + } + tr.totalResult.PrintCriticalErrors() +} + +func printLatencyResults(name string, latency *hdrhistogram.Histogram, scale int64) { + fmt.Println(name, ":\n max:\t\t", time.Duration(latency.Max() * scale), + "\n 99.9th:\t", time.Duration(latency.ValueAtQuantile(99.9) * scale), + "\n 99th:\t\t", time.Duration(latency.ValueAtQuantile(99) * scale), + "\n 95th:\t\t", time.Duration(latency.ValueAtQuantile(95) * scale), + "\n 90th:\t\t", time.Duration(latency.ValueAtQuantile(90) * scale), + "\n median:\t", time.Duration(latency.ValueAtQuantile(50) * scale), + "\n mean:\t\t", time.Duration(latency.Mean() * float64(scale))) +} + +func (tr *TestRun) GetFinalStatus() int { + if tr.totalResult.IsCriticalErrorsFound() { + return 1 + } + return 0 +} diff --git a/pkg/results/auxiliary.go b/pkg/tools/tools.go similarity index 98% rename from pkg/results/auxiliary.go rename to pkg/tools/tools.go index 530abd1..32d9332 100644 --- a/pkg/results/auxiliary.go +++ b/pkg/tools/tools.go @@ -1,4 +1,4 @@ -package results +package tools import "time" diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go new file mode 100644 index 0000000..61a231c --- /dev/null +++ b/pkg/worker/worker.go @@ -0,0 +1,100 @@ +package worker + +import ( + "github.com/scylladb/scylla-bench/pkg/results" + "sync" + "sync/atomic" + "time" +) + +type Worker struct { + partialResult *results.PartialResult + totalResult *results.TotalResult + waitGroup *sync.WaitGroup + measureLatency bool + hdrLatencyScale int64 + hdrLatencyMaxValue int64 +} + +var GlobalErrorFlag = false + +func NewWorker( + partialResult *results.PartialResult, + totalResult *results.TotalResult, + waitGroup *sync.WaitGroup, + measureLatency bool, + hdrLatencyScale int64, + hdrLatencyMaxValue int64, + ) *Worker { + return &Worker{ + measureLatency: measureLatency, + partialResult: partialResult, + totalResult: totalResult, + waitGroup: waitGroup, + hdrLatencyScale: hdrLatencyScale, + hdrLatencyMaxValue: hdrLatencyMaxValue, + } +} + +func (r *Worker) IncOps() { + atomic.AddUint64(&r.partialResult.Operations, 1) + atomic.AddUint64(&r.totalResult.Operations, 1) +} + +func (r *Worker) IncRows() { + atomic.AddUint64(&r.partialResult.ClusteringRows, 1) + atomic.AddUint64(&r.totalResult.ClusteringRows, 1) +} + +func (r *Worker) AddRows(n uint64) { + atomic.AddUint64(&r.partialResult.ClusteringRows, n) + atomic.AddUint64(&r.totalResult.ClusteringRows, n) +} + +func (r *Worker) IncErrors() { + atomic.AddUint64(&r.partialResult.Errors, 1) + atomic.AddUint64(&r.totalResult.Errors, 1) +} + +func (r *Worker) SubmitCriticalError(err error) { + r.totalResult.SubmitCriticalError(&err) + GlobalErrorFlag = true +} + +func (r *Worker) RecordRawLatency(latency time.Duration) { + if !r.measureLatency { + return + } + lv := latency.Nanoseconds() / r.hdrLatencyScale + + if lv >= r.hdrLatencyMaxValue { + lv = r.hdrLatencyMaxValue + } + // Instead of submitting value to the Histogram directly we push it to temporary storage, i.e. Stack, + // which occasionally trigger flushing data to Histogram + // It is done to avoid data corruption in Histogram (it is not tread safe), but at the same time to have no locking + // Since locking here will lead to interruption of the load generation, which is not acceptable + r.partialResult.RawLatencyStack.Push(lv) + r.totalResult.RawLatencyStack.Push(lv) +} + +func (r *Worker) RecordCoFixedLatency(latency time.Duration) { + if !r.measureLatency { + return + } + lv := latency.Nanoseconds() / r.hdrLatencyScale + + if lv >= r.hdrLatencyMaxValue { + lv = r.hdrLatencyMaxValue + } + // Instead of submitting value to the Histogram directly we push it to temporary storage, i.e. Stack, + // which occasionally trigger flushing data to Histogram + // It is done to avoid data corruption in Histogram (it is not tread safe), but at the same time to have no locking + // Since locking here will lead to interruption of the load generation, which is not acceptable + r.partialResult.CoFixedLatencyStack.Push(lv) + r.totalResult.CoFixedLatencyStack.Push(lv) +} + +func (r *Worker) StopReporting() { + r.waitGroup.Done() +} diff --git a/pkg/workloads/workloads.go b/pkg/workloads/workloads.go index 6213c34..f3cbdf6 100644 --- a/pkg/workloads/workloads.go +++ b/pkg/workloads/workloads.go @@ -1,6 +1,8 @@ package workloads import ( + "github.com/scylladb/scylla-bench/pkg/rate_limiter" + "github.com/scylladb/scylla-bench/pkg/worker" "log" "math" "math/rand" @@ -20,6 +22,8 @@ const ( maxToken int64 = (1 << 63) - 1 ) +type WorkloadFunction func(i int, testResult *worker.Worker, rateLimiter rate_limiter.RateLimiter) + // Bounds are inclusive type TokenRange struct { Start int64