diff --git a/README.MD b/README.MD index b02fddd..67673a7 100644 --- a/README.MD +++ b/README.MD @@ -2,30 +2,30 @@ ## Introduction -This plugin adds feature to connect to a topic to publish/subscribe messages in GCP +This plugin adds feature to connect to GCP to publish/subscribe messages to the topic ## Required Components -1. Jmeter +1. Apache Jmeter/DI-Jmeter 2. GCP Pubsub configs ## Jar Dependencies Required -* google-cloud-pubsub-1.88.0.jar or above +* google-cloud-pubsub-1.111.2.jar * gson-2.2.4.jar ## Jmeter Target -* Jmeter version 5.1.1 or above +* Jmeter version 5.3 or above * Java 8 or above ## Installation Instructions -* Download the source code from the Github. +* Download the source code from the Gitlab. * Just do a mvn clean install (Git bash is required) -* Jar will be generated under the target directory (jmeter-pubsub-sampler-1.0.jar) -* Copy the Jar to \/lib/ext/ +* Jar will be generated under the target directory (jmeter-pubsub-sampler-1.1.jar). +* Copy the Jar to \/lib/ext/ for DI Jmeter \/di/plugins ## How to use it Add required config element (Publisher config/ Subscriber config) @@ -37,7 +37,9 @@ Add required config element (Publisher config/ Subscriber config) # Publisher Info Apart from config element, Publisher sampler has Gzip compression feature * The Flag in the sampler will allow the mechanism to publish the message with/without Gzip compression -* On successfull publish of each message, GCP returns a unique Id which will be returned in response header. +* On successful publish of each message, GCP returns a unique Id which will be returned in response header. +* Supports attributes for messages being published +* Supports multiple topic publishing feature in the same test plan # Subscriber Info The subscriber works perfectly. The current mechanism of Subscriber is @@ -49,6 +51,13 @@ The subscriber works perfectly. The current mechanism of Subscriber is * each message retrieved from the subscriber will have the message ID and publish time info along with the message. * The message ID and publish time info will be returned in response headers and message on response body * The subscriber sampler supports De-Compression, If the message is in gzip compressed format. +* Supports multiple topic subscription feature in the same test plan + +## Changes +* Upgraded pub-sub client version to 1.111.2 +* Supports Attributes for sending messages +* Supports multi-topic publishing, subscribing on same test plan + ## References @@ -58,3 +67,7 @@ Below are the references which guided to build this plugin. * https://cloud.google.com/pubsub/docs/quickstart-client-libraries * https://cloud.google.com/pubsub/docs/publisher * https://cloud.google.com/pubsub/docs/pull + +Please rate a :star2: if you like it. +Please open up :bug: - If you experienced something. + diff --git a/images/PublisherConfig.png b/images/PublisherConfig.png new file mode 100644 index 0000000..a68b701 Binary files /dev/null and b/images/PublisherConfig.png differ diff --git a/images/PublisherSampler.png b/images/PublisherSampler.png new file mode 100644 index 0000000..674962b Binary files /dev/null and b/images/PublisherSampler.png differ diff --git a/images/SubscriberConfig.png b/images/SubscriberConfig.png new file mode 100644 index 0000000..57801b7 Binary files /dev/null and b/images/SubscriberConfig.png differ diff --git a/images/SubscriberSampler.png b/images/SubscriberSampler.png new file mode 100644 index 0000000..3cee0ee Binary files /dev/null and b/images/SubscriberSampler.png differ diff --git a/pom.xml b/pom.xml index a7f6cd3..73adbd5 100644 --- a/pom.xml +++ b/pom.xml @@ -1,12 +1,15 @@ - + 4.0.0 com.di.jmeter.sampler jmeter-pubsub-sampler - 1.1-SNAPSHOT + 1.1 5.1.1 provided - 1.86.0 + 1.111.2 + 2.8.6 @@ -26,6 +29,11 @@ guava 28.1-android + + com.google.code.gson + gson + ${google.gson.version} + com.google.cloud google-cloud-pubsub @@ -94,11 +102,14 @@ io.opencensus:* com.google.protobuf:* com.google.guava:* + com.google.gson:* org.checkerframework:checker-compat-qual com.google.errorprone:error_prone_annotations com.google.j2objc:j2objc-annotations org.codehaus.mojo:animal-sniffer-annotations io.grpc:* + io.grpc.grpclb:* + io.perfmark:* org.threeten:threetenbp com.google.http-client:* com.google.android:annotations @@ -116,6 +127,12 @@ shade + + + + + diff --git a/src/main/java/com/di/jmeter/pubsub/config/PublisherConfig.java b/src/main/java/com/di/jmeter/pubsub/config/PublisherConfig.java index fcf1771..2d30237 100644 --- a/src/main/java/com/di/jmeter/pubsub/config/PublisherConfig.java +++ b/src/main/java/com/di/jmeter/pubsub/config/PublisherConfig.java @@ -15,19 +15,20 @@ * limitations under the License. * */ + package com.di.jmeter.pubsub.config; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.io.Serializable; +import java.util.concurrent.TimeUnit; import org.apache.jmeter.config.ConfigElement; import org.apache.jmeter.config.ConfigTestElement; import org.apache.jmeter.testbeans.TestBean; import org.apache.jmeter.testbeans.TestBeanHelper; import org.apache.jmeter.testelement.TestStateListener; -import org.apache.jmeter.threads.JMeterContextService; import org.apache.jmeter.threads.JMeterVariables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,7 +40,12 @@ import com.google.gson.JsonObject; import com.google.pubsub.v1.ProjectTopicName; -public class PublisherConfig extends ConfigTestElement implements Serializable, ConfigElement, TestStateListener, TestBean { +/** + * @author Mohamed Ibrahim + * + */ +public class PublisherConfig extends ConfigTestElement + implements ConfigElement, TestStateListener, TestBean, Serializable { private static Logger LOGGER = LoggerFactory.getLogger(PublisherConfig.class); private static final long serialVersionUID = 7645049205276507368L; @@ -60,7 +66,7 @@ public class PublisherConfig extends ConfigTestElement implements Serializable, private String authProvider_x509CertUrl; private String client_x509CertUrl; - private static final String PUBSUB_CONNECTION = "publisherConnection"; + private String publisherConnection; // Default Constructor public PublisherConfig() { @@ -83,7 +89,7 @@ public void testStarted() { TestBeanHelper.prepare(this); JMeterVariables variables = getThreadContext().getVariables(); - if (variables.getObject(PUBSUB_CONNECTION) != null) { + if (variables.getObject(publisherConnection) != null) { LOGGER.error("PubSub connection is already established and active !!"); } else { synchronized (this) { @@ -91,8 +97,9 @@ public void testStarted() { publisherClient = Publisher.newBuilder(getGcpTopic()) .setCredentialsProvider(createCredentialsProviderUsingJson(getCredentials())).build(); - variables.putObject(PUBSUB_CONNECTION, publisherClient); - LOGGER.info(String.format("Publisher connection established with the %s successfully !!", getTopic())); + variables.putObject(publisherConnection, publisherClient); + LOGGER.info( + String.format("Publisher connection established with the %s successfully !!", getTopic())); } catch (IOException e) { LOGGER.error("Exception Occured while establishing connection with GCP : "); e.printStackTrace(); @@ -133,8 +140,13 @@ public void testEnded() { synchronized (this) { if (publisherClient != null) { publisherClient.shutdown(); - publisherClient = null; - LOGGER.info("Publisher connection Terminated successfully !!"); + try { + publisherClient.awaitTermination(30, TimeUnit.SECONDS); + LOGGER.info("Publisher connection Terminated successfully !!"); + } catch (InterruptedException e) { + LOGGER.info("Error occurred while terminating Publisher connection"); + e.printStackTrace(); + } } } } @@ -247,8 +259,12 @@ public void setClient_x509CertUrl(String client_x509CertUrl) { this.client_x509CertUrl = client_x509CertUrl; } - public static Publisher getPublisherClient() { - return (Publisher) JMeterContextService.getContext().getVariables().getObject(PUBSUB_CONNECTION); + public String getPublisherConnection() { + return publisherConnection; + } + + public void setPublisherConnection(String publisherConnection) { + this.publisherConnection = publisherConnection; } } diff --git a/src/main/java/com/di/jmeter/pubsub/config/PublisherConfigBeanInfo.java b/src/main/java/com/di/jmeter/pubsub/config/PublisherConfigBeanInfo.java index 1f30a54..f17a588 100644 --- a/src/main/java/com/di/jmeter/pubsub/config/PublisherConfigBeanInfo.java +++ b/src/main/java/com/di/jmeter/pubsub/config/PublisherConfigBeanInfo.java @@ -33,6 +33,8 @@ public class PublisherConfigBeanInfo extends BeanInfoSupport { public PublisherConfigBeanInfo() { super(PublisherConfig.class); + createPropertyGroup("pubConfig", new String[] { "publisherConnection" }); + createPropertyGroup("credentials", new String[] { "type", "projectId", "topic", "privateKey", "privateKeyId", "tokenUri", "clientId", "clientEmail", "client_x509CertUrl", "authUri", "authProvider_x509CertUrl" }); @@ -80,6 +82,10 @@ public PublisherConfigBeanInfo() { propertyDescriptor.setValue(NOT_UNDEFINED, Boolean.TRUE); propertyDescriptor.setValue(DEFAULT, "https://www.googleapis.com/oauth2/v1/certs"); + propertyDescriptor = property("publisherConnection"); + propertyDescriptor.setValue(NOT_UNDEFINED, Boolean.TRUE); + propertyDescriptor.setValue(DEFAULT, ""); + if (LOGGER.isDebugEnabled()) { String pubDescriptorsAsString = Arrays.stream(getPropertyDescriptors()) diff --git a/src/main/java/com/di/jmeter/pubsub/config/SubscriberConfig.java b/src/main/java/com/di/jmeter/pubsub/config/SubscriberConfig.java index f303cca..28ab469 100644 --- a/src/main/java/com/di/jmeter/pubsub/config/SubscriberConfig.java +++ b/src/main/java/com/di/jmeter/pubsub/config/SubscriberConfig.java @@ -22,6 +22,8 @@ import java.io.IOException; import java.io.InputStream; import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; import org.apache.jmeter.config.ConfigElement; import org.apache.jmeter.config.ConfigTestElement; @@ -44,6 +46,7 @@ import com.google.gson.JsonObject; import com.google.pubsub.v1.ProjectSubscriptionName; + public class SubscriberConfig extends ConfigTestElement implements ConfigElement, TestStateListener, TestBean, Serializable { private static final long serialVersionUID = -6527581818773236163L; @@ -69,10 +72,10 @@ public class SubscriberConfig extends ConfigTestElement implements ConfigElement private String maxAckExtensionPeriod; private String maxOutStandingElementCount; private String maxOutstandingRequestBytes; - private static MessagesQueue messagesQueue; - - private static String SUBSCRIBER_CONNECTION = "subscriberConnection"; - private static String MESSAGESQUEUE = "message"; + + private static Map pubsubQueue = new HashMap(); + private String subscriberConnection; + //private static String MESSAGESQUEUE = "message"; private static String SUBSCRIBED_TOPIC="subTopic"; @Override @@ -80,16 +83,17 @@ public void testStarted() { this.setRunningVersion(true); TestBeanHelper.prepare(this); - messagesQueue = new MessagesQueue(100000); + JMeterVariables variables = getThreadContext().getVariables(); ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of(getProjectId(), getSubscriptionId()); - if (variables.getObject(SUBSCRIBER_CONNECTION) != null) { + if (variables.getObject(subscriberConnection) != null) { LOGGER.error("PubSub connection is already established and active !!"); } else { synchronized (this) { try { - + MessagesQueue messagesQueue = new MessagesQueue(100000); + LOGGER.info("Attempting to subscribe to a topic"); if (isFlowControlSetting()) { subscriber = Subscriber.newBuilder(subscriptionName, new SimpleMessageReceiver(messagesQueue)) @@ -109,8 +113,9 @@ public void testStarted() { subscriber.startAsync().awaitRunning(); // subscriber.awaitTerminated(); // Allow the subscriber to run indefinitely unless an error occurs - variables.putObject(SUBSCRIBER_CONNECTION, subscriber); - variables.putObject(MESSAGESQUEUE, messagesQueue); + pubsubQueue.put(getSubscriberConnection(), messagesQueue); + //variables.putObject(subscriberConnection, subscriber); + //variables.putObject(MESSAGESQUEUE, pubsubQueue); variables.putObject(SUBSCRIBED_TOPIC, topic); LOGGER.info(String.format("Subscriber connection established with the %s successfully !!", getTopic())); @@ -341,16 +346,29 @@ public void setMaxOutStandingElementCount(String maxOutStandingElementCount) { } } - public static Subscriber getSubscriber() { - return (Subscriber) JMeterContextService.getContext().getVariables().getObject(SUBSCRIBER_CONNECTION); - } - - public static MessagesQueue getMessagesQueue() { - return (MessagesQueue) JMeterContextService.getContext().getVariables().getObject(MESSAGESQUEUE); - } +// public static MessagesQueue getMessagesQueue() { +// return (MessagesQueue) JMeterContextService.getContext().getVariables().getObject(MESSAGESQUEUE); +// } public static String getSubscribedTopic() { return (String) JMeterContextService.getContext().getVariables().getObject(SUBSCRIBED_TOPIC); } + public String getSubscriberConnection() { + return subscriberConnection; + } + + public void setSubscriberConnection(String subscriberConnection) { + this.subscriberConnection = subscriberConnection; + } + + public static Map getPubsubQueue() { + return pubsubQueue; + } + + public static void setPubsubQueue(Map pubsubQueue) { + SubscriberConfig.pubsubQueue = pubsubQueue; + } + + } diff --git a/src/main/java/com/di/jmeter/pubsub/config/SubscriberConfigBeanInfo.java b/src/main/java/com/di/jmeter/pubsub/config/SubscriberConfigBeanInfo.java index 804aea7..16d5b77 100644 --- a/src/main/java/com/di/jmeter/pubsub/config/SubscriberConfigBeanInfo.java +++ b/src/main/java/com/di/jmeter/pubsub/config/SubscriberConfigBeanInfo.java @@ -31,6 +31,8 @@ public class SubscriberConfigBeanInfo extends BeanInfoSupport { public SubscriberConfigBeanInfo() { super(SubscriberConfig.class); + + createPropertyGroup("subConfig", new String[] { "subscriberConnection" }); createPropertyGroup("credentials", new String[] { "type", "projectId", "topic", "subscriptionId", "privateKey", "privateKeyId", "tokenUri", @@ -107,6 +109,10 @@ public SubscriberConfigBeanInfo() { p = property("maxOutstandingRequestBytes"); p.setValue(NOT_UNDEFINED, Boolean.TRUE); p.setValue(DEFAULT, "1_000_000_000L"); + + p = property("subscriberConnection"); + p.setValue(NOT_UNDEFINED, Boolean.TRUE); + p.setValue(DEFAULT, ""); if (LOGGER.isDebugEnabled()) { diff --git a/src/main/java/com/di/jmeter/pubsub/sampler/PublisherSampler.java b/src/main/java/com/di/jmeter/pubsub/sampler/PublisherSampler.java index a79f7b6..8f9dc42 100644 --- a/src/main/java/com/di/jmeter/pubsub/sampler/PublisherSampler.java +++ b/src/main/java/com/di/jmeter/pubsub/sampler/PublisherSampler.java @@ -18,15 +18,17 @@ package com.di.jmeter.pubsub.sampler; -import com.google.api.core.ApiFuture; -import com.google.cloud.pubsub.v1.Publisher; -import com.google.gson.Gson; -import com.google.gson.JsonSyntaxException; -import com.google.gson.reflect.TypeToken; -import com.google.protobuf.ByteString; -import com.google.pubsub.v1.PubsubMessage; - -import com.di.jmeter.pubsub.config.PublisherConfig; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.zip.GZIPOutputStream; import org.apache.jmeter.config.ConfigTestElement; import org.apache.jmeter.engine.util.ConfigMergabilityIndicator; @@ -35,28 +37,22 @@ import org.apache.jmeter.samplers.Sampler; import org.apache.jmeter.testbeans.TestBean; import org.apache.jmeter.testelement.TestElement; +import org.apache.jmeter.threads.JMeterContextService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.lang.reflect.Type; -import java.nio.charset.StandardCharsets; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashSet; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.ExecutionException; -import java.util.zip.GZIPOutputStream; +import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.databind.JsonMappingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.api.core.ApiFuture; +import com.google.cloud.pubsub.v1.Publisher; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.PubsubMessage; public class PublisherSampler extends PublisherTestElement implements Sampler, TestBean, ConfigMergabilityIndicator { private static final long serialVersionUID = -2509242423429019193L; private static final Logger LOGGER = LoggerFactory.getLogger(PublisherSampler.class); - private static final Gson gson = new Gson(); - private static final Type TYPED_MAP = new TypeToken>() {}.getType(); private Publisher publisher = null; private static final Set APPLIABLE_CONFIG_CLASSES = new HashSet<>( @@ -65,25 +61,37 @@ public class PublisherSampler extends PublisherTestElement implements Sampler, T @Override public SampleResult sample(Entry e) { PubsubMessage template = null; + Map attributes = null; + byte[] byteMsg; SampleResult result = new SampleResult(); result.setSampleLabel(getName()); result.setSamplerData(request()); result.setDataType(SampleResult.TEXT); result.setContentType("text/plain"); result.setDataEncoding(StandardCharsets.UTF_8.name()); - - Map attributes = convertStringToAttributesMap(getAttributes()); + + try { + attributes = convertStringToAttributesMap(getAttributes()); + } catch (JsonParseException e1) { + e1.printStackTrace(); + } catch (JsonMappingException e1) { + e1.printStackTrace(); + } catch (IOException e1) { + e1.printStackTrace(); + } if (isGzipCompression()) { - template = createPubsubMessage(createEventCompressed(getMessage()), attributes); + byteMsg = createEventCompressed(getMessage()); } else { - template = convertStringToPubSubMessage(getMessage(), attributes); + byteMsg = ByteString.copyFromUtf8(getMessage()).toByteArray(); } result.sampleStart(); try { + template = createPubsubMessage(byteMsg, attributes); publish(template, result); + } catch (Exception ex) { LOGGER.info("Exception occurred while publishing message"); result = handleException(result, ex); @@ -93,29 +101,20 @@ public SampleResult sample(Entry e) { return result; } - private Map convertStringToAttributesMap(String attributesAsString){ - return Optional.ofNullable(attributesAsString) - - .map(str -> { - Map result = null; - try { - result = gson.fromJson(str, TYPED_MAP); - } catch (JsonSyntaxException sse) { - LOGGER.error("Failed to convert string attributes: {} to Map", attributesAsString, sse); - } - return result; - }) + @SuppressWarnings("unchecked") + private Map convertStringToAttributesMap(String attributes) throws JsonParseException, JsonMappingException, IOException { + + if(!attributes.equals("")) { + return (HashMap) new ObjectMapper().readValue(attributes, Map.class); + + } - .orElse(Collections.emptyMap()); + return Collections.emptyMap(); } // Returns Modified templates/Message as template for publishing - private PubsubMessage convertStringToPubSubMessage(String message, Map attributes) { - return PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(message)).putAllAttributes(attributes).build(); - } - private byte[] createEventCompressed(String message) { - //BufferedWriter zipWriter = null; + // BufferedWriter zipWriter = null; try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(message.length())) { try (GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteArrayOutputStream)) { gzipOutputStream.write(message.getBytes(StandardCharsets.UTF_8)); @@ -131,28 +130,31 @@ private String publish(PubsubMessage template, SampleResult result) { String resp = null; ApiFuture future = null; if (this.publisher == null) { - this.publisher = PublisherConfig.getPublisherClient(); + this.publisher = (Publisher) JMeterContextService.getContext().getVariables() + .getObject(getPublisherClientObject()); } try { + future = publisher.publish(template); - result.setResponseHeaders("MessagePublishedID: "+ future.get()); + result.setResponseHeaders("MessagePublishedID: " + future.get()); result.setResponseData(template.toString(), StandardCharsets.UTF_8.name()); result.setSuccessful(true); result.setResponseCode("200"); result.setResponseMessageOK(); - + } catch (ExecutionException e) { LOGGER.info("Publisher config not initialized properly.. Check the config element"); handleException(result, e); } catch (InterruptedException e) { e.printStackTrace(); } - + return resp; } - public static PubsubMessage createPubsubMessage(byte[] msg, Map attributes) { + public static PubsubMessage createPubsubMessage(byte[] msg, Map attributes) { + return PubsubMessage.newBuilder().setData(ByteString.copyFrom(msg)).putAllAttributes(attributes).build(); } @@ -165,7 +167,8 @@ private String request() { private SampleResult handleException(SampleResult result, Exception ex) { result.setResponseMessage("Message Publish Error"); result.setResponseCode("500"); - result.setResponseData(String.format("Error in publishing message to PubSub topic : %s", ex.toString()).getBytes());// PublisherConfig.getTopic() + result.setResponseData( + String.format("Error in publishing message to PubSub topic : %s", ex.toString()).getBytes()); result.setSuccessful(false); return result; } diff --git a/src/main/java/com/di/jmeter/pubsub/sampler/PublisherTestElement.java b/src/main/java/com/di/jmeter/pubsub/sampler/PublisherTestElement.java index fb42fb3..9e1bf9e 100644 --- a/src/main/java/com/di/jmeter/pubsub/sampler/PublisherTestElement.java +++ b/src/main/java/com/di/jmeter/pubsub/sampler/PublisherTestElement.java @@ -15,30 +15,42 @@ * limitations under the License. * */ + package com.di.jmeter.pubsub.sampler; +import java.io.Serializable; + import org.apache.jmeter.gui.Searchable; import org.apache.jmeter.testelement.AbstractTestElement; import org.apache.jmeter.testelement.TestElement; import org.apache.jmeter.testelement.TestStateListener; -import java.io.Serializable; - public abstract class PublisherTestElement extends AbstractTestElement implements TestStateListener, TestElement, Serializable, Searchable { + private static final long serialVersionUID = 7027549399338665744L; - private boolean gzipCompression = false; - private String message = ""; - private String attributes = ""; - + private boolean gzipCompression; + private String message; + private String publisherClientObject; + private String attributes; + public String getMessage() { return message; } + public String getAttributes() { + return attributes; + } + + public void setAttributes(String attributes) { + this.attributes = attributes; + } + public void setMessage(String message) { this.message = message; } + public boolean isGzipCompression() { return gzipCompression; @@ -48,12 +60,14 @@ public void setGzipCompression(boolean gzipCompression) { this.gzipCompression = gzipCompression; } - public String getAttributes() { - return attributes; + public String getPublisherClientObject() { + return publisherClientObject; } - public void setAttributes(String attributes) { - this.attributes = attributes; + public void setPublisherClientObject(String publisherClientObject) { + this.publisherClientObject = publisherClientObject; } + + } diff --git a/src/main/java/com/di/jmeter/pubsub/sampler/PublisherTestElementBeanInfoSupport.java b/src/main/java/com/di/jmeter/pubsub/sampler/PublisherTestElementBeanInfoSupport.java index a624418..8819175 100644 --- a/src/main/java/com/di/jmeter/pubsub/sampler/PublisherTestElementBeanInfoSupport.java +++ b/src/main/java/com/di/jmeter/pubsub/sampler/PublisherTestElementBeanInfoSupport.java @@ -15,32 +15,37 @@ * limitations under the License. * */ + package com.di.jmeter.pubsub.sampler; +import java.beans.PropertyDescriptor; + import org.apache.jmeter.testbeans.BeanInfoSupport; import org.apache.jmeter.testbeans.TestBean; import org.apache.jmeter.testbeans.gui.TypeEditor; -import java.beans.PropertyDescriptor; - public class PublisherTestElementBeanInfoSupport extends BeanInfoSupport { protected PublisherTestElementBeanInfoSupport(Class beanClass) { super(beanClass); - createPropertyGroup("Message to publish", new String[] { "gzipCompression", "message", "attributes"}); + createPropertyGroup("Message to publish", new String[] { "publisherClientObject", "gzipCompression", "attributes", "message"}); PropertyDescriptor propertyDescriptor = property("message", TypeEditor.TextAreaEditor); propertyDescriptor.setValue(NOT_UNDEFINED, Boolean.TRUE); - propertyDescriptor.setValue(DEFAULT, "{\"demoMessage\":\"Hello World!\"}"); - + propertyDescriptor.setValue(DEFAULT, "{\"demoMessage\":\"Hello World from DI pubsub sampler!\"}"); + propertyDescriptor = property("gzipCompression"); propertyDescriptor.setValue(NOT_UNDEFINED, Boolean.TRUE); propertyDescriptor.setValue(DEFAULT, Boolean.FALSE); - - propertyDescriptor = property("attributes", TypeEditor.TextAreaEditor); + + propertyDescriptor = property("publisherClientObject", TypeEditor.ComboStringEditor); + propertyDescriptor.setValue(NOT_UNDEFINED, Boolean.TRUE); + propertyDescriptor.setValue(DEFAULT, ""); + + propertyDescriptor = property("attributes", TypeEditor.TextAreaEditor); propertyDescriptor.setValue(NOT_UNDEFINED, Boolean.TRUE); - propertyDescriptor.setValue(DEFAULT, "{\"attributeKey\":\"attributeValue\"}"); + propertyDescriptor.setValue("default", ""); } diff --git a/src/main/java/com/di/jmeter/pubsub/sampler/SubscriberSampler.java b/src/main/java/com/di/jmeter/pubsub/sampler/SubscriberSampler.java index de7097c..26cbdd2 100644 --- a/src/main/java/com/di/jmeter/pubsub/sampler/SubscriberSampler.java +++ b/src/main/java/com/di/jmeter/pubsub/sampler/SubscriberSampler.java @@ -15,6 +15,7 @@ * limitations under the License. * */ + package com.di.jmeter.pubsub.sampler; import java.io.BufferedReader; @@ -24,6 +25,7 @@ import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.HashSet; +import java.util.Map; import java.util.Set; import java.util.zip.GZIPInputStream; @@ -52,6 +54,9 @@ public class SubscriberSampler extends SubscriberTestElement implements Sampler, private static PubsubMessage reader; private String ackDelay; private boolean decompression; + private String subscriberObject; + + private Map pubsubQueue; @Override public SampleResult sample(Entry e) { @@ -81,18 +86,20 @@ public SampleResult sample(Entry e) { } private PubsubMessage readMessageFromTopic(SampleResult result) throws IOException { - - if (messagesQueue == null) { - this.messagesQueue = SubscriberConfig.getMessagesQueue(); + if(messagesQueue == null) { + if(pubsubQueue == null ) { + this.pubsubQueue = SubscriberConfig.getPubsubQueue(); + } + this.messagesQueue = pubsubQueue.get(getSubscriberObject()); } try { reader = messagesQueue.take(); if (isDecompression()) { - result.setResponseData(createDeCompressedMessage(reader.getData().toByteArray()), + result.setResponseData(createDeCompressedMessage(reader.toByteArray()), StandardCharsets.UTF_8.name()); } else { - result.setResponseData(reader.getData().toStringUtf8(), StandardCharsets.UTF_8.name()); + result.setResponseData(reader.toString(), StandardCharsets.UTF_8.name()); } result.setResponseHeaders("PublishedMessageID: " + reader.getMessageId() + "\npublish_time in "+reader.getPublishTime()); result.setSuccessful(true); @@ -175,4 +182,12 @@ public void setDecompression(boolean decompression) { this.decompression = decompression; } + public String getSubscriberObject() { + return subscriberObject; + } + + public void setSubscriberObject(String subscriberObject) { + this.subscriberObject = subscriberObject; + } + } diff --git a/src/main/java/com/di/jmeter/pubsub/sampler/SubscriberSamplerBeanInfo.java b/src/main/java/com/di/jmeter/pubsub/sampler/SubscriberSamplerBeanInfo.java index c454206..e9ec6ad 100644 --- a/src/main/java/com/di/jmeter/pubsub/sampler/SubscriberSamplerBeanInfo.java +++ b/src/main/java/com/di/jmeter/pubsub/sampler/SubscriberSamplerBeanInfo.java @@ -15,6 +15,7 @@ * limitations under the License. * */ + package com.di.jmeter.pubsub.sampler; public class SubscriberSamplerBeanInfo extends SubscriberTestElementBeanInfoSupport{ diff --git a/src/main/java/com/di/jmeter/pubsub/sampler/SubscriberTestElement.java b/src/main/java/com/di/jmeter/pubsub/sampler/SubscriberTestElement.java index 75e0adf..15446f1 100644 --- a/src/main/java/com/di/jmeter/pubsub/sampler/SubscriberTestElement.java +++ b/src/main/java/com/di/jmeter/pubsub/sampler/SubscriberTestElement.java @@ -15,6 +15,7 @@ * limitations under the License. * */ + package com.di.jmeter.pubsub.sampler; import java.io.Serializable; diff --git a/src/main/java/com/di/jmeter/pubsub/sampler/SubscriberTestElementBeanInfoSupport.java b/src/main/java/com/di/jmeter/pubsub/sampler/SubscriberTestElementBeanInfoSupport.java index e78707d..8c5d068 100644 --- a/src/main/java/com/di/jmeter/pubsub/sampler/SubscriberTestElementBeanInfoSupport.java +++ b/src/main/java/com/di/jmeter/pubsub/sampler/SubscriberTestElementBeanInfoSupport.java @@ -15,6 +15,7 @@ * limitations under the License. * */ + package com.di.jmeter.pubsub.sampler; import java.beans.PropertyDescriptor; @@ -27,7 +28,7 @@ public class SubscriberTestElementBeanInfoSupport extends BeanInfoSupport { protected SubscriberTestElementBeanInfoSupport(Class beanClass) { super(beanClass); - createPropertyGroup("SubscriberProperties", new String[] { "ackDelay", "decompression"}); + createPropertyGroup("Subscriber Properties", new String[] { "subscriberObject", "ackDelay", "decompression"}); PropertyDescriptor propertyDescriptor = property("ackDelay"); propertyDescriptor.setValue(NOT_UNDEFINED, Boolean.TRUE); @@ -37,6 +38,12 @@ protected SubscriberTestElementBeanInfoSupport(Class beanCla propertyDescriptor.setValue(NOT_UNDEFINED, Boolean.TRUE); propertyDescriptor.setValue(DEFAULT, Boolean.FALSE); + propertyDescriptor = property("subscriberObject"); + propertyDescriptor.setValue(NOT_UNDEFINED, Boolean.TRUE); + propertyDescriptor.setValue(DEFAULT, ""); + + + } } diff --git a/src/main/java/com/di/jmeter/pubsub/utils/MessagesQueue.java b/src/main/java/com/di/jmeter/pubsub/utils/MessagesQueue.java index 105fc78..17840b6 100644 --- a/src/main/java/com/di/jmeter/pubsub/utils/MessagesQueue.java +++ b/src/main/java/com/di/jmeter/pubsub/utils/MessagesQueue.java @@ -51,4 +51,3 @@ public boolean offer(PubsubMessage message){ } - diff --git a/src/main/java/com/di/jmeter/pubsub/utils/SimpleMessageReceiver.java b/src/main/java/com/di/jmeter/pubsub/utils/SimpleMessageReceiver.java index 11c1163..397ae1e 100644 --- a/src/main/java/com/di/jmeter/pubsub/utils/SimpleMessageReceiver.java +++ b/src/main/java/com/di/jmeter/pubsub/utils/SimpleMessageReceiver.java @@ -15,6 +15,7 @@ * limitations under the License. * */ + package com.di.jmeter.pubsub.utils; import com.google.cloud.pubsub.v1.AckReplyConsumer; diff --git a/src/main/resources/com/di/jmeter/pubsub/config/PublisherConfigResources.properties b/src/main/resources/com/di/jmeter/pubsub/config/PublisherConfigResources.properties index 51e777c..d79397c 100644 --- a/src/main/resources/com/di/jmeter/pubsub/config/PublisherConfigResources.properties +++ b/src/main/resources/com/di/jmeter/pubsub/config/PublisherConfigResources.properties @@ -16,8 +16,12 @@ # displayName=PubSub-PublisherConfiguration +pubConfig.displayName=Publisher Topic credentials.displayName=Credentials Configuration +publisherConnection.displayName=Publisher Topic +publisherConnection.shortDescription=Export the Publisher connection config to the Variable + projectId.displayName=projectId projectId.shortDescription=projectId type.displayName=type diff --git a/src/main/resources/com/di/jmeter/pubsub/config/SubscriberConfigResources.properties b/src/main/resources/com/di/jmeter/pubsub/config/SubscriberConfigResources.properties index 6350337..f704af0 100644 --- a/src/main/resources/com/di/jmeter/pubsub/config/SubscriberConfigResources.properties +++ b/src/main/resources/com/di/jmeter/pubsub/config/SubscriberConfigResources.properties @@ -16,8 +16,12 @@ # displayName=PubSub-SubscriberConfiguration +subConfig.displayName=Subscriber Topic credentials.displayName=Credentials Configuration +subscriberConnection.displayName=Subscriber Topic +subscriberConnection.shortDescription=Export the Subscriber connection config to the Variable + projectId.displayName=projectId projectId.shortDescription=projectId subscriptionId.displayName=subscriptionId diff --git a/src/main/resources/com/di/jmeter/pubsub/sampler/PublisherSamplerResources.properties b/src/main/resources/com/di/jmeter/pubsub/sampler/PublisherSamplerResources.properties index fad473f..8a9e92f 100644 --- a/src/main/resources/com/di/jmeter/pubsub/sampler/PublisherSamplerResources.properties +++ b/src/main/resources/com/di/jmeter/pubsub/sampler/PublisherSamplerResources.properties @@ -16,12 +16,13 @@ # displayName=Publisher Request -#template.displayName=Template +publisherClientObject.displayName=Publisher Topic +publisherClientObject.shortDescription=Set to import the config element for particular topic gzipCompression.displayName=Gzip-Compression gzipCompression.shortDescription=Set to enable compression for messages +attributes.displayName=Attributes +attributes.shortrDesscription=Attributes to be added in key value pair message.displayName=Message message.shortDescription=Message - This will supercedes template Type -attributes.displayName=Attributes -attributes.shortDescription=Attributes - Specify message attributes diff --git a/src/main/resources/com/di/jmeter/pubsub/sampler/SubscriberSamplerResources.properties b/src/main/resources/com/di/jmeter/pubsub/sampler/SubscriberSamplerResources.properties index 18246ff..4d87b26 100644 --- a/src/main/resources/com/di/jmeter/pubsub/sampler/SubscriberSamplerResources.properties +++ b/src/main/resources/com/di/jmeter/pubsub/sampler/SubscriberSamplerResources.properties @@ -16,6 +16,9 @@ # displayName=Subscriber Request + +subscriberObject.displayName=Subscriber Topic +subscriberObject.shortDescription=Set to import the config element for particular topic ackDelay.displayName=ACK Delay ackDelay.shortDescription= Time delay to acknowledge the received message decompression.displayName=Enable Decompression