Skip to content

Commit

Permalink
scylla-bench: non-blocking logging
Browse files Browse the repository at this point in the history
Fix following things:
1. Get rid of 2 histograms per thread to reduce memory usage
2. Make latency printing non-blocking
3. Reorganize code
4. Get rid of histogram merging

fixes scylladb#80, scylladb#49, scylladb#73, scylladb#69
  • Loading branch information
dkropachev committed Feb 28, 2022
1 parent 3925639 commit f025a89
Show file tree
Hide file tree
Showing 13 changed files with 777 additions and 600 deletions.
37 changes: 23 additions & 14 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ import (
"crypto/tls"
"flag"
"fmt"
"github.com/scylladb/scylla-bench/pkg/config"
"github.com/scylladb/scylla-bench/pkg/rate_limiter"
"github.com/scylladb/scylla-bench/pkg/test_run"
"github.com/scylladb/scylla-bench/pkg/worker"
"log"
"os"
"os/signal"
Expand Down Expand Up @@ -164,7 +168,7 @@ func GetWorkload(name string, threadId int, partitionOffset int64, mode string,
panic("unreachable")
}

func GetMode(name string) func(session *gocql.Session, testResult *results.TestThreadResult, workload WorkloadGenerator, rateLimiter RateLimiter) {
func GetMode(name string) func(session *gocql.Session, testResult *worker.Worker, workload WorkloadGenerator, rateLimiter rate_limiter.RateLimiter) {
switch name {
case "write":
if rowsPerRequest == 1 {
Expand Down Expand Up @@ -363,7 +367,7 @@ func main() {
errorToTimeoutCutoffTime = time.Second
}

if err := results.ValidateGlobalLatencyType(latencyType); err != nil {
if err := config.ValidateGlobalLatencyType(latencyType); err != nil {
log.Fatal(errors.Wrap(err, "Bad value for latency-type"))
}

Expand Down Expand Up @@ -521,15 +525,20 @@ func main() {
}
setResultsConfiguration()

fmt.Println("Hdr memory consumption:\t", results.GetHdrMemoryConsumption(concurrency), "bytes")
if config.GetGlobalMeasureLatency() {
fmt.Println("Hdr memory consumption:\t", results.GetHdrMemoryConsumption(), "bytes")
}

testResult := RunConcurrently(maximumRate, func(i int, testResult *results.TestThreadResult, rateLimiter RateLimiter) {
totalResults := test_run.NewTestRun(concurrency, maximumRate)
totalResults.SetStartTime()
totalResults.PrintResultsHeader()
totalResults.RunTest(func(i int, testResult *worker.Worker, rateLimiter rate_limiter.RateLimiter) {
GetMode(mode)(session, testResult, GetWorkload(workload, i, partitionOffset, mode, writeRate, distribution), rateLimiter)
})

testResult.GetTotalResults()
testResult.PrintTotalResults()
os.Exit(testResult.GetFinalStatus())
totalResults.StartPrintingPartialResult()
totalResults.GetTotalResults()
totalResults.PrintTotalResults()
os.Exit(totalResults.GetFinalStatus())
}

func newHostSelectionPolicy(policy string, hosts []string) (gocql.HostSelectionPolicy, error) {
Expand All @@ -546,14 +555,14 @@ func newHostSelectionPolicy(policy string, hosts []string) (gocql.HostSelectionP
}

func setResultsConfiguration() {
results.SetGlobalMeasureLatency(measureLatency)
results.SetGlobalHdrLatencyFile(hdrLatencyFile)
results.SetGlobalHdrLatencyUnits(hdrLatencyUnits)
results.SetGlobalHistogramConfiguration(
config.SetGlobalMeasureLatency(measureLatency)
config.SetGlobalHdrLatencyFile(hdrLatencyFile)
config.SetGlobalHdrLatencyUnits(hdrLatencyUnits)
config.SetGlobalHistogramConfiguration(
time.Microsecond.Nanoseconds()*50,
(timeout * 3).Nanoseconds(),
hdrLatencySigFig,
)
results.SetGlobalConcurrency(concurrency)
results.SetGlobalLatencyTypeFromString(latencyType)
config.SetGlobalConcurrency(concurrency)
config.SetGlobalLatencyTypeFromString(latencyType)
}
118 changes: 20 additions & 98 deletions modes.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crypto/sha256"
"encoding/binary"
"fmt"
"github.com/scylladb/scylla-bench/pkg/rate_limiter"
"log"
"math/rand"
"strings"
Expand All @@ -13,75 +14,10 @@ import (

"github.com/gocql/gocql"
"github.com/pkg/errors"
"github.com/scylladb/scylla-bench/pkg/results"
"github.com/scylladb/scylla-bench/pkg/worker"
. "github.com/scylladb/scylla-bench/pkg/workloads"
)

type RateLimiter interface {
Wait()
Expected() time.Time
}

type UnlimitedRateLimiter struct{}

func (*UnlimitedRateLimiter) Wait() {}

func (*UnlimitedRateLimiter) Expected() time.Time {
return time.Time{}
}

type MaximumRateLimiter struct {
Period time.Duration
StartTime time.Time
CompletedOperations int64
}

func (mxrl *MaximumRateLimiter) Wait() {
mxrl.CompletedOperations++
nextRequest := mxrl.StartTime.Add(mxrl.Period * time.Duration(mxrl.CompletedOperations))
now := time.Now()
if now.Before(nextRequest) {
time.Sleep(nextRequest.Sub(now))
}
}

func (mxrl *MaximumRateLimiter) Expected() time.Time {
return mxrl.StartTime.Add(mxrl.Period * time.Duration(mxrl.CompletedOperations))
}

func NewRateLimiter(maximumRate int, timeOffset time.Duration) RateLimiter {
if maximumRate == 0 {
return &UnlimitedRateLimiter{}
}
period := time.Duration(int64(time.Second) / int64(maximumRate))
return &MaximumRateLimiter{period, time.Now(), 0}
}

func RunConcurrently(maximumRate int, workload func(id int, testResult *results.TestThreadResult, rateLimiter RateLimiter)) *results.TestResults {
var timeOffsetUnit int64
if maximumRate != 0 {
timeOffsetUnit = int64(time.Second) / int64(maximumRate)
maximumRate /= concurrency
} else {
timeOffsetUnit = 0
}

totalResults := results.TestResults{}
totalResults.Init(concurrency)
totalResults.SetStartTime()
totalResults.PrintResultsHeader()

for i := 0; i < concurrency; i++ {
testResult := totalResults.GetTestResult(i)
go func(i int) {
timeOffset := time.Duration(timeOffsetUnit * int64(i))
workload(i, testResult, NewRateLimiter(maximumRate, timeOffset))
}(i)
}

return &totalResults
}

type TestIterator struct {
iteration uint
workload WorkloadGenerator
Expand Down Expand Up @@ -109,10 +45,8 @@ func (ti *TestIterator) IsDone() bool {
}
}

func RunTest(threadResult *results.TestThreadResult, workload WorkloadGenerator, rateLimiter RateLimiter, test func(rb *results.TestThreadResult) (error, time.Duration)) {

start := time.Now()
partialStart := start
func RunTest(threadResult *worker.Worker, workload WorkloadGenerator, rateLimiter rate_limiter.RateLimiter, test func(rb *worker.Worker) (error, time.Duration)) {
defer threadResult.StopReporting()
iter := NewTestIterator(workload)
errorsAtRow := 0
for !iter.IsDone() {
Expand Down Expand Up @@ -140,26 +74,14 @@ func RunTest(threadResult *results.TestThreadResult, workload WorkloadGenerator,
threadResult.RecordCoFixedLatency(endTime.Sub(expectedStartTime))
}

now := time.Now()
if maxErrorsAtRow > 0 && errorsAtRow >= maxErrorsAtRow {
threadResult.SubmitCriticalError(errors.New(fmt.Sprintf("Error limit (maxErrorsAtRow) of %d errors is reached", errorsAtRow)))
}
if results.GlobalErrorFlag {
threadResult.ResultChannel <- *threadResult.PartialResult
threadResult.ResetPartialResult()
break
}
if now.Sub(partialStart) > time.Second {
threadResult.ResultChannel <- *threadResult.PartialResult
threadResult.ResetPartialResult()
partialStart = now
if worker.GlobalErrorFlag {
break
}
}
end := time.Now()

threadResult.FullResult.ElapsedTime = end.Sub(start)
threadResult.ResultChannel <- *threadResult.FullResult
threadResult.StopReporting()
}

const (
Expand Down Expand Up @@ -298,10 +220,10 @@ func ValidateData(pk int64, ck int64, data []byte) error {
return nil
}

func DoWrites(session *gocql.Session, threadResult *results.TestThreadResult, workload WorkloadGenerator, rateLimiter RateLimiter) {
func DoWrites(session *gocql.Session, threadResult *worker.Worker, workload WorkloadGenerator, rateLimiter rate_limiter.RateLimiter) {
query := session.Query("INSERT INTO " + keyspaceName + "." + tableName + " (pk, ck, v) VALUES (?, ?, ?)")

RunTest(threadResult, workload, rateLimiter, func(rb *results.TestThreadResult) (error, time.Duration) {
RunTest(threadResult, workload, rateLimiter, func(rb *worker.Worker) (error, time.Duration) {
pk := workload.NextPartitionKey()
ck := workload.NextClusteringKey()
value := GenerateData(pk, ck, clusteringRowSizeDist.Generate())
Expand All @@ -322,15 +244,15 @@ func DoWrites(session *gocql.Session, threadResult *results.TestThreadResult, wo
})
}

func DoBatchedWrites(session *gocql.Session, threadResult *results.TestThreadResult, workload WorkloadGenerator, rateLimiter RateLimiter) {
func DoBatchedWrites(session *gocql.Session, threadResult *worker.Worker, workload WorkloadGenerator, rateLimiter rate_limiter.RateLimiter) {
request := fmt.Sprintf("INSERT INTO %s.%s (pk, ck, v) VALUES (?, ?, ?)", keyspaceName, tableName)

RunTest(threadResult, workload, rateLimiter, func(rb *results.TestThreadResult) (error, time.Duration) {
RunTest(threadResult, workload, rateLimiter, func(rb *worker.Worker) (error, time.Duration) {
batch := session.NewBatch(gocql.UnloggedBatch)
batchSize := 0
batchSize := uint64(0)

currentPk := workload.NextPartitionKey()
for !workload.IsPartitionDone() && atomic.LoadUint32(&stopAll) == 0 && batchSize < rowsPerRequest {
for !workload.IsPartitionDone() && atomic.LoadUint32(&stopAll) == 0 && batchSize < uint64(rowsPerRequest) {
ck := workload.NextClusteringKey()
batchSize++
value := GenerateData(currentPk, ck, clusteringRowSizeDist.Generate())
Expand All @@ -352,11 +274,11 @@ func DoBatchedWrites(session *gocql.Session, threadResult *results.TestThreadRes
})
}

func DoCounterUpdates(session *gocql.Session, threadResult *results.TestThreadResult, workload WorkloadGenerator, rateLimiter RateLimiter) {
func DoCounterUpdates(session *gocql.Session, threadResult *worker.Worker, workload WorkloadGenerator, rateLimiter rate_limiter.RateLimiter) {
query := session.Query("UPDATE " + keyspaceName + "." + counterTableName +
" SET c1 = c1 + ?, c2 = c2 + ?, c3 = c3 + ?, c4 = c4 + ?, c5 = c5 + ? WHERE pk = ? AND ck = ?")

RunTest(threadResult, workload, rateLimiter, func(rb *results.TestThreadResult) (error, time.Duration) {
RunTest(threadResult, workload, rateLimiter, func(rb *worker.Worker) (error, time.Duration) {
pk := workload.NextPartitionKey()
ck := workload.NextClusteringKey()
bound := query.Bind(ck, ck+1, ck+2, ck+3, ck+4, pk, ck)
Expand All @@ -376,11 +298,11 @@ func DoCounterUpdates(session *gocql.Session, threadResult *results.TestThreadRe
})
}

func DoReads(session *gocql.Session, threadResult *results.TestThreadResult, workload WorkloadGenerator, rateLimiter RateLimiter) {
func DoReads(session *gocql.Session, threadResult *worker.Worker, workload WorkloadGenerator, rateLimiter rate_limiter.RateLimiter) {
DoReadsFromTable(tableName, session, threadResult, workload, rateLimiter)
}

func DoCounterReads(session *gocql.Session, threadResult *results.TestThreadResult, workload WorkloadGenerator, rateLimiter RateLimiter) {
func DoCounterReads(session *gocql.Session, threadResult *worker.Worker, workload WorkloadGenerator, rateLimiter rate_limiter.RateLimiter) {
DoReadsFromTable(counterTableName, session, threadResult, workload, rateLimiter)
}

Expand Down Expand Up @@ -415,7 +337,7 @@ func BuildReadQuery(table string, orderBy string, session *gocql.Session) *gocql
return session.Query(request)
}

func DoReadsFromTable(table string, session *gocql.Session, threadResult *results.TestThreadResult, workload WorkloadGenerator, rateLimiter RateLimiter) {
func DoReadsFromTable(table string, session *gocql.Session, threadResult *worker.Worker, workload WorkloadGenerator, rateLimiter rate_limiter.RateLimiter) {
var queries []*gocql.Query
queryIdx := 0
queryIdxMax := len(selectOrderByParsed) - 1
Expand All @@ -424,7 +346,7 @@ func DoReadsFromTable(table string, session *gocql.Session, threadResult *result
queries = append(queries, BuildReadQuery(table, orderBy, session))
}

RunTest(threadResult, workload, rateLimiter, func(rb *results.TestThreadResult) (error, time.Duration) {
RunTest(threadResult, workload, rateLimiter, func(rb *worker.Worker) (error, time.Duration) {
pk := workload.NextPartitionKey()

query := queries[queryIdx]
Expand Down Expand Up @@ -507,11 +429,11 @@ func DoReadsFromTable(table string, session *gocql.Session, threadResult *result
})
}

func DoScanTable(session *gocql.Session, threadResult *results.TestThreadResult, workload WorkloadGenerator, rateLimiter RateLimiter) {
func DoScanTable(session *gocql.Session, threadResult *worker.Worker, workload WorkloadGenerator, rateLimiter rate_limiter.RateLimiter) {
request := fmt.Sprintf("SELECT * FROM %s.%s WHERE token(pk) >= ? AND token(pk) <= ?", keyspaceName, tableName)
query := session.Query(request)

RunTest(threadResult, workload, rateLimiter, func(rb *results.TestThreadResult) (error, time.Duration) {
RunTest(threadResult, workload, rateLimiter, func(rb *worker.Worker) (error, time.Duration) {
requestStart := time.Now()
currentRange := workload.NextTokenRange()
bound := query.Bind(currentRange.Start, currentRange.End)
Expand Down
Loading

0 comments on commit f025a89

Please sign in to comment.