Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scylla-bench: non-blocking logging #93

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 23 additions & 14 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ import (
"crypto/tls"
"flag"
"fmt"
"github.com/scylladb/scylla-bench/pkg/config"
"github.com/scylladb/scylla-bench/pkg/rate_limiter"
"github.com/scylladb/scylla-bench/pkg/test_run"
"github.com/scylladb/scylla-bench/pkg/worker"
"log"
"os"
"os/signal"
Expand Down Expand Up @@ -164,7 +168,7 @@ func GetWorkload(name string, threadId int, partitionOffset int64, mode string,
panic("unreachable")
}

func GetMode(name string) func(session *gocql.Session, testResult *results.TestThreadResult, workload WorkloadGenerator, rateLimiter RateLimiter) {
func GetMode(name string) func(session *gocql.Session, testResult *worker.Worker, workload WorkloadGenerator, rateLimiter rate_limiter.RateLimiter) {
switch name {
case "write":
if rowsPerRequest == 1 {
Expand Down Expand Up @@ -363,7 +367,7 @@ func main() {
errorToTimeoutCutoffTime = time.Second
}

if err := results.ValidateGlobalLatencyType(latencyType); err != nil {
if err := config.ValidateGlobalLatencyType(latencyType); err != nil {
log.Fatal(errors.Wrap(err, "Bad value for latency-type"))
}

Expand Down Expand Up @@ -521,15 +525,20 @@ func main() {
}
setResultsConfiguration()

fmt.Println("Hdr memory consumption:\t", results.GetHdrMemoryConsumption(concurrency), "bytes")
if config.GetGlobalMeasureLatency() {
fmt.Println("Hdr memory consumption:\t", results.GetHdrMemoryConsumption(), "bytes")
}

testResult := RunConcurrently(maximumRate, func(i int, testResult *results.TestThreadResult, rateLimiter RateLimiter) {
totalResults := test_run.NewTestRun(concurrency, maximumRate)
totalResults.SetStartTime()
totalResults.PrintResultsHeader()
totalResults.RunTest(func(i int, testResult *worker.Worker, rateLimiter rate_limiter.RateLimiter) {
GetMode(mode)(session, testResult, GetWorkload(workload, i, partitionOffset, mode, writeRate, distribution), rateLimiter)
})

testResult.GetTotalResults()
testResult.PrintTotalResults()
os.Exit(testResult.GetFinalStatus())
totalResults.StartPrintingPartialResult()
totalResults.GetTotalResults()
totalResults.PrintTotalResults()
os.Exit(totalResults.GetFinalStatus())
}

func newHostSelectionPolicy(policy string, hosts []string) (gocql.HostSelectionPolicy, error) {
Expand All @@ -546,14 +555,14 @@ func newHostSelectionPolicy(policy string, hosts []string) (gocql.HostSelectionP
}

func setResultsConfiguration() {
results.SetGlobalMeasureLatency(measureLatency)
results.SetGlobalHdrLatencyFile(hdrLatencyFile)
results.SetGlobalHdrLatencyUnits(hdrLatencyUnits)
results.SetGlobalHistogramConfiguration(
config.SetGlobalMeasureLatency(measureLatency)
config.SetGlobalHdrLatencyFile(hdrLatencyFile)
config.SetGlobalHdrLatencyUnits(hdrLatencyUnits)
config.SetGlobalHistogramConfiguration(
time.Microsecond.Nanoseconds()*50,
(timeout * 3).Nanoseconds(),
hdrLatencySigFig,
)
results.SetGlobalConcurrency(concurrency)
results.SetGlobalLatencyTypeFromString(latencyType)
config.SetGlobalConcurrency(concurrency)
config.SetGlobalLatencyTypeFromString(latencyType)
}
118 changes: 20 additions & 98 deletions modes.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crypto/sha256"
"encoding/binary"
"fmt"
"github.com/scylladb/scylla-bench/pkg/rate_limiter"
"log"
"math/rand"
"strings"
Expand All @@ -13,75 +14,10 @@ import (

"github.com/gocql/gocql"
"github.com/pkg/errors"
"github.com/scylladb/scylla-bench/pkg/results"
"github.com/scylladb/scylla-bench/pkg/worker"
. "github.com/scylladb/scylla-bench/pkg/workloads"
)

type RateLimiter interface {
Wait()
Expected() time.Time
}

type UnlimitedRateLimiter struct{}

func (*UnlimitedRateLimiter) Wait() {}

func (*UnlimitedRateLimiter) Expected() time.Time {
return time.Time{}
}

type MaximumRateLimiter struct {
Period time.Duration
StartTime time.Time
CompletedOperations int64
}

func (mxrl *MaximumRateLimiter) Wait() {
mxrl.CompletedOperations++
nextRequest := mxrl.StartTime.Add(mxrl.Period * time.Duration(mxrl.CompletedOperations))
now := time.Now()
if now.Before(nextRequest) {
time.Sleep(nextRequest.Sub(now))
}
}

func (mxrl *MaximumRateLimiter) Expected() time.Time {
return mxrl.StartTime.Add(mxrl.Period * time.Duration(mxrl.CompletedOperations))
}

func NewRateLimiter(maximumRate int, timeOffset time.Duration) RateLimiter {
if maximumRate == 0 {
return &UnlimitedRateLimiter{}
}
period := time.Duration(int64(time.Second) / int64(maximumRate))
return &MaximumRateLimiter{period, time.Now(), 0}
}

func RunConcurrently(maximumRate int, workload func(id int, testResult *results.TestThreadResult, rateLimiter RateLimiter)) *results.TestResults {
var timeOffsetUnit int64
if maximumRate != 0 {
timeOffsetUnit = int64(time.Second) / int64(maximumRate)
maximumRate /= concurrency
} else {
timeOffsetUnit = 0
}

totalResults := results.TestResults{}
totalResults.Init(concurrency)
totalResults.SetStartTime()
totalResults.PrintResultsHeader()

for i := 0; i < concurrency; i++ {
testResult := totalResults.GetTestResult(i)
go func(i int) {
timeOffset := time.Duration(timeOffsetUnit * int64(i))
workload(i, testResult, NewRateLimiter(maximumRate, timeOffset))
}(i)
}

return &totalResults
}

type TestIterator struct {
iteration uint
workload WorkloadGenerator
Expand Down Expand Up @@ -109,10 +45,8 @@ func (ti *TestIterator) IsDone() bool {
}
}

func RunTest(threadResult *results.TestThreadResult, workload WorkloadGenerator, rateLimiter RateLimiter, test func(rb *results.TestThreadResult) (error, time.Duration)) {

start := time.Now()
partialStart := start
func RunTest(threadResult *worker.Worker, workload WorkloadGenerator, rateLimiter rate_limiter.RateLimiter, test func(rb *worker.Worker) (error, time.Duration)) {
defer threadResult.StopReporting()
iter := NewTestIterator(workload)
errorsAtRow := 0
for !iter.IsDone() {
Expand Down Expand Up @@ -140,26 +74,14 @@ func RunTest(threadResult *results.TestThreadResult, workload WorkloadGenerator,
threadResult.RecordCoFixedLatency(endTime.Sub(expectedStartTime))
}

now := time.Now()
if maxErrorsAtRow > 0 && errorsAtRow >= maxErrorsAtRow {
threadResult.SubmitCriticalError(errors.New(fmt.Sprintf("Error limit (maxErrorsAtRow) of %d errors is reached", errorsAtRow)))
}
if results.GlobalErrorFlag {
threadResult.ResultChannel <- *threadResult.PartialResult
threadResult.ResetPartialResult()
break
}
if now.Sub(partialStart) > time.Second {
threadResult.ResultChannel <- *threadResult.PartialResult
threadResult.ResetPartialResult()
partialStart = now
if worker.GlobalErrorFlag {
break
}
}
end := time.Now()

threadResult.FullResult.ElapsedTime = end.Sub(start)
threadResult.ResultChannel <- *threadResult.FullResult
threadResult.StopReporting()
}

const (
Expand Down Expand Up @@ -298,10 +220,10 @@ func ValidateData(pk int64, ck int64, data []byte) error {
return nil
}

func DoWrites(session *gocql.Session, threadResult *results.TestThreadResult, workload WorkloadGenerator, rateLimiter RateLimiter) {
func DoWrites(session *gocql.Session, threadResult *worker.Worker, workload WorkloadGenerator, rateLimiter rate_limiter.RateLimiter) {
query := session.Query("INSERT INTO " + keyspaceName + "." + tableName + " (pk, ck, v) VALUES (?, ?, ?)")

RunTest(threadResult, workload, rateLimiter, func(rb *results.TestThreadResult) (error, time.Duration) {
RunTest(threadResult, workload, rateLimiter, func(rb *worker.Worker) (error, time.Duration) {
pk := workload.NextPartitionKey()
ck := workload.NextClusteringKey()
value := GenerateData(pk, ck, clusteringRowSizeDist.Generate())
Expand All @@ -322,15 +244,15 @@ func DoWrites(session *gocql.Session, threadResult *results.TestThreadResult, wo
})
}

func DoBatchedWrites(session *gocql.Session, threadResult *results.TestThreadResult, workload WorkloadGenerator, rateLimiter RateLimiter) {
func DoBatchedWrites(session *gocql.Session, threadResult *worker.Worker, workload WorkloadGenerator, rateLimiter rate_limiter.RateLimiter) {
request := fmt.Sprintf("INSERT INTO %s.%s (pk, ck, v) VALUES (?, ?, ?)", keyspaceName, tableName)

RunTest(threadResult, workload, rateLimiter, func(rb *results.TestThreadResult) (error, time.Duration) {
RunTest(threadResult, workload, rateLimiter, func(rb *worker.Worker) (error, time.Duration) {
batch := session.NewBatch(gocql.UnloggedBatch)
batchSize := 0
batchSize := uint64(0)

currentPk := workload.NextPartitionKey()
for !workload.IsPartitionDone() && atomic.LoadUint32(&stopAll) == 0 && batchSize < rowsPerRequest {
for !workload.IsPartitionDone() && atomic.LoadUint32(&stopAll) == 0 && batchSize < uint64(rowsPerRequest) {
ck := workload.NextClusteringKey()
batchSize++
value := GenerateData(currentPk, ck, clusteringRowSizeDist.Generate())
Expand All @@ -352,11 +274,11 @@ func DoBatchedWrites(session *gocql.Session, threadResult *results.TestThreadRes
})
}

func DoCounterUpdates(session *gocql.Session, threadResult *results.TestThreadResult, workload WorkloadGenerator, rateLimiter RateLimiter) {
func DoCounterUpdates(session *gocql.Session, threadResult *worker.Worker, workload WorkloadGenerator, rateLimiter rate_limiter.RateLimiter) {
query := session.Query("UPDATE " + keyspaceName + "." + counterTableName +
" SET c1 = c1 + ?, c2 = c2 + ?, c3 = c3 + ?, c4 = c4 + ?, c5 = c5 + ? WHERE pk = ? AND ck = ?")

RunTest(threadResult, workload, rateLimiter, func(rb *results.TestThreadResult) (error, time.Duration) {
RunTest(threadResult, workload, rateLimiter, func(rb *worker.Worker) (error, time.Duration) {
pk := workload.NextPartitionKey()
ck := workload.NextClusteringKey()
bound := query.Bind(ck, ck+1, ck+2, ck+3, ck+4, pk, ck)
Expand All @@ -376,11 +298,11 @@ func DoCounterUpdates(session *gocql.Session, threadResult *results.TestThreadRe
})
}

func DoReads(session *gocql.Session, threadResult *results.TestThreadResult, workload WorkloadGenerator, rateLimiter RateLimiter) {
func DoReads(session *gocql.Session, threadResult *worker.Worker, workload WorkloadGenerator, rateLimiter rate_limiter.RateLimiter) {
DoReadsFromTable(tableName, session, threadResult, workload, rateLimiter)
}

func DoCounterReads(session *gocql.Session, threadResult *results.TestThreadResult, workload WorkloadGenerator, rateLimiter RateLimiter) {
func DoCounterReads(session *gocql.Session, threadResult *worker.Worker, workload WorkloadGenerator, rateLimiter rate_limiter.RateLimiter) {
DoReadsFromTable(counterTableName, session, threadResult, workload, rateLimiter)
}

Expand Down Expand Up @@ -415,7 +337,7 @@ func BuildReadQuery(table string, orderBy string, session *gocql.Session) *gocql
return session.Query(request)
}

func DoReadsFromTable(table string, session *gocql.Session, threadResult *results.TestThreadResult, workload WorkloadGenerator, rateLimiter RateLimiter) {
func DoReadsFromTable(table string, session *gocql.Session, threadResult *worker.Worker, workload WorkloadGenerator, rateLimiter rate_limiter.RateLimiter) {
var queries []*gocql.Query
queryIdx := 0
queryIdxMax := len(selectOrderByParsed) - 1
Expand All @@ -424,7 +346,7 @@ func DoReadsFromTable(table string, session *gocql.Session, threadResult *result
queries = append(queries, BuildReadQuery(table, orderBy, session))
}

RunTest(threadResult, workload, rateLimiter, func(rb *results.TestThreadResult) (error, time.Duration) {
RunTest(threadResult, workload, rateLimiter, func(rb *worker.Worker) (error, time.Duration) {
pk := workload.NextPartitionKey()

query := queries[queryIdx]
Expand Down Expand Up @@ -507,11 +429,11 @@ func DoReadsFromTable(table string, session *gocql.Session, threadResult *result
})
}

func DoScanTable(session *gocql.Session, threadResult *results.TestThreadResult, workload WorkloadGenerator, rateLimiter RateLimiter) {
func DoScanTable(session *gocql.Session, threadResult *worker.Worker, workload WorkloadGenerator, rateLimiter rate_limiter.RateLimiter) {
request := fmt.Sprintf("SELECT * FROM %s.%s WHERE token(pk) >= ? AND token(pk) <= ?", keyspaceName, tableName)
query := session.Query(request)

RunTest(threadResult, workload, rateLimiter, func(rb *results.TestThreadResult) (error, time.Duration) {
RunTest(threadResult, workload, rateLimiter, func(rb *worker.Worker) (error, time.Duration) {
requestStart := time.Now()
currentRange := workload.NextTokenRange()
bound := query.Bind(currentRange.Start, currentRange.End)
Expand Down
Loading