Skip to content

Commit

Permalink
Added attributes + Muti topic support
Browse files Browse the repository at this point in the history
  • Loading branch information
mi185051 committed Feb 27, 2021
1 parent ecbac2d commit 5301c71
Show file tree
Hide file tree
Showing 23 changed files with 250 additions and 116 deletions.
29 changes: 21 additions & 8 deletions README.MD
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,30 @@

## Introduction

This plugin adds feature to connect to a topic to publish/subscribe messages in GCP
This plugin adds feature to connect to GCP to publish/subscribe messages to the topic

## Required Components

1. Jmeter
1. Apache Jmeter/DI-Jmeter
2. GCP Pubsub configs


## Jar Dependencies Required

* google-cloud-pubsub-1.88.0.jar or above
* google-cloud-pubsub-1.111.2.jar
* gson-2.2.4.jar

## Jmeter Target

* Jmeter version 5.1.1 or above
* Jmeter version 5.3 or above
* Java 8 or above

## Installation Instructions

* Download the source code from the Github.
* Download the source code from the Gitlab.
* Just do a mvn clean install (Git bash is required)
* Jar will be generated under the target directory (jmeter-pubsub-sampler-1.0.jar)
* Copy the Jar to \<Jmeter Installed Directory\>/lib/ext/
* Jar will be generated under the target directory (jmeter-pubsub-sampler-1.1.jar).
* Copy the Jar to \<Jmeter Installed Directory\>/lib/ext/ for DI Jmeter \<Jmeter Installed Directory\>/di/plugins

## How to use it
Add required config element (Publisher config/ Subscriber config)
Expand All @@ -37,7 +37,9 @@ Add required config element (Publisher config/ Subscriber config)
# Publisher Info
Apart from config element, Publisher sampler has Gzip compression feature
* The Flag in the sampler will allow the mechanism to publish the message with/without Gzip compression
* On successfull publish of each message, GCP returns a unique Id which will be returned in response header.
* On successful publish of each message, GCP returns a unique Id which will be returned in response header.
* Supports attributes for messages being published
* Supports multiple topic publishing feature in the same test plan

# Subscriber Info
The subscriber works perfectly. The current mechanism of Subscriber is
Expand All @@ -49,6 +51,13 @@ The subscriber works perfectly. The current mechanism of Subscriber is
* each message retrieved from the subscriber will have the message ID and publish time info along with the message.
* The message ID and publish time info will be returned in response headers and message on response body
* The subscriber sampler supports De-Compression, If the message is in gzip compressed format.
* Supports multiple topic subscription feature in the same test plan

## Changes
* Upgraded pub-sub client version to 1.111.2
* Supports Attributes for sending messages
* Supports multi-topic publishing, subscribing on same test plan



## References
Expand All @@ -58,3 +67,7 @@ Below are the references which guided to build this plugin.
* https://cloud.google.com/pubsub/docs/quickstart-client-libraries
* https://cloud.google.com/pubsub/docs/publisher
* https://cloud.google.com/pubsub/docs/pull

Please rate a :star2: if you like it.
Please open up :bug: - If you experienced something.

Binary file added images/PublisherConfig.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added images/PublisherSampler.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added images/SubscriberConfig.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added images/SubscriberSampler.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
23 changes: 20 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.di.jmeter.sampler</groupId>
<artifactId>jmeter-pubsub-sampler</artifactId>
<version>1.1-SNAPSHOT</version>
<version>1.1</version>
<properties>
<di-jmeter-version>5.1.1</di-jmeter-version>
<jmeter.lib.scope>provided</jmeter.lib.scope>
<google.cloud.pubsub.version>1.86.0</google.cloud.pubsub.version>
<google.cloud.pubsub.version>1.111.2</google.cloud.pubsub.version>
<google.gson.version>2.8.6</google.gson.version>
</properties>
<dependencies>
<dependency>
Expand All @@ -26,6 +29,11 @@
<artifactId>guava</artifactId>
<version>28.1-android</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>${google.gson.version}</version>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsub</artifactId>
Expand Down Expand Up @@ -94,11 +102,14 @@
<include>io.opencensus:*</include>
<include>com.google.protobuf:*</include>
<include>com.google.guava:*</include>
<include>com.google.gson:*</include>
<include>org.checkerframework:checker-compat-qual</include>
<include>com.google.errorprone:error_prone_annotations</include>
<include>com.google.j2objc:j2objc-annotations</include>
<include>org.codehaus.mojo:animal-sniffer-annotations</include>
<include>io.grpc:*</include>
<include>io.grpc.grpclb:*</include>
<include>io.perfmark:*</include>
<include>org.threeten:threetenbp</include>
<include>com.google.http-client:*</include>
<include>com.google.android:annotations</include>
Expand All @@ -116,6 +127,12 @@
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
</transformers>
</configuration>
</execution>
</executions>
</plugin>
Expand Down
36 changes: 26 additions & 10 deletions src/main/java/com/di/jmeter/pubsub/config/PublisherConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,20 @@
* limitations under the License.
*
*/

package com.di.jmeter.pubsub.config;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.util.concurrent.TimeUnit;

import org.apache.jmeter.config.ConfigElement;
import org.apache.jmeter.config.ConfigTestElement;
import org.apache.jmeter.testbeans.TestBean;
import org.apache.jmeter.testbeans.TestBeanHelper;
import org.apache.jmeter.testelement.TestStateListener;
import org.apache.jmeter.threads.JMeterContextService;
import org.apache.jmeter.threads.JMeterVariables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -39,7 +40,12 @@
import com.google.gson.JsonObject;
import com.google.pubsub.v1.ProjectTopicName;

public class PublisherConfig extends ConfigTestElement implements Serializable, ConfigElement, TestStateListener, TestBean {
/**
* @author Mohamed Ibrahim
*
*/
public class PublisherConfig extends ConfigTestElement
implements ConfigElement, TestStateListener, TestBean, Serializable {

private static Logger LOGGER = LoggerFactory.getLogger(PublisherConfig.class);
private static final long serialVersionUID = 7645049205276507368L;
Expand All @@ -60,7 +66,7 @@ public class PublisherConfig extends ConfigTestElement implements Serializable,
private String authProvider_x509CertUrl;
private String client_x509CertUrl;

private static final String PUBSUB_CONNECTION = "publisherConnection";
private String publisherConnection;

// Default Constructor
public PublisherConfig() {
Expand All @@ -83,16 +89,17 @@ public void testStarted() {
TestBeanHelper.prepare(this);
JMeterVariables variables = getThreadContext().getVariables();

if (variables.getObject(PUBSUB_CONNECTION) != null) {
if (variables.getObject(publisherConnection) != null) {
LOGGER.error("PubSub connection is already established and active !!");
} else {
synchronized (this) {
try {
publisherClient = Publisher.newBuilder(getGcpTopic())
.setCredentialsProvider(createCredentialsProviderUsingJson(getCredentials())).build();

variables.putObject(PUBSUB_CONNECTION, publisherClient);
LOGGER.info(String.format("Publisher connection established with the %s successfully !!", getTopic()));
variables.putObject(publisherConnection, publisherClient);
LOGGER.info(
String.format("Publisher connection established with the %s successfully !!", getTopic()));
} catch (IOException e) {
LOGGER.error("Exception Occured while establishing connection with GCP : ");
e.printStackTrace();
Expand Down Expand Up @@ -133,8 +140,13 @@ public void testEnded() {
synchronized (this) {
if (publisherClient != null) {
publisherClient.shutdown();
publisherClient = null;
LOGGER.info("Publisher connection Terminated successfully !!");
try {
publisherClient.awaitTermination(30, TimeUnit.SECONDS);
LOGGER.info("Publisher connection Terminated successfully !!");
} catch (InterruptedException e) {
LOGGER.info("Error occurred while terminating Publisher connection");
e.printStackTrace();
}
}
}
}
Expand Down Expand Up @@ -247,8 +259,12 @@ public void setClient_x509CertUrl(String client_x509CertUrl) {
this.client_x509CertUrl = client_x509CertUrl;
}

public static Publisher getPublisherClient() {
return (Publisher) JMeterContextService.getContext().getVariables().getObject(PUBSUB_CONNECTION);
public String getPublisherConnection() {
return publisherConnection;
}

public void setPublisherConnection(String publisherConnection) {
this.publisherConnection = publisherConnection;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public class PublisherConfigBeanInfo extends BeanInfoSupport {
public PublisherConfigBeanInfo() {
super(PublisherConfig.class);

createPropertyGroup("pubConfig", new String[] { "publisherConnection" });

createPropertyGroup("credentials", new String[] { "type", "projectId", "topic", "privateKey", "privateKeyId", "tokenUri",
"clientId", "clientEmail", "client_x509CertUrl", "authUri", "authProvider_x509CertUrl" });

Expand Down Expand Up @@ -80,6 +82,10 @@ public PublisherConfigBeanInfo() {
propertyDescriptor.setValue(NOT_UNDEFINED, Boolean.TRUE);
propertyDescriptor.setValue(DEFAULT, "https://www.googleapis.com/oauth2/v1/certs");

propertyDescriptor = property("publisherConnection");
propertyDescriptor.setValue(NOT_UNDEFINED, Boolean.TRUE);
propertyDescriptor.setValue(DEFAULT, "<PUBLISHER CONFIG OBJECT>");


if (LOGGER.isDebugEnabled()) {
String pubDescriptorsAsString = Arrays.stream(getPropertyDescriptors())
Expand Down
50 changes: 34 additions & 16 deletions src/main/java/com/di/jmeter/pubsub/config/SubscriberConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;

import org.apache.jmeter.config.ConfigElement;
import org.apache.jmeter.config.ConfigTestElement;
Expand All @@ -44,6 +46,7 @@
import com.google.gson.JsonObject;
import com.google.pubsub.v1.ProjectSubscriptionName;


public class SubscriberConfig extends ConfigTestElement implements ConfigElement, TestStateListener, TestBean, Serializable {

private static final long serialVersionUID = -6527581818773236163L;
Expand All @@ -69,27 +72,28 @@ public class SubscriberConfig extends ConfigTestElement implements ConfigElement
private String maxAckExtensionPeriod;
private String maxOutStandingElementCount;
private String maxOutstandingRequestBytes;
private static MessagesQueue messagesQueue;

private static String SUBSCRIBER_CONNECTION = "subscriberConnection";
private static String MESSAGESQUEUE = "message";

private static Map<String, MessagesQueue> pubsubQueue = new HashMap<String, MessagesQueue>();
private String subscriberConnection;
//private static String MESSAGESQUEUE = "message";
private static String SUBSCRIBED_TOPIC="subTopic";

@Override
public void testStarted() {
this.setRunningVersion(true);
TestBeanHelper.prepare(this);

messagesQueue = new MessagesQueue(100000);

JMeterVariables variables = getThreadContext().getVariables();
ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of(getProjectId(), getSubscriptionId());

if (variables.getObject(SUBSCRIBER_CONNECTION) != null) {
if (variables.getObject(subscriberConnection) != null) {
LOGGER.error("PubSub connection is already established and active !!");
} else {
synchronized (this) {
try {

MessagesQueue messagesQueue = new MessagesQueue(100000);

LOGGER.info("Attempting to subscribe to a topic");
if (isFlowControlSetting()) {
subscriber = Subscriber.newBuilder(subscriptionName, new SimpleMessageReceiver(messagesQueue))
Expand All @@ -109,8 +113,9 @@ public void testStarted() {
subscriber.startAsync().awaitRunning();
// subscriber.awaitTerminated();
// Allow the subscriber to run indefinitely unless an error occurs
variables.putObject(SUBSCRIBER_CONNECTION, subscriber);
variables.putObject(MESSAGESQUEUE, messagesQueue);
pubsubQueue.put(getSubscriberConnection(), messagesQueue);
//variables.putObject(subscriberConnection, subscriber);
//variables.putObject(MESSAGESQUEUE, pubsubQueue);
variables.putObject(SUBSCRIBED_TOPIC, topic);
LOGGER.info(String.format("Subscriber connection established with the %s successfully !!", getTopic()));

Expand Down Expand Up @@ -341,16 +346,29 @@ public void setMaxOutStandingElementCount(String maxOutStandingElementCount) {
}
}

public static Subscriber getSubscriber() {
return (Subscriber) JMeterContextService.getContext().getVariables().getObject(SUBSCRIBER_CONNECTION);
}

public static MessagesQueue getMessagesQueue() {
return (MessagesQueue) JMeterContextService.getContext().getVariables().getObject(MESSAGESQUEUE);
}
// public static MessagesQueue getMessagesQueue() {
// return (MessagesQueue) JMeterContextService.getContext().getVariables().getObject(MESSAGESQUEUE);
// }

public static String getSubscribedTopic() {
return (String) JMeterContextService.getContext().getVariables().getObject(SUBSCRIBED_TOPIC);
}

public String getSubscriberConnection() {
return subscriberConnection;
}

public void setSubscriberConnection(String subscriberConnection) {
this.subscriberConnection = subscriberConnection;
}

public static Map<String, MessagesQueue> getPubsubQueue() {
return pubsubQueue;
}

public static void setPubsubQueue(Map<String, MessagesQueue> pubsubQueue) {
SubscriberConfig.pubsubQueue = pubsubQueue;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ public class SubscriberConfigBeanInfo extends BeanInfoSupport {

public SubscriberConfigBeanInfo() {
super(SubscriberConfig.class);

createPropertyGroup("subConfig", new String[] { "subscriberConnection" });

createPropertyGroup("credentials",
new String[] { "type", "projectId", "topic", "subscriptionId", "privateKey", "privateKeyId", "tokenUri",
Expand Down Expand Up @@ -107,6 +109,10 @@ public SubscriberConfigBeanInfo() {
p = property("maxOutstandingRequestBytes");
p.setValue(NOT_UNDEFINED, Boolean.TRUE);
p.setValue(DEFAULT, "1_000_000_000L");

p = property("subscriberConnection");
p.setValue(NOT_UNDEFINED, Boolean.TRUE);
p.setValue(DEFAULT, "<SUBSCRIBER CONFIG OBJECT>");


if (LOGGER.isDebugEnabled()) {
Expand Down
Loading

0 comments on commit 5301c71

Please sign in to comment.