Skip to content

Commit

Permalink
Merge pull request #24 from raj-prince/warp_seq_read_script
Browse files Browse the repository at this point in the history
chore: modifying read script for warp-test
  • Loading branch information
raj-prince authored Nov 10, 2024
2 parents 4658a17 + a66070c commit 3366f03
Show file tree
Hide file tree
Showing 5 changed files with 435 additions and 17 deletions.
69 changes: 52 additions & 17 deletions benchmark-script/read_operation/main.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package main

import (
"bufio"
"flag"
"fmt"
"io"
Expand All @@ -11,27 +10,37 @@ import (
"syscall"
"time"


"golang.org/x/sync/errgroup"
)

var (
fDir = flag.String("dir", "", "Directory file to be opened.")
fNumOfThreads = flag.Int("threads", 1, "Number of threads to read parallel")

fBlockSize = flag.Int("block-size", 256, "Block size in KB")
fBlockSizeKB = flag.Int("block-size-kb", 1024, "Block size in KB")

fFileSizeMB = flag.Int64("file-size-mb", 1024, "File size in MB")

fileHandles []*os.File

eG errgroup.Group

// OneKB means 1024 bytes.
OneKB = 1024

Check failure on line 29 in benchmark-script/read_operation/main.go

View workflow job for this annotation

GitHub Actions / audit

exported var OneKB should have comment or be unexported

fNumberOfRead = flag.Int("read-count", 1, "number of read iteration")

fOutputDir = flag.String("output-dir", "", "Directory to dump the output")
fFilePrefix = flag.String("file-prefix", "", "Prefix file")
)

var gResult *Result
func init() {
gResult = &Result{}
}

func openFile(index int) (err error) {
fileName := path.Join(*fDir, "file_"+strconv.Itoa(index))
fileName := path.Join(*fDir, *fFilePrefix+strconv.Itoa(index))
fileHandle, err := os.OpenFile(fileName, os.O_RDONLY|syscall.O_DIRECT, 0600)
if err != nil {
err = fmt.Errorf("while opening file: %w", err)
Expand All @@ -43,16 +52,30 @@ func openFile(index int) (err error) {

// Expect file is already opened, otherwise throws error
func readAlreadyOpenedFile(index int) (err error) {
b := make([]byte, *fBlockSizeKB*1024)
for i := 0; i < *fNumberOfRead; i++ {
r := bufio.NewReader(fileHandles[index])
b := make([]byte, *fBlockSize*1024)
readStart := time.Now()
_, _ = fileHandles[index].Seek(0, 0)

for {
_, err = fileHandles[index].Read(b)
if err != nil {
if err == io.EOF {
err = nil
}
break
}
}

_, err = io.CopyBuffer(io.Discard, r, b)
if err != nil {
return fmt.Errorf("while reading and discarding content: %v", err)
}
}

readLatency := time.Since(readStart)

throughput := float64(*fFileSizeMB) / readLatency.Seconds()
gResult.Append(readLatency.Seconds(), throughput)
}
return
}

Expand All @@ -67,6 +90,11 @@ func runReadFileOperations() (err error) {
return
}

if *fFileSizeMB <= 0 {
err = fmt.Errorf("file size is not valid")
return
}

fileHandles = make([]*os.File, *fNumOfThreads)

for i := 0; i < *fNumOfThreads; i++ {
Expand All @@ -89,14 +117,7 @@ func runReadFileOperations() (err error) {
})
}

err = eG.Wait()

if err == nil {
fmt.Println("read benchmark completed successfully!")
fmt.Println("Waiting for 10 seconds")

time.Sleep(10 * time.Second)
}
groupError := eG.Wait()

for i := 0; i < *fNumOfThreads; i++ {
if err = fileHandles[i].Close(); err != nil {
Expand All @@ -105,16 +126,30 @@ func runReadFileOperations() (err error) {
}
}

if groupError != nil {
err = groupError
} else {
fmt.Println("read benchmark completed successfully!")
}
return
}

func main() {
flag.Parse()
fmt.Println("\n******* Passed flags: *******")
flag.VisitAll(func(f *flag.Flag) {
fmt.Printf("Flag: %s, Value: %v\n", f.Name, f.Value)
})

err := runReadFileOperations()
if err != nil {
fmt.Println(err)
fmt.Printf("while performing read: %v", err)
os.Exit(1)
}
if *fOutputDir == "" {
*fOutputDir, _ = os.Getwd()
}
gResult.DumpMetricsCSV(path.Join(*fOutputDir, "metrics.csv"))
gResult.PrintStats()

}
159 changes: 159 additions & 0 deletions benchmark-script/read_operation/result_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
package main

import (
"encoding/csv"
"encoding/json"
"fmt"
"os"
"sort"
"sync"
"time"
)

type Metric struct {

Check failure on line 13 in benchmark-script/read_operation/result_metrics.go

View workflow job for this annotation

GitHub Actions / audit

exported type Metric should have comment or be unexported
latency float64
throughput float64
unixTime int64
}

type Result struct {

Check failure on line 19 in benchmark-script/read_operation/result_metrics.go

View workflow job for this annotation

GitHub Actions / audit

exported type Result should have comment or be unexported
metrics []Metric // List to store metrics
mutex sync.Mutex
}

func (r *Result) Append(latency float64, throughput float64) {

Check failure on line 24 in benchmark-script/read_operation/result_metrics.go

View workflow job for this annotation

GitHub Actions / audit

exported method Result.Append should have comment or be unexported
r.mutex.Lock()
defer r.mutex.Unlock()

newMetric := Metric{
latency: latency,
throughput: throughput,
unixTime: time.Now().Unix(),
}
r.metrics = append(r.metrics, newMetric)
}

func (r *Result) GetMetrics() []Metric {

Check failure on line 36 in benchmark-script/read_operation/result_metrics.go

View workflow job for this annotation

GitHub Actions / audit

exported method Result.GetMetrics should have comment or be unexported
r.mutex.Lock()
defer r.mutex.Unlock()

metricsCopy := make([]Metric, len(r.metrics))
copy(metricsCopy, r.metrics)
return metricsCopy
}

func (r *Result) DumpMetricsJson(filePath string) error {

Check failure on line 45 in benchmark-script/read_operation/result_metrics.go

View workflow job for this annotation

GitHub Actions / audit

exported method Result.DumpMetricsJson should have comment or be unexported

Check failure on line 45 in benchmark-script/read_operation/result_metrics.go

View workflow job for this annotation

GitHub Actions / audit

method DumpMetricsJson should be DumpMetricsJSON
r.mutex.Lock()
defer r.mutex.Unlock()
fmt.Print(r.metrics)

// Marshal the metrics int64o JSON format
jsonData, err := json.Marshal(r.metrics)
if err != nil {
return err
}

// Write the JSON data to the file
err = os.WriteFile(filePath, jsonData, 0644)
if err != nil {
return err
}

return nil
}

func (r *Result) DumpMetricsCSV(filePath string) error {

Check failure on line 65 in benchmark-script/read_operation/result_metrics.go

View workflow job for this annotation

GitHub Actions / audit

exported method Result.DumpMetricsCSV should have comment or be unexported
r.mutex.Lock()
defer r.mutex.Unlock()

// Create the CSV file
file, err := os.Create(filePath)
if err != nil {
return err
}
defer file.Close()

// Create a CSV writer
writer := csv.NewWriter(file)
defer writer.Flush()


// Write the CSV header
err = writer.Write([]string{"Timestamp", "ReadLatency(s)", "Throughput(MiB/s)"})
if err != nil {
return err
}

for _, metric := range r.metrics {
err = writer.Write([]string{
fmt.Sprintf("%d", metric.unixTime),
fmt.Sprintf("%.3f", metric.latency),
fmt.Sprintf("%.3f", metric.throughput),
})
if err != nil {
return err
}
}

return nil
}

func (r *Result) PrintStats() {

Check failure on line 101 in benchmark-script/read_operation/result_metrics.go

View workflow job for this annotation

GitHub Actions / audit

exported method Result.PrintStats should have comment or be unexported
r.mutex.Lock()
defer r.mutex.Unlock()

if len(r.metrics) == 0 {
fmt.Println("No metrics collected yet.")
return
}

// Calculate averages
var totalLatency, totalThroughput float64
for _, metric := range r.metrics {
totalLatency += metric.latency
totalThroughput += metric.throughput
}
avgLatency := totalLatency / float64(len(r.metrics))
avgThroughput := totalThroughput / float64(len(r.metrics))

// Calculate percentiles (e.g., 50th, 90th, 95th, 99th)
latencyValues := make([]float64, len(r.metrics))
throughputValues := make([]float64, len(r.metrics))
for i, metric := range r.metrics {
latencyValues[i] = metric.latency
throughputValues[i] = metric.throughput
}

sort.Float64s(latencyValues)
sort.Float64s(throughputValues)

fmt.Println("\n******* Metrics Summary: ReadLatency (s) *******")
fmt.Printf("Average Latency: %.2f\n", avgLatency)
fmt.Printf("p0: %.2f\n", percentileFloat64(latencyValues, 0))
fmt.Printf("p50: %.2f\n", percentileFloat64(latencyValues, 50))
fmt.Printf("p90: %.2f\n", percentileFloat64(latencyValues, 90))
fmt.Printf("p95: %.2f\n", percentileFloat64(latencyValues, 95))
fmt.Printf("p99: %.2f\n", percentileFloat64(latencyValues, 99))
fmt.Printf("p100: %.2f\n", percentileFloat64(latencyValues, 100))

fmt.Println("\n******* Metrics Summary: Throughput (MiB/s): *******")
fmt.Printf("Average Throughput: %.2f\n", avgThroughput)
fmt.Printf("p0: %.2f\n", percentileFloat64(throughputValues, 0))
fmt.Printf("p50: %.2f\n", percentileFloat64(throughputValues, 50))
fmt.Printf("p90: %.2f\n", percentileFloat64(throughputValues, 90))
fmt.Printf("p95: %.2f\n", percentileFloat64(throughputValues, 95))
fmt.Printf("p99: %.2f\n", percentileFloat64(throughputValues, 99))
fmt.Printf("p100: %.2f\n", percentileFloat64(throughputValues, 100))
}

func percentileFloat64(values []float64, p int) float64 {
if p < 0 || p > 100 {
panic("Percentile must be between 1 and 100")
}

index := int((float32(p) / float32(100)) * float32(len(values)))
if index == len(values) {
index--
}
return values[index]
}
6 changes: 6 additions & 0 deletions benchmark-script/read_operation/run_warp_test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#!/bin/bash

set -e
shopt -s expand_aliases

time go run . --threads 64 --read-count 1 --file-size-mb 64 --dir /home/princer_google_com/warp-test/gcs/64M/ --file-prefix "experiment."
Loading

0 comments on commit 3366f03

Please sign in to comment.