From 11efa4f5ebe56e6d8bd9f0856dcc4a018d27b7c7 Mon Sep 17 00:00:00 2001 From: Keshava Munegowda Date: Fri, 6 Sep 2019 15:41:42 +0530 Subject: [PATCH 01/15] Code Cleanup. Signed-off-by: Keshava Munegowda --- src/main/java/io/pravega/perf/PerfStats.java | 3 ++- src/main/java/io/pravega/perf/PravegaPerfTest.java | 3 ++- src/main/java/io/pravega/perf/WriterWorker.java | 2 +- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/src/main/java/io/pravega/perf/PerfStats.java b/src/main/java/io/pravega/perf/PerfStats.java index 30008c43a..2e584b988 100644 --- a/src/main/java/io/pravega/perf/PerfStats.java +++ b/src/main/java/io/pravega/perf/PerfStats.java @@ -16,6 +16,7 @@ import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Callable; import java.util.concurrent.Future; +import java.util.concurrent.ExecutorService; import java.util.concurrent.locks.LockSupport; import java.nio.file.Files; import java.nio.file.Paths; @@ -38,7 +39,7 @@ public class PerfStats { final private int messageSize; final private int windowInterval; final private ConcurrentLinkedQueue queue; - final private ForkJoinPool executor; + final private ExecutorService executor; @GuardedBy("this") private Future ret; diff --git a/src/main/java/io/pravega/perf/PravegaPerfTest.java b/src/main/java/io/pravega/perf/PravegaPerfTest.java index e90cc9638..5b24a3954 100644 --- a/src/main/java/io/pravega/perf/PravegaPerfTest.java +++ b/src/main/java/io/pravega/perf/PravegaPerfTest.java @@ -39,6 +39,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ExecutorService; /** * Performance benchmark for Pravega. @@ -102,7 +103,7 @@ public static void main(String[] args) { System.exit(0); } - final ForkJoinPool executor = new ForkJoinPool(); + final ExecutorService executor = new ForkJoinPool(); try { final List producers = perfTest.getProducers(); diff --git a/src/main/java/io/pravega/perf/WriterWorker.java b/src/main/java/io/pravega/perf/WriterWorker.java index 7a87f5b9c..54bce4a6a 100644 --- a/src/main/java/io/pravega/perf/WriterWorker.java +++ b/src/main/java/io/pravega/perf/WriterWorker.java @@ -194,7 +194,7 @@ private void EventsWriterTimeRW() throws InterruptedException, IOException { for (int i = 0; (time - startTime) < msToRun; i++) { time = System.currentTimeMillis(); - byte[] bytes = timeBuffer.putLong(0, System.currentTimeMillis()).array(); + byte[] bytes = timeBuffer.putLong(0, time).array(); System.arraycopy(bytes, 0, payload, 0, bytes.length); writeData(payload); /* From 5c0fc88a66d448abda931643af2592d851c17ceb Mon Sep 17 00:00:00 2001 From: Keshava Munegowda Date: Fri, 6 Sep 2019 18:17:36 +0530 Subject: [PATCH 02/15] Implementation of -fork option. Signed-off-by: Keshava Munegowda --- src/main/java/io/pravega/perf/PerfStats.java | 4 +-- .../java/io/pravega/perf/PravegaPerfTest.java | 28 +++++++++++++++++-- 2 files changed, 27 insertions(+), 5 deletions(-) diff --git a/src/main/java/io/pravega/perf/PerfStats.java b/src/main/java/io/pravega/perf/PerfStats.java index 2e584b988..f85e63037 100644 --- a/src/main/java/io/pravega/perf/PerfStats.java +++ b/src/main/java/io/pravega/perf/PerfStats.java @@ -67,13 +67,13 @@ private boolean isEnd() { } } - public PerfStats(String action, int reportingInterval, int messageSize, String csvFile) { + public PerfStats(String action, int reportingInterval, int messageSize, String csvFile, ExecutorService executor) { this.action = action; this.messageSize = messageSize; this.windowInterval = reportingInterval; this.csvFile = csvFile; + this.executor = executor; this.queue = new ConcurrentLinkedQueue<>(); - this.executor = new ForkJoinPool(1); this.ret = null; } diff --git a/src/main/java/io/pravega/perf/PravegaPerfTest.java b/src/main/java/io/pravega/perf/PravegaPerfTest.java index 5b24a3954..d906cdf12 100644 --- a/src/main/java/io/pravega/perf/PravegaPerfTest.java +++ b/src/main/java/io/pravega/perf/PravegaPerfTest.java @@ -81,6 +81,7 @@ public static void main(String[] args) { "if -1, get the maximum throughput"); options.addOption("writecsv", true, "CSV file to record write latencies"); options.addOption("readcsv", true, "CSV file to record read latencies"); + options.addOption("fork", true, "Use Fork join Pool"); options.addOption("help", false, "Help message"); @@ -103,7 +104,7 @@ public static void main(String[] args) { System.exit(0); } - final ExecutorService executor = new ForkJoinPool(); + final ExecutorService executor = perfTest.getExecutor(); try { final List producers = perfTest.getProducers(); @@ -172,6 +173,7 @@ static private abstract class Test { static final int TIMEOUT = 1000; static final String SCOPE = "Scope"; + final ExecutorService executor; final String controllerUri; final int messageSize; final String streamName; @@ -179,6 +181,7 @@ static private abstract class Test { final String scopeName; final boolean recreate; final boolean writeAndRead; + final boolean fork; final int producerCount; final int consumerCount; final int segmentCount; @@ -196,6 +199,7 @@ static private abstract class Test { final PerfStats consumeStats; final long startTime; + Test(long startTime, CommandLine commandline) throws IllegalArgumentException { this.startTime = startTime; if (commandline.hasOption("controller")) { @@ -277,6 +281,12 @@ static private abstract class Test { recreate = producerCount > 0 && consumerCount > 0; } + if (commandline.hasOption("fork")) { + fork = Boolean.parseBoolean(commandline.getOptionValue("fork")); + } else { + fork = false; + } + if (commandline.hasOption("throughput")) { throughput = Double.parseDouble(commandline.getOptionValue("throughput")); } else { @@ -306,6 +316,14 @@ static private abstract class Test { throw new IllegalArgumentException("Error: Must specify the number of producers or Consumers"); } + final int threadCount = producerCount + consumerCount + 6; + + if (fork) { + executor = new ForkJoinPool(threadCount); + } else { + executor = Executors.newScheduledThreadPool(threadCount); + } + if (recreate) { rdGrpName = streamName + startTime; } else { @@ -322,7 +340,7 @@ static private abstract class Test { if (writeAndRead) { produceStats = null; } else { - produceStats = new PerfStats("Writing", REPORTINGINTERVAL, messageSize, writeFile); + produceStats = new PerfStats("Writing", REPORTINGINTERVAL, messageSize, writeFile, executor); } eventsPerProducer = (events + producerCount - 1) / producerCount; @@ -347,7 +365,7 @@ static private abstract class Test { } else { action = "Reading"; } - consumeStats = new PerfStats(action, REPORTINGINTERVAL, messageSize, readFile); + consumeStats = new PerfStats(action, REPORTINGINTERVAL, messageSize, readFile, executor); eventsPerConsumer = events / consumerCount; } else { consumeStats = null; @@ -377,6 +395,10 @@ public void shutdown(long endTime) { } } + public ExecutorService getExecutor() { + return executor; + } + public abstract void closeReaderGroup(); public abstract List getProducers(); From 1b04b12b8f3282a466d6a53acc3db159bc284f87 Mon Sep 17 00:00:00 2001 From: Keshava Munegowda Date: Fri, 6 Sep 2019 18:29:14 +0530 Subject: [PATCH 03/15] Code cleanup. Signed-off-by: Keshava Munegowda --- src/main/java/io/pravega/perf/PravegaPerfTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/io/pravega/perf/PravegaPerfTest.java b/src/main/java/io/pravega/perf/PravegaPerfTest.java index d906cdf12..e23a8478b 100644 --- a/src/main/java/io/pravega/perf/PravegaPerfTest.java +++ b/src/main/java/io/pravega/perf/PravegaPerfTest.java @@ -321,7 +321,7 @@ static private abstract class Test { if (fork) { executor = new ForkJoinPool(threadCount); } else { - executor = Executors.newScheduledThreadPool(threadCount); + executor = Executors.newFixedThreadPool(threadCount); } if (recreate) { From d245ee37a6fdfa3d49f19a8e02ad125d34499123 Mon Sep 17 00:00:00 2001 From: Keshava Munegowda Date: Fri, 6 Sep 2019 18:45:27 +0530 Subject: [PATCH 04/15] Use fork join pool as default. Signed-off-by: Keshava Munegowda --- src/main/java/io/pravega/perf/PravegaPerfTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/io/pravega/perf/PravegaPerfTest.java b/src/main/java/io/pravega/perf/PravegaPerfTest.java index e23a8478b..3781ca631 100644 --- a/src/main/java/io/pravega/perf/PravegaPerfTest.java +++ b/src/main/java/io/pravega/perf/PravegaPerfTest.java @@ -284,7 +284,7 @@ static private abstract class Test { if (commandline.hasOption("fork")) { fork = Boolean.parseBoolean(commandline.getOptionValue("fork")); } else { - fork = false; + fork = true; } if (commandline.hasOption("throughput")) { From 3ed25adc0389d5042207d2ce2156047e625887e5 Mon Sep 17 00:00:00 2001 From: Keshava Munegowda Date: Fri, 6 Sep 2019 21:09:51 +0530 Subject: [PATCH 05/15] Add Kafka benchmarking. Signed-off-by: Keshava Munegowda --- build.gradle | 3 +- .../io/pravega/perf/KafkaReaderWorker.java | 46 ++++++++ .../io/pravega/perf/KafkaWriterWorker.java | 58 ++++++++++ .../java/io/pravega/perf/PravegaPerfTest.java | 109 +++++++++++++++++- 4 files changed, 214 insertions(+), 2 deletions(-) create mode 100644 src/main/java/io/pravega/perf/KafkaReaderWorker.java create mode 100644 src/main/java/io/pravega/perf/KafkaWriterWorker.java diff --git a/build.gradle b/build.gradle index 101678b48..e103b400b 100644 --- a/build.gradle +++ b/build.gradle @@ -31,7 +31,8 @@ buildscript { compile "io.pravega:pravega-client:0.5.0", "io.pravega:pravega-common:0.5.0", "commons-cli:commons-cli:1.3.1", - "org.apache.commons:commons-csv:1.5" + "org.apache.commons:commons-csv:1.5", + "org.apache.kafka:kafka-clients:2.2.0" runtime "org.slf4j:slf4j-simple:1.7.14" } diff --git a/src/main/java/io/pravega/perf/KafkaReaderWorker.java b/src/main/java/io/pravega/perf/KafkaReaderWorker.java new file mode 100644 index 000000000..c17222a16 --- /dev/null +++ b/src/main/java/io/pravega/perf/KafkaReaderWorker.java @@ -0,0 +1,46 @@ +/** + * Copyright (c) 2017 Dell Inc., or its subsidiaries. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.pravega.perf; + +import java.util.Properties; +import java.util.Arrays; + +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; + +/** + * Class for Kafka reader/consumer. + */ +public class KafkaReaderWorker extends ReaderWorker { + final private KafkaConsumer consumer; + + KafkaReaderWorker(int readerId, int events, int secondsToRun, + long start, PerfStats stats, String partition, + int timeout, boolean writeAndRead, Properties consumerProps) { + super(readerId, events, secondsToRun, start, stats, partition, timeout, writeAndRead); + + this.consumer = new KafkaConsumer<>(consumerProps); + this.consumer.subscribe(Arrays.asList(partition)); + } + + @Override + public byte[] readData() { + final ConsumerRecords records = consumer.poll(timeout); + if (records.isEmpty()) { + return null; + } + return records.iterator().next().value(); + } + + @Override + public void close() { + consumer.close(); + } +} \ No newline at end of file diff --git a/src/main/java/io/pravega/perf/KafkaWriterWorker.java b/src/main/java/io/pravega/perf/KafkaWriterWorker.java new file mode 100644 index 000000000..2500d1b49 --- /dev/null +++ b/src/main/java/io/pravega/perf/KafkaWriterWorker.java @@ -0,0 +1,58 @@ +/** + * Copyright (c) 2017 Dell Inc., or its subsidiaries. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.pravega.perf; + +import java.util.Properties; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; + +/** + * Class for Kafka writer/producer. + */ +public class KafkaWriterWorker extends WriterWorker { + final private KafkaProducer producer; + + KafkaWriterWorker(int sensorId, int events, int flushEvents, + int secondsToRun, boolean isRandomKey, int messageSize, + long start, PerfStats stats, String streamName, + int eventsPerSec, boolean writeAndRead, Properties producerProps) { + + super(sensorId, events, flushEvents, + secondsToRun, isRandomKey, messageSize, + start, stats, streamName, eventsPerSec, writeAndRead); + + this.producer = new KafkaProducer<>(producerProps); + } + + public long recordWrite(byte[] data, TriConsumer record) { + final long time = System.currentTimeMillis(); + producer.send(new ProducerRecord<>(streamName, data), (metadata, exception) -> { + record.accept(time, System.currentTimeMillis(), data.length); + }); + return time; + } + + @Override + public void writeData(byte[] data) { + producer.send(new ProducerRecord<>(streamName, data)); + } + + + @Override + public void flush() { + producer.flush(); + } + + @Override + public synchronized void close() { + producer.close(); + } +} \ No newline at end of file diff --git a/src/main/java/io/pravega/perf/PravegaPerfTest.java b/src/main/java/io/pravega/perf/PravegaPerfTest.java index 3781ca631..0be287e43 100644 --- a/src/main/java/io/pravega/perf/PravegaPerfTest.java +++ b/src/main/java/io/pravega/perf/PravegaPerfTest.java @@ -17,6 +17,12 @@ import io.pravega.client.stream.impl.ControllerImplConfig; import io.pravega.client.stream.impl.ClientFactoryImpl; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.requests.IsolationLevel; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; + import java.io.IOException; import java.net.URISyntaxException; @@ -40,6 +46,8 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ExecutorService; +import java.util.Properties; +import java.util.Locale; /** * Performance benchmark for Pravega. @@ -154,7 +162,17 @@ public void run() { public static Test createTest(long startTime, CommandLine commandline, Options options) { try { - return new PravegaTest(startTime, commandline); + boolean runKafka; + if (commandline.hasOption("kafka")) { + runKafka = Boolean.parseBoolean(commandline.getOptionValue("kafka")); + } else { + runKafka = false; + } + if (runKafka) { + return new KafkaTest(startTime, commandline); + } else { + return new PravegaTest(startTime, commandline); + } } catch (IllegalArgumentException ex) { ex.printStackTrace(); final HelpFormatter formatter = new HelpFormatter(); @@ -493,6 +511,95 @@ public void closeReaderGroup() { readerGroup.close(); } } + } + + static private class KafkaTest extends Test { + final private Properties producerConfig; + final private Properties consumerConfig; + + KafkaTest(long startTime, CommandLine commandline) throws + IllegalArgumentException, URISyntaxException, InterruptedException, Exception { + super(startTime, commandline); + producerConfig = createProducerConfig(); + consumerConfig = createConsumerConfig(); + } + private Properties createProducerConfig() { + if (producerCount < 1) { + return null; + } + final Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, controllerUri); + props.put(ProducerConfig.ACKS_CONFIG, "all"); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + // Enabling the producer IDEMPOTENCE is must to compare between Kafka and Pravega + props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); + return props; + } + + private Properties createConsumerConfig() { + if (consumerCount < 1) { + return null; + } + final Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, controllerUri); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1); + // Enabling the consumer to READ_COMMITTED is must to compare between Kafka and Pravega + props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT)); + if (writeAndRead) { + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); + props.put(ConsumerConfig.GROUP_ID_CONFIG, streamName); + } else { + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.put(ConsumerConfig.GROUP_ID_CONFIG, Long.toString(startTime)); + } + return props; + } + + + public List getProducers() { + final List writers; + + if (producerCount > 0) { + if (transactionPerCommit > 0) { + throw new IllegalArgumentException("Kafka Transactions are not supported"); + } else { + writers = IntStream.range(0, producerCount) + .boxed() + .map(i -> new KafkaWriterWorker(i, eventsPerProducer, + EventsPerFlush, runtimeSec, false, + messageSize, startTime, produceStats, + streamName, eventsPerSec, writeAndRead, producerConfig)) + .collect(Collectors.toList()); + } + } else { + writers = null; + } + return writers; + } + + public List getConsumers() throws URISyntaxException { + final List readers; + if (consumerCount > 0) { + readers = IntStream.range(0, consumerCount) + .boxed() + .map(i -> new KafkaReaderWorker(i, eventsPerConsumer, + runtimeSec, startTime, consumeStats, + streamName, TIMEOUT, writeAndRead, consumerConfig)) + .collect(Collectors.toList()); + + } else { + readers = null; + } + return readers; + } + + @Override + public void closeReaderGroup() { + } } + } From 8c311ea51265e6d41743c5e7c323719b90ecf340 Mon Sep 17 00:00:00 2001 From: Keshava Munegowda Date: Fri, 6 Sep 2019 21:14:44 +0530 Subject: [PATCH 06/15] Code cleanup Signed-off-by: Keshava Munegowda --- src/main/java/io/pravega/perf/PravegaPerfTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/java/io/pravega/perf/PravegaPerfTest.java b/src/main/java/io/pravega/perf/PravegaPerfTest.java index 0be287e43..58eb0077c 100644 --- a/src/main/java/io/pravega/perf/PravegaPerfTest.java +++ b/src/main/java/io/pravega/perf/PravegaPerfTest.java @@ -90,6 +90,7 @@ public static void main(String[] args) { options.addOption("writecsv", true, "CSV file to record write latencies"); options.addOption("readcsv", true, "CSV file to record read latencies"); options.addOption("fork", true, "Use Fork join Pool"); + options.addOption("kafka", true, "Kafka Benchmarking"); options.addOption("help", false, "Help message"); From f741a63ca8ad74a3a10bca9c503ab7a8700327d6 Mon Sep 17 00:00:00 2001 From: Keshava Munegowda Date: Fri, 6 Sep 2019 21:17:41 +0530 Subject: [PATCH 07/15] Fix the Serializer class. Signed-off-by: Keshava Munegowda --- src/main/java/io/pravega/perf/PravegaPerfTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/io/pravega/perf/PravegaPerfTest.java b/src/main/java/io/pravega/perf/PravegaPerfTest.java index 58eb0077c..6561458de 100644 --- a/src/main/java/io/pravega/perf/PravegaPerfTest.java +++ b/src/main/java/io/pravega/perf/PravegaPerfTest.java @@ -533,7 +533,7 @@ private Properties createProducerConfig() { props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, controllerUri); props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); // Enabling the producer IDEMPOTENCE is must to compare between Kafka and Pravega props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); return props; @@ -545,7 +545,7 @@ private Properties createConsumerConfig() { } final Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, controllerUri); - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1); // Enabling the consumer to READ_COMMITTED is must to compare between Kafka and Pravega From 90c66d67c337e61685d0bcb974564300c90ba8ea Mon Sep 17 00:00:00 2001 From: Keshava Munegowda Date: Fri, 6 Sep 2019 22:49:25 +0530 Subject: [PATCH 08/15] Tuning the idle count for performance task. Signed-off-by: Keshava Munegowda --- src/main/java/io/pravega/perf/PerfStats.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/io/pravega/perf/PerfStats.java b/src/main/java/io/pravega/perf/PerfStats.java index f85e63037..fd8d90e86 100644 --- a/src/main/java/io/pravega/perf/PerfStats.java +++ b/src/main/java/io/pravega/perf/PerfStats.java @@ -84,7 +84,7 @@ final private class QueueProcessor implements Callable { final private static int NS_PER_MICRO = 1000; final private static int MICROS_PER_MS = 1000; final private static int NS_PER_MS = NS_PER_MICRO * MICROS_PER_MS; - final private static int PARK_NS = NS_PER_MICRO; + final private static int PARK_NS = NS_PER_MS; final private long startTime; private QueueProcessor(long startTime) { @@ -95,7 +95,7 @@ public Void call() throws IOException { final TimeWindow window = new TimeWindow(action, startTime); final LatencyWriter latencyRecorder = csvFile == null ? new LatencyWriter(action, messageSize, startTime) : new CSVLatencyWriter(action, messageSize, startTime, csvFile); - final int minWaitTimeMS = windowInterval / 50; + final int minWaitTimeMS = windowInterval / 10; final long totalIdleCount = (NS_PER_MS / PARK_NS) * minWaitTimeMS; boolean doWork = true; long time = startTime; From d6796f3b7c7cb3342033d6a5e64b6aa3e41b6fad Mon Sep 17 00:00:00 2001 From: Keshava Munegowda Date: Sat, 7 Sep 2019 13:03:32 +0530 Subject: [PATCH 09/15] Code Cleanup. Signed-off-by: Keshava Munegowda --- src/main/java/io/pravega/perf/PerfStats.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main/java/io/pravega/perf/PerfStats.java b/src/main/java/io/pravega/perf/PerfStats.java index fd8d90e86..227628ea3 100644 --- a/src/main/java/io/pravega/perf/PerfStats.java +++ b/src/main/java/io/pravega/perf/PerfStats.java @@ -352,7 +352,6 @@ public synchronized void shutdown(long endTime) throws ExecutionException, Inter if (this.ret != null) { queue.add(new TimeStamp(endTime)); ret.get(); - executor.shutdownNow(); queue.clear(); this.ret = null; } From 61f68c87c606b50c1fd4fbee5ed4de65b23d95f2 Mon Sep 17 00:00:00 2001 From: Keshava Munegowda Date: Sat, 7 Sep 2019 14:33:59 +0530 Subject: [PATCH 10/15] Code cleanup. Signed-off-by: Keshava Munegowda --- .../java/io/pravega/perf/PravegaPerfTest.java | 106 ++++-------------- 1 file changed, 21 insertions(+), 85 deletions(-) diff --git a/src/main/java/io/pravega/perf/PravegaPerfTest.java b/src/main/java/io/pravega/perf/PravegaPerfTest.java index 6561458de..93379fe81 100644 --- a/src/main/java/io/pravega/perf/PravegaPerfTest.java +++ b/src/main/java/io/pravega/perf/PravegaPerfTest.java @@ -163,12 +163,7 @@ public void run() { public static Test createTest(long startTime, CommandLine commandline, Options options) { try { - boolean runKafka; - if (commandline.hasOption("kafka")) { - runKafka = Boolean.parseBoolean(commandline.getOptionValue("kafka")); - } else { - runKafka = false; - } + boolean runKafka = Boolean.parseBoolean(commandline.getOptionValue("kafka", "false")); if (runKafka) { return new KafkaTest(startTime, commandline); } else { @@ -221,37 +216,33 @@ static private abstract class Test { Test(long startTime, CommandLine commandline) throws IllegalArgumentException { this.startTime = startTime; - if (commandline.hasOption("controller")) { - controllerUri = commandline.getOptionValue("controller"); - } else { - controllerUri = null; - } + controllerUri = commandline.getOptionValue("controller", null); + streamName = commandline.getOptionValue("stream", null); + producerCount = Integer.parseInt(commandline.getOptionValue("producers", "0")); + consumerCount = Integer.parseInt(commandline.getOptionValue("consumers", "0")); - if (commandline.hasOption("producers")) { - producerCount = Integer.parseInt(commandline.getOptionValue("producers")); - } else { - producerCount = 0; + if (controllerUri == null) { + throw new IllegalArgumentException("Error: Must specify Controller IP address"); } - if (commandline.hasOption("consumers")) { - consumerCount = Integer.parseInt(commandline.getOptionValue("consumers")); - } else { - consumerCount = 0; + if (streamName == null) { + throw new IllegalArgumentException("Error: Must specify stream Name"); } - if (commandline.hasOption("events")) { - events = Integer.parseInt(commandline.getOptionValue("events")); - } else { - events = 0; + if (producerCount == 0 && consumerCount == 0) { + throw new IllegalArgumentException("Error: Must specify the number of producers or Consumers"); } - if (commandline.hasOption("flush")) { - int flushEvents = Integer.parseInt(commandline.getOptionValue("flush")); - if (flushEvents > 0) { - EventsPerFlush = flushEvents; - } else { - EventsPerFlush = Integer.MAX_VALUE; - } + events = Integer.parseInt(commandline.getOptionValue("events", "0")); + messageSize = Integer.parseInt(commandline.getOptionValue("size","0")); + scopeName = commandline.getOptionValue("scope",SCOPE); + transactionPerCommit = Integer.parseInt(commandline.getOptionValue("transactionspercommit","0")); + fork = Boolean.parseBoolean(commandline.getOptionValue("fork", "true")); + writeFile = commandline.getOptionValue("writecsv",null); + readFile = commandline.getOptionValue("readcsv", null); + int flushEvents = Integer.parseInt(commandline.getOptionValue("flush", "0")); + if (flushEvents > 0) { + EventsPerFlush = flushEvents; } else { EventsPerFlush = Integer.MAX_VALUE; } @@ -264,30 +255,6 @@ static private abstract class Test { runtimeSec = MAXTIME; } - if (commandline.hasOption("size")) { - messageSize = Integer.parseInt(commandline.getOptionValue("size")); - } else { - messageSize = 0; - } - - if (commandline.hasOption("stream")) { - streamName = commandline.getOptionValue("stream"); - } else { - streamName = null; - } - - if (commandline.hasOption("scope")) { - scopeName = commandline.getOptionValue("scope"); - } else { - scopeName = SCOPE; - } - - if (commandline.hasOption("transactionspercommit")) { - transactionPerCommit = Integer.parseInt(commandline.getOptionValue("transactionspercommit")); - } else { - transactionPerCommit = 0; - } - if (commandline.hasOption("segments")) { segmentCount = Integer.parseInt(commandline.getOptionValue("segments")); } else { @@ -300,43 +267,12 @@ static private abstract class Test { recreate = producerCount > 0 && consumerCount > 0; } - if (commandline.hasOption("fork")) { - fork = Boolean.parseBoolean(commandline.getOptionValue("fork")); - } else { - fork = true; - } - if (commandline.hasOption("throughput")) { throughput = Double.parseDouble(commandline.getOptionValue("throughput")); } else { throughput = -1; } - - if (commandline.hasOption("writecsv")) { - writeFile = commandline.getOptionValue("writecsv"); - } else { - writeFile = null; - } - if (commandline.hasOption("readcsv")) { - readFile = commandline.getOptionValue("readcsv"); - } else { - readFile = null; - } - - if (controllerUri == null) { - throw new IllegalArgumentException("Error: Must specify Controller IP address"); - } - - if (streamName == null) { - throw new IllegalArgumentException("Error: Must specify stream Name"); - } - - if (producerCount == 0 && consumerCount == 0) { - throw new IllegalArgumentException("Error: Must specify the number of producers or Consumers"); - } - final int threadCount = producerCount + consumerCount + 6; - if (fork) { executor = new ForkJoinPool(threadCount); } else { From be44734ea7babd9a350c001b5a0bb65ac302b99f Mon Sep 17 00:00:00 2001 From: Keshava Munegowda Date: Sat, 7 Sep 2019 14:42:40 +0530 Subject: [PATCH 11/15] Code cleanup. Signed-off-by: Keshava Munegowda --- src/main/java/io/pravega/perf/PerfStats.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main/java/io/pravega/perf/PerfStats.java b/src/main/java/io/pravega/perf/PerfStats.java index 227628ea3..c5ba4a6b8 100644 --- a/src/main/java/io/pravega/perf/PerfStats.java +++ b/src/main/java/io/pravega/perf/PerfStats.java @@ -13,7 +13,6 @@ import java.util.ArrayList; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Callable; import java.util.concurrent.Future; import java.util.concurrent.ExecutorService; From 2d3de4c3d31b444a0370a36c2dde11355023af92 Mon Sep 17 00:00:00 2001 From: Keshava Munegowda Date: Sat, 7 Sep 2019 17:46:34 +0530 Subject: [PATCH 12/15] Code cleanup. Signed-off-by: Keshava Munegowda --- src/main/java/io/pravega/perf/ReaderWorker.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main/java/io/pravega/perf/ReaderWorker.java b/src/main/java/io/pravega/perf/ReaderWorker.java index f6d1d8b19..aa2faa4de 100644 --- a/src/main/java/io/pravega/perf/ReaderWorker.java +++ b/src/main/java/io/pravega/perf/ReaderWorker.java @@ -12,7 +12,6 @@ import java.io.IOException; import java.nio.ByteBuffer; -import java.util.Arrays; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; From 5c9c4b52770d4e31c4ef3b1bf480c361250d4f17 Mon Sep 17 00:00:00 2001 From: Keshava Munegowda Date: Sun, 8 Sep 2019 12:22:57 +0530 Subject: [PATCH 13/15] Update README.md --- README.md | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index d4db67fbf..8be33f49a 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,7 @@ You may obtain a copy of the License at # Pravega Benchmark Tool -The Pravega benchmark tool used for the performance benchmarking of pravega streaming storage cluster. +The Pravega benchmark tool used for the performance benchmarking of pravega and Kafka streaming storage clusters. This tool performs the throughput and latency analysis for the multi producers/writers and consumers/readers of pravega. it also validates the end to end latency. The write and/or read latencies can be stored in a CSV file for later analysis. At the end of the performance benchmarking, this tool outputs the 50th, 75th, 95th , 99th, 99.9th and 99.99th latency percentiles. @@ -46,7 +46,6 @@ Running Pravega benchmark tool locally: ``` /pravega-benchmark$ ./run/pravega-benchmark/bin/pravega-benchmark -help -usage: pravega-benchmark -consumers Number of consumers -controller Controller URI -events Number of events/records if 'time' not @@ -58,7 +57,9 @@ usage: pravega-benchmark number of of events/records; Not applicable, if both producers and consumers are specified + -fork Use Fork join Pool -help Help message + -kafka Kafka Benchmarking -producers Number of producers -readcsv CSV file to record read latencies -recreate If the stream is already existing, delete @@ -172,3 +173,7 @@ The -throughput -1 specifies the writes tries to write the events at the maximum ### Recording the latencies to CSV files User can use the options "-writecsv " to record the latencies of writers and "-readcsv " for readers. in case of End to End latency mode, if the user can supply only -readcsv to get the end to end latency in to the csv file. + +### Kafka Benchmarking +User can set the option "-kafka true" for Kafka Benchmarking. User should create the topics manually before running this for kafka benchmarking. Unlike Pravega benchmarking, this tool does not create the topic automatically. This tools treats stream name as a topic name. + From 6df6be3bcf32273f2eabb872e1b6e9db1a27ad02 Mon Sep 17 00:00:00 2001 From: Keshava Munegowda Date: Tue, 10 Sep 2019 11:48:13 +0530 Subject: [PATCH 14/15] Code cleanup Signed-off-by: Keshava Munegowda --- src/main/java/io/pravega/perf/KafkaWriterWorker.java | 3 ++- src/main/java/io/pravega/perf/PravegaWriterWorker.java | 3 ++- src/main/java/io/pravega/perf/ReaderWorker.java | 6 ++++-- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/src/main/java/io/pravega/perf/KafkaWriterWorker.java b/src/main/java/io/pravega/perf/KafkaWriterWorker.java index 2500d1b49..98a95e434 100644 --- a/src/main/java/io/pravega/perf/KafkaWriterWorker.java +++ b/src/main/java/io/pravega/perf/KafkaWriterWorker.java @@ -35,7 +35,8 @@ public class KafkaWriterWorker extends WriterWorker { public long recordWrite(byte[] data, TriConsumer record) { final long time = System.currentTimeMillis(); producer.send(new ProducerRecord<>(streamName, data), (metadata, exception) -> { - record.accept(time, System.currentTimeMillis(), data.length); + final long endTime = System.currentTimeMillis(); + record.accept(time, endTime, data.length); }); return time; } diff --git a/src/main/java/io/pravega/perf/PravegaWriterWorker.java b/src/main/java/io/pravega/perf/PravegaWriterWorker.java index 9ad293daf..5206006a8 100644 --- a/src/main/java/io/pravega/perf/PravegaWriterWorker.java +++ b/src/main/java/io/pravega/perf/PravegaWriterWorker.java @@ -43,7 +43,8 @@ public long recordWrite(byte[] data, TriConsumer record) { final long time = System.currentTimeMillis(); ret = producer.writeEvent(data); ret.thenAccept(d -> { - record.accept(time, System.currentTimeMillis(), data.length); + final long endTime = System.currentTimeMillis(); + record.accept(time, endTime, data.length); }); return time; } diff --git a/src/main/java/io/pravega/perf/ReaderWorker.java b/src/main/java/io/pravega/perf/ReaderWorker.java index aa2faa4de..12229c19a 100644 --- a/src/main/java/io/pravega/perf/ReaderWorker.java +++ b/src/main/java/io/pravega/perf/ReaderWorker.java @@ -73,7 +73,8 @@ public void EventsReader() throws IOException { final long startTime = System.currentTimeMillis(); ret = readData(); if (ret != null) { - stats.recordTime(startTime, System.currentTimeMillis(), ret.length); + final long endTime = System.currentTimeMillis(); + stats.recordTime(startTime, endTime, ret.length); i++; } } @@ -115,7 +116,8 @@ public void EventsTimeReader() throws IOException { time = System.currentTimeMillis(); ret = readData(); if (ret != null) { - stats.recordTime(time, System.currentTimeMillis(), ret.length); + final long endTime = System.currentTimeMillis(); + stats.recordTime(time, endTime, ret.length); } } } finally { From 976b7600c628ac2fb9242d168146ecf51a2002ea Mon Sep 17 00:00:00 2001 From: Keshava Munegowda Date: Tue, 10 Sep 2019 19:11:59 +0530 Subject: [PATCH 15/15] User the kafka client 2.3.0 Signed-off-by: Keshava Munegowda --- build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index e103b400b..79a7516dc 100644 --- a/build.gradle +++ b/build.gradle @@ -32,7 +32,7 @@ buildscript { "io.pravega:pravega-common:0.5.0", "commons-cli:commons-cli:1.3.1", "org.apache.commons:commons-csv:1.5", - "org.apache.kafka:kafka-clients:2.2.0" + "org.apache.kafka:kafka-clients:2.3.0" runtime "org.slf4j:slf4j-simple:1.7.14" }