Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 56 track unacked; added option to disable connection pooling #59

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
.git
.gradle
.idea
build
out
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,16 @@ build
.classpath
.DS_Store
bin/
ca-certificates/
!ca-certificates/README.txt
classes/
.settings
.gradle
output
.idea
target
lib/
!lib/README.txt
log/
out/
.metadata/
Expand Down
48 changes: 48 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Building Container

FROM gradle:4.10-jdk8 as GradleBuilder

USER 0

COPY ca-certificates/* /usr/local/share/ca-certificates/
RUN update-ca-certificates

RUN apt-get update \
&& apt-get install -y \
maven \
&& rm -rf /var/lib/apt/lists/*

USER gradle

COPY --chown=gradle:gradle build.gradle /home/gradle/src/build.gradle
COPY --chown=gradle:gradle gradle /home/gradle/src/gradle
COPY --chown=gradle:gradle gradle.properties /home/gradle/src/gradle.properties
COPY --chown=gradle:gradle settings.gradle /home/gradle/src/settings.gradle
COPY --chown=gradle:gradle lib /home/gradle/src/lib
COPY --chown=gradle:gradle src /home/gradle/src/src

WORKDIR /home/gradle/src

ENV GRADLE_USER_HOME=/home/gradle
ENV CREDENTIALS_VERSION=0.5.0-2306.a5a5cdf-0.11.10-002.985e705

RUN mvn install:install-file \
-Dfile=lib/pravega-keycloak-credentials-${CREDENTIALS_VERSION}-shadow.jar \
-DgroupId=io.pravega \
-DartifactId=pravega-keycloak-credentials \
-Dversion=${CREDENTIALS_VERSION} -Dpackaging=jar

RUN gradle installDist \
--no-daemon --info --stacktrace \
-PincludePravegaCredentials=true \
-PpravegaCredentialsVersion=${CREDENTIALS_VERSION}

# Runtime Container

FROM openjdk:8-jre

ENV APP_NAME=pravega-benchmark

COPY --from=GradleBuilder /home/gradle/src/build/install/${APP_NAME} /opt/${APP_NAME}

ENTRYPOINT ["/opt/pravega-benchmark/bin/pravega-benchmark"]
32 changes: 15 additions & 17 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,29 +13,27 @@ apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'application'

mainClassName = "io.pravega.perf.PravegaPerfTest"

buildscript {
repositories {
jcenter()
}
repositories {
jcenter()
mavenLocal()
mavenCentral()
}

repositories {
mavenLocal()
jcenter()
mavenCentral()
}
dependencies {
compile "io.pravega:pravega-client:${pravegaVersion}",
"io.pravega:pravega-common:${pravegaVersion}",
"commons-cli:commons-cli:${commonsCLIVersion}",
"org.apache.commons:commons-csv:1.5"

dependencies {
if (includePravegaCredentials.toBoolean()) {
compile "io.pravega:pravega-keycloak-credentials:${pravegaCredentialsVersion}"
}

compile "io.pravega:pravega-client:0.5.0",
"io.pravega:pravega-common:0.5.0",
"commons-cli:commons-cli:1.3.1",
"org.apache.commons:commons-csv:1.5"
runtime "org.slf4j:slf4j-simple:1.7.14"
}

runtime "org.slf4j:slf4j-simple:1.7.14"
}
mainClassName = "io.pravega.perf.PravegaPerfTest"
startScripts {
doLast {
unixScript.text = unixScript.text.replace('SERVER_APP_HOME', '\$APP_HOME')
Expand Down
16 changes: 16 additions & 0 deletions gradle.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#
# Copyright (c) 2017 Dell Inc., or its subsidiaries. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#

commonsCLIVersion=1.3.1
pravegaCredentialsVersion=0.5.0-2306.a5a5cdf-0.11.10-002.985e705
pravegaVersion=0.5.0

# Set below to true when using Pravega in Nautilus.
includePravegaCredentials=true
3 changes: 2 additions & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#Wed Sep 04 14:34:44 UTC 2019
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-4.10.2-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-4.10.2-all.zip
11 changes: 11 additions & 0 deletions scripts/build-docker.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#! /bin/bash
set -ex

: ${DOCKER_REPOSITORY?"You must export DOCKER_REPOSITORY"}
: ${IMAGE_TAG?"You must export IMAGE_TAG"}

ROOT_DIR=$(dirname $0)/..

docker build -f ${ROOT_DIR}/Dockerfile ${ROOT_DIR} --tag ${DOCKER_REPOSITORY}/pravega-benchmark:${IMAGE_TAG}

docker push ${DOCKER_REPOSITORY}/pravega-benchmark:${IMAGE_TAG}
6 changes: 6 additions & 0 deletions scripts/pravega-benchmark-k8s.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#!/usr/bin/env bash
NAMESPACE=${NAMESPACE:-examples}
kubectl delete -f pravega-benchmark.yaml -n ${NAMESPACE}
kubectl apply -f pravega-benchmark.yaml -n ${NAMESPACE}
sleep 5s
kubectl logs -f jobs/pravega-benchmark -n ${NAMESPACE}
40 changes: 40 additions & 0 deletions scripts/pravega-benchmark.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
kind: Job
apiVersion: batch/v1
metadata:
name: pravega-benchmark
spec:
parallelism: 3
template:
spec:
serviceAccount: examples3-pravega
restartPolicy: Never
containers:
- name: benchmark
image: claudiofahey/pravega-benchmark:0.5.0
imagePullPolicy: Always
resources:
limits:
cpu: "2"
memory: "4Gi"
requests:
cpu: "1"
memory: "4Gi"
args: [
"-controller", "tcp://nautilus-pravega-controller.nautilus-pravega.svc.cluster.local:9090",
"-scope", "examples3",
"-stream", "benchmark5",
#"-recreate", "1",
"-segments", "96",
"-producers", "1",
"-time", "3600", # Number of seconds to run
"-size", "524288",
"-throughput", "30",
"-enableConnectionPooling", "false",
]
env:
- name: JAVA_OPTS
value: "-Xmx2g -Xms1g"
- name: pravega_client_auth_method
value: Bearer
- name: pravega_client_auth_loadDynamic
value: "true"
59 changes: 41 additions & 18 deletions src/main/java/io/pravega/perf/PerfStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.commons.csv.CSVPrinter;
import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.NotThreadSafe;
Expand All @@ -33,6 +35,8 @@
* Class for Performance statistics.
*/
public class PerfStats {
private static Logger log = LoggerFactory.getLogger(PerfStats.class);

final private String action;
final private String csvFile;
final private int messageSize;
Expand Down Expand Up @@ -64,6 +68,10 @@ private TimeStamp(long endTime) {
private boolean isEnd() {
return this.bytes == -1 && this.startTime == -1;
}

private boolean isAcked() {
return this.endTime != -1;
}
}

public PerfStats(String action, int reportingInterval, int messageSize, String csvFile) {
Expand Down Expand Up @@ -100,21 +108,34 @@ public Void call() throws IOException {
long time = startTime;
long idleCount = 0;
TimeStamp t;
long sentEvents = 0;
long sentBytes = 0;
long ackedEvents = 0;
long ackedBytes = 0;

while (doWork) {
t = queue.poll();
if (t != null) {
if (t.isEnd()) {
doWork = false;
if (t.isAcked()) {
if (t.isEnd()) {
doWork = false;
} else {
ackedEvents++;
ackedBytes += t.bytes;
final int latency = (int) (t.endTime - t.startTime);
window.record(t.bytes, latency);
latencyRecorder.record(t.startTime, t.bytes, latency);
}
time = t.endTime;
if (window.windowTimeMS(time) > windowInterval) {
double unAckedMiB = (sentBytes - ackedBytes) / 1024.0 / 1024.0;
long unAckedEvents = sentEvents - ackedEvents;
log.info(String.format("%s, %.3f MiB in %d events unacked", window.toString(time), unAckedMiB, unAckedEvents));
window.reset(time);
}
} else {
final int latency = (int) (t.endTime - t.startTime);
window.record(t.bytes, latency);
latencyRecorder.record(t.startTime, t.bytes, latency);
}
time = t.endTime;
if (window.windowTimeMS(time) > windowInterval) {
window.print(time);
window.reset(time);
sentEvents++;
sentBytes += t.bytes;
}
} else {
LockSupport.parkNanos(PARK_NS);
Expand All @@ -123,7 +144,7 @@ public Void call() throws IOException {
time = System.currentTimeMillis();
idleCount = 0;
if (window.windowTimeMS(time) > windowInterval) {
window.print(time);
log.info(window.toString(time));
window.reset(time);
}
}
Expand Down Expand Up @@ -175,16 +196,18 @@ private void record(long bytes, int latency) {
}

/**
* Print the window statistics
* Get the window statistics as a string
*
* Note that record and byte counters are for acked events.
*/
private void print(long time) {
private String toString(long time) {
this.lastTime = time;
assert this.lastTime > this.startTime : "Invalid Start and EndTime";
final double elapsed = (this.lastTime - this.startTime) / 1000.0;
final double recsPerSec = count / elapsed;
final double mbPerSec = (this.bytes / (1024.0 * 1024.0)) / elapsed;

System.out.printf("%8d records %s, %9.1f records/sec, %6.2f MB/sec, %7.1f ms avg latency, %7.1f ms max latency\n",
return String.format("%8d records %s, %9.1f records/sec, %6.2f MiB/sec, %7.1f ms avg latency, %7.1f ms max latency",
count, action, recsPerSec, mbPerSec, totalLatency / (double) count, (double) maxLatency);
}

Expand Down Expand Up @@ -274,11 +297,11 @@ public void printTotal(long endTime) {
final double mbPerSec = (this.totalBytes / (1024.0 * 1024.0)) / elapsed;
int[] percs = getPercentiles();

System.out.printf(
log.info(String.format(
"%d records %s, %.3f records/sec, %d bytes record size, %.2f MB/sec, %.1f ms avg latency, %.1f ms max latency" +
", %d ms 50th, %d ms 75th, %d ms 95th, %d ms 99th, %d ms 99.9th, %d ms 99.99th.\n",
count, action, recsPerSec, messageSize, mbPerSec, totalLatency / ((double) count), (double) maxLatency,
percs[0], percs[1], percs[2], percs[3], percs[4], percs[5]);
percs[0], percs[1], percs[2], percs[3], percs[4], percs[5]));
}
}

Expand Down Expand Up @@ -361,10 +384,10 @@ public synchronized void shutdown(long endTime) throws ExecutionException, Inter
* Record the data write/read time of data.
*
* @param startTime starting time
* @param endTime End time
* @param endTime End time (-1 indicates that ack has not been received)
* @param bytes number of bytes written or read
**/
public void recordTime(long startTime, long endTime, int bytes) {
queue.add(new TimeStamp(startTime, endTime, bytes));
}
}
}
8 changes: 6 additions & 2 deletions src/main/java/io/pravega/perf/PravegaPerfTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public static void main(String[] args) {
"if -1, get the maximum throughput");
options.addOption("writecsv", true, "CSV file to record write latencies");
options.addOption("readcsv", true, "CSV file to record read latencies");
options.addOption("enableConnectionPooling", true, "enable connection pooling");

options.addOption("help", false, "Help message");

Expand Down Expand Up @@ -194,6 +195,7 @@ static private abstract class Test {
final PerfStats produceStats;
final PerfStats consumeStats;
final long startTime;
final boolean enableConnectionPooling;

Test(long startTime, CommandLine commandline) throws IllegalArgumentException {
this.startTime = startTime;
Expand Down Expand Up @@ -293,6 +295,8 @@ static private abstract class Test {
readFile = null;
}

enableConnectionPooling = Boolean.parseBoolean(commandline.getOptionValue("enableConnectionPooling", "true"));

if (controllerUri == null) {
throw new IllegalArgumentException("Error: Must specify Controller IP address");
}
Expand Down Expand Up @@ -431,15 +435,15 @@ public List<WriterWorker> getProducers() {
messageSize, startTime,
produceStats, streamName,
eventsPerSec, writeAndRead, factory,
transactionPerCommit))
transactionPerCommit, enableConnectionPooling))
.collect(Collectors.toList());
} else {
writers = IntStream.range(0, producerCount)
.boxed()
.map(i -> new PravegaWriterWorker(i, eventsPerProducer,
EventsPerFlush, runtimeSec, false,
messageSize, startTime, produceStats,
streamName, eventsPerSec, writeAndRead, factory))
streamName, eventsPerSec, writeAndRead, factory, enableConnectionPooling))
.collect(Collectors.toList());
}
} else {
Expand Down
1 change: 0 additions & 1 deletion src/main/java/io/pravega/perf/PravegaStreamHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ public class PravegaStreamHandler {
this.timeout = timeout;
this.bgexecutor = bgexecutor;
streamManager = StreamManager.create(new URI(uri));
streamManager.createScope(scope);
streamconfig = StreamConfiguration.builder().scope(scope).streamName(stream)
.scalingPolicy(ScalingPolicy.fixed(segCount))
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ public class PravegaTransactionWriterWorker extends PravegaWriterWorker {
int secondsToRun, boolean isRandomKey,
int messageSize, long start,
PerfStats stats, String streamName, int eventsPerSec, boolean writeAndRead,
ClientFactory factory, int transactionsPerCommit) {
ClientFactory factory, int transactionsPerCommit, boolean enableConnectionPooling) {

super(sensorId, events, Integer.MAX_VALUE, secondsToRun, isRandomKey,
messageSize, start, stats, streamName, eventsPerSec, writeAndRead, factory);
messageSize, start, stats, streamName, eventsPerSec, writeAndRead, factory, enableConnectionPooling);

this.transactionsPerCommit = transactionsPerCommit;
eventCount = 0;
Expand Down
Loading