diff --git a/ballerina/caller.bal b/ballerina/caller.bal index 633b1546..6e880750 100644 --- a/ballerina/caller.bal +++ b/ballerina/caller.bal @@ -24,6 +24,6 @@ public isolated client class Caller { # + message - JMS message record # + return - `jms:Error` if there is an error in the execution or else nil isolated remote function acknowledge(Message message) returns Error? = @java:Method { - 'class: "io.ballerina.stdlib.java.jms.JmsConsumer" + 'class: "io.ballerina.stdlib.java.jms.consumer.Actions" } external; } diff --git a/ballerina/message_consumer.bal b/ballerina/message_consumer.bal index 87704a36..3a4928b6 100644 --- a/ballerina/message_consumer.bal +++ b/ballerina/message_consumer.bal @@ -70,7 +70,7 @@ public isolated client class MessageConsumer { isolated function externInit(Session session, ConsumerOptions consumerOptions) returns Error? = @java:Method { name: "init", - 'class: "io.ballerina.stdlib.java.jms.JmsConsumer" + 'class: "io.ballerina.stdlib.java.jms.consumer.Actions" } external; # Receives the next message that arrives within the specified timeout interval. @@ -78,14 +78,14 @@ public isolated client class MessageConsumer { # + timeoutMillis - Message receive timeout # + return - `jms:JmsMessage` or `jsm:Error` if there is an error in the execution isolated remote function receive(int timeoutMillis = 0) returns Message|Error? = @java:Method { - 'class: "io.ballerina.stdlib.java.jms.JmsConsumer" + 'class: "io.ballerina.stdlib.java.jms.consumer.Actions" } external; # Receives the next message if one is immediately available. # # + return - `jms:JmsMessage` or `jsm:Error` if there is an error in the execution isolated remote function receiveNoWait() returns Message|Error? = @java:Method { - 'class: "io.ballerina.stdlib.java.jms.JmsConsumer" + 'class: "io.ballerina.stdlib.java.jms.consumer.Actions" } external; # Mark a JMS message as received. @@ -93,13 +93,13 @@ public isolated client class MessageConsumer { # + message - JMS message record # + return - `jms:Error` if there is an error in the execution or else nil isolated remote function acknowledge(Message message) returns Error? = @java:Method { - 'class: "io.ballerina.stdlib.java.jms.JmsConsumer" + 'class: "io.ballerina.stdlib.java.jms.consumer.Actions" } external; # Closes the message consumer. # # + return - `jms:Error` if there is an error or else nil isolated remote function close() returns Error? = @java:Method { - 'class: "io.ballerina.stdlib.java.jms.JmsConsumer" + 'class: "io.ballerina.stdlib.java.jms.consumer.Actions" } external; } diff --git a/ballerina/message_listener.bal b/ballerina/message_listener.bal index 7fd947fa..29be9961 100644 --- a/ballerina/message_listener.bal +++ b/ballerina/message_listener.bal @@ -63,7 +63,7 @@ public isolated class Listener { # ``` # # + 'service - The service to be detached - # + return - A `kafka:Error` if an error is encountered while detaching a service or else `()` + # + return - A `jms:Error` if an error is encountered while detaching a service or else `()` public isolated function detach(Service 'service) returns Error? {} # Starts the endpoint. @@ -94,5 +94,5 @@ public isolated class Listener { } isolated function setMessageListener(MessageConsumer consumer, Service 'service) returns Error? = @java:Method { - 'class: "io.ballerina.stdlib.java.jms.JmsMessageListenerUtils" + 'class: "io.ballerina.stdlib.java.jms.listener.Utils" } external; diff --git a/ballerina/message_producer.bal b/ballerina/message_producer.bal index 70c18b34..6c5e3c22 100644 --- a/ballerina/message_producer.bal +++ b/ballerina/message_producer.bal @@ -27,7 +27,7 @@ public isolated client class MessageProducer { isolated function externInit(Session session, Destination? destination) returns Error? = @java:Method { name: "init", - 'class: "io.ballerina.stdlib.java.jms.JmsProducer" + 'class: "io.ballerina.stdlib.java.jms.producer.Actions" } external; # Sends a message to the JMS provider. @@ -41,7 +41,7 @@ public isolated client class MessageProducer { isolated function externSend(handle message) returns Error? = @java:Method { name: "send", - 'class: "io.ballerina.stdlib.java.jms.JmsProducer" + 'class: "io.ballerina.stdlib.java.jms.producer.Actions" } external; # Sends a message to a given destination of the JMS provider. @@ -57,14 +57,14 @@ public isolated client class MessageProducer { isolated function externSendTo(Session session, Destination destination, handle message) returns Error? = @java:Method { name: "sendTo", - 'class: "io.ballerina.stdlib.java.jms.JmsProducer" + 'class: "io.ballerina.stdlib.java.jms.producer.Actions" } external; # Closes the message producer. # # + return - `jms:Error` if there is an error or else nil isolated remote function close() returns Error? = @java:Method { - 'class: "io.ballerina.stdlib.java.jms.JmsProducer" + 'class: "io.ballerina.stdlib.java.jms.producer.Actions" } external; }; diff --git a/ballerina/tests/jms_message_listener_tests.bal b/ballerina/tests/jms_message_listener_tests.bal index 43d31c18..42f0835c 100644 --- a/ballerina/tests/jms_message_listener_tests.bal +++ b/ballerina/tests/jms_message_listener_tests.bal @@ -430,6 +430,32 @@ isolated function testMessageListenerImmediateStop() returns error? { check msgListener.immediateStop(); } +@test:Config { + groups: ["messageListener"] +} +isolated function testMessageListenerAttachWithoutSvcPath() returns error? { + Listener msgListener = check new ( + connectionConfig = { + initialContextFactory: "org.apache.activemq.jndi.ActiveMQInitialContextFactory", + providerUrl: "tcp://localhost:61616" + }, + acknowledgementMode = CLIENT_ACKNOWLEDGE, + consumerOptions = { + destination: { + 'type: QUEUE, + name: "test-caller" + } + } + ); + Service consumerSvc = service object { + remote function onMessage(Message message, Caller caller) returns error? {} + }; + check msgListener.attach(consumerSvc); + check msgListener.'start(); + runtime:sleep(2); + check msgListener.immediateStop(); +} + @test:AfterGroups { value: ["messageListener"] } diff --git a/native/src/main/java/io/ballerina/stdlib/java.jms/BallerinaJmsException.java b/native/src/main/java/io/ballerina/stdlib/java.jms/BallerinaJmsException.java index 1c3f2b4a..c113b922 100644 --- a/native/src/main/java/io/ballerina/stdlib/java.jms/BallerinaJmsException.java +++ b/native/src/main/java/io/ballerina/stdlib/java.jms/BallerinaJmsException.java @@ -23,7 +23,7 @@ */ public class BallerinaJmsException extends Exception { - BallerinaJmsException(String message) { + public BallerinaJmsException(String message) { super(message); } diff --git a/native/src/main/java/io/ballerina/stdlib/java.jms/Constants.java b/native/src/main/java/io/ballerina/stdlib/java.jms/Constants.java index 88cbf6c7..85d64e98 100644 --- a/native/src/main/java/io/ballerina/stdlib/java.jms/Constants.java +++ b/native/src/main/java/io/ballerina/stdlib/java.jms/Constants.java @@ -106,12 +106,12 @@ public class Constants { static final String SESSION_TRANSACTED_MODE = "SESSION_TRANSACTED"; // Native properties in respective ballerina objects - static final String NATIVE_CONNECTION = "connection"; - static final String NATIVE_SESSION = "session"; - static final String NATIVE_PRODUCER = "producer"; - static final String NATIVE_CONSUMER = "consumer"; + public static final String NATIVE_CONNECTION = "connection"; + public static final String NATIVE_SESSION = "session"; + public static final String NATIVE_PRODUCER = "producer"; + public static final String NATIVE_CONSUMER = "consumer"; - static final String NATIVE_MESSAGE = "message"; + public static final String NATIVE_MESSAGE = "message"; // Ballerina JMS message types static final String MESSAGE_BAL_RECORD_NAME = "Message"; @@ -124,7 +124,7 @@ public class Constants { static final BString TIMESTAMP = StringUtils.fromString("timestamp"); static final BString CORRELATION_ID = StringUtils.fromString("correlationId"); static final BString REPLY_TO = StringUtils.fromString("replyTo"); - static final BString DESTINATION = StringUtils.fromString("destination"); + public static final BString DESTINATION = StringUtils.fromString("destination"); static final BString DELIVERY_MODE = StringUtils.fromString("deliveryMode"); static final BString REDELIVERED = StringUtils.fromString("redelivered"); static final BString JMS_TYPE = StringUtils.fromString("jmsType"); @@ -139,14 +139,14 @@ public class Constants { static final BString TOPIC = StringUtils.fromString("TOPIC"); static final BString TEMPORARY_TOPIC = StringUtils.fromString("TEMPORARY_TOPIC"); - static final String SERVICE_RESOURCE_ON_MESSAGE = "onMessage"; + public static final String SERVICE_RESOURCE_ON_MESSAGE = "onMessage"; static final String SERVICE_RESOURCE_ON_TEXT_MESSAGE = "onTextMessage"; static final String SERVICE_RESOURCE_ON_MAP_MESSAGE = "onMapMessage"; static final String SERVICE_RESOURCE_ON_BYTES_MESSAGE = "onBytesMessage"; static final String SERVICE_RESOURCE_ON_STREAM_MESSAGE = "onStreamMessage"; static final String SERVICE_RESOURCE_ON_OTHER_MESSAGE = "onOtherMessage"; - static final String CALLER = "Caller"; + public static final String CALLER = "Caller"; private Constants() { } diff --git a/native/src/main/java/io/ballerina/stdlib/java.jms/JmsConsumer.java b/native/src/main/java/io/ballerina/stdlib/java.jms/consumer/Actions.java similarity index 69% rename from native/src/main/java/io/ballerina/stdlib/java.jms/JmsConsumer.java rename to native/src/main/java/io/ballerina/stdlib/java.jms/consumer/Actions.java index 7e70b817..20b01159 100644 --- a/native/src/main/java/io/ballerina/stdlib/java.jms/JmsConsumer.java +++ b/native/src/main/java/io/ballerina/stdlib/java.jms/consumer/Actions.java @@ -16,15 +16,21 @@ * under the License. */ -package io.ballerina.stdlib.java.jms; +package io.ballerina.stdlib.java.jms.consumer; +import io.ballerina.runtime.api.Environment; +import io.ballerina.runtime.api.Future; import io.ballerina.runtime.api.utils.StringUtils; +import io.ballerina.runtime.api.values.BError; import io.ballerina.runtime.api.values.BMap; import io.ballerina.runtime.api.values.BObject; import io.ballerina.runtime.api.values.BString; +import io.ballerina.stdlib.java.jms.BallerinaJmsException; import java.util.Objects; import java.util.Optional; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import javax.jms.Destination; import javax.jms.JMSException; @@ -40,12 +46,15 @@ import static io.ballerina.stdlib.java.jms.Constants.DESTINATION; import static io.ballerina.stdlib.java.jms.Constants.JMS_ERROR; import static io.ballerina.stdlib.java.jms.Constants.NATIVE_CONSUMER; +import static io.ballerina.stdlib.java.jms.Constants.NATIVE_MESSAGE; import static io.ballerina.stdlib.java.jms.Constants.NATIVE_SESSION; /** * Represents {@link javax.jms.MessageConsumer} related utility functions. */ -public class JmsConsumer { +public class Actions { + private static final ExecutorService executorService = Executors.newCachedThreadPool(new ConsumerThreadFactory()); + private static final BString CONSUMER_TYPE = StringUtils.fromString("type"); private static final BString MESSAGE_SELECTOR = StringUtils.fromString("messageSelector"); private static final BString NO_LOCAL = StringUtils.fromString("noLocal"); @@ -111,56 +120,76 @@ private static MessageConsumer createConsumer(Session session, BMap { + try { + Message message = nativeConsumer.receive(timeout); + if (Objects.isNull(message)) { + balFuture.complete(null); + } else { + BMap ballerinaMsg = getBallerinaMessage(message); + balFuture.complete(ballerinaMsg); + } + } catch (JMSException exception) { + BError bError = createError(JMS_ERROR, + String.format("Error occurred while receiving messages: %s", exception.getMessage()), + exception); + balFuture.complete(bError); + } catch (BallerinaJmsException exception) { + balFuture.complete(createError(JMS_ERROR, exception.getMessage(), exception)); + } catch (Exception exception) { + BError bError = createError(JMS_ERROR, + String.format("Unknown error occurred while processing the received messages: %s", + exception.getMessage()), exception); + balFuture.complete(bError); } - return getBallerinaMessage(message); - } catch (JMSException exception) { - return createError(JMS_ERROR, - String.format("Error occurred while receiving messages: %s", exception.getMessage()), exception); - } catch (BallerinaJmsException exception) { - return createError(JMS_ERROR, exception.getMessage(), exception); - } catch (Exception exception) { - return createError(JMS_ERROR, - String.format("Unknown error occurred while processing the received messages: %s", - exception.getMessage()), exception); - } + }); + return null; } /** * Receives the next message if one is immediately available. * + * @param env Ballerina runtime environment * @param consumer Ballerina consumer object * @return A Ballerina `jms:Error` if the JMS MessageConsumer fails to receive the message due to some error * or else the next message produced for this message consumer, or null */ - public static Object receiveNoWait(BObject consumer) { + public static Object receiveNoWait(Environment env, BObject consumer) { MessageConsumer nativeConsumer = (MessageConsumer) consumer.getNativeData(NATIVE_CONSUMER); - try { - Message message = nativeConsumer.receiveNoWait(); - if (Objects.isNull(message)) { - return null; + Future balFuture = env.markAsync(); + executorService.execute(() -> { + try { + Message message = nativeConsumer.receiveNoWait(); + if (Objects.isNull(message)) { + balFuture.complete(null); + } else { + BMap ballerinaMsg = getBallerinaMessage(message); + balFuture.complete(ballerinaMsg); + } + } catch (JMSException exception) { + BError bError = createError(JMS_ERROR, + String.format("Error occurred while receiving messages: %s", exception.getMessage()), + exception); + balFuture.complete(bError); + } catch (BallerinaJmsException exception) { + balFuture.complete(createError(JMS_ERROR, exception.getMessage(), exception)); + } catch (Exception exception) { + BError bError = createError(JMS_ERROR, + String.format("Unknown error occurred while processing the received messages: %s", + exception.getMessage()), exception); + balFuture.complete(bError); } - return getBallerinaMessage(message); - } catch (JMSException exception) { - return createError(JMS_ERROR, - String.format("Error occurred while receiving messages: %s", exception.getMessage()), exception); - } catch (BallerinaJmsException exception) { - return createError(JMS_ERROR, exception.getMessage(), exception); - } catch (Exception exception) { - return createError(JMS_ERROR, - String.format("Unknown error occurred while processing the received messages: %s", - exception.getMessage()), exception); - } + }); + return null; } /** @@ -189,7 +218,7 @@ public static Object close(BObject consumer) { */ public static Object acknowledge(BMap message) { try { - Object nativeMessage = message.getNativeData(Constants.NATIVE_MESSAGE); + Object nativeMessage = message.getNativeData(NATIVE_MESSAGE); if (Objects.nonNull(nativeMessage)) { ((Message) nativeMessage).acknowledge(); } diff --git a/native/src/main/java/io/ballerina/stdlib/java.jms/consumer/ConsumerThreadFactory.java b/native/src/main/java/io/ballerina/stdlib/java.jms/consumer/ConsumerThreadFactory.java new file mode 100644 index 00000000..18ffa80d --- /dev/null +++ b/native/src/main/java/io/ballerina/stdlib/java.jms/consumer/ConsumerThreadFactory.java @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2023, WSO2 LLC. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.ballerina.stdlib.java.jms.consumer; + +import java.util.concurrent.ThreadFactory; + +/** + * A {@link ThreadFactory} object that creates new threads on demand for JMS consumer network actions. + */ +class ConsumerThreadFactory implements ThreadFactory { + + @Override + public Thread newThread(Runnable runnable) { + Thread jmsConsumerThread = new Thread(runnable); + jmsConsumerThread.setName("balx-jms-consumer-network-thread"); + return jmsConsumerThread; + } +} diff --git a/native/src/main/java/io/ballerina/stdlib/java.jms/ConsumerCallback.java b/native/src/main/java/io/ballerina/stdlib/java.jms/listener/ListenerCallback.java similarity index 92% rename from native/src/main/java/io/ballerina/stdlib/java.jms/ConsumerCallback.java rename to native/src/main/java/io/ballerina/stdlib/java.jms/listener/ListenerCallback.java index c4dcf13a..e8d18041 100644 --- a/native/src/main/java/io/ballerina/stdlib/java.jms/ConsumerCallback.java +++ b/native/src/main/java/io/ballerina/stdlib/java.jms/listener/ListenerCallback.java @@ -16,7 +16,7 @@ * under the License. */ -package io.ballerina.stdlib.java.jms; +package io.ballerina.stdlib.java.jms.listener; import io.ballerina.runtime.api.async.Callback; import io.ballerina.runtime.api.values.BError; @@ -24,7 +24,7 @@ /** * Callback code to be executed when the message-listener complete a message producing cycle to the ballerina service. */ -public class ConsumerCallback implements Callback { +public class ListenerCallback implements Callback { @Override public void notifySuccess(Object o) { if (o instanceof BError) { @@ -37,4 +37,5 @@ public void notifyFailure(BError bError) { bError.printStackTrace(); System.exit(1); } + } diff --git a/native/src/main/java/io/ballerina/stdlib/java.jms/JmsListener.java b/native/src/main/java/io/ballerina/stdlib/java.jms/listener/ListenerImpl.java similarity index 83% rename from native/src/main/java/io/ballerina/stdlib/java.jms/JmsListener.java rename to native/src/main/java/io/ballerina/stdlib/java.jms/listener/ListenerImpl.java index 41c7abc6..7033d138 100644 --- a/native/src/main/java/io/ballerina/stdlib/java.jms/JmsListener.java +++ b/native/src/main/java/io/ballerina/stdlib/java.jms/listener/ListenerImpl.java @@ -16,7 +16,7 @@ * under the License. */ -package io.ballerina.stdlib.java.jms; +package io.ballerina.stdlib.java.jms.listener; import io.ballerina.runtime.api.Module; import io.ballerina.runtime.api.PredefinedTypes; @@ -30,6 +30,9 @@ import io.ballerina.runtime.api.types.Type; import io.ballerina.runtime.api.utils.TypeUtils; import io.ballerina.runtime.api.values.BObject; +import io.ballerina.stdlib.java.jms.BallerinaJmsException; +import io.ballerina.stdlib.java.jms.Constants; +import io.ballerina.stdlib.java.jms.ModuleUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,18 +46,19 @@ import static io.ballerina.runtime.api.TypeTags.OBJECT_TYPE_TAG; import static io.ballerina.runtime.api.TypeTags.RECORD_TYPE_TAG; import static io.ballerina.stdlib.java.jms.CommonUtils.getBallerinaMessage; +import static io.ballerina.stdlib.java.jms.Constants.SERVICE_RESOURCE_ON_MESSAGE; /** * A {@link javax.jms.MessageListener} implementation. */ -public class JmsListener implements MessageListener { - private static final Logger LOGGER = LoggerFactory.getLogger(JmsListener.class); +public class ListenerImpl implements MessageListener { + private static final Logger LOGGER = LoggerFactory.getLogger(ListenerImpl.class); private final BObject consumerService; private final Runtime ballerinaRuntime; - private final Callback callback = new ConsumerCallback(); + private final Callback callback = new ListenerCallback(); - public JmsListener(BObject consumerService, Runtime ballerinaRuntime) { + public ListenerImpl(BObject consumerService, Runtime ballerinaRuntime) { this.consumerService = consumerService; this.ballerinaRuntime = ballerinaRuntime; } @@ -64,16 +68,16 @@ public void onMessage(Message message) { try { Module module = ModuleUtils.getModule(); StrandMetadata metadata = new StrandMetadata( - module.getOrg(), module.getName(), module.getVersion(), Constants.SERVICE_RESOURCE_ON_MESSAGE); + module.getOrg(), module.getName(), module.getVersion(), SERVICE_RESOURCE_ON_MESSAGE); ObjectType serviceType = (ObjectType) TypeUtils.getReferredType(TypeUtils.getType(consumerService)); Object[] params = methodParameters(serviceType, message); - if (serviceType.isIsolated() && serviceType.isIsolated(Constants.SERVICE_RESOURCE_ON_MESSAGE)) { + if (serviceType.isIsolated() && serviceType.isIsolated(SERVICE_RESOURCE_ON_MESSAGE)) { ballerinaRuntime.invokeMethodAsyncConcurrently( - consumerService, Constants.SERVICE_RESOURCE_ON_MESSAGE, null, metadata, callback, + consumerService, SERVICE_RESOURCE_ON_MESSAGE, null, metadata, callback, null, PredefinedTypes.TYPE_NULL, params); } else { ballerinaRuntime.invokeMethodAsyncSequentially( - consumerService, Constants.SERVICE_RESOURCE_ON_MESSAGE, null, metadata, callback, + consumerService, SERVICE_RESOURCE_ON_MESSAGE, null, metadata, callback, null, PredefinedTypes.TYPE_NULL, params); } } catch (JMSException | BallerinaJmsException e) { @@ -84,7 +88,7 @@ public void onMessage(Message message) { private Object[] methodParameters(ObjectType serviceType, Message message) throws JMSException, BallerinaJmsException { Optional onMessageFuncOpt = Stream.of(serviceType.getMethods()) - .filter(methodType -> Constants.SERVICE_RESOURCE_ON_MESSAGE.equals(methodType.getName())) + .filter(methodType -> SERVICE_RESOURCE_ON_MESSAGE.equals(methodType.getName())) .findFirst(); if (onMessageFuncOpt.isPresent()) { MethodType onMessageFunction = onMessageFuncOpt.get(); diff --git a/native/src/main/java/io/ballerina/stdlib/java.jms/JmsMessageListenerUtils.java b/native/src/main/java/io/ballerina/stdlib/java.jms/listener/Utils.java similarity index 85% rename from native/src/main/java/io/ballerina/stdlib/java.jms/JmsMessageListenerUtils.java rename to native/src/main/java/io/ballerina/stdlib/java.jms/listener/Utils.java index 71733383..a8730b16 100644 --- a/native/src/main/java/io/ballerina/stdlib/java.jms/JmsMessageListenerUtils.java +++ b/native/src/main/java/io/ballerina/stdlib/java.jms/listener/Utils.java @@ -16,7 +16,7 @@ * under the License. */ -package io.ballerina.stdlib.java.jms; +package io.ballerina.stdlib.java.jms.listener; import io.ballerina.runtime.api.Environment; import io.ballerina.runtime.api.Runtime; @@ -30,16 +30,17 @@ import static io.ballerina.stdlib.java.jms.Constants.NATIVE_CONSUMER; /** - * Representation of {@link javax.jms.MessageListener} with utility methods to invoke as inter-op functions. + * Provides utility functionalities related to setting up a {@link javax.jms.MessageListener} for a given + * message consumer. */ -public class JmsMessageListenerUtils { +public class Utils { public static Object setMessageListener(Environment environment, BObject consumer, BObject serviceObject) { MessageConsumer nativeConsumer = (MessageConsumer) consumer.getNativeData(NATIVE_CONSUMER); Runtime bRuntime = environment.getRuntime(); try { - nativeConsumer.setMessageListener(new JmsListener(serviceObject, bRuntime)); + nativeConsumer.setMessageListener(new ListenerImpl(serviceObject, bRuntime)); } catch (JMSException e) { return createError(JMS_ERROR, String.format("Error occurred while setting the message listener: %s", e.getMessage()), e); diff --git a/native/src/main/java/io/ballerina/stdlib/java.jms/JmsProducer.java b/native/src/main/java/io/ballerina/stdlib/java.jms/producer/Actions.java similarity index 68% rename from native/src/main/java/io/ballerina/stdlib/java.jms/JmsProducer.java rename to native/src/main/java/io/ballerina/stdlib/java.jms/producer/Actions.java index aee1e781..0a63e8de 100644 --- a/native/src/main/java/io/ballerina/stdlib/java.jms/JmsProducer.java +++ b/native/src/main/java/io/ballerina/stdlib/java.jms/producer/Actions.java @@ -16,11 +16,18 @@ * under the License. */ -package io.ballerina.stdlib.java.jms; +package io.ballerina.stdlib.java.jms.producer; +import io.ballerina.runtime.api.Environment; +import io.ballerina.runtime.api.Future; +import io.ballerina.runtime.api.values.BError; import io.ballerina.runtime.api.values.BMap; import io.ballerina.runtime.api.values.BObject; import io.ballerina.runtime.api.values.BString; +import io.ballerina.stdlib.java.jms.BallerinaJmsException; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import javax.jms.Destination; import javax.jms.JMSException; @@ -38,7 +45,8 @@ /** * Representation of {@link javax.jms.MessageProducer} with utility methods to invoke as inter-op functions. */ -public class JmsProducer { +public class Actions { + private static final ExecutorService executorService = Executors.newCachedThreadPool(new ProducerThreadFactory()); /** * Creates a {@link javax.jms.MessageProducer} object with given {@link javax.jms.Session}. @@ -68,19 +76,25 @@ public static Object init(BObject producer, BObject session, Object destination) /** * Sends a message using the {@code MessageProducer}'s default delivery mode, priority, and time to live. * + * @param env Ballerina runtime environment * @param producer Ballerina producer object * @param message The JMS message * @return A Ballerina `jms:Error` if the JMS MessageProducer fails to send the message due to some error */ - public static Object send(BObject producer, Message message) { + public static Object send(Environment env, BObject producer, Message message) { MessageProducer nativeProducer = (MessageProducer) producer.getNativeData(NATIVE_PRODUCER); - try { - nativeProducer.send(message); - } catch (UnsupportedOperationException | JMSException exception) { - return createError(JMS_ERROR, - String.format("Error occurred while sending a message to the JMS provider: %s", - exception.getMessage()), exception); - } + Future balFuture = env.markAsync(); + executorService.execute(() -> { + try { + nativeProducer.send(message); + balFuture.complete(null); + } catch (UnsupportedOperationException | JMSException exception) { + BError bError = createError(JMS_ERROR, + String.format("Error occurred while sending a message to the JMS provider: %s", + exception.getMessage()), exception); + balFuture.complete(bError); + } + }); return null; } @@ -88,26 +102,33 @@ public static Object send(BObject producer, Message message) { * Sends a message to a destination for an unidentified message producer using the {@code MessageProducer}'s * default delivery mode, priority, and time to live. * + * @param env Ballerina runtime environment * @param producer Ballerina producer object * @param session Ballerina session object * @param destination Relevant JMS destination * @param message The JMS message * @return A Ballerina `jms:Error` if the JMS MessageProducer fails to send the message due to some error */ - public static Object sendTo(BObject producer, BObject session, BMap destination, + public static Object sendTo(Environment env, BObject producer, BObject session, BMap destination, Message message) { MessageProducer nativeProducer = (MessageProducer) producer.getNativeData(NATIVE_PRODUCER); Session nativeSession = (Session) session.getNativeData(NATIVE_SESSION); - try { - Destination jmsDestination = getDestination(nativeSession, destination); - nativeProducer.send(jmsDestination, message); - } catch (BallerinaJmsException exception) { - return createError(JMS_ERROR, exception.getMessage(), exception); - } catch (UnsupportedOperationException | JMSException exception) { - return createError(JMS_ERROR, - String.format("Error occurred while sending a message to the JMS provider: %s", - exception.getMessage()), exception); - } + Future balFuture = env.markAsync(); + executorService.execute(() -> { + try { + Destination jmsDestination = getDestination(nativeSession, destination); + nativeProducer.send(jmsDestination, message); + balFuture.complete(null); + } catch (BallerinaJmsException exception) { + BError bError = createError(JMS_ERROR, exception.getMessage(), exception); + balFuture.complete(bError); + } catch (UnsupportedOperationException | JMSException exception) { + BError bError = createError(JMS_ERROR, + String.format("Error occurred while sending a message to the JMS provider: %s", + exception.getMessage()), exception); + balFuture.complete(bError); + } + }); return null; } diff --git a/native/src/main/java/io/ballerina/stdlib/java.jms/producer/ProducerThreadFactory.java b/native/src/main/java/io/ballerina/stdlib/java.jms/producer/ProducerThreadFactory.java new file mode 100644 index 00000000..e9d9a5c8 --- /dev/null +++ b/native/src/main/java/io/ballerina/stdlib/java.jms/producer/ProducerThreadFactory.java @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2023, WSO2 LLC. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.ballerina.stdlib.java.jms.producer; + +import java.util.concurrent.ThreadFactory; + +/** + * A {@link ThreadFactory} object that creates new threads on demand for JMS producer network actions. + */ +class ProducerThreadFactory implements ThreadFactory { + + @Override + public Thread newThread(Runnable runnable) { + Thread jmsProducerThread = new Thread(runnable); + jmsProducerThread.setName("balx-jms-producer-network-thread"); + return jmsProducerThread; + } +} diff --git a/spotbugs-exclude.xml b/spotbugs-exclude.xml index 6814254c..039d48bc 100644 --- a/spotbugs-exclude.xml +++ b/spotbugs-exclude.xml @@ -17,7 +17,7 @@ --> - + - \ No newline at end of file +