diff --git a/benchmark-script/read_operation/main.go b/benchmark-script/read_operation/main.go index 77e94cf..fb9d4f8 100644 --- a/benchmark-script/read_operation/main.go +++ b/benchmark-script/read_operation/main.go @@ -1,7 +1,6 @@ package main import ( - "bufio" "flag" "fmt" "io" @@ -11,6 +10,7 @@ import ( "syscall" "time" + "golang.org/x/sync/errgroup" ) @@ -18,20 +18,29 @@ var ( fDir = flag.String("dir", "", "Directory file to be opened.") fNumOfThreads = flag.Int("threads", 1, "Number of threads to read parallel") - fBlockSize = flag.Int("block-size", 256, "Block size in KB") + fBlockSizeKB = flag.Int("block-size-kb", 1024, "Block size in KB") + + fFileSizeMB = flag.Int64("file-size-mb", 1024, "File size in MB") fileHandles []*os.File eG errgroup.Group - // OneKB means 1024 bytes. OneKB = 1024 fNumberOfRead = flag.Int("read-count", 1, "number of read iteration") + + fOutputDir = flag.String("output-dir", "", "Directory to dump the output") + fFilePrefix = flag.String("file-prefix", "", "Prefix file") ) +var gResult *Result +func init() { + gResult = &Result{} +} + func openFile(index int) (err error) { - fileName := path.Join(*fDir, "file_"+strconv.Itoa(index)) + fileName := path.Join(*fDir, *fFilePrefix+strconv.Itoa(index)) fileHandle, err := os.OpenFile(fileName, os.O_RDONLY|syscall.O_DIRECT, 0600) if err != nil { err = fmt.Errorf("while opening file: %w", err) @@ -43,16 +52,30 @@ func openFile(index int) (err error) { // Expect file is already opened, otherwise throws error func readAlreadyOpenedFile(index int) (err error) { + b := make([]byte, *fBlockSizeKB*1024) for i := 0; i < *fNumberOfRead; i++ { - r := bufio.NewReader(fileHandles[index]) - b := make([]byte, *fBlockSize*1024) + readStart := time.Now() + _, _ = fileHandles[index].Seek(0, 0) + + for { + _, err = fileHandles[index].Read(b) + if err != nil { + if err == io.EOF { + err = nil + } + break + } + } - _, err = io.CopyBuffer(io.Discard, r, b) if err != nil { return fmt.Errorf("while reading and discarding content: %v", err) } - } + readLatency := time.Since(readStart) + + throughput := float64(*fFileSizeMB) / readLatency.Seconds() + gResult.Append(readLatency.Seconds(), throughput) + } return } @@ -67,6 +90,11 @@ func runReadFileOperations() (err error) { return } + if *fFileSizeMB <= 0 { + err = fmt.Errorf("file size is not valid") + return + } + fileHandles = make([]*os.File, *fNumOfThreads) for i := 0; i < *fNumOfThreads; i++ { @@ -89,14 +117,7 @@ func runReadFileOperations() (err error) { }) } - err = eG.Wait() - - if err == nil { - fmt.Println("read benchmark completed successfully!") - fmt.Println("Waiting for 10 seconds") - - time.Sleep(10 * time.Second) - } + groupError := eG.Wait() for i := 0; i < *fNumOfThreads; i++ { if err = fileHandles[i].Close(); err != nil { @@ -105,16 +126,30 @@ func runReadFileOperations() (err error) { } } + if groupError != nil { + err = groupError + } else { + fmt.Println("read benchmark completed successfully!") + } return } func main() { flag.Parse() + fmt.Println("\n******* Passed flags: *******") + flag.VisitAll(func(f *flag.Flag) { + fmt.Printf("Flag: %s, Value: %v\n", f.Name, f.Value) + }) err := runReadFileOperations() if err != nil { - fmt.Println(err) + fmt.Printf("while performing read: %v", err) os.Exit(1) } + if *fOutputDir == "" { + *fOutputDir, _ = os.Getwd() + } + gResult.DumpMetricsCSV(path.Join(*fOutputDir, "metrics.csv")) + gResult.PrintStats() } diff --git a/benchmark-script/read_operation/result_metrics.go b/benchmark-script/read_operation/result_metrics.go new file mode 100644 index 0000000..bef2fba --- /dev/null +++ b/benchmark-script/read_operation/result_metrics.go @@ -0,0 +1,159 @@ +package main + +import ( + "encoding/csv" + "encoding/json" + "fmt" + "os" + "sort" + "sync" + "time" +) + +type Metric struct { + latency float64 + throughput float64 + unixTime int64 +} + +type Result struct { + metrics []Metric // List to store metrics + mutex sync.Mutex +} + +func (r *Result) Append(latency float64, throughput float64) { + r.mutex.Lock() + defer r.mutex.Unlock() + + newMetric := Metric{ + latency: latency, + throughput: throughput, + unixTime: time.Now().Unix(), + } + r.metrics = append(r.metrics, newMetric) +} + +func (r *Result) GetMetrics() []Metric { + r.mutex.Lock() + defer r.mutex.Unlock() + + metricsCopy := make([]Metric, len(r.metrics)) + copy(metricsCopy, r.metrics) + return metricsCopy +} + +func (r *Result) DumpMetricsJson(filePath string) error { + r.mutex.Lock() + defer r.mutex.Unlock() + fmt.Print(r.metrics) + + // Marshal the metrics int64o JSON format + jsonData, err := json.Marshal(r.metrics) + if err != nil { + return err + } + + // Write the JSON data to the file + err = os.WriteFile(filePath, jsonData, 0644) + if err != nil { + return err + } + + return nil +} + +func (r *Result) DumpMetricsCSV(filePath string) error { + r.mutex.Lock() + defer r.mutex.Unlock() + + // Create the CSV file + file, err := os.Create(filePath) + if err != nil { + return err + } + defer file.Close() + + // Create a CSV writer + writer := csv.NewWriter(file) + defer writer.Flush() + + + // Write the CSV header + err = writer.Write([]string{"Timestamp", "ReadLatency(s)", "Throughput(MiB/s)"}) + if err != nil { + return err + } + + for _, metric := range r.metrics { + err = writer.Write([]string{ + fmt.Sprintf("%d", metric.unixTime), + fmt.Sprintf("%.3f", metric.latency), + fmt.Sprintf("%.3f", metric.throughput), + }) + if err != nil { + return err + } + } + + return nil +} + +func (r *Result) PrintStats() { + r.mutex.Lock() + defer r.mutex.Unlock() + + if len(r.metrics) == 0 { + fmt.Println("No metrics collected yet.") + return + } + + // Calculate averages + var totalLatency, totalThroughput float64 + for _, metric := range r.metrics { + totalLatency += metric.latency + totalThroughput += metric.throughput + } + avgLatency := totalLatency / float64(len(r.metrics)) + avgThroughput := totalThroughput / float64(len(r.metrics)) + + // Calculate percentiles (e.g., 50th, 90th, 95th, 99th) + latencyValues := make([]float64, len(r.metrics)) + throughputValues := make([]float64, len(r.metrics)) + for i, metric := range r.metrics { + latencyValues[i] = metric.latency + throughputValues[i] = metric.throughput + } + + sort.Float64s(latencyValues) + sort.Float64s(throughputValues) + + fmt.Println("\n******* Metrics Summary: ReadLatency (s) *******") + fmt.Printf("Average Latency: %.2f\n", avgLatency) + fmt.Printf("p0: %.2f\n", percentileFloat64(latencyValues, 0)) + fmt.Printf("p50: %.2f\n", percentileFloat64(latencyValues, 50)) + fmt.Printf("p90: %.2f\n", percentileFloat64(latencyValues, 90)) + fmt.Printf("p95: %.2f\n", percentileFloat64(latencyValues, 95)) + fmt.Printf("p99: %.2f\n", percentileFloat64(latencyValues, 99)) + fmt.Printf("p100: %.2f\n", percentileFloat64(latencyValues, 100)) + + fmt.Println("\n******* Metrics Summary: Throughput (MiB/s): *******") + fmt.Printf("Average Throughput: %.2f\n", avgThroughput) + fmt.Printf("p0: %.2f\n", percentileFloat64(throughputValues, 0)) + fmt.Printf("p50: %.2f\n", percentileFloat64(throughputValues, 50)) + fmt.Printf("p90: %.2f\n", percentileFloat64(throughputValues, 90)) + fmt.Printf("p95: %.2f\n", percentileFloat64(throughputValues, 95)) + fmt.Printf("p99: %.2f\n", percentileFloat64(throughputValues, 99)) + fmt.Printf("p100: %.2f\n", percentileFloat64(throughputValues, 100)) +} + +func percentileFloat64(values []float64, p int) float64 { + if p < 0 || p > 100 { + panic("Percentile must be between 1 and 100") + } + + index := int((float32(p) / float32(100)) * float32(len(values))) + if index == len(values) { + index-- + } + return values[index] +} diff --git a/benchmark-script/read_operation/run_warp_test.sh b/benchmark-script/read_operation/run_warp_test.sh new file mode 100755 index 0000000..7c543d3 --- /dev/null +++ b/benchmark-script/read_operation/run_warp_test.sh @@ -0,0 +1,6 @@ +#!/bin/bash + +set -e +shopt -s expand_aliases + +time go run . --threads 64 --read-count 1 --file-size-mb 64 --dir /home/princer_google_com/warp-test/gcs/64M/ --file-prefix "experiment." diff --git a/benchmark-script/warp_test/merge_csv_script.go b/benchmark-script/warp_test/merge_csv_script.go new file mode 100644 index 0000000..2bdabe3 --- /dev/null +++ b/benchmark-script/warp_test/merge_csv_script.go @@ -0,0 +1,217 @@ +package main + +import ( + "encoding/csv" + "fmt" + "math" + "os" + "path/filepath" + "sort" + "strconv" +) + +type DataRow struct { + Timestamp int64 + ReadLatency float64 + Throughput float64 +} + +func main() { + // Define the folder containing the CSV files + folder := "/usr/local/google/home/princer/csv/metrics" + + // Store all data rows from all files + var allDataRows []DataRow + + cmnStartTime := int64(0) + cmnEndTime := int64(math.MaxInt64) + + // Iterate over all CSV files in the folder + err := filepath.Walk(folder, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if filepath.Ext(path) == ".csv" { + dataRows, err := readCSV(path) + n := len(dataRows) + if n == 0 { + return fmt.Errorf("empty metrics-file") + } + + // Get the time interval when all the nodes are active. + if cmnStartTime < dataRows[0].Timestamp { + cmnStartTime = dataRows[0].Timestamp + } + if cmnEndTime > dataRows[n - 1].Timestamp { + cmnEndTime = dataRows[n - 1].Timestamp + } + + if err != nil { + return err + } + allDataRows = append(allDataRows, dataRows...) + } + return nil + }) + + fmt.Println(cmnStartTime) + fmt.Println(cmnEndTime) + if err != nil { + fmt.Printf("Error reading CSV files: %v\n", err) + return + } + + // Sort all data rows by Timestamp + sort.Slice(allDataRows, func(i, j int) bool { + return allDataRows[i].Timestamp < allDataRows[j].Timestamp + }) + + // Filter out the data which is not part of common time interval. + var tmpFilteredRows []DataRow + var filteredCnt int + for _, data := range allDataRows { + if data.Timestamp < cmnStartTime || data.Timestamp > cmnEndTime { + filteredCnt++ + continue + } + tmpFilteredRows = append(tmpFilteredRows, data) + } + allDataRows = tmpFilteredRows + + fmt.Printf("total filtered: %v\n", filteredCnt) + printPercentiles(allDataRows) + + // Write the filtered data to a new CSV file + outputFile := "/usr/local/google/home/princer/csv/output/output.csv" + err = writeCSV(outputFile, allDataRows) + fmt.Printf("Len: %d", len(allDataRows)) + if err != nil { + fmt.Printf("Error writing CSV file: %v\n", err) + return + } + + fmt.Printf("Merged and filtered data written to %s\n", outputFile) +} + +// readCSV reads a CSV file and returns a slice of DataRow +func readCSV(path string) ([]DataRow, error) { + file, err := os.Open(path) + if err != nil { + return nil, err + } + defer file.Close() + + reader := csv.NewReader(file) + // Skip the header row + _, err = reader.Read() + if err != nil { + return nil, err + } + + var dataRows []DataRow + for { + record, err := reader.Read() + if err != nil { + break + } + timestamp, _ := strconv.ParseInt(record[0], 10, 64) + readLatency, _ := strconv.ParseFloat(record[1], 64) + throughput, _ := strconv.ParseFloat(record[2], 64) + + dataRows = append(dataRows, DataRow{ + Timestamp: timestamp, + ReadLatency: readLatency, + Throughput: throughput, + }) + } + + return dataRows, nil +} + +// writeCSV writes a slice of DataRow to a CSV file +func writeCSV(path string, data []DataRow) error { + file, err := os.Create(path) + if err != nil { + return err + } + defer file.Close() + + writer := csv.NewWriter(file) + defer writer.Flush() + + // Write header row + err = writer.Write([]string{"Timestamp", "ReadLatency(s)", "Throughput(MiB/s)"}) + if err != nil { + return err + } + + // Write data rows + for _, row := range data { + err := writer.Write([]string{ + strconv.FormatInt(row.Timestamp, 10), + strconv.FormatFloat(row.ReadLatency, 'f', 3, 64), + strconv.FormatFloat(row.Throughput, 'f', 3, 64), + }) + if err != nil { + return err + } + } + + return nil +} + +func printPercentiles(rows []DataRow) { + // Calculate averages + var totalLatency float64 + for _, metric := range rows { + totalLatency += metric.ReadLatency + } + avgLatency := totalLatency / float64(len(rows)) + + // Calculate standard deviation + var squaredDiffSum float64 + for _, metric := range rows { + squaredDiff := math.Pow(metric.ReadLatency-avgLatency, 2) + squaredDiffSum += squaredDiff + } + + // Calculate the variance + variance := squaredDiffSum / float64(len(rows)) + + // Calculate the standard deviation + standardDeviation := math.Sqrt(variance) + + // Calculate percentiles (e.g., 50th, 90th, 95th, 99th) + latencyValues := make([]float64, len(rows)) + for i, metric := range rows { + latencyValues[i] = metric.ReadLatency + } + + sort.Float64s(latencyValues) + + fmt.Println("\n******* Metrics Summary: ReadLatency (s) *******") + fmt.Printf("Average Latency: %.2f\n", avgLatency) + fmt.Printf("Count: %d\n", len(rows)) + fmt.Printf("Standard deviation: %.2f\n", standardDeviation) + fmt.Printf("p0: %.2f\n", percentileFloat64(latencyValues, 0)) + fmt.Printf("p50: %.2f\n", percentileFloat64(latencyValues, 50)) + fmt.Printf("p90: %.2f\n", percentileFloat64(latencyValues, 90)) + fmt.Printf("p95: %.2f\n", percentileFloat64(latencyValues, 95)) + fmt.Printf("p99: %.2f\n", percentileFloat64(latencyValues, 99)) + fmt.Printf("p99.9: %.2f\n", percentileFloat64(latencyValues, 99.9)) + fmt.Printf("p99.99: %.2f\n", percentileFloat64(latencyValues, 99.99)) + fmt.Printf("p100: %.2f\n", percentileFloat64(latencyValues, 100)) +} + +func percentileFloat64(values []float64, p float32) float64 { + if p < 0 || p > 100 { + panic("Percentile must be between 1 and 100") + } + + index := int((p / float32(100)) * float32(len(values))) + if index == len(values) { + index-- + } + return values[index] +} + diff --git a/go.sum b/go.sum index 68115dc..3b4b456 100644 --- a/go.sum +++ b/go.sum @@ -565,6 +565,7 @@ github.com/golang/mock v1.4.1/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt github.com/golang/mock v1.4.3/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw= github.com/golang/mock v1.4.4/go.mod h1:l3mdAwkq5BuhzHwde/uurv3sEJeZMXNpwsxVWU71h+4= github.com/golang/mock v1.5.0/go.mod h1:CWnOUgYIOo4TcNZ0wHX3YZCqsaM1I1Jvs6v3mP3KVu8= +github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc= github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=