Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 57 & 60: Add "-fork " and "-kafka" options #61

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
9 changes: 7 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ You may obtain a copy of the License at

# Pravega Benchmark Tool

The Pravega benchmark tool used for the performance benchmarking of pravega streaming storage cluster.
The Pravega benchmark tool used for the performance benchmarking of pravega and Kafka streaming storage clusters.
This tool performs the throughput and latency analysis for the multi producers/writers and consumers/readers of pravega.
it also validates the end to end latency. The write and/or read latencies can be stored in a CSV file for later analysis.
At the end of the performance benchmarking, this tool outputs the 50th, 75th, 95th , 99th, 99.9th and 99.99th latency percentiles.
Expand Down Expand Up @@ -46,7 +46,6 @@ Running Pravega benchmark tool locally:

```
<dir>/pravega-benchmark$ ./run/pravega-benchmark/bin/pravega-benchmark -help
usage: pravega-benchmark
-consumers <arg> Number of consumers
-controller <arg> Controller URI
-events <arg> Number of events/records if 'time' not
Expand All @@ -58,7 +57,9 @@ usage: pravega-benchmark
<arg> number of of events/records; Not
applicable, if both producers and
consumers are specified
-fork <arg> Use Fork join Pool
-help Help message
-kafka <arg> Kafka Benchmarking
-producers <arg> Number of producers
-readcsv <arg> CSV file to record read latencies
-recreate <arg> If the stream is already existing, delete
Expand Down Expand Up @@ -172,3 +173,7 @@ The -throughput -1 specifies the writes tries to write the events at the maximum
### Recording the latencies to CSV files
User can use the options "-writecsv <file name>" to record the latencies of writers and "-readcsv <file name>" for readers.
in case of End to End latency mode, if the user can supply only -readcsv to get the end to end latency in to the csv file.

### Kafka Benchmarking
User can set the option "-kafka true" for Kafka Benchmarking. User should create the topics manually before running this for kafka benchmarking. Unlike Pravega benchmarking, this tool does not create the topic automatically. This tools treats stream name as a topic name.

3 changes: 2 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ buildscript {
compile "io.pravega:pravega-client:0.5.0",
"io.pravega:pravega-common:0.5.0",
"commons-cli:commons-cli:1.3.1",
"org.apache.commons:commons-csv:1.5"
"org.apache.commons:commons-csv:1.5",
"org.apache.kafka:kafka-clients:2.3.0"

runtime "org.slf4j:slf4j-simple:1.7.14"
}
Expand Down
46 changes: 46 additions & 0 deletions src/main/java/io/pravega/perf/KafkaReaderWorker.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/**
* Copyright (c) 2017 Dell Inc., or its subsidiaries. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*/
package io.pravega.perf;

import java.util.Properties;
import java.util.Arrays;

import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

/**
* Class for Kafka reader/consumer.
*/
public class KafkaReaderWorker extends ReaderWorker {
final private KafkaConsumer<byte[], byte[]> consumer;

KafkaReaderWorker(int readerId, int events, int secondsToRun,
long start, PerfStats stats, String partition,
int timeout, boolean writeAndRead, Properties consumerProps) {
super(readerId, events, secondsToRun, start, stats, partition, timeout, writeAndRead);

this.consumer = new KafkaConsumer<>(consumerProps);
this.consumer.subscribe(Arrays.asList(partition));
}

@Override
public byte[] readData() {
final ConsumerRecords<byte[], byte[]> records = consumer.poll(timeout);
if (records.isEmpty()) {
return null;
}
return records.iterator().next().value();
}

@Override
public void close() {
consumer.close();
}
}
59 changes: 59 additions & 0 deletions src/main/java/io/pravega/perf/KafkaWriterWorker.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/**
* Copyright (c) 2017 Dell Inc., or its subsidiaries. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*/
package io.pravega.perf;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

/**
* Class for Kafka writer/producer.
*/
public class KafkaWriterWorker extends WriterWorker {
final private KafkaProducer<byte[], byte[]> producer;

KafkaWriterWorker(int sensorId, int events, int flushEvents,
int secondsToRun, boolean isRandomKey, int messageSize,
long start, PerfStats stats, String streamName,
int eventsPerSec, boolean writeAndRead, Properties producerProps) {

super(sensorId, events, flushEvents,
secondsToRun, isRandomKey, messageSize,
start, stats, streamName, eventsPerSec, writeAndRead);

this.producer = new KafkaProducer<>(producerProps);
}

public long recordWrite(byte[] data, TriConsumer record) {
final long time = System.currentTimeMillis();
producer.send(new ProducerRecord<>(streamName, data), (metadata, exception) -> {
final long endTime = System.currentTimeMillis();
record.accept(time, endTime, data.length);
});
return time;
}

@Override
public void writeData(byte[] data) {
producer.send(new ProducerRecord<>(streamName, data));
}


@Override
public void flush() {
producer.flush();
}

@Override
public synchronized void close() {
producer.close();
}
}
13 changes: 6 additions & 7 deletions src/main/java/io/pravega/perf/PerfStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
import java.util.ArrayList;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.locks.LockSupport;
import java.nio.file.Files;
import java.nio.file.Paths;
Expand All @@ -38,7 +38,7 @@ public class PerfStats {
final private int messageSize;
final private int windowInterval;
final private ConcurrentLinkedQueue<TimeStamp> queue;
final private ForkJoinPool executor;
final private ExecutorService executor;

@GuardedBy("this")
private Future<Void> ret;
Expand Down Expand Up @@ -66,13 +66,13 @@ private boolean isEnd() {
}
}

public PerfStats(String action, int reportingInterval, int messageSize, String csvFile) {
public PerfStats(String action, int reportingInterval, int messageSize, String csvFile, ExecutorService executor) {
this.action = action;
this.messageSize = messageSize;
this.windowInterval = reportingInterval;
this.csvFile = csvFile;
this.executor = executor;
this.queue = new ConcurrentLinkedQueue<>();
this.executor = new ForkJoinPool(1);
this.ret = null;
}

Expand All @@ -83,7 +83,7 @@ final private class QueueProcessor implements Callable {
final private static int NS_PER_MICRO = 1000;
final private static int MICROS_PER_MS = 1000;
final private static int NS_PER_MS = NS_PER_MICRO * MICROS_PER_MS;
final private static int PARK_NS = NS_PER_MICRO;
final private static int PARK_NS = NS_PER_MS;
final private long startTime;

private QueueProcessor(long startTime) {
Expand All @@ -94,7 +94,7 @@ public Void call() throws IOException {
final TimeWindow window = new TimeWindow(action, startTime);
final LatencyWriter latencyRecorder = csvFile == null ? new LatencyWriter(action, messageSize, startTime) :
new CSVLatencyWriter(action, messageSize, startTime, csvFile);
final int minWaitTimeMS = windowInterval / 50;
final int minWaitTimeMS = windowInterval / 10;
final long totalIdleCount = (NS_PER_MS / PARK_NS) * minWaitTimeMS;
boolean doWork = true;
long time = startTime;
Expand Down Expand Up @@ -351,7 +351,6 @@ public synchronized void shutdown(long endTime) throws ExecutionException, Inter
if (this.ret != null) {
queue.add(new TimeStamp(endTime));
ret.get();
executor.shutdownNow();
queue.clear();
this.ret = null;
}
Expand Down
Loading