Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 57: Add "-fork" option #58

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions src/main/java/io/pravega/perf/PerfStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
import java.util.ArrayList;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.locks.LockSupport;
import java.nio.file.Files;
import java.nio.file.Paths;
Expand All @@ -38,7 +38,7 @@ public class PerfStats {
final private int messageSize;
final private int windowInterval;
final private ConcurrentLinkedQueue<TimeStamp> queue;
final private ForkJoinPool executor;
final private ExecutorService executor;

@GuardedBy("this")
private Future<Void> ret;
Expand Down Expand Up @@ -66,13 +66,13 @@ private boolean isEnd() {
}
}

public PerfStats(String action, int reportingInterval, int messageSize, String csvFile) {
public PerfStats(String action, int reportingInterval, int messageSize, String csvFile, ExecutorService executor) {
this.action = action;
this.messageSize = messageSize;
this.windowInterval = reportingInterval;
this.csvFile = csvFile;
this.executor = executor;
this.queue = new ConcurrentLinkedQueue<>();
this.executor = new ForkJoinPool(1);
this.ret = null;
}

Expand Down Expand Up @@ -351,7 +351,6 @@ public synchronized void shutdown(long endTime) throws ExecutionException, Inter
if (this.ret != null) {
queue.add(new TimeStamp(endTime));
ret.get();
executor.shutdownNow();
queue.clear();
this.ret = null;
}
Expand Down
109 changes: 36 additions & 73 deletions src/main/java/io/pravega/perf/PravegaPerfTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;

/**
* Performance benchmark for Pravega.
Expand Down Expand Up @@ -80,6 +81,7 @@ public static void main(String[] args) {
"if -1, get the maximum throughput");
options.addOption("writecsv", true, "CSV file to record write latencies");
options.addOption("readcsv", true, "CSV file to record read latencies");
options.addOption("fork", true, "Use Fork join Pool");

options.addOption("help", false, "Help message");

Expand All @@ -102,7 +104,7 @@ public static void main(String[] args) {
System.exit(0);
}

final ForkJoinPool executor = new ForkJoinPool();
final ExecutorService executor = perfTest.getExecutor();

try {
final List<WriterWorker> producers = perfTest.getProducers();
Expand Down Expand Up @@ -171,13 +173,15 @@ static private abstract class Test {
static final int TIMEOUT = 1000;
static final String SCOPE = "Scope";

final ExecutorService executor;
final String controllerUri;
final int messageSize;
final String streamName;
final String rdGrpName;
final String scopeName;
final boolean recreate;
final boolean writeAndRead;
final boolean fork;
final int producerCount;
final int consumerCount;
final int segmentCount;
Expand All @@ -195,39 +199,35 @@ static private abstract class Test {
final PerfStats consumeStats;
final long startTime;


Test(long startTime, CommandLine commandline) throws IllegalArgumentException {
this.startTime = startTime;
if (commandline.hasOption("controller")) {
controllerUri = commandline.getOptionValue("controller");
} else {
controllerUri = null;
}
controllerUri = commandline.getOptionValue("controller", null);
streamName = commandline.getOptionValue("stream", null);
producerCount = Integer.parseInt(commandline.getOptionValue("producers", "0"));
consumerCount = Integer.parseInt(commandline.getOptionValue("consumers", "0"));

if (commandline.hasOption("producers")) {
producerCount = Integer.parseInt(commandline.getOptionValue("producers"));
} else {
producerCount = 0;
if (controllerUri == null) {
throw new IllegalArgumentException("Error: Must specify Controller IP address");
}

if (commandline.hasOption("consumers")) {
consumerCount = Integer.parseInt(commandline.getOptionValue("consumers"));
} else {
consumerCount = 0;
if (streamName == null) {
throw new IllegalArgumentException("Error: Must specify stream Name");
}

if (commandline.hasOption("events")) {
events = Integer.parseInt(commandline.getOptionValue("events"));
} else {
events = 0;
if (producerCount == 0 && consumerCount == 0) {
throw new IllegalArgumentException("Error: Must specify the number of producers or Consumers");
}

if (commandline.hasOption("flush")) {
int flushEvents = Integer.parseInt(commandline.getOptionValue("flush"));
if (flushEvents > 0) {
EventsPerFlush = flushEvents;
} else {
EventsPerFlush = Integer.MAX_VALUE;
}
events = Integer.parseInt(commandline.getOptionValue("events", "0"));
messageSize = Integer.parseInt(commandline.getOptionValue("size","0"));
scopeName = commandline.getOptionValue("scope",SCOPE);
transactionPerCommit = Integer.parseInt(commandline.getOptionValue("transactionspercommit","0"));
fork = Boolean.parseBoolean(commandline.getOptionValue("fork", "true"));
writeFile = commandline.getOptionValue("writecsv",null);
readFile = commandline.getOptionValue("readcsv", null);
int flushEvents = Integer.parseInt(commandline.getOptionValue("flush", "0"));
if (flushEvents > 0) {
EventsPerFlush = flushEvents;
} else {
EventsPerFlush = Integer.MAX_VALUE;
}
Expand All @@ -240,30 +240,6 @@ static private abstract class Test {
runtimeSec = MAXTIME;
}

if (commandline.hasOption("size")) {
messageSize = Integer.parseInt(commandline.getOptionValue("size"));
} else {
messageSize = 0;
}

if (commandline.hasOption("stream")) {
streamName = commandline.getOptionValue("stream");
} else {
streamName = null;
}

if (commandline.hasOption("scope")) {
scopeName = commandline.getOptionValue("scope");
} else {
scopeName = SCOPE;
}

if (commandline.hasOption("transactionspercommit")) {
transactionPerCommit = Integer.parseInt(commandline.getOptionValue("transactionspercommit"));
} else {
transactionPerCommit = 0;
}

if (commandline.hasOption("segments")) {
segmentCount = Integer.parseInt(commandline.getOptionValue("segments"));
} else {
Expand All @@ -281,28 +257,11 @@ static private abstract class Test {
} else {
throughput = -1;
}

if (commandline.hasOption("writecsv")) {
writeFile = commandline.getOptionValue("writecsv");
final int threadCount = producerCount + consumerCount + 6;
if (fork) {
executor = new ForkJoinPool(threadCount);
} else {
writeFile = null;
}
if (commandline.hasOption("readcsv")) {
readFile = commandline.getOptionValue("readcsv");
} else {
readFile = null;
}

if (controllerUri == null) {
throw new IllegalArgumentException("Error: Must specify Controller IP address");
}

if (streamName == null) {
throw new IllegalArgumentException("Error: Must specify stream Name");
}

if (producerCount == 0 && consumerCount == 0) {
throw new IllegalArgumentException("Error: Must specify the number of producers or Consumers");
executor = Executors.newFixedThreadPool(threadCount);
}

if (recreate) {
Expand All @@ -321,7 +280,7 @@ static private abstract class Test {
if (writeAndRead) {
produceStats = null;
} else {
produceStats = new PerfStats("Writing", REPORTINGINTERVAL, messageSize, writeFile);
produceStats = new PerfStats("Writing", REPORTINGINTERVAL, messageSize, writeFile, executor);
}

eventsPerProducer = (events + producerCount - 1) / producerCount;
Expand All @@ -346,7 +305,7 @@ static private abstract class Test {
} else {
action = "Reading";
}
consumeStats = new PerfStats(action, REPORTINGINTERVAL, messageSize, readFile);
consumeStats = new PerfStats(action, REPORTINGINTERVAL, messageSize, readFile, executor);
eventsPerConsumer = events / consumerCount;
} else {
consumeStats = null;
Expand Down Expand Up @@ -376,6 +335,10 @@ public void shutdown(long endTime) {
}
}

public ExecutorService getExecutor() {
return executor;
}

public abstract void closeReaderGroup();

public abstract List<WriterWorker> getProducers();
Expand Down
1 change: 0 additions & 1 deletion src/main/java/io/pravega/perf/ReaderWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/pravega/perf/WriterWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ private void EventsWriterTimeRW() throws InterruptedException, IOException {

for (int i = 0; (time - startTime) < msToRun; i++) {
time = System.currentTimeMillis();
byte[] bytes = timeBuffer.putLong(0, System.currentTimeMillis()).array();
byte[] bytes = timeBuffer.putLong(0, time).array();
System.arraycopy(bytes, 0, payload, 0, bytes.length);
writeData(payload);
/*
Expand Down