From 5f7ec2e35d27ef49dc7182d79302dbf1fbdec0cf Mon Sep 17 00:00:00 2001 From: mibrahim Date: Thu, 9 Apr 2020 17:49:53 +0100 Subject: [PATCH] Initial commit --- README.MD | 60 ++++ pom.xml | 134 +++++++ .../jmeter/pubsub/config/PublisherConfig.java | 236 ++++++++++++ .../config/PublisherConfigBeanInfo.java | 74 ++++ .../pubsub/config/SubscriberConfig.java | 337 ++++++++++++++++++ .../config/SubscriberConfigBeanInfo.java | 100 ++++++ .../pubsub/sampler/PublisherSampler.java | 153 ++++++++ .../sampler/PublisherSamplerBeanInfo.java | 9 + .../pubsub/sampler/PublisherTestElement.java | 36 ++ .../PublisherTestElementBeanInfoSupport.java | 26 ++ .../pubsub/sampler/SubscriberSampler.java | 161 +++++++++ .../sampler/SubscriberSamplerBeanInfo.java | 9 + .../pubsub/sampler/SubscriberTestElement.java | 30 ++ .../SubscriberTestElementBeanInfoSupport.java | 25 ++ .../di/jmeter/pubsub/utils/MessagesQueue.java | 36 ++ .../pubsub/utils/SimpleMessageReceiver.java | 27 ++ .../PublisherConfigResources.properties | 42 +++ .../SubscriberConfigResources.properties | 57 +++ .../PublisherSamplerResources.properties | 25 ++ .../SubscriberSamplerResources.properties | 23 ++ 20 files changed, 1600 insertions(+) create mode 100644 README.MD create mode 100644 pom.xml create mode 100644 src/main/java/com/di/jmeter/pubsub/config/PublisherConfig.java create mode 100644 src/main/java/com/di/jmeter/pubsub/config/PublisherConfigBeanInfo.java create mode 100644 src/main/java/com/di/jmeter/pubsub/config/SubscriberConfig.java create mode 100644 src/main/java/com/di/jmeter/pubsub/config/SubscriberConfigBeanInfo.java create mode 100644 src/main/java/com/di/jmeter/pubsub/sampler/PublisherSampler.java create mode 100644 src/main/java/com/di/jmeter/pubsub/sampler/PublisherSamplerBeanInfo.java create mode 100644 src/main/java/com/di/jmeter/pubsub/sampler/PublisherTestElement.java create mode 100644 src/main/java/com/di/jmeter/pubsub/sampler/PublisherTestElementBeanInfoSupport.java create mode 100644 src/main/java/com/di/jmeter/pubsub/sampler/SubscriberSampler.java create mode 100644 src/main/java/com/di/jmeter/pubsub/sampler/SubscriberSamplerBeanInfo.java create mode 100644 src/main/java/com/di/jmeter/pubsub/sampler/SubscriberTestElement.java create mode 100644 src/main/java/com/di/jmeter/pubsub/sampler/SubscriberTestElementBeanInfoSupport.java create mode 100644 src/main/java/com/di/jmeter/pubsub/utils/MessagesQueue.java create mode 100644 src/main/java/com/di/jmeter/pubsub/utils/SimpleMessageReceiver.java create mode 100644 src/main/resources/com/di/jmeter/pubsub/config/PublisherConfigResources.properties create mode 100644 src/main/resources/com/di/jmeter/pubsub/config/SubscriberConfigResources.properties create mode 100644 src/main/resources/com/di/jmeter/pubsub/sampler/PublisherSamplerResources.properties create mode 100644 src/main/resources/com/di/jmeter/pubsub/sampler/SubscriberSamplerResources.properties diff --git a/README.MD b/README.MD new file mode 100644 index 0000000..70dd909 --- /dev/null +++ b/README.MD @@ -0,0 +1,60 @@ +# Jmeter-GCP-PubSub-Sampler + +## Introduction + +This plugin adds feature to connect to a topic to publish/subscribe messages in GCP + +## Required Components + +1. Jmeter +2. GCP Pubsub configs + + +## Jar Dependencies Required + +* google-cloud-pubsub-1.88.0.jar or above +* gson-2.2.4.jar + +## Jmeter Target + +* Jmeter version 5.1.1 or above +* Java 8 or above + +## Installation Instructions + +* Download the source code from the Gitlab. +* Just do a mvn clean install (Git bash is required) +* Jar will be generated under the target directory (jmeter-pubsub-sampler-1.0.jar) +* Copy the Jar to \/lib/ext/ + +## How to use it +Add required config element (Publisher config/ Subscriber config) + +* Provide the required credentials information in the Publisher config/ Subscriber config for the project +* To publish message, Add Publisher sampler to TG and then pass the message to publish +* To receive message, Add Subscriber sampler to TG to read the mesasges + +# Publisher Info +Apart from config element, Publisher sampler has Gzip compression feature +* The Flag in the sampler will allow the mechanism to publish the message with/without Gzip compression +* On successfull publish of each message, GCP returns a unique Id which will be returned in response header. + +# Subscriber Info +The subscriber works perfectly. The current mechanism of Subscriber is +* The config elements creats a local queue to store the messages received from the GCP. +* Currently the Queue size is hardcoded to 100K +* The Subscriber sampler is bound to read it form the queue, which is already subscribed by the Message receiver(which is a separate client library's thread) +* The subscriber sampler is reading and removing the message from the queue. In future, will change the ack to be done by the sampler. +* When the susbcriber sampler is triggered and If there is no messages in the queue, It will wait until a new message arrive to process it. +* each message retrieved from the subscriber will have the message ID and publish time info along with the message. +* The message ID and publish time info will be returned in response headers and message on response body +* The subscriber sampler supports De-Compression, If the message is in gzip compressed format. + + +## References + +Below are the references which guided to build this plugin. + +* https://cloud.google.com/pubsub/docs/quickstart-client-libraries +* https://cloud.google.com/pubsub/docs/publisher +* https://cloud.google.com/pubsub/docs/pull diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..f4b966f --- /dev/null +++ b/pom.xml @@ -0,0 +1,134 @@ + + 4.0.0 + com.di.jmeter.sampler + jmeter-pubsub-sampler + 1.0 + + 5.1.1 + provided + 1.86.0 + + + + org.apache.jmeter + ApacheJMeter_java + ${di-jmeter-version} + ${jmeter.lib.scope} + + + com.google.guava + guava + + + + + com.google.guava + guava + 28.1-android + + + com.google.cloud + google-cloud-pubsub + ${google.cloud.pubsub.version} + + + com.google.guava + guava + + + + + + + + org.apache.maven.plugins + maven-dependency-plugin + + + copy + prepare-package + + copy + + + + + com.google.cloud + google-cloud-pubsub + ${google.cloud.pubsub.version} + ${project.build.directory}/lib + + + + + ${basedir}/src/main/resources/com/di/jmeter/pubsub/config/*.properties + false + + + ${basedir}/src/main/resources/com/di/jmeter/pubsub/sampler/*.properties + false + + + + + + + + org.apache.maven.plugins + maven-jar-plugin + 2.4 + + + org.apache.maven.plugins + maven-shade-plugin + 3.1.0 + + false + false + + + com.google.cloud:* + com.google.auth:* + com.google.api:* + com.google.api.grpc:* + io.opencensus:* + com.google.protobuf:* + com.google.guava:* + org.checkerframework:checker-compat-qual + com.google.errorprone:error_prone_annotations + com.google.j2objc:j2objc-annotations + org.codehaus.mojo:animal-sniffer-annotations + io.grpc:* + org.threeten:threetenbp + com.google.http-client:* + com.google.android:annotations + + + + + *:* + + + + + + package + + shade + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + 8 + 8 + + + + + \ No newline at end of file diff --git a/src/main/java/com/di/jmeter/pubsub/config/PublisherConfig.java b/src/main/java/com/di/jmeter/pubsub/config/PublisherConfig.java new file mode 100644 index 0000000..721d4e0 --- /dev/null +++ b/src/main/java/com/di/jmeter/pubsub/config/PublisherConfig.java @@ -0,0 +1,236 @@ +package com.di.jmeter.pubsub.config; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; + +import org.apache.jmeter.config.ConfigElement; +import org.apache.jmeter.testbeans.TestBean; +import org.apache.jmeter.testbeans.TestBeanHelper; +import org.apache.jmeter.testelement.AbstractTestElement; +import org.apache.jmeter.testelement.TestStateListener; +import org.apache.jmeter.threads.JMeterContextService; +import org.apache.jmeter.threads.JMeterVariables; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.api.gax.core.CredentialsProvider; +import com.google.api.gax.core.FixedCredentialsProvider; +import com.google.auth.oauth2.GoogleCredentials; +import com.google.cloud.pubsub.v1.Publisher; +import com.google.gson.JsonObject; +import com.google.pubsub.v1.ProjectTopicName; + +public class PublisherConfig extends AbstractTestElement implements ConfigElement, TestStateListener, TestBean { + + private static Logger LOGGER = LoggerFactory.getLogger(PublisherConfig.class); + private static final long serialVersionUID = 7645049205276507368L; + + private Publisher publisherClient; + private JsonObject credentials = new JsonObject(); + private static GoogleCredentials gcpCredentials = null; + + private String type; + private String topic; + private String projectId; + private String privateKey; + private String privateKeyId; + private String clientEmail; + private String clientId; + private String authUri; + private String tokenUri; + private String authProvider_x509CertUrl; + private String client_x509CertUrl; + + private static final String PUBSUB_CONNECTION = "publisherConnection"; + + // Default Constructor + public PublisherConfig() { + + } + + @Override + public void addConfigElement(ConfigElement config) { + + } + + @Override + public boolean expectsModification() { + return false; + } + + @Override + public void testStarted() { + this.setRunningVersion(true); + TestBeanHelper.prepare(this); + JMeterVariables variables = getThreadContext().getVariables(); + + if (variables.getObject(PUBSUB_CONNECTION) != null) { + LOGGER.error("PubSub connection is already established and active !!"); + } else { + synchronized (this) { + try { + publisherClient = Publisher.newBuilder(getGcpTopic()) + .setCredentialsProvider(createCredentialsProviderUsingJson(getCredentials())).build(); + + variables.putObject(PUBSUB_CONNECTION, publisherClient); + LOGGER.info(String.format("Publisher connection established with the %s successfully !!", getTopic())); + } catch (IOException e) { + LOGGER.error("Exception Occured while establishing connection with GCP : "); + e.printStackTrace(); + } + } + } + } + + private ProjectTopicName getGcpTopic() { + return ProjectTopicName.of(getProjectId(), getTopic()); + } + + // Return credentials in JSON Object + private JsonObject getCredentials() { + + credentials.addProperty("type", getType()); + credentials.addProperty("topic", getTopic()); + credentials.addProperty("project_id", getProjectId()); + credentials.addProperty("private_key", getPrivateKey().replaceAll("\\\\n", "\n")); + credentials.addProperty("private_key_id", getPrivateKeyId()); + credentials.addProperty("client_email", getClientEmail()); + credentials.addProperty("client_id", getClientId()); + credentials.addProperty("auth_uri", getAuthUri()); + credentials.addProperty("token_uri", getTokenUri()); + credentials.addProperty("auth_provider_x509_cert_url", getAuthProvider_x509CertUrl()); + credentials.addProperty("client_x509_cert_url", getClient_x509CertUrl()); + + return credentials; + } + + @Override + public void testStarted(String host) { + testStarted(); + } + + @Override + public void testEnded() { + synchronized (this) { + if (publisherClient != null) { + publisherClient.shutdown(); + publisherClient = null; + LOGGER.info("Publisher connection Terminated successfully !!"); + } + } + } + + @Override + public void testEnded(String host) { + testEnded(); + } + + private static CredentialsProvider createCredentialsProviderUsingJson(JsonObject credentials) { + + InputStream configJson = new ByteArrayInputStream(credentials.toString().getBytes()); + + try { + gcpCredentials = GoogleCredentials.fromStream(configJson); + } catch (IOException e) { + System.out.println("Error occurred while creating GCPcredentials using Json"); + e.printStackTrace(); + } + return FixedCredentialsProvider.create(gcpCredentials); + } + + // =============== > Getters and setters < ================ + + public String getType() { + return type; + } + + public String getTopic() { + return topic; + } + + public String getProjectId() { + return projectId; + } + + public String getPrivateKey() { + return privateKey; + } + + public String getPrivateKeyId() { + return privateKeyId; + } + + public String getClientEmail() { + return clientEmail; + } + + public String getClientId() { + return clientId; + } + + public String getAuthUri() { + return authUri; + } + + public String getTokenUri() { + return tokenUri; + } + + public String getAuthProvider_x509CertUrl() { + return authProvider_x509CertUrl; + } + + public String getClient_x509CertUrl() { + return client_x509CertUrl; + } + + public void setType(String type) { + this.type = type; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + public void setProjectId(String projectId) { + this.projectId = projectId; + } + + public void setPrivateKey(String privateKey) { + this.privateKey = privateKey; + } + + public void setPrivateKeyId(String privateKeyId) { + this.privateKeyId = privateKeyId; + } + + public void setClientEmail(String clientEmail) { + this.clientEmail = clientEmail; + } + + public void setClientId(String clientId) { + this.clientId = clientId; + } + + public void setAuthUri(String authUri) { + this.authUri = authUri; + } + + public void setTokenUri(String tokenUri) { + this.tokenUri = tokenUri; + } + + public void setAuthProvider_x509CertUrl(String authProvider_x509CertUrl) { + this.authProvider_x509CertUrl = authProvider_x509CertUrl; + } + + public void setClient_x509CertUrl(String client_x509CertUrl) { + this.client_x509CertUrl = client_x509CertUrl; + } + + public static Publisher getPublisherClient() { + return (Publisher) JMeterContextService.getContext().getVariables().getObject(PUBSUB_CONNECTION); + } + +} diff --git a/src/main/java/com/di/jmeter/pubsub/config/PublisherConfigBeanInfo.java b/src/main/java/com/di/jmeter/pubsub/config/PublisherConfigBeanInfo.java new file mode 100644 index 0000000..c938730 --- /dev/null +++ b/src/main/java/com/di/jmeter/pubsub/config/PublisherConfigBeanInfo.java @@ -0,0 +1,74 @@ +package com.di.jmeter.pubsub.config; + +import java.beans.PropertyDescriptor; +import java.util.Arrays; +import java.util.stream.Collectors; + +import org.apache.jmeter.testbeans.BeanInfoSupport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PublisherConfigBeanInfo extends BeanInfoSupport { + + private static Logger LOGGER = LoggerFactory.getLogger(PublisherConfigBeanInfo.class); + + public PublisherConfigBeanInfo() { + super(PublisherConfig.class); + + createPropertyGroup("credentials", new String[] { "type", "projectId", "topic", "privateKey", "privateKeyId", "tokenUri", + "clientId", "clientEmail", "client_x509CertUrl", "authUri", "authProvider_x509CertUrl" }); + + PropertyDescriptor propertyDescriptor = property("type"); + propertyDescriptor.setValue(NOT_UNDEFINED, Boolean.TRUE); + propertyDescriptor.setValue(DEFAULT, "service_account"); + + propertyDescriptor = property("projectId"); + propertyDescriptor.setValue(NOT_UNDEFINED, Boolean.TRUE); + propertyDescriptor.setValue(DEFAULT, ""); + + propertyDescriptor = property("topic"); + propertyDescriptor.setValue(NOT_UNDEFINED, Boolean.TRUE); + propertyDescriptor.setValue(DEFAULT, ""); + + propertyDescriptor = property("privateKey"); + propertyDescriptor.setValue(NOT_UNDEFINED, Boolean.TRUE); + propertyDescriptor.setValue(DEFAULT, ""); + + propertyDescriptor = property("privateKeyId"); + propertyDescriptor.setValue(NOT_UNDEFINED, Boolean.TRUE); + propertyDescriptor.setValue(DEFAULT, ""); + + propertyDescriptor = property("tokenUri"); + propertyDescriptor.setValue(NOT_UNDEFINED, Boolean.TRUE); + propertyDescriptor.setValue(DEFAULT, "https://oauth2.googleapis.com/token"); + + propertyDescriptor = property("clientId"); + propertyDescriptor.setValue(NOT_UNDEFINED, Boolean.TRUE); + propertyDescriptor.setValue(DEFAULT, ""); + + propertyDescriptor = property("clientEmail"); + propertyDescriptor.setValue(NOT_UNDEFINED, Boolean.TRUE); + propertyDescriptor.setValue(DEFAULT, ""); + + propertyDescriptor = property("client_x509CertUrl"); + propertyDescriptor.setValue(NOT_UNDEFINED, Boolean.TRUE); + propertyDescriptor.setValue(DEFAULT, ""); + + propertyDescriptor = property("authUri"); + propertyDescriptor.setValue(NOT_UNDEFINED, Boolean.TRUE); + propertyDescriptor.setValue(DEFAULT, "https://accounts.google.com/o/oauth2/auth"); + + propertyDescriptor = property("authProvider_x509CertUrl"); + propertyDescriptor.setValue(NOT_UNDEFINED, Boolean.TRUE); + propertyDescriptor.setValue(DEFAULT, "https://www.googleapis.com/oauth2/v1/certs"); + + + if (LOGGER.isDebugEnabled()) { + String pubDescriptorsAsString = Arrays.stream(getPropertyDescriptors()) + .map(pd -> pd.getName() + "=" + pd.getDisplayName()).collect(Collectors.joining(" ,")); + LOGGER.debug(pubDescriptorsAsString); + } + + } + +} diff --git a/src/main/java/com/di/jmeter/pubsub/config/SubscriberConfig.java b/src/main/java/com/di/jmeter/pubsub/config/SubscriberConfig.java new file mode 100644 index 0000000..ec83f29 --- /dev/null +++ b/src/main/java/com/di/jmeter/pubsub/config/SubscriberConfig.java @@ -0,0 +1,337 @@ +package com.di.jmeter.pubsub.config; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; + +import org.apache.jmeter.config.ConfigElement; +import org.apache.jmeter.testbeans.TestBean; +import org.apache.jmeter.testbeans.TestBeanHelper; +import org.apache.jmeter.testelement.AbstractTestElement; +import org.apache.jmeter.testelement.TestStateListener; +import org.apache.jmeter.threads.JMeterContextService; +import org.apache.jmeter.threads.JMeterVariables; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.threeten.bp.Duration; + +import com.di.jmeter.pubsub.utils.MessagesQueue; +import com.di.jmeter.pubsub.utils.SimpleMessageReceiver; +import com.google.api.gax.batching.FlowControlSettings; +import com.google.api.gax.core.CredentialsProvider; +import com.google.api.gax.core.FixedCredentialsProvider; +import com.google.auth.oauth2.GoogleCredentials; +import com.google.cloud.pubsub.v1.Subscriber; +import com.google.gson.JsonObject; +import com.google.pubsub.v1.ProjectSubscriptionName; + +public class SubscriberConfig extends AbstractTestElement implements ConfigElement, TestStateListener, TestBean { + + private static final long serialVersionUID = -6527581818773236163L; + private static Logger LOGGER = LoggerFactory.getLogger(SubscriberConfig.class); + private Subscriber subscriber; + private JsonObject credentials = new JsonObject(); + private static GoogleCredentials gcpCredentials = null; + + private String type; + private String topic; + private String projectId; + private String privateKey; + private String privateKeyId; + private String clientEmail; + private String clientId; + private String authUri; + private String tokenUri; + private String authProvider_x509CertUrl; + private String client_x509CertUrl; + private String subscriptionId; + private boolean flowControlSetting; + private String parallelPullCount; + private String maxAckExtensionPeriod; + private String maxOutStandingElementCount; + private String maxOutstandingRequestBytes; + private static MessagesQueue messagesQueue; + + private static String SUBSCRIBER_CONNECTION = "subscriberConnection"; + private static String MESSAGESQUEUE = "message"; + private static String SUBSCRIBED_TOPIC="subTopic"; + + @Override + public void testStarted() { + this.setRunningVersion(true); + TestBeanHelper.prepare(this); + + messagesQueue = new MessagesQueue(100000); + JMeterVariables variables = getThreadContext().getVariables(); + ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of(getProjectId(), getSubscriptionId()); + + if (variables.getObject(SUBSCRIBER_CONNECTION) != null) { + LOGGER.error("PubSub connection is already established and active !!"); + } else { + synchronized (this) { + try { + + LOGGER.info("Attempting to subscribe to a topic"); + if (isFlowControlSetting()) { + subscriber = Subscriber.newBuilder(subscriptionName, new SimpleMessageReceiver(messagesQueue)) + .setCredentialsProvider(createCredentialsProviderUsingJson(getCredentials())) + .setMaxAckExtensionPeriod(Duration.ofMillis(Long.parseLong(getMaxAckExtensionPeriod()))) + .setFlowControlSettings(flowControlSettings()) + .setParallelPullCount(Integer.parseInt(getParallelPullCount())).build(); + + } else { + subscriber = Subscriber.newBuilder(subscriptionName, new SimpleMessageReceiver(messagesQueue)) + .setCredentialsProvider(createCredentialsProviderUsingJson(getCredentials())) + .setMaxAckExtensionPeriod(Duration.ofMillis(Long.parseLong(getMaxAckExtensionPeriod()))) + .build(); + + } + + subscriber.startAsync().awaitRunning(); + // subscriber.awaitTerminated(); + // Allow the subscriber to run indefinitely unless an error occurs + variables.putObject(SUBSCRIBER_CONNECTION, subscriber); + variables.putObject(MESSAGESQUEUE, messagesQueue); + variables.putObject(SUBSCRIBED_TOPIC, topic); + LOGGER.info(String.format("Subscriber connection established with the %s successfully !!", getTopic())); + + } catch (IllegalStateException e) { + LOGGER.info("Error occurred while establishing subscriber connection with Pub/Sub: " + e); + } + } + } + } + + private FlowControlSettings flowControlSettings() { + return FlowControlSettings.newBuilder() + .setMaxOutstandingElementCount(Long.valueOf(getMaxOutStandingElementCount())) + .setMaxOutstandingRequestBytes(Long.valueOf(getMaxOutstandingRequestBytes())).build(); + } + + @Override + public void testStarted(String host) { + testStarted(); + } + + @Override + public void testEnded() { + synchronized (this) { + if (subscriber != null) { + subscriber.stopAsync(); + subscriber = null; + LOGGER.info("Subscriber connection Terminated successfully !!"); + } + } + } + + @Override + public void testEnded(String host) { + testEnded(); + } + + @Override + public void addConfigElement(ConfigElement config) { + // TODO Auto-generated method stub + + } + + @Override + public boolean expectsModification() { + // TODO Auto-generated method stub + return false; + } + + // Return credentials in JSON Object + private JsonObject getCredentials() { + + credentials.addProperty("type", getType()); + credentials.addProperty("topic", getTopic()); + credentials.addProperty("project_id", getProjectId()); + credentials.addProperty("private_key", getPrivateKey().replaceAll("\\\\n", "\n")); + credentials.addProperty("private_key_id", getPrivateKeyId()); + credentials.addProperty("client_email", getClientEmail()); + credentials.addProperty("client_id", getClientId()); + credentials.addProperty("auth_uri", getAuthUri()); + credentials.addProperty("token_uri", getTokenUri()); + credentials.addProperty("auth_provider_x509_cert_url", getAuthProvider_x509CertUrl()); + credentials.addProperty("client_x509_cert_url", getClient_x509CertUrl()); + + return credentials; + } + + private static CredentialsProvider createCredentialsProviderUsingJson(JsonObject credentials) { + + InputStream configJson = new ByteArrayInputStream(credentials.toString().getBytes()); + + try { + gcpCredentials = GoogleCredentials.fromStream(configJson); + } catch (IOException e) { + System.out.println("Error occurred while creating GCPcredentials using Json"); + e.printStackTrace(); + } + return FixedCredentialsProvider.create(gcpCredentials); + } + +// === Getters and Setters === + + public String getType() { + return type; + } + + public String getTopic() { + return topic; + } + + public String getProjectId() { + return projectId; + } + + public String getPrivateKey() { + return privateKey; + } + + public String getPrivateKeyId() { + return privateKeyId; + } + + public String getClientEmail() { + return clientEmail; + } + + public String getClientId() { + return clientId; + } + + public String getAuthUri() { + return authUri; + } + + public String getTokenUri() { + return tokenUri; + } + + public String getAuthProvider_x509CertUrl() { + return authProvider_x509CertUrl; + } + + public String getClient_x509CertUrl() { + return client_x509CertUrl; + } + + public String getSubscriptionId() { + return subscriptionId; + } + + public boolean isFlowControlSetting() { + return flowControlSetting; + } + + public String getParallelPullCount() { + return parallelPullCount; + } + + public String getMaxOutstandingRequestBytes() { + return maxOutstandingRequestBytes; + } + + public String getMaxAckExtensionPeriod() { + return maxAckExtensionPeriod; + } + + public String getMaxOutStandingElementCount() { + return maxOutStandingElementCount; + } + + public void setType(String type) { + this.type = type; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + public void setProjectId(String projectId) { + this.projectId = projectId; + } + + public void setPrivateKey(String privateKey) { + this.privateKey = privateKey; + } + + public void setPrivateKeyId(String privateKeyId) { + this.privateKeyId = privateKeyId; + } + + public void setClientEmail(String clientEmail) { + this.clientEmail = clientEmail; + } + + public void setClientId(String clientId) { + this.clientId = clientId; + } + + public void setAuthUri(String authUri) { + this.authUri = authUri; + } + + public void setTokenUri(String tokenUri) { + this.tokenUri = tokenUri; + } + + public void setAuthProvider_x509CertUrl(String authProvider_x509CertUrl) { + this.authProvider_x509CertUrl = authProvider_x509CertUrl; + } + + public void setClient_x509CertUrl(String client_x509CertUrl) { + this.client_x509CertUrl = client_x509CertUrl; + } + + public void setSubscriptionId(String subscriptionId) { + this.subscriptionId = subscriptionId; + } + + public void setFlowControlSetting(boolean flowControlSetting) { + this.flowControlSetting = flowControlSetting; + } + + public void setParallelPullCount(String parallelPullCount) { + if (Integer.parseInt(parallelPullCount) > 1) { + this.parallelPullCount = parallelPullCount; + } else { + this.parallelPullCount = "1"; + } + } + + public void setMaxOutstandingRequestBytes(String maxOutstandingRequestBytes) { + if (maxOutstandingRequestBytes == null || maxOutstandingRequestBytes == "") { + this.maxOutstandingRequestBytes = "1_000_000_000L"; + } else { + this.maxOutstandingRequestBytes = maxOutstandingRequestBytes; + } + } + + public void setMaxAckExtensionPeriod(String maxAckExtensionPeriod) { + this.maxAckExtensionPeriod = maxAckExtensionPeriod; + } + + public void setMaxOutStandingElementCount(String maxOutStandingElementCount) { + if (maxOutStandingElementCount == null || maxOutStandingElementCount == "") { + this.maxOutStandingElementCount = "10_000L"; + } else { + this.maxOutStandingElementCount = maxOutStandingElementCount; + } + } + + public static Subscriber getSubscriber() { + return (Subscriber) JMeterContextService.getContext().getVariables().getObject(SUBSCRIBER_CONNECTION); + } + + public static MessagesQueue getMessagesQueue() { + return (MessagesQueue) JMeterContextService.getContext().getVariables().getObject(MESSAGESQUEUE); + } + + public static String getSubscribedTopic() { + return (String) JMeterContextService.getContext().getVariables().getObject(SUBSCRIBED_TOPIC); + } + +} diff --git a/src/main/java/com/di/jmeter/pubsub/config/SubscriberConfigBeanInfo.java b/src/main/java/com/di/jmeter/pubsub/config/SubscriberConfigBeanInfo.java new file mode 100644 index 0000000..3769e64 --- /dev/null +++ b/src/main/java/com/di/jmeter/pubsub/config/SubscriberConfigBeanInfo.java @@ -0,0 +1,100 @@ +package com.di.jmeter.pubsub.config; + +import java.beans.PropertyDescriptor; +import java.util.Arrays; +import java.util.stream.Collectors; + +import org.apache.jmeter.testbeans.BeanInfoSupport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SubscriberConfigBeanInfo extends BeanInfoSupport { + private static Logger LOGGER = LoggerFactory.getLogger(SubscriberConfigBeanInfo.class); + + public SubscriberConfigBeanInfo() { + super(SubscriberConfig.class); + + createPropertyGroup("credentials", + new String[] { "type", "projectId", "topic", "subscriptionId", "privateKey", "privateKeyId", "tokenUri", + "clientId", "clientEmail", "client_x509CertUrl", "authUri", "authProvider_x509CertUrl", + "maxAckExtensionPeriod" }); + + //FlowControl category + createPropertyGroup("fcSettings", new String[] {"flowControlSetting", "parallelPullCount", "maxOutStandingElementCount", "maxOutstandingRequestBytes"}); + + PropertyDescriptor p = property("projectId"); + p.setValue(NOT_UNDEFINED, Boolean.TRUE); + p.setValue(DEFAULT, ""); + + p = property("subscriptionId"); + p.setValue(NOT_UNDEFINED, Boolean.TRUE); + p.setValue(DEFAULT, ""); + + p = property("type"); + p.setValue(NOT_UNDEFINED, Boolean.TRUE); + p.setValue(DEFAULT, "service_account"); + + p = property("topic"); + p.setValue(NOT_UNDEFINED, Boolean.TRUE); + p.setValue(DEFAULT, ""); + + p = property("privateKey"); + p.setValue(NOT_UNDEFINED, Boolean.TRUE); + p.setValue(DEFAULT, ""); + + p = property("privateKeyId"); + p.setValue(NOT_UNDEFINED, Boolean.TRUE); + p.setValue(DEFAULT, ""); + + p = property("tokenUri"); + p.setValue(NOT_UNDEFINED, Boolean.TRUE); + p.setValue(DEFAULT, "https://oauth2.googleapis.com/token"); + + p = property("clientId"); + p.setValue(NOT_UNDEFINED, Boolean.TRUE); + p.setValue(DEFAULT, ""); + + p = property("clientEmail"); + p.setValue(NOT_UNDEFINED, Boolean.TRUE); + p.setValue(DEFAULT, ""); + + p = property("client_x509CertUrl"); + p.setValue(NOT_UNDEFINED, Boolean.TRUE); + p.setValue(DEFAULT, ""); + + p = property("authUri"); + p.setValue(NOT_UNDEFINED, Boolean.TRUE); + p.setValue(DEFAULT, "https://accounts.google.com/o/oauth2/auth"); + + p = property("authProvider_x509CertUrl"); + p.setValue(NOT_UNDEFINED, Boolean.TRUE); + p.setValue(DEFAULT, "https://www.googleapis.com/oauth2/v1/certs"); + + p = property("maxAckExtensionPeriod"); + p.setValue(NOT_UNDEFINED, Boolean.TRUE); + p.setValue(DEFAULT, "1"); + + p = property("flowControlSetting"); + p.setValue(NOT_UNDEFINED, Boolean.TRUE); + p.setValue(DEFAULT, Boolean.FALSE); + + p = property("parallelPullCount"); + p.setValue(NOT_UNDEFINED, Boolean.TRUE); + p.setValue(DEFAULT, "1"); + + p = property("maxOutStandingElementCount"); + p.setValue(NOT_UNDEFINED, Boolean.TRUE); + p.setValue(DEFAULT, "10_000L"); + + p = property("maxOutstandingRequestBytes"); + p.setValue(NOT_UNDEFINED, Boolean.TRUE); + p.setValue(DEFAULT, "1_000_000_000L"); + + + if (LOGGER.isDebugEnabled()) { + String subDescriptorsAsString = Arrays.stream(getPropertyDescriptors()) + .map(pd -> pd.getName() + "=" + pd.getDisplayName()).collect(Collectors.joining(" ,")); + LOGGER.debug(subDescriptorsAsString); + } + } +} diff --git a/src/main/java/com/di/jmeter/pubsub/sampler/PublisherSampler.java b/src/main/java/com/di/jmeter/pubsub/sampler/PublisherSampler.java new file mode 100644 index 0000000..e508298 --- /dev/null +++ b/src/main/java/com/di/jmeter/pubsub/sampler/PublisherSampler.java @@ -0,0 +1,153 @@ +package com.di.jmeter.pubsub.sampler; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.zip.GZIPOutputStream; + +import org.apache.jmeter.config.ConfigTestElement; +import org.apache.jmeter.engine.util.ConfigMergabilityIndicator; +import org.apache.jmeter.samplers.Entry; +import org.apache.jmeter.samplers.SampleResult; +import org.apache.jmeter.samplers.Sampler; +import org.apache.jmeter.testbeans.TestBean; +import org.apache.jmeter.testelement.TestElement; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.di.jmeter.pubsub.config.PublisherConfig; +import com.google.api.core.ApiFuture; +import com.google.cloud.pubsub.v1.Publisher; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.PubsubMessage; + +public class PublisherSampler extends PublisherTestElement implements Sampler, TestBean, ConfigMergabilityIndicator { + + private static final long serialVersionUID = -2509242423429019193L; + private static final Logger LOGGER = LoggerFactory.getLogger(PublisherSampler.class); + + private Publisher publisher = null; + private static final Set APPLIABLE_CONFIG_CLASSES = new HashSet<>( + Arrays.asList("org.apache.jmeter.config.gui.SimpleConfigGui")); + + @Override + public SampleResult sample(Entry e) { + PubsubMessage template = null; + SampleResult result = new SampleResult(); + result.setSampleLabel(getName()); + result.setSamplerData(request()); + result.setDataType(SampleResult.TEXT); + result.setContentType("text/plain"); + result.setDataEncoding(StandardCharsets.UTF_8.name()); + + if (isGzipCompression()) { + template = createPubsubMessage(createEventCompressed(getMessage())); + } else { + template = convertStringToPubSubMessage(getMessage()); + } + + result.sampleStart(); + + try { + publish(template, result); + } catch (Exception ex) { + LOGGER.info("Exception occurred while publishing message"); + result = handleException(result, ex); + } finally { + result.sampleEnd(); + } + return result; + } + + // Returns Modified templates/Message as template for publishing + private PubsubMessage convertStringToPubSubMessage(String message) { + return PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(message)).build(); + } + + private byte[] createEventCompressed(String message) { + //BufferedWriter zipWriter = null; + try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(message.length())) { + try (GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteArrayOutputStream)) { + gzipOutputStream.write(message.getBytes(StandardCharsets.UTF_8)); + gzipOutputStream.close(); + } + return byteArrayOutputStream.toByteArray(); + } catch (IOException e) { + throw new RuntimeException("Failed to zip content", e); + } + } + + private String publish(PubsubMessage template, SampleResult result) { + String resp = null; + ApiFuture future = null; + if (this.publisher == null) { + this.publisher = PublisherConfig.getPublisherClient(); + } + + try { + future = publisher.publish(template); + result.setResponseHeaders("MessagePublishedID: "+ future.get()); + result.setResponseData(template.toString(), StandardCharsets.UTF_8.name()); + result.setSuccessful(true); + result.setResponseCode("200"); + result.setResponseMessageOK(); + + } catch (ExecutionException e) { + LOGGER.info("Publisher config not initialized properly.. Check the config element"); + handleException(result, e); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + return resp; + } + + public static PubsubMessage createPubsubMessage(byte[] msg) { + return PubsubMessage.newBuilder().setData(ByteString.copyFrom(msg)).build(); + } + + private String request() { + StringBuilder requestBody = new StringBuilder(); + requestBody.append("PublishedMessage: \n").append(getMessage()).append("\n"); + return requestBody.toString(); + } + + private SampleResult handleException(SampleResult result, Exception ex) { + result.setResponseMessage("Message Publish Error"); + result.setResponseCode("500"); + result.setResponseData(String.format("Error in publishing message to PubSub topic : %s", ex.toString()).getBytes());// PublisherConfig.getTopic() + result.setSuccessful(false); + return result; + } + + @Override + public boolean applies(ConfigTestElement configElement) { + String guiClass = configElement.getProperty(TestElement.GUI_CLASS).getStringValue(); + return APPLIABLE_CONFIG_CLASSES.contains(guiClass); + } + + @Override + public void testStarted() { + // TODO Auto-generated method stub + } + + @Override + public void testStarted(String host) { + // TODO Auto-generated method stub + } + + @Override + public void testEnded() { + // TODO Auto-generated method stub + } + + @Override + public void testEnded(String host) { + // TODO Auto-generated method stub + } + +} diff --git a/src/main/java/com/di/jmeter/pubsub/sampler/PublisherSamplerBeanInfo.java b/src/main/java/com/di/jmeter/pubsub/sampler/PublisherSamplerBeanInfo.java new file mode 100644 index 0000000..c7bb085 --- /dev/null +++ b/src/main/java/com/di/jmeter/pubsub/sampler/PublisherSamplerBeanInfo.java @@ -0,0 +1,9 @@ +package com.di.jmeter.pubsub.sampler; + +public class PublisherSamplerBeanInfo extends PublisherTestElementBeanInfoSupport { + + public PublisherSamplerBeanInfo() { + super(PublisherSampler.class); + } + +} diff --git a/src/main/java/com/di/jmeter/pubsub/sampler/PublisherTestElement.java b/src/main/java/com/di/jmeter/pubsub/sampler/PublisherTestElement.java new file mode 100644 index 0000000..cd0010a --- /dev/null +++ b/src/main/java/com/di/jmeter/pubsub/sampler/PublisherTestElement.java @@ -0,0 +1,36 @@ +package com.di.jmeter.pubsub.sampler; + +import java.io.Serializable; + +import org.apache.jmeter.gui.Searchable; +import org.apache.jmeter.testelement.AbstractTestElement; +import org.apache.jmeter.testelement.TestElement; +import org.apache.jmeter.testelement.TestStateListener; + +public abstract class PublisherTestElement extends AbstractTestElement implements TestStateListener, TestElement, Serializable, Searchable { + + + private static final long serialVersionUID = 7027549399338665744L; + + private boolean gzipCompression; + private String message; + + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } + + public boolean isGzipCompression() { + return gzipCompression; + } + + public void setGzipCompression(boolean gzipCompression) { + this.gzipCompression = gzipCompression; + } + + +} diff --git a/src/main/java/com/di/jmeter/pubsub/sampler/PublisherTestElementBeanInfoSupport.java b/src/main/java/com/di/jmeter/pubsub/sampler/PublisherTestElementBeanInfoSupport.java new file mode 100644 index 0000000..90a4504 --- /dev/null +++ b/src/main/java/com/di/jmeter/pubsub/sampler/PublisherTestElementBeanInfoSupport.java @@ -0,0 +1,26 @@ +package com.di.jmeter.pubsub.sampler; + +import java.beans.PropertyDescriptor; + +import org.apache.jmeter.testbeans.BeanInfoSupport; +import org.apache.jmeter.testbeans.TestBean; +import org.apache.jmeter.testbeans.gui.TypeEditor; + +public class PublisherTestElementBeanInfoSupport extends BeanInfoSupport { + + protected PublisherTestElementBeanInfoSupport(Class beanClass) { + super(beanClass); + + createPropertyGroup("Message to publish", new String[] { "gzipCompression","message"}); + + PropertyDescriptor propertyDescriptor = property("message", TypeEditor.TextAreaEditor); + propertyDescriptor.setValue(NOT_UNDEFINED, Boolean.TRUE); + propertyDescriptor.setValue(DEFAULT, "{\"demoMessage\":\"Hello World!\"}"); + + propertyDescriptor = property("gzipCompression"); + propertyDescriptor.setValue(NOT_UNDEFINED, Boolean.TRUE); + propertyDescriptor.setValue(DEFAULT, Boolean.FALSE); + + } + +} diff --git a/src/main/java/com/di/jmeter/pubsub/sampler/SubscriberSampler.java b/src/main/java/com/di/jmeter/pubsub/sampler/SubscriberSampler.java new file mode 100644 index 0000000..3e49654 --- /dev/null +++ b/src/main/java/com/di/jmeter/pubsub/sampler/SubscriberSampler.java @@ -0,0 +1,161 @@ +package com.di.jmeter.pubsub.sampler; + +import java.io.BufferedReader; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; +import java.util.zip.GZIPInputStream; + +import org.apache.jmeter.config.ConfigTestElement; +import org.apache.jmeter.engine.util.ConfigMergabilityIndicator; +import org.apache.jmeter.samplers.Entry; +import org.apache.jmeter.samplers.SampleResult; +import org.apache.jmeter.samplers.Sampler; +import org.apache.jmeter.testbeans.TestBean; +import org.apache.jmeter.testelement.TestElement; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.di.jmeter.pubsub.config.SubscriberConfig; +import com.di.jmeter.pubsub.utils.MessagesQueue; +import com.google.pubsub.v1.PubsubMessage; + +public class SubscriberSampler extends SubscriberTestElement implements Sampler, TestBean, ConfigMergabilityIndicator { + + private static final long serialVersionUID = 7189759970130154866L; + private static final Logger LOGGER = LoggerFactory.getLogger(SubscriberSampler.class); + private static final Set APPLIABLE_CONFIG_CLASSES = new HashSet<>( + Arrays.asList("org.apache.jmeter.config.gui.SimpleConfigGui")); + + private MessagesQueue messagesQueue; + private static PubsubMessage reader; + private String ackDelay; + private boolean decompression; + + @Override + public SampleResult sample(Entry e) { + SampleResult result = new SampleResult(); + result.setSampleLabel(getName()); + // result.setSamplerData(); + result.setDataType(SampleResult.TEXT); + result.setContentType("text/plain"); + result.setDataEncoding(StandardCharsets.UTF_8.name()); + + if (Integer.parseInt(getAckDelay()) > 1) { + ackDelay = getAckDelay(); + } + + result.sampleStart(); + + try { + reader = readMessageFromTopic(result); + + } catch (Exception ex) { + LOGGER.info("Exception Occurred while reading message"); + result = handleException(result, ex); + } finally { + result.sampleEnd(); + } + return result; + } + + private PubsubMessage readMessageFromTopic(SampleResult result) throws IOException { + + if (messagesQueue == null) { + this.messagesQueue = SubscriberConfig.getMessagesQueue(); + } + + try { + reader = messagesQueue.take(); + if (isDecompression()) { + result.setResponseData(createDeCompressedMessage(reader.getData().toByteArray()), + StandardCharsets.UTF_8.name()); + } else { + result.setResponseData(reader.getData().toString(), StandardCharsets.UTF_8.name()); + } + result.setResponseHeaders("PublishedMessageID: " + reader.getMessageId() + "\npublish_time in "+reader.getPublishTime()); + result.setSuccessful(true); + result.setResponseCode("200"); + result.setResponseMessageOK(); + + } catch (InterruptedException e) { + LOGGER.info(String.format("Error in reading message from the Message queue " + e)); + e.printStackTrace(); + } + + return reader; + } + + private String createDeCompressedMessage(byte[] message) throws IOException { + String output = null; + StringBuffer resultBuffer = new StringBuffer(); + try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(message)) { + try (GZIPInputStream gZIPInputStream = new GZIPInputStream(byteArrayInputStream)) { + BufferedReader br = new BufferedReader(new InputStreamReader(gZIPInputStream)); + while ((output = br.readLine()) != null) { + resultBuffer.append(output); + } + } + } catch (IOException e) { + e.printStackTrace(); + } + return resultBuffer.toString(); + } + + private SampleResult handleException(SampleResult result, Exception ex) { + result.setResponseMessage("Message Read Error"); + result.setResponseCode("500"); + result.setResponseData( + String.format("Error in Retrieving message from the receiever queue: " + ex.toString()).getBytes()); + result.setSuccessful(false); + return result; + } + + @Override + public void testStarted() { + // TODO Auto-generated method stub + } + + @Override + public void testStarted(String host) { + testStarted(); + } + + @Override + public void testEnded() { + // TODO Auto-generated method stub + + } + + @Override + public void testEnded(String host) { + testEnded(); + } + + @Override + public boolean applies(ConfigTestElement configElement) { + String guiClass = configElement.getProperty(TestElement.GUI_CLASS).getStringValue(); + return APPLIABLE_CONFIG_CLASSES.contains(guiClass); + } + + public String getAckDelay() { + return ackDelay; + } + + public void setAckDelay(String ackDelay) { + this.ackDelay = ackDelay; + } + + public boolean isDecompression() { + return decompression; + } + + public void setDecompression(boolean decompression) { + this.decompression = decompression; + } + +} diff --git a/src/main/java/com/di/jmeter/pubsub/sampler/SubscriberSamplerBeanInfo.java b/src/main/java/com/di/jmeter/pubsub/sampler/SubscriberSamplerBeanInfo.java new file mode 100644 index 0000000..499648c --- /dev/null +++ b/src/main/java/com/di/jmeter/pubsub/sampler/SubscriberSamplerBeanInfo.java @@ -0,0 +1,9 @@ +package com.di.jmeter.pubsub.sampler; + +public class SubscriberSamplerBeanInfo extends SubscriberTestElementBeanInfoSupport{ + + public SubscriberSamplerBeanInfo() { + super(SubscriberSampler.class); + } + +} diff --git a/src/main/java/com/di/jmeter/pubsub/sampler/SubscriberTestElement.java b/src/main/java/com/di/jmeter/pubsub/sampler/SubscriberTestElement.java new file mode 100644 index 0000000..39b3eb1 --- /dev/null +++ b/src/main/java/com/di/jmeter/pubsub/sampler/SubscriberTestElement.java @@ -0,0 +1,30 @@ +package com.di.jmeter.pubsub.sampler; + +import java.io.Serializable; + +import org.apache.jmeter.gui.Searchable; +import org.apache.jmeter.testelement.AbstractTestElement; +import org.apache.jmeter.testelement.TestElement; +import org.apache.jmeter.testelement.TestStateListener; + +public abstract class SubscriberTestElement extends AbstractTestElement implements TestStateListener, TestElement, Serializable, Searchable { + + private static final long serialVersionUID = -6951161193102820427L; + private static String ackDelay; + + +// ===== Getters and Setters ===== + + public String getAckDelay() { + return ackDelay; + } + + public void setAckDelay(String ackDelay) { + if(Integer.parseInt(ackDelay)>0) { + SubscriberTestElement.ackDelay = ackDelay; + }else { + SubscriberTestElement.ackDelay = "0"; + } + } + +} diff --git a/src/main/java/com/di/jmeter/pubsub/sampler/SubscriberTestElementBeanInfoSupport.java b/src/main/java/com/di/jmeter/pubsub/sampler/SubscriberTestElementBeanInfoSupport.java new file mode 100644 index 0000000..7efdd5b --- /dev/null +++ b/src/main/java/com/di/jmeter/pubsub/sampler/SubscriberTestElementBeanInfoSupport.java @@ -0,0 +1,25 @@ +package com.di.jmeter.pubsub.sampler; + +import java.beans.PropertyDescriptor; + +import org.apache.jmeter.testbeans.BeanInfoSupport; +import org.apache.jmeter.testbeans.TestBean; + +public class SubscriberTestElementBeanInfoSupport extends BeanInfoSupport { + + protected SubscriberTestElementBeanInfoSupport(Class beanClass) { + super(beanClass); + + createPropertyGroup("SubscriberProperties", new String[] { "ackDelay", "decompression"}); + + PropertyDescriptor propertyDescriptor = property("ackDelay"); + propertyDescriptor.setValue(NOT_UNDEFINED, Boolean.TRUE); + propertyDescriptor.setValue(DEFAULT, "0"); + + propertyDescriptor = property("decompression"); + propertyDescriptor.setValue(NOT_UNDEFINED, Boolean.TRUE); + propertyDescriptor.setValue(DEFAULT, Boolean.FALSE); + + } + +} diff --git a/src/main/java/com/di/jmeter/pubsub/utils/MessagesQueue.java b/src/main/java/com/di/jmeter/pubsub/utils/MessagesQueue.java new file mode 100644 index 0000000..9d424d0 --- /dev/null +++ b/src/main/java/com/di/jmeter/pubsub/utils/MessagesQueue.java @@ -0,0 +1,36 @@ +package com.di.jmeter.pubsub.utils; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingDeque; + +import com.google.pubsub.v1.PubsubMessage; + + +public class MessagesQueue { + + + private final BlockingQueue messages; + + public MessagesQueue(int maxQueueSize){ + this.messages = new LinkedBlockingDeque<>(maxQueueSize); + } + + protected MessagesQueue(BlockingQueue messages){ + this.messages = messages; + } + + public PubsubMessage take() throws InterruptedException { + return messages.take(); + } + + public int getSize(){ + return messages.size(); + } + + public boolean offer(PubsubMessage message){ + return messages.offer(message); + } + + +} + diff --git a/src/main/java/com/di/jmeter/pubsub/utils/SimpleMessageReceiver.java b/src/main/java/com/di/jmeter/pubsub/utils/SimpleMessageReceiver.java new file mode 100644 index 0000000..a45b18e --- /dev/null +++ b/src/main/java/com/di/jmeter/pubsub/utils/SimpleMessageReceiver.java @@ -0,0 +1,27 @@ +package com.di.jmeter.pubsub.utils; + +import com.google.cloud.pubsub.v1.AckReplyConsumer; +import com.google.cloud.pubsub.v1.MessageReceiver; +import com.google.pubsub.v1.PubsubMessage; + +public class SimpleMessageReceiver implements MessageReceiver { + private final MessagesQueue messagesQueue; + + public SimpleMessageReceiver(final MessagesQueue messagesQueue) { + this.messagesQueue = messagesQueue; + } + + @Override + public void receiveMessage(final PubsubMessage message, final AckReplyConsumer consumer) { +// System.out.println("Id : " + message.getMessageId()); +// System.out.println("Data : " + message.getData().toStringUtf8()); + + if(messagesQueue.offer(message)){ + consumer.ack(); + } + else{ + consumer.nack(); + } + } + +} diff --git a/src/main/resources/com/di/jmeter/pubsub/config/PublisherConfigResources.properties b/src/main/resources/com/di/jmeter/pubsub/config/PublisherConfigResources.properties new file mode 100644 index 0000000..51e777c --- /dev/null +++ b/src/main/resources/com/di/jmeter/pubsub/config/PublisherConfigResources.properties @@ -0,0 +1,42 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to you under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +displayName=PubSub-PublisherConfiguration +credentials.displayName=Credentials Configuration + +projectId.displayName=projectId +projectId.shortDescription=projectId +type.displayName=type +type.shortDescription=type +topic.displayName=topic +topic.shortDescription=topic +privateKey.displayName=privateKey +privateKey.shortDescription=privateKey +privateKeyId.displayName=privateKeyId +privateKeyId.shortDescription=privateKeyId +tokenUri.displayName=tokenUri +tokenUri.shortDescription=tokenUri +clientId.displayName=clientId +clientId.shortDescription=clientId +clientEmail.displayName=clientEmail +clientEmail.shortDescription=clientEmail +client_x509CertUrl.displayName=client_x509CertUrl +client_x509CertUrl.shortDescription=client_x509CertUrl +authUri.displayName=authUri +authUri.shortDescription=authUri +authProvider_x509CertUrl.displayName=authProvider_x509CertUrl +authProvider_x509CertUrl.shortDescription=authProvider_x509CertUrl diff --git a/src/main/resources/com/di/jmeter/pubsub/config/SubscriberConfigResources.properties b/src/main/resources/com/di/jmeter/pubsub/config/SubscriberConfigResources.properties new file mode 100644 index 0000000..6350337 --- /dev/null +++ b/src/main/resources/com/di/jmeter/pubsub/config/SubscriberConfigResources.properties @@ -0,0 +1,57 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to you under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +displayName=PubSub-SubscriberConfiguration +credentials.displayName=Credentials Configuration + +projectId.displayName=projectId +projectId.shortDescription=projectId +subscriptionId.displayName=subscriptionId +subscriptionId.shortDescription=subscriptionId +type.displayName=type +type.shortDescription=type +topic.displayName=topic +topic.shortDescription=topic +privateKey.displayName=privateKey +privateKey.shortDescription=privateKey +privateKeyId.displayName=privateKeyId +privateKeyId.shortDescription=privateKeyId +tokenUri.displayName=tokenUri +tokenUri.shortDescription=tokenUri +clientId.displayName=clientId +clientId.shortDescription=clientId +clientEmail.displayName=clientEmail +clientEmail.shortDescription=clientEmail +client_x509CertUrl.displayName=client_x509CertUrl +client_x509CertUrl.shortDescription=client_x509CertUrl +authUri.displayName=authUri +authUri.shortDescription=authUri +authProvider_x509CertUrl.displayName=authProvider_x509CertUrl +authProvider_x509CertUrl.shortDescription=authProvider_x509CertUrl +maxAckExtensionPeriod.displayName=MaxAckExtensionPeriod +maxAckExtensionPeriod.shortDescription=Set the maximum period a message ack deadline will be extended. Defaults to one day. +fcSettings.displayName=FlowControl Settings +flowControlSetting.displayName=Enable FlowControl +flowControlSetting.shortDescription=Defines how client pulls messages from pub/sub +parallelPullCount.displayName=ParallelPullCount +parallelPullCount.shortDescription=Defines how many subscriber streams will open to receive message(s) +maxOutStandingElementCount.displayName=MaxOutStandingElement +maxOutStandingElementCount.shortDescription=MaxOutStandingElement +maxOutStandingRequestBytes.displayName=MaxOutStandingRequestBytes +maxOutStandingRequestBytes.shortDescription=MaxOutStandingRequestBytes + + diff --git a/src/main/resources/com/di/jmeter/pubsub/sampler/PublisherSamplerResources.properties b/src/main/resources/com/di/jmeter/pubsub/sampler/PublisherSamplerResources.properties new file mode 100644 index 0000000..1a96e80 --- /dev/null +++ b/src/main/resources/com/di/jmeter/pubsub/sampler/PublisherSamplerResources.properties @@ -0,0 +1,25 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to you under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +displayName=Publisher Request +#template.displayName=Template + +gzipCompression.displayName=Gzip-Compression +gzipCompression.shortDescription=Set to enable compression for messages +message.displayName=Message +message.shortDescription=Message - This will supercedes template Type + diff --git a/src/main/resources/com/di/jmeter/pubsub/sampler/SubscriberSamplerResources.properties b/src/main/resources/com/di/jmeter/pubsub/sampler/SubscriberSamplerResources.properties new file mode 100644 index 0000000..18246ff --- /dev/null +++ b/src/main/resources/com/di/jmeter/pubsub/sampler/SubscriberSamplerResources.properties @@ -0,0 +1,23 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to you under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +displayName=Subscriber Request +ackDelay.displayName=ACK Delay +ackDelay.shortDescription= Time delay to acknowledge the received message +decompression.displayName=Enable Decompression +decompression.shortDescription=Flag to DeCompress the message, If the message received is in compressed format +