diff --git a/ballerina/Dependencies.toml b/ballerina/Dependencies.toml index 3ce726a5..594d611f 100644 --- a/ballerina/Dependencies.toml +++ b/ballerina/Dependencies.toml @@ -7,15 +7,6 @@ dependencies-toml-version = "2" distribution-version = "2201.6.0" -[[package]] -org = "ballerina" -name = "io" -version = "1.4.1" -dependencies = [ - {org = "ballerina", name = "jballerina.java"}, - {org = "ballerina", name = "lang.value"} -] - [[package]] org = "ballerina" name = "jballerina.java" @@ -24,43 +15,12 @@ modules = [ {org = "ballerina", packageName = "jballerina.java", moduleName = "jballerina.java"} ] -[[package]] -org = "ballerina" -name = "lang.value" -version = "0.0.0" -dependencies = [ - {org = "ballerina", name = "jballerina.java"} -] - -[[package]] -org = "ballerina" -name = "log" -version = "2.7.1" -dependencies = [ - {org = "ballerina", name = "io"}, - {org = "ballerina", name = "jballerina.java"}, - {org = "ballerina", name = "lang.value"}, - {org = "ballerina", name = "observe"} -] -modules = [ - {org = "ballerina", packageName = "log", moduleName = "log"} -] - -[[package]] -org = "ballerina" -name = "observe" -version = "1.0.7" -dependencies = [ - {org = "ballerina", name = "jballerina.java"} -] - [[package]] org = "ballerinax" name = "java.jms" version = "0.1.3" dependencies = [ - {org = "ballerina", name = "jballerina.java"}, - {org = "ballerina", name = "log"} + {org = "ballerina", name = "jballerina.java"} ] modules = [ {org = "ballerinax", packageName = "java.jms", moduleName = "java.jms"} diff --git a/ballerina/caller.bal b/ballerina/caller.bal index 4ad5b5ef..633b1546 100644 --- a/ballerina/caller.bal +++ b/ballerina/caller.bal @@ -14,6 +14,8 @@ // specific language governing permissions and limitations // under the License. +import ballerina/jballerina.java; + # Represents a JMS caller, which can be used to commit the offsets consumed by the service. public isolated client class Caller { @@ -21,7 +23,7 @@ public isolated client class Caller { # # + message - JMS message record # + return - `jms:Error` if there is an error in the execution or else nil - isolated remote function acknowledge(Message message) returns Error? { - return externConsumerAcknowledge(message); - } + isolated remote function acknowledge(Message message) returns Error? = @java:Method { + 'class: "io.ballerina.stdlib.java.jms.JmsConsumer" + } external; } diff --git a/ballerina/connection.bal b/ballerina/connection.bal index 5b670038..97a304a0 100644 --- a/ballerina/connection.bal +++ b/ballerina/connection.bal @@ -15,58 +15,59 @@ // under the License. import ballerina/jballerina.java; -import ballerina/log; # Represents JMS Connection. -# -# + config - Used to store configurations related to a JMS Connection public isolated client class Connection { - public final readonly & ConnectionConfiguration config; - private final handle jmsConnection; + private final readonly & ConnectionConfiguration config; - # JMS Connection constructor - public isolated function init(*ConnectionConfiguration connectionConfig) returns error? { + # Initialize and starts a JMS connection. + # + # + connectionConfig - The configurations to be used when initializing the JMS connection + # + return - The `jms:Connection` or an `jms:Error` if the initialization failed + public isolated function init(*ConnectionConfiguration connectionConfig) returns Error? { self.config = connectionConfig.cloneReadOnly(); - string icf = self.config.initialContextFactory; - string providerUrl = self.config.providerUrl; - string factoryName = self.config.connectionFactoryName; - self.jmsConnection = check createJmsConnection( - icf, providerUrl, factoryName, connectionConfig.properties); + return self.externInit(connectionConfig); } - # Create a Session object, specifying transacted and acknowledgeMode + isolated function externInit(ConnectionConfiguration connectionConfig) returns Error? = @java:Method { + name: "init", + 'class: "io.ballerina.stdlib.java.jms.JmsConnection" + } external; + + # Create a Session object, specifying transacted and acknowledgeMode. # - # + acknowledgementMode - Configuration indicating how messages received by the session will be acknowledged + # + ackMode - Configuration indicating how messages received by the session will be acknowledged # + return - Returns the Session or an error if it fails. - isolated remote function createSession(AcknowledgementMode acknowledgementMode = AUTO_ACKNOWLEDGE) returns Session|error { - return new Session(self.jmsConnection, acknowledgementMode); + isolated remote function createSession(AcknowledgementMode ackMode = AUTO_ACKNOWLEDGE) returns Session|error { + return new Session(self, ackMode); } # Starts (or restarts) a connection's delivery of incoming messages. # A call to start on a connection that has already been started is ignored. - isolated remote function 'start() { - error? err = startJmsConnection(self.jmsConnection); - if (err is error) { - log:printError("Error starting connection", err); - } - - } + # + # + return - A `jms:Error` if threre is an error while starting the connection + isolated remote function 'start() returns Error? = @java:Method { + name: "start", + 'class: "io.ballerina.stdlib.java.jms.JmsConnection" + } external; # Temporarily stops a connection's delivery of incoming messages. # Delivery can be restarted using the connection's start method. - isolated remote function stop() { - error? err = stopJmsConnection(self.jmsConnection); - if (err is error) { - log:printError("Error stopping connection", err); - } - } + # + # + return - A `jms:Error` if threre is an error while stopping the connection + isolated remote function stop() returns Error? = @java:Method { + 'class: "io.ballerina.stdlib.java.jms.JmsConnection" + } external; - isolated function getJmsConnection() returns handle { - return self.jmsConnection; - } + # Closes the connection. + # + # + return - A `jms:Error` if threre is an error while closing the connection + isolated remote function close() returns Error? = @java:Method { + 'class: "io.ballerina.stdlib.java.jms.JmsConnection" + } external; } -# Configurations related to a JMS connection +# Configurations related to a JMS connection. # # + initialContextFactory - JMS provider specific inital context factory # + providerUrl - JMS provider specific provider URL used to configure a connection @@ -78,22 +79,7 @@ public type ConnectionConfiguration record {| string initialContextFactory = "wso2mbInitialContextFactory"; string providerUrl = "amqp://admin:admin@ballerina/default?brokerlist='tcp://localhost:5672'"; string connectionFactoryName = "ConnectionFactory"; - string? username = (); - string? password = (); + string username?; + string password?; map properties = {}; |}; - -isolated function createJmsConnection(string initialContextFactory, string providerUrl, - string connectionFactoryName, map otherPropeties) returns handle|error = @java:Method { - 'class: "io.ballerina.stdlib.java.jms.JmsConnectionUtils" -} external; - -isolated function startJmsConnection(handle jmsConnection) returns error? = @java:Method { - name: "start", - 'class: "javax.jms.Connection" -} external; - -isolated function stopJmsConnection(handle jmsConnection) returns error? = @java:Method { - name: "stop", - 'class: "javax.jms.Connection" -} external; diff --git a/ballerina/destination.bal b/ballerina/destination.bal index cb9c2d9d..10dd6783 100644 --- a/ballerina/destination.bal +++ b/ballerina/destination.bal @@ -14,34 +14,23 @@ // specific language governing permissions and limitations // under the License. -import ballerina/jballerina.java; +# Represent the JMS destination. +# +# + 'type - JMS destination types +# + name - Name of the destination +public type Destination readonly & record {| + DestinationType 'type; + string name?; +|}; -# Represent the JMS destination -public type Destination distinct object { - - isolated function getJmsDestination() returns handle; -}; - -function getDestination(handle jmsDestination) returns Destination|error { - handle jmsDestinationType = getDestinationType(jmsDestination); - string? destinationType = java:toString(jmsDestinationType); - match destinationType { - "queue" => { - return new Queue(jmsDestination); - } - "topic" => { - return new Topic(jmsDestination); - } - "temporaryQueue" => { - return new TemporaryQueue(jmsDestination); - } - "temporaryTopic" => { - return new TemporaryTopic(jmsDestination); - } - } - return error Error("Invalid destination type"); +# Defines the supported JMS destination types. +public enum DestinationType { + # Represents JMS Queue + QUEUE = "QUEUE", + # Represents JMS Temporary Queue + TEMPORARY_QUEUE = "TEMPORARY_QUEUE", + # Represents JMS Topic + TOPIC = "TOPIC", + # Represents JMS Temporary Topic + TEMPORARY_TOPIC = "TEMPORARY_TOPIC" } - -function getDestinationType(handle destination) returns handle = @java:Method { - 'class: "io.ballerina.stdlib.java.jms.JmsDestinationUtils" -} external; diff --git a/ballerina/jms_commons.bal b/ballerina/jms_commons.bal deleted file mode 100644 index 9b98cb6f..00000000 --- a/ballerina/jms_commons.bal +++ /dev/null @@ -1,19 +0,0 @@ -// Copyright (c) 2023, WSO2 LLC. (http://www.wso2.org) All Rights Reserved. -// -// WSO2 LLC. licenses this file to you under the Apache License, -// Version 2.0 (the "License"); you may not use this file except -// in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -import ballerina/jballerina.java; - -final handle JAVA_NULL = java:createNull(); diff --git a/ballerina/message.bal b/ballerina/message.bal index ce2825d4..44de920e 100644 --- a/ballerina/message.bal +++ b/ballerina/message.bal @@ -34,14 +34,8 @@ public type Message record { string messageId?; int timestamp?; string correlationId?; - record {| - JmsDestinationType 'type; - string name?; - |} replyTo?; - record {| - JmsDestinationType 'type; - string name?; - |} destination?; + Destination replyTo?; + Destination destination?; int deliveryMode?; boolean redelivered?; string jmsType?; @@ -75,9 +69,14 @@ public type BytesMessage record {| byte[] content; |}; +isolated function externWriteText(handle message, handle value) returns error? = @java:Method { + name: "setText", + 'class: "javax.jms.TextMessage" +} external; + isolated function externWriteBytes(handle message, byte[] value) returns error? = @java:Method { name: "writeBytes", - 'class: "io.ballerina.stdlib.java.jms.JmsBytesMessageUtils" + 'class: "io.ballerina.stdlib.java.jms.JmsBytesMessage" } external; isolated function externSetBoolean(handle message, handle name, boolean value) returns error? = @java:Method { diff --git a/ballerina/message_consumer.bal b/ballerina/message_consumer.bal index 241f41c3..34b078d6 100644 --- a/ballerina/message_consumer.bal +++ b/ballerina/message_consumer.bal @@ -16,71 +16,74 @@ import ballerina/jballerina.java; +# Defines the supported JMS message consumer types. +public enum ConsumerType { + # Represents JMS durable subscriber + DURABLE = "DURABLE", + # Represents JMS shared consumer + SHARED = "SHARED", + # Represents JMS shared durable subscriber + SHARED_DURABLE = "SHARED_DURABLE", + # Represents JMS default consumer + DEFAULT = "DEFAULT" +} + +# Message consumer listener configurations. +# +# + type - Message consumer type +# + destination - Name of the JMS destination +# + messageSelector - only messages with properties matching the message selector expression are added to the durable subscription. +# An empty string indicates that there is no message selector for the durable subscription. +# + noLocal - if true then any messages published to the topic using this session's connection, or any other connection +# with the same client identifier, will not be added to the durable subscription. +# + subscriberName - the name used to identify the subscription +public type ConsumerOptions record {| + ConsumerType 'type = DEFAULT; + Destination destination; + string messageSelector = ""; + boolean noLocal = false; + string subscriberName?; +|}; + # JMS Message Consumer client object to receive messages from both queues and topics. public isolated client class MessageConsumer { - private final handle jmsConsumer; - # Initialize the Message Consumer client object. - # - # + jmsProducer - reference to java MessageConsumer object - isolated function init(handle jmsMessageConsumer) { - self.jmsConsumer = jmsMessageConsumer; + isolated function init(Session session, *ConsumerOptions consumerOptions) returns Error? { + return self.externInit(session, consumerOptions); } + isolated function externInit(Session session, ConsumerOptions consumerOptions) returns Error? = @java:Method { + name: "init", + 'class: "io.ballerina.stdlib.java.jms.JmsConsumer" + } external; + # Receives the next message that arrives within the specified timeout interval. # # + timeoutMillis - Message receive timeout # + return - `jms:JmsMessage` or `jsm:Error` if there is an error in the execution - isolated remote function receive(int timeoutMillis = 0) returns Message|Error? { - return externReceive(self.jmsConsumer, timeoutMillis); - }; + isolated remote function receive(int timeoutMillis = 0) returns Message|Error? = @java:Method { + 'class: "io.ballerina.stdlib.java.jms.JmsConsumer" + } external; # Receives the next message if one is immediately available. # # + return - `jms:JmsMessage` or `jsm:Error` if there is an error in the execution - isolated remote function receiveNoWait() returns Message|Error? { - return externReceiveNoWait(self.jmsConsumer); - } + isolated remote function receiveNoWait() returns Message|Error? = @java:Method { + 'class: "io.ballerina.stdlib.java.jms.JmsConsumer" + } external; # Mark a JMS message as received. # # + message - JMS message record # + return - `jms:Error` if there is an error in the execution or else nil - isolated remote function acknowledge(Message message) returns Error? { - return externConsumerAcknowledge(message); - } + isolated remote function acknowledge(Message message) returns Error? = @java:Method { + 'class: "io.ballerina.stdlib.java.jms.JmsConsumer" + } external; # Closes the message consumer. - # + # # + return - `jms:Error` if there is an error or else nil - isolated remote function close() returns Error? { - error? result = externClose(self.jmsConsumer); - if result is error { - return error Error(result.message()); - } - } - - isolated function getJmsConsumer() returns handle { - return self.jmsConsumer; - } + isolated remote function close() returns Error? = @java:Method { + 'class: "io.ballerina.stdlib.java.jms.JmsConsumer" + } external; } - -isolated function externReceive(handle jmsMessageConsumer, int timeout) returns Message|Error? = @java:Method { - name: "receive", - 'class: "io.ballerina.stdlib.java.jms.ConsumerUtils" -} external; - -isolated function externReceiveNoWait(handle jmsMessageConsumer) returns Message|Error? = @java:Method { - name: "receiveNoWait", - 'class: "io.ballerina.stdlib.java.jms.ConsumerUtils" -} external; - -isolated function externConsumerAcknowledge(Message message) returns Error? = @java:Method { - name: "acknowledge", - 'class: "io.ballerina.stdlib.java.jms.ConsumerUtils" -} external; - -isolated function externClose(handle jmsConsumer) returns error? = @java:Method { - name: "close", - 'class: "javax.jms.MessageConsumer" -} external; diff --git a/ballerina/message_listener.bal b/ballerina/message_listener.bal index f2863d78..7fd947fa 100644 --- a/ballerina/message_listener.bal +++ b/ballerina/message_listener.bal @@ -15,43 +15,21 @@ // under the License. import ballerina/jballerina.java; -import ballerina/log; # The JMS service type. public type Service distinct service object { // remote function onMessage(jms:Message message, jms:Caller caller) returns error?; }; -# Defines the supported JMS destinations. -public enum JmsDestinationType { - # Represents JMS Queue - QUEUE = "QUEUE", - # Represents JMS Temporary Queue - TEMPORARY_QUEUE = "TEMPORARY_QUEUE", - # Represents JMS Topic - TOPIC = "TOPIC", - # Represents JMS Temporary Topic - TEMPORARY_TOPIC = "TEMPORARY_TOPIC" -} - -# Message consumer configurations. +# Message listener configurations. # # + connectionConfig - Configurations related to the broker connection # + acknowledgementMode - Configuration indicating how messages received by the session will be acknowledged -# + destination - Name of the JMS destination -# + messageSelector - only messages with properties matching the message selector expression are added to the durable subscription. -# An empty string indicates that there is no message selector for the durable subscription. -# + noLocal - if true then any messages published to the topic using this session's connection, or any other connection -# with the same client identifier, will not be added to the durable subscription. -public type ConsumerConfiguration record {| +# + consumerOptions - Underlying JMS message consumer configurations +public type MessageListenerConfigurations record {| ConnectionConfiguration connectionConfig; AcknowledgementMode acknowledgementMode = AUTO_ACKNOWLEDGE; - record {| - JmsDestinationType 'type; - string name?; - |} destination; - string messageSelector = ""; - boolean noLocal = false; + ConsumerOptions consumerOptions; |}; # Represents a JMS consumer listener. @@ -61,13 +39,10 @@ public isolated class Listener { # Creates a new `jms:Listener`. # # + consumer - The relevant JMS consumer. - public isolated function init(*ConsumerConfiguration consumerConfig) returns error? { + public isolated function init(*MessageListenerConfigurations consumerConfig) returns error? { Connection connection = check new (consumerConfig.connectionConfig); Session session = check connection->createSession(consumerConfig.acknowledgementMode); - Destination destination = check createJmsDestination( - session, consumerConfig.destination.'type, consumerConfig.destination?.name); - self.consumer = check session.createConsumer( - destination, consumerConfig.messageSelector, consumerConfig.noLocal); + self.consumer = check new(session, consumerConfig.consumerOptions); } # Attaches a message consumer service to a listener. @@ -79,7 +54,7 @@ public isolated class Listener { # + name - Name of the service. # + return - Returns nil or an error upon failure to register the listener. public isolated function attach(Service 'service, string[]|string? name = ()) returns Error? { - return setMessageListener(self.consumer.getJmsConsumer(), 'service); + return setMessageListener(self.consumer, 'service); } # Detaches a message consumer service from the the listener. @@ -118,36 +93,6 @@ public isolated class Listener { } } -isolated function createJmsDestination(Session session, JmsDestinationType 'type, string? name) returns Destination|error { - if 'type is TEMPORARY_QUEUE || 'type is TEMPORARY_TOPIC { - if name is string { - log:printWarn("Temporary JSM destinations does not support naming"); - } - } - if 'type is QUEUE || 'type is TOPIC { - if name is () { - return error ("JMS destination name can not be empty"); - } - } - match 'type { - QUEUE => { - return session->createQueue( name); - } - TEMPORARY_QUEUE => { - return session->createTemporaryQueue(); - } - TOPIC => { - return session->createTopic( name); - } - TEMPORARY_TOPIC => { - return session->createTemporaryTopic(); - } - _ => { - return error (string `Unidentified JSM destination type: ${'type}`); - } - } -} - -isolated function setMessageListener(handle jmsConsumer, Service 'service) returns Error? = @java:Method { +isolated function setMessageListener(MessageConsumer consumer, Service 'service) returns Error? = @java:Method { 'class: "io.ballerina.stdlib.java.jms.JmsMessageListenerUtils" } external; diff --git a/ballerina/message_producer.bal b/ballerina/message_producer.bal index a5340515..712fa7ee 100644 --- a/ballerina/message_producer.bal +++ b/ballerina/message_producer.bal @@ -18,50 +18,73 @@ import ballerina/jballerina.java; # JMS Message Producer client object to send messages to both queues and topics. public isolated client class MessageProducer { - private final handle jmsProducer; - private final handle jmsSession; + private final Session session; - # Initialize the Message Producer client object - # - # + jmsProducer - reference to java MessageProducer object - isolated function init(handle jmsProducer, handle session) returns error? { - self.jmsProducer = jmsProducer; - self.jmsSession = session; + isolated function init(Session session, Destination? destination = ()) returns Error? { + self.session = session; + return self.externInit(session, destination); } - # Sends a message to the JMS provider + isolated function externInit(Session session, Destination? destination) returns Error? = @java:Method { + name: "init", + 'class: "io.ballerina.stdlib.java.jms.JmsProducer" + } external; + + # Sends a message to the JMS provider. # # + message - Message to be sent to the JMS provider # + return - Error if unable to send the message to the queue - isolated remote function send(Message message) returns error? { - handle jmsMessage = check getJmsMessage(self.jmsSession, message); - return externSend(self.jmsProducer, jmsMessage); + isolated remote function send(Message message) returns Error? { + handle jmsMessage = check getJmsMessage(self.session, message); + return self.externSend(jmsMessage); } - # Sends a message to a given destination of the JMS provider + isolated function externSend(handle message) returns Error? = @java:Method { + name: "send", + 'class: "io.ballerina.stdlib.java.jms.JmsProducer" + } external; + + # Sends a message to a given destination of the JMS provider. # # + destination - Destination used for the message sender # + message - Message to be sent to the JMS provider # + return - Error if sending to the given destination fails - isolated remote function sendTo(Destination destination, Message message) returns error? { - handle jmsMessage = check getJmsMessage(self.jmsSession, message); - return externSendTo(self.jmsProducer, destination.getJmsDestination(), jmsMessage); + isolated remote function sendTo(Destination destination, Message message) returns Error? { + handle jmsMessage = check getJmsMessage(self.session, message); + return self.externSendTo(self.session, destination, jmsMessage); } + + isolated function externSendTo(Session session, Destination destination, handle message) + returns Error? = @java:Method { + name: "sendTo", + 'class: "io.ballerina.stdlib.java.jms.JmsProducer" + } external; }; -isolated function getJmsMessage(handle session, Message message) returns handle|error { +isolated function getJmsMessage(Session session, Message message) returns handle|Error { if message is TextMessage { - return createJmsTextMessageWithText(session, java:fromString(message.content)); + handle jmsMessage = check session.createJmsMessage("TEXT"); + error? result = trap externWriteText(jmsMessage, java:fromString(message.content)); + if result is error { + return error Error(result.message()); + } + return jmsMessage; } else if message is BytesMessage { - handle jmsMessage = check createJmsBytesMessage(session); - check externWriteBytes(jmsMessage, message.content); + handle jmsMessage = check session.createJmsMessage("BYTES"); + error? result = trap externWriteBytes(jmsMessage, message.content); + if result is error { + return error Error(result.message()); + } return jmsMessage; } else if message is MapMessage { - handle jmsMessage = check createJmsMapMessage(session); - check populateMapMessage(jmsMessage, message.content); + handle jmsMessage = check session.createJmsMessage("MAP"); + error? result = trap populateMapMessage(jmsMessage, message.content); + if result is error { + return error Error(result.message()); + } return jmsMessage; } - return error ("Unidentified message type"); + return error Error("Unidentified message type"); } isolated function populateMapMessage(handle mapMessage, map keyValues) returns error? { @@ -78,15 +101,3 @@ isolated function populateMapMessage(handle mapMessage, map keyValues) } } } - -isolated function externSend(handle messageProducer, handle message) returns error? = @java:Method { - name: "send", - paramTypes: ["javax.jms.Message"], - 'class: "javax.jms.MessageProducer" -} external; - -isolated function externSendTo(handle messageProducer, handle destination, handle message) returns error? = @java:Method { - name: "send", - paramTypes: ["javax.jms.Destination", "javax.jms.Message"], - 'class: "javax.jms.MessageProducer" -} external; diff --git a/ballerina/queue.bal b/ballerina/queue.bal deleted file mode 100644 index 1f1266f0..00000000 --- a/ballerina/queue.bal +++ /dev/null @@ -1,50 +0,0 @@ -// Copyright (c) 2023, WSO2 LLC. (http://www.wso2.org) All Rights Reserved. -// -// WSO2 LLC. licenses this file to you under the Apache License, -// Version 2.0 (the "License"); you may not use this file except -// in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -import ballerina/jballerina.java; - -# Represent the JMS queue. -public isolated class Queue { - *Destination; - - private final handle jmsDestination; - - # Initialized a `Queue` object. - # - # + handle - The java reference to the jms text message. - isolated function init(handle queue) { - self.jmsDestination = queue; - } - - # Get the JMS queue - # - # + return - Returns the java reference to the jms queue - isolated function getJmsDestination() returns handle { - return self.jmsDestination; - } - - # Gets the name of this queue. - # - # + return - Returns the string value or an error if it fails. - public isolated function getQueueName() returns string|error? { - handle queue = check getQueueName(self.jmsDestination); - return java:toString(queue); - } -} - -isolated function getQueueName(handle destination) returns handle|error = @java:Method { - 'class: "javax.jms.Queue" -} external; diff --git a/ballerina/session.bal b/ballerina/session.bal index b66c50fe..de38ab84 100644 --- a/ballerina/session.bal +++ b/ballerina/session.bal @@ -14,21 +14,20 @@ // specific language governing permissions and limitations // under the License. -import ballerina/log; import ballerina/jballerina.java; # Represents the JMS session. public isolated client class Session { - private final readonly & AcknowledgementMode acknowledgementMode; - private final handle jmsSession; -# The default constructor of the JMS session. - public isolated function init(handle jmsConnection, AcknowledgementMode acknowledgementMode) returns error? { - self.acknowledgementMode = acknowledgementMode; - handle ackModeJString = java:fromString(self.acknowledgementMode); - self.jmsSession = check createJmsSession(jmsConnection, ackModeJString); + isolated function init(Connection connection, AcknowledgementMode ackMode) returns error? { + return self.externInit(connection, ackMode); } + isolated function externInit(Connection connection, AcknowledgementMode ackMode) returns Error? = @java:Method { + name: "init", + 'class: "io.ballerina.stdlib.java.jms.JmsSession" + } external; + # Unsubscribe a durable subscription that has been created by a client. # It is erroneous for a client to delete a durable subscription while there is an active (not closed) consumer # for the subscription, or while a consumed message being part of a pending transaction or has not been @@ -36,168 +35,29 @@ public isolated client class Session { # # + subscriptionId - The name, which is used to identify the subscription. # + return - Cancels the subscription. - isolated remote function unsubscribe(string subscriptionId) returns error? { - return unsubscribeJmsSubscription(self.jmsSession, java:fromString(subscriptionId)); - } - - # Creates a JMS Queue, which can be used as temporary response destination. - # - # + return - Returns the JMS destination for a temporary queue or an error if it fails. - isolated remote function createTemporaryQueue() returns Destination|Error { - handle|error val = createTemporaryJmsQueue(self.jmsSession); - if (val is handle) { - return new TemporaryQueue(val); - } else { - return error Error("Error occurred while creating the JMS queue.", val); - } - } - - # Creates a JMS Topic, which can be used as a temporary response destination. - # - # + return - Returns the JMS destination for a temporary topic or an error if it fails. - isolated remote function createTemporaryTopic() returns Destination|Error { - handle|error val = createTemporaryJmsTopic(self.jmsSession); - if (val is handle) { - return new TemporaryTopic(val); - } else { - return error Error("Error occurred while creating the JMS topic.", val); - } - } - - # Creates a JMS Queue, which can be used with a message producer. - # - # + queueName - The name of the Queue. - # + return - Returns the JMS destination for a queue or an error if it fails. - isolated remote function createQueue(string queueName) returns Destination|error { - handle|error val = createJmsQueue(self.jmsSession, java:fromString(queueName)); - if (val is handle) { - return new Queue(val); - } else { - return val; - } - } - - # Creates a JMS Topic, which can be used with a message producer. - # - # + topicName - The name of the Topic. - # + return - Returns the JMS destination for a topic or an error if it fails. - isolated remote function createTopic(string topicName) returns Destination|error { - handle|error val = createJmsTopic(self.jmsSession, java:fromString(topicName)); - if (val is handle) { - return new Topic(val); - } else { - return val; - } - } - - # Get the reference to the java session object. - # - # + return - Returns jms session java reference. - function getJmsSession() returns handle { - return self.jmsSession; - } + isolated remote function unsubscribe(string subscriptionId) returns Error? = @java:Method { + 'class: "io.ballerina.stdlib.java.jms.JmsSession" + } external; # Creates a MessageProducer to send messages to the specified destination. # - # + destination - the Destination to send to, or nil if this is a producer which does not have a specified destination + # + destination - The Destination to send to, or nil if this is a producer which does not have a specified destination # + return - Returns jms:MessageProducer - public isolated function createProducer(Destination? destination = ()) returns MessageProducer|error { - handle jmsDestination = (destination is Destination) ? destination.getJmsDestination() : JAVA_NULL; - handle|error v = createJmsProducer(self.jmsSession, jmsDestination); - if (v is handle) { - return new MessageProducer(v, self.jmsSession); - } else { - log:printError("Error occurred while creating producer"); - return v; - } - } - - # Creates a MessageConsumer for the specified destination. Both Queue and Topic can be used in - # the destination parameter to create a MessageConsumer. - # - # + destination - the Destination to access - # + messageSelector - only messages with properties matching the message selector expression are delivered. - # An empty string indicates that there is no message selector for the message consumer. - # + noLocal - if true, and the destination is a topic, then the MessageConsumer will not receive messages published to the topic by its own connection. - # + return - Returns a jms:MessageConsumer - public isolated function createConsumer(Destination destination, - string messageSelector = "", boolean noLocal = false) returns MessageConsumer|error { - var val = createJmsConsumer( - self.jmsSession, destination.getJmsDestination(), - java:fromString(messageSelector), noLocal); - if (val is handle) { - MessageConsumer consumer = new (val); - return consumer; - } else { - return val; - } + public isolated function createProducer(Destination? destination = ()) returns MessageProducer|Error { + return new MessageProducer(self, destination); } - # Creates an unshared durable subscription on the specified topic (if one does not already exist), - # specifying a message selector and the noLocal parameter, and creates a consumer on that durable subscription. + # Creates a MessageConsumer for the specified destination. # - # + topic - the non-temporary Topic to subscribe to - # + subscriberName - the name used to identify this subscription - # + messageSelector - only messages with properties matching the message selector expression are added to the durable subscription. - # An empty string indicates that there is no message selector for the durable subscription. - # + noLocal - if true then any messages published to the topic using this session's connection, or any other connection - # with the same client identifier, will not be added to the durable subscription. + # + consumerOptions - The relevant consumer configurations # + return - Returns a jms:MessageConsumer - public isolated function createDurableSubscriber(Destination topic, string subscriberName, - string messageSelector = "", boolean noLocal = false) returns MessageConsumer|error { - var val = createJmsDurableSubscriber( - self.jmsSession, topic.getJmsDestination(), - java:fromString(subscriberName), java:fromString(messageSelector), noLocal); - if (val is handle) { - MessageConsumer consumer = new (val); - return consumer; - } else { - return val; - } + public isolated function createConsumer(*ConsumerOptions consumerOptions) returns MessageConsumer|Error { + return new MessageConsumer(self, consumerOptions); } - # Creates a shared non-durable subscription with the specified name on the specified topic - # (if one does not already exist) specifying a message selector, and creates a consumer on that subscription. - # - # + topic - the Topic to subscribe to - # + subscriberName - the name used to identify the shared non-durable subscription - # + messageSelector - only messages with properties matching the message selector expression are added to the shared - # non-durable subscription. A value of null or an empty string indicates that there is no message - # selector for the shared non-durable subscription. - # + return - Returns a jms:MessageConsumer - public isolated function createSharedConsumer(Destination topic, string subscriberName, - string messageSelector = "") returns MessageConsumer|error { - var val = createJmsSharedConsumer( - self.jmsSession, topic.getJmsDestination(), - java:fromString(subscriberName), java:fromString(messageSelector)); - if (val is handle) { - MessageConsumer consumer = new (val); - return consumer; - } else { - return val; - } - } - - # Creates a shared durable subscription on the specified topic (if one does not already exist), - # specifying a message selector, and creates a consumer on that durable subscription. - # - # + topic - the non-temporary Topic to subscribe to - # + subscriberName - the name used to identify this subscription - # + messageSelector - only messages with properties matching the message selector expression are added to the durable subscription. - # A value of null or an empty string indicates that there is no message selector for the durable subscription. - # + return - Returns a jms:MessageConsumer - public isolated function createSharedDurableConsumer(Destination topic, string subscriberName, - string messageSelector = "") returns MessageConsumer|error { - var val = createJmsSharedDurableConsumer( - self.jmsSession, topic.getJmsDestination(), - java:fromString(subscriberName), java:fromString(messageSelector)); - if (val is handle) { - MessageConsumer consumer = new (val); - return consumer; - } else { - return val; - } - } + isolated function createJmsMessage(string messageType) returns handle|Error = @java:Method { + 'class: "io.ballerina.stdlib.java.jms.JmsSession" + } external; } # Defines the JMS session acknowledgement modes. @@ -219,79 +79,3 @@ public enum AcknowledgementMode { # Use of this mode can reduce session overhead by minimizing the work the session does to prevent duplicates. DUPS_OK_ACKNOWLEDGE = "DUPS_OK_ACKNOWLEDGE" } - -isolated function createJmsTextMessageWithText(handle session, handle text) returns handle|error = @java:Method { - name: "createTextMessage", - paramTypes: ["java.lang.String"], - 'class: "javax.jms.Session" -} external; - -isolated function createJmsMapMessage(handle session) returns handle|error = @java:Method { - name: "createMapMessage", - 'class: "javax.jms.Session" -} external; - -isolated function createJmsBytesMessage(handle session) returns handle|error = @java:Method { - name: "createBytesMessage", - 'class: "javax.jms.Session" -} external; - -isolated function createJmsConsumer(handle jmsSession, handle jmsDestination, - handle selectorString, boolean noLocal) returns handle|error = @java:Method { - name: "createConsumer", - paramTypes: ["javax.jms.Destination", "java.lang.String", "boolean"], - 'class: "javax.jms.Session" -} external; - -isolated function createJmsSession(handle connection, handle acknowledgmentMode) returns handle|error = @java:Method { - 'class: "io.ballerina.stdlib.java.jms.JmsSessionUtils" -} external; - -isolated function unsubscribeJmsSubscription(handle session, handle subscriptionId) returns error? = @java:Method { - name: "unsubscribe", - 'class: "javax.jms.Session" -} external; - -isolated function createJmsProducer(handle session, handle jmsDestination) returns handle|error = @java:Method { - name: "createProducer", - 'class: "javax.jms.Session" -} external; - -isolated function createJmsDurableSubscriber(handle jmsSession, handle subscriberName, handle jmsDestination, - handle selectorString, boolean noLocal) returns handle|error = @java:Method { - name: "createDurableSubscriber", - paramTypes: ["javax.jms.Topic", "java.lang.String", "java.lang.String", "boolean"], - 'class: "javax.jms.Session" -} external; - -isolated function createJmsSharedConsumer(handle jmsSession, handle subscriberName, handle jmsDestination, - handle selectorString) returns handle|error = @java:Method { - name: "createSharedConsumer", - paramTypes: ["javax.jms.Topic", "java.lang.String", "java.lang.String"], - 'class: "javax.jms.Session" -} external; - -isolated function createJmsSharedDurableConsumer(handle jmsSession, handle subscriberName, handle jmsDestination, - handle selectorString) returns handle|error = @java:Method { - name: "createSharedDurableConsumer", - paramTypes: ["javax.jms.Topic", "java.lang.String", "java.lang.String"], - 'class: "javax.jms.Session" -} external; - -isolated function createJmsQueue(handle session, handle queueName) returns handle|error = @java:Method { - name: "createQueue", - 'class: "javax.jms.Session" -} external; - -isolated function createJmsTopic(handle session, handle topicName) returns handle|error = @java:Method { - name: "createTopic", - 'class: "javax.jms.Session" -} external; - -isolated function createTemporaryJmsQueue(handle session) returns handle|error = @java:Method { - 'class: "io.ballerina.stdlib.java.jms.JmsSessionUtils" -} external; - -isolated function createTemporaryJmsTopic(handle session) returns handle|error = @java:Method { - 'class: "io.ballerina.stdlib.java.jms.JmsSessionUtils" -} external; diff --git a/ballerina/temporary_queue.bal b/ballerina/temporary_queue.bal deleted file mode 100644 index 671a7b71..00000000 --- a/ballerina/temporary_queue.bal +++ /dev/null @@ -1,62 +0,0 @@ -// Copyright (c) 2023, WSO2 LLC. (http://www.wso2.org) All Rights Reserved. -// -// WSO2 LLC. licenses this file to you under the Apache License, -// Version 2.0 (the "License"); you may not use this file except -// in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -import ballerina/jballerina.java; - -# Represent the JMS temporary queue. -public isolated class TemporaryQueue { - *Destination; - - private final handle jmsDestination; - - # Initialized a `TemporaryQueue` object. - # - # + handle - The java reference to the jms text message. - isolated function init(handle temporaryQueue) { - self.jmsDestination = temporaryQueue; - } - - # Get the JMS temporary queue - # - # + return - Returns the java reference to the jms temporary queue - isolated function getJmsDestination() returns handle { - return self.jmsDestination; - } - - # Gets the name of this temporary queue. - # - # + return - Returns the string value or an error if it fails. - public isolated function getQueueName() returns string | error? { - handle|error val = getQueueName(self.jmsDestination); - if (val is handle) { - return java:toString(val); - } else { - return val; - } - } - - # Deletes this temporary queue. - # - # + return - Returns an error if it fails. - public isolated function delete() returns error? { - return deleteTemporaryQueue(self.jmsDestination); - } -} - -isolated function deleteTemporaryQueue(handle destination) returns error? = @java:Method { - name: "delete", - 'class: "javax.jms.TemporaryQueue" -} external; diff --git a/ballerina/temporary_topic.bal b/ballerina/temporary_topic.bal deleted file mode 100644 index d96da2de..00000000 --- a/ballerina/temporary_topic.bal +++ /dev/null @@ -1,62 +0,0 @@ -// Copyright (c) 2023, WSO2 LLC. (http://www.wso2.org) All Rights Reserved. -// -// WSO2 LLC. licenses this file to you under the Apache License, -// Version 2.0 (the "License"); you may not use this file except -// in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -import ballerina/jballerina.java; - -# Represent the JMS temporary topic. -public isolated class TemporaryTopic { - *Destination; - - private final handle jmsDestination; - - # Initialized a `TemporaryTopic` object. - # - # + handle - The java reference to the jms text message. - isolated function init(handle temporaryTopic) { - self.jmsDestination = temporaryTopic; - } - - # Get the JMS temporary topic - # - # + return - Returns the java reference to the jms temporary topic - isolated function getJmsDestination() returns handle { - return self.jmsDestination; - } - - # Gets the name of this temporary topic. - # - # + return - Returns the string value or an error if it fails. - public isolated function getTopicName() returns string|error? { - handle|error val = getTopicName(self.jmsDestination); - if (val is handle) { - return java:toString(val); - } else { - return val; - } - } - - # Deletes this temporary topic. - # - # + return - Returns an error if it fails. - public isolated function delete() returns error? { - return deleteTemporaryTopic(self.jmsDestination); - } -} - -isolated function deleteTemporaryTopic(handle destination) returns error? = @java:Method { - name: "delete", - 'class: "javax.jms.TemporaryTopic" -} external; diff --git a/ballerina/topic.bal b/ballerina/topic.bal deleted file mode 100644 index c03f2ea3..00000000 --- a/ballerina/topic.bal +++ /dev/null @@ -1,50 +0,0 @@ -// Copyright (c) 2023, WSO2 LLC. (http://www.wso2.org) All Rights Reserved. -// -// WSO2 LLC. licenses this file to you under the Apache License, -// Version 2.0 (the "License"); you may not use this file except -// in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -import ballerina/jballerina.java; - -# Represent the JMS topic. -public isolated class Topic { - *Destination; - - private final handle jmsDestination; - - # Initialized a `Topic` object. - # - # + handle - The java reference to the jms text message. - isolated function init(handle topic) { - self.jmsDestination = topic; - } - - # Get the JMS topic - # - # + return - Returns the java reference to the jms topic - isolated function getJmsDestination() returns handle { - return self.jmsDestination; - } - - # Gets the name of this topic. - # - # + return - Returns the string value or an error if it fails. - public isolated function getTopicName() returns string|error? { - handle topic = check getTopicName(self.jmsDestination); - return java:toString(topic); - } -} - -isolated function getTopicName(handle destination) returns handle|error = @java:Method { - 'class: "javax.jms.Topic" -} external; diff --git a/native/src/main/java/io/ballerina/stdlib/java.jms/ConsumerUtils.java b/native/src/main/java/io/ballerina/stdlib/java.jms/CommonUtils.java similarity index 64% rename from native/src/main/java/io/ballerina/stdlib/java.jms/ConsumerUtils.java rename to native/src/main/java/io/ballerina/stdlib/java.jms/CommonUtils.java index 4e2bba74..cc465514 100644 --- a/native/src/main/java/io/ballerina/stdlib/java.jms/ConsumerUtils.java +++ b/native/src/main/java/io/ballerina/stdlib/java.jms/CommonUtils.java @@ -18,97 +18,78 @@ package io.ballerina.stdlib.java.jms; -import io.ballerina.runtime.api.creators.ErrorCreator; import io.ballerina.runtime.api.creators.ValueCreator; import io.ballerina.runtime.api.types.Type; import io.ballerina.runtime.api.utils.StringUtils; import io.ballerina.runtime.api.utils.TypeUtils; import io.ballerina.runtime.api.utils.ValueUtils; -import io.ballerina.runtime.api.values.BError; import io.ballerina.runtime.api.values.BMap; import io.ballerina.runtime.api.values.BString; import java.util.Enumeration; import java.util.Iterator; import java.util.Objects; +import java.util.Optional; import javax.jms.BytesMessage; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; -import javax.jms.MessageConsumer; import javax.jms.Queue; +import javax.jms.Session; import javax.jms.TemporaryQueue; import javax.jms.TemporaryTopic; import javax.jms.TextMessage; import javax.jms.Topic; /** - * Represents {@code javax.jms.MessageConsumer} related utility functions. + * {@code CommonUtils} contains the common utility functions for the Ballerina JMS connector. */ -public class ConsumerUtils { - public static Object receive(MessageConsumer consumer, long timeout) { - try { - Message message = consumer.receive(timeout); - if (Objects.isNull(message)) { - return null; - } - return getBallerinaMessage(message); - } catch (JMSException exception) { - BError cause = ErrorCreator.createError(exception); - return ErrorCreator.createError(ModuleUtils.getModule(), Constants.JMS_ERROR, - StringUtils.fromString("Error occurred while receiving messages"), cause, null); - } catch (BallerinaJmsException exception) { - BError cause = ErrorCreator.createError(exception); - return ErrorCreator.createError(ModuleUtils.getModule(), Constants.JMS_ERROR, - StringUtils.fromString("Error occurred while processing the received messages"), - cause, null); - } catch (Exception exception) { - BError cause = ErrorCreator.createError(exception); - return ErrorCreator.createError(ModuleUtils.getModule(), Constants.JMS_ERROR, - StringUtils.fromString("Unknown error occurred while processing the received messages"), - cause, null); +public class CommonUtils { + private static final BString DESTINATION_TYPE = StringUtils.fromString("type"); + private static final BString DESTINATION_NAME = StringUtils.fromString("name"); + private static final String QUEUE = "QUEUE"; + private static final String TEMPORARY_QUEUE = "TEMPORARY_QUEUE"; + private static final String TOPIC = "TOPIC"; + + public static Optional getOptionalStringProperty(BMap config, BString fieldName) { + if (config.containsKey(fieldName)) { + return Optional.of(config.getStringValue(fieldName).getValue()); } + return Optional.empty(); } - public static Object receiveNoWait(MessageConsumer consumer) { - try { - Message message = consumer.receiveNoWait(); - if (Objects.isNull(message)) { - return null; - } - return getBallerinaMessage(message); - } catch (JMSException exception) { - BError cause = ErrorCreator.createError(exception); - return ErrorCreator.createError(ModuleUtils.getModule(), Constants.JMS_ERROR, - StringUtils.fromString("Error occurred while receiving messages"), cause, null); - } catch (BallerinaJmsException exception) { - BError cause = ErrorCreator.createError(exception); - return ErrorCreator.createError(ModuleUtils.getModule(), Constants.JMS_ERROR, - StringUtils.fromString("Error occurred while processing the received messages"), - cause, null); - } catch (Exception exception) { - BError cause = ErrorCreator.createError(exception); - return ErrorCreator.createError(ModuleUtils.getModule(), Constants.JMS_ERROR, - StringUtils.fromString("Unknown error occurred while processing the received messages"), - cause, null); + @SuppressWarnings("unchecked") + public static Destination getDestinationOrNull(Session session, Object destination) + throws JMSException, BallerinaJmsException { + if (Objects.isNull(destination)) { + return null; } + + return getDestination(session, (BMap) destination); } - public static Object acknowledge(BMap message) { - try { - Object nativeMessage = message.getNativeData(Constants.NATIVE_MESSAGE); - if (Objects.nonNull(nativeMessage)) { - ((Message) nativeMessage).acknowledge(); + public static Destination getDestination(Session session, BMap destinationConfig) + throws BallerinaJmsException, JMSException { + String destinationType = destinationConfig.getStringValue(DESTINATION_TYPE).getValue(); + Optional destinationNameOpt = getOptionalStringProperty(destinationConfig, DESTINATION_NAME); + if (QUEUE.equals(destinationType) || TOPIC.equals(destinationType)) { + if (destinationNameOpt.isEmpty()) { + throw new BallerinaJmsException( + String.format("JMS destination name can not be empty for destination type: %s", destinationType) + ); } - } catch (JMSException exception) { - BError cause = ErrorCreator.createError(exception); - return ErrorCreator.createError(ModuleUtils.getModule(), Constants.JMS_ERROR, - StringUtils.fromString("Error occurred while sending acknowledgement for the message"), - cause, null); } - return null; + if (QUEUE.equals(destinationType)) { + return session.createQueue(destinationNameOpt.get()); + } else if (TEMPORARY_QUEUE.equals(destinationType)) { + return session.createTemporaryQueue(); + } else if (TOPIC.equals(destinationType)) { + return session.createTopic(destinationNameOpt.get()); + } else { + return session.createTemporaryTopic(); + } } public static BMap getBallerinaMessage(Message message) @@ -150,21 +131,21 @@ private static String getMessageType(Message message) { } private static BMap getJmsDestinationField(Destination destination) throws JMSException { - BMap destRecord = ValueCreator.createMapValue(); + BMap values = ValueCreator.createMapValue(); if (destination instanceof TemporaryQueue) { - destRecord.put(Constants.TYPE, Constants.TEMPORARY_QUEUE); + values.put(DESTINATION_TYPE, Constants.TEMPORARY_QUEUE); } else if (destination instanceof Queue) { String queueName = ((Queue) destination).getQueueName(); - destRecord.put(Constants.TYPE, Constants.QUEUE); - destRecord.put(Constants.NAME, StringUtils.fromString(queueName)); + values.put(DESTINATION_TYPE, Constants.QUEUE); + values.put(DESTINATION_NAME, StringUtils.fromString(queueName)); } else if (destination instanceof TemporaryTopic) { - destRecord.put(Constants.TYPE, Constants.TEMPORARY_TOPIC); + values.put(DESTINATION_TYPE, Constants.TEMPORARY_TOPIC); } else { String topicName = ((Topic) destination).getTopicName(); - destRecord.put(Constants.TYPE, Constants.TOPIC); - destRecord.put(Constants.NAME, StringUtils.fromString(topicName)); + values.put(DESTINATION_TYPE, Constants.TOPIC); + values.put(DESTINATION_NAME, StringUtils.fromString(topicName)); } - return destRecord; + return ValueCreator.createReadonlyRecordValue(ModuleUtils.getModule(), "Destination", values); } @SuppressWarnings("unchecked") diff --git a/native/src/main/java/io/ballerina/stdlib/java.jms/Constants.java b/native/src/main/java/io/ballerina/stdlib/java.jms/Constants.java index 666c1c58..88cbf6c7 100644 --- a/native/src/main/java/io/ballerina/stdlib/java.jms/Constants.java +++ b/native/src/main/java/io/ballerina/stdlib/java.jms/Constants.java @@ -33,20 +33,9 @@ * @since 0.8.0 */ public class Constants { - - static final String ORG = "ballerina"; - static final String PACKAGE_NAME = "java.jms"; - public static final String VERSION = "0.99.0"; - // Error names for JMS package public static final String JMS_ERROR = "Error"; - // Destination types - public static final String DESTINATION_TYPE_QUEUE = "queue"; - public static final String DESTINATION_TYPE_TOPIC = "topic"; - public static final String DESTINATION_TYPE_TEMP_QUEUE = "temporaryQueue"; - public static final String DESTINATION_TYPE_TEMP_TOPIC = "temporaryTopic"; - public static final String CONFIG_FILE_PATH = "configFilePath"; /** @@ -116,14 +105,15 @@ public class Constants { static final String DUPS_OK_ACKNOWLEDGE_MODE = "DUPS_OK_ACKNOWLEDGE"; static final String SESSION_TRANSACTED_MODE = "SESSION_TRANSACTED"; - static final String TEXT_MESSAGE_BAL_OBJECT_NAME = "TextMessage"; - static final String MESSAGE_BAL_OBJECT_NAME = "Message"; - static final String MAP_MESSAGE_BAL_OBJECT_NAME = "MapMessage"; - static final String BYTE_MESSAGE_BAL_OBJECT_NAME = "BytesMessage"; - static final String STREAM_MESSAGE_BAL_OBJECT_NAME = "StreamMessage"; - + // Native properties in respective ballerina objects + static final String NATIVE_CONNECTION = "connection"; + static final String NATIVE_SESSION = "session"; + static final String NATIVE_PRODUCER = "producer"; + static final String NATIVE_CONSUMER = "consumer"; static final String NATIVE_MESSAGE = "message"; + + // Ballerina JMS message types static final String MESSAGE_BAL_RECORD_NAME = "Message"; static final String TEXT_MESSAGE_BAL_RECORD_NAME = "TextMessage"; static final String MAP_MESSAGE_BAL_RECORD_NAME = "MapMessage"; @@ -133,8 +123,6 @@ public class Constants { static final BString MESSAGE_ID = StringUtils.fromString("messageId"); static final BString TIMESTAMP = StringUtils.fromString("timestamp"); static final BString CORRELATION_ID = StringUtils.fromString("correlationId"); - static final BString TYPE = StringUtils.fromString("'type"); - static final BString NAME = StringUtils.fromString("name"); static final BString REPLY_TO = StringUtils.fromString("replyTo"); static final BString DESTINATION = StringUtils.fromString("destination"); static final BString DELIVERY_MODE = StringUtils.fromString("deliveryMode"); diff --git a/native/src/main/java/io/ballerina/stdlib/java.jms/JmsBytesMessage.java b/native/src/main/java/io/ballerina/stdlib/java.jms/JmsBytesMessage.java new file mode 100644 index 00000000..ce3141bb --- /dev/null +++ b/native/src/main/java/io/ballerina/stdlib/java.jms/JmsBytesMessage.java @@ -0,0 +1,54 @@ +/* + * Copyright (c) 2023, WSO2 LLC. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.ballerina.stdlib.java.jms; + +import io.ballerina.runtime.api.creators.ErrorCreator; +import io.ballerina.runtime.api.utils.StringUtils; +import io.ballerina.runtime.api.values.BArray; +import io.ballerina.runtime.api.values.BError; + +import javax.jms.BytesMessage; +import javax.jms.JMSException; + +import static io.ballerina.stdlib.java.jms.Constants.JMS_ERROR; + +/** + * Representation of {@link javax.jms.BytesMessage} with utility methods to invoke as inter-op functions. + */ +public class JmsBytesMessage { + + /** + * Writes a byte array to the bytes message stream. + * + * @param message {@link javax.jms.BytesMessage} object + * @param value byte[] array as ballerina + */ + public static Object writeBytes(BytesMessage message, BArray value) { + try { + byte[] bytes = value.getBytes(); + message.writeBytes(bytes); + } catch (JMSException e) { + BError cause = ErrorCreator.createError(e); + return ErrorCreator.createError(ModuleUtils.getModule(), JMS_ERROR, + StringUtils.fromString("Error occurred while writing the bytes message."), + cause, null); + } + return null; + } +} diff --git a/native/src/main/java/io/ballerina/stdlib/java.jms/JmsBytesMessageUtils.java b/native/src/main/java/io/ballerina/stdlib/java.jms/JmsBytesMessageUtils.java deleted file mode 100644 index a931c3db..00000000 --- a/native/src/main/java/io/ballerina/stdlib/java.jms/JmsBytesMessageUtils.java +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Copyright (c) 2023, WSO2 LLC. (http://www.wso2.org) All Rights Reserved. - * - * WSO2 LLC. licenses this file to you under the Apache License, - * Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.ballerina.stdlib.java.jms; - -import io.ballerina.runtime.api.creators.ErrorCreator; -import io.ballerina.runtime.api.creators.ValueCreator; -import io.ballerina.runtime.api.utils.StringUtils; -import io.ballerina.runtime.api.values.BArray; -import io.ballerina.runtime.api.values.BError; - -import javax.jms.BytesMessage; -import javax.jms.JMSException; - -import static io.ballerina.stdlib.java.jms.Constants.JMS_ERROR; - -/** - * Representation of {@link javax.jms.BytesMessage} with utility methods to invoke as inter-op functions. - */ -public class JmsBytesMessageUtils { - private JmsBytesMessageUtils() { - } - - /** - * Reads a byte array from the bytes message stream. - * - * @param message {@link javax.jms.BytesMessage} object - * @return Total number of bytes read into the buffer, or -1 if there is no more data - */ - public static Object readJavaBytes(BytesMessage message) { - try { - int bodyLength = (int) message.getBodyLength(); - byte[] bytes = new byte[bodyLength]; - message.readBytes(bytes); - return ValueCreator.createArrayValue(bytes); - } catch (JMSException e) { - BError cause = ErrorCreator.createError(e); - return ErrorCreator.createError(ModuleUtils.getModule(), JMS_ERROR, - StringUtils.fromString("Error occurred while reading bytes message."), cause, null); - } - } - - /** - * Reads a portion of the bytes message stream. - * - * @param message {@link javax.jms.BytesMessage} object - * @param length Number of bytes to read - * @return Total number of bytes read into the buffer, or -1 if there is no more data - */ - public static Object readPortionOfJavaBytes(BytesMessage message, int length) { - try { - long bodyLength = message.getBodyLength(); - if (length > bodyLength) { - return ErrorCreator.createError(ModuleUtils.getModule(), JMS_ERROR, - StringUtils.fromString("Length should be less than or equal to the message's body length."), - null, null); - } - byte[] bytes = new byte[length]; - message.readBytes(bytes, length); - return ValueCreator.createArrayValue(bytes); - } catch (JMSException e) { - BError cause = ErrorCreator.createError(e); - return ErrorCreator.createError(ModuleUtils.getModule(), JMS_ERROR, - StringUtils.fromString("Error occurred while reading portion of the bytes message."), - cause, null); - } - } - - /** - * Writes a byte array to the bytes message stream. - * - * @param message {@link javax.jms.BytesMessage} object - * @param value byte[] array as ballerina - */ - public static Object writeBytes(BytesMessage message, BArray value) { - try { - byte[] bytes = value.getBytes(); - message.writeBytes(bytes); - } catch (JMSException e) { - BError cause = ErrorCreator.createError(e); - return ErrorCreator.createError(ModuleUtils.getModule(), JMS_ERROR, - StringUtils.fromString("Error occurred while writing the bytes message."), - cause, null); - } - return null; - } - - /** - * Writes a portion of a byte array to the bytes message stream. - * - * @param message {@link javax.jms.BytesMessage} object - * @param value byte[] array as ballerina {@link BArray} - * @param offset Initial offset within the byte array - * @param length Number of bytes to use - */ - public static Object writePortionOfBytes(BytesMessage message, BArray value, int offset, int length) - throws BallerinaJmsException { - try { - byte[] bytes = value.getBytes(); - message.writeBytes(bytes, offset, length); - } catch (JMSException e) { - BError cause = ErrorCreator.createError(e); - return ErrorCreator.createError(ModuleUtils.getModule(), JMS_ERROR, - StringUtils.fromString("Error occurred while writing a portion of the bytes message."), - cause, null); - } - return null; - } -} diff --git a/native/src/main/java/io/ballerina/stdlib/java.jms/JmsConnection.java b/native/src/main/java/io/ballerina/stdlib/java.jms/JmsConnection.java new file mode 100644 index 00000000..d8ba96fa --- /dev/null +++ b/native/src/main/java/io/ballerina/stdlib/java.jms/JmsConnection.java @@ -0,0 +1,251 @@ +/* + * Copyright (c) 2023, WSO2 LLC. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.ballerina.stdlib.java.jms; + +import io.ballerina.runtime.api.creators.ErrorCreator; +import io.ballerina.runtime.api.utils.StringUtils; +import io.ballerina.runtime.api.values.BError; +import io.ballerina.runtime.api.values.BMap; +import io.ballerina.runtime.api.values.BObject; +import io.ballerina.runtime.api.values.BString; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Properties; +import java.util.UUID; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import javax.naming.InitialContext; +import javax.naming.NamingException; + +import static io.ballerina.stdlib.java.jms.CommonUtils.getOptionalStringProperty; +import static io.ballerina.stdlib.java.jms.Constants.JMS_ERROR; +import static io.ballerina.stdlib.java.jms.Constants.NATIVE_CONNECTION; + +/** + * Representation of {@link javax.jms.Connection} with utility methods to invoke as inter-op functions. + */ +public class JmsConnection { + private static final BString INITIAL_CONTEXT_FACTORY = StringUtils.fromString("initialContextFactory"); + private static final BString PROVIDER_URL = StringUtils.fromString("providerUrl"); + private static final BString CONNECTION_FACTORY_NAME = StringUtils.fromString("connectionFactoryName"); + private static final BString USERNAME = StringUtils.fromString("username"); + private static final BString PASSWORD = StringUtils.fromString("password"); + private static final BString PROPERTIES = StringUtils.fromString("properties"); + + /** + * Creates a JMS connection with the provided configurations. + * + * @param connection Ballerina connection object + * @param connectionConfig JMS configurations + * @return A Ballerina `jms:Error` if the JMS provider fails to create the connection due to some internal error + */ + public static Object init(BObject connection, BMap connectionConfig) { + try { + Connection jmsConnection = createJmsConnection(connectionConfig); + if (jmsConnection.getClientID() == null) { + jmsConnection.setClientID(UUID.randomUUID().toString()); + } + jmsConnection.setExceptionListener(new LoggingExceptionListener()); + jmsConnection.start(); + connection.addNativeData(NATIVE_CONNECTION, jmsConnection); + } catch (BallerinaJmsException e) { + BError cause = ErrorCreator.createError(e); + return ErrorCreator.createError(ModuleUtils.getModule(), JMS_ERROR, + StringUtils.fromString(e.getMessage()), cause, null); + } catch (JMSException e) { + BError cause = ErrorCreator.createError(e); + return ErrorCreator.createError(ModuleUtils.getModule(), JMS_ERROR, + StringUtils.fromString("Error occurred while initializing and starring connection"), + cause, null); + } + return null; + } + + private static Connection createJmsConnection(BMap connectionConfigs) + throws BallerinaJmsException { + String connectionFactoryName = connectionConfigs.getStringValue(CONNECTION_FACTORY_NAME).getValue(); + Properties properties = getConnectionProperties(connectionConfigs, connectionFactoryName); + try { + InitialContext initialContext = new InitialContext(properties); + ConnectionFactory connectionFactory = + (ConnectionFactory) initialContext.lookup(connectionFactoryName); + Optional userNameOpt = getOptionalStringProperty(connectionConfigs, USERNAME); + Optional passwordOpt = getOptionalStringProperty(connectionConfigs, PASSWORD); + if (userNameOpt.isPresent() && passwordOpt.isPresent()) { + String username = userNameOpt.get(); + String password = passwordOpt.get(); + if (!username.isBlank()) { + return connectionFactory.createConnection(username, password); + } + } + return connectionFactory.createConnection(); + } catch (NamingException | JMSException e) { + throw new BallerinaJmsException( + String.format("Error occurred while connecting to broker: %s", e.getMessage()), e); + } + } + + @SuppressWarnings("unchecked") + private static Properties getConnectionProperties(BMap connectionConfigs, + String connectionFactoryName) throws BallerinaJmsException { + Map configProperties = new HashMap<>(); + + String initialContextFactory = connectionConfigs.getStringValue(INITIAL_CONTEXT_FACTORY).getValue(); + configProperties.put(Constants.ALIAS_INITIAL_CONTEXT_FACTORY, initialContextFactory); + + String providerUrl = connectionConfigs.getStringValue(PROVIDER_URL).getValue(); + configProperties.put(Constants.ALIAS_PROVIDER_URL, providerUrl); + + configProperties.put(Constants.ALIAS_CONNECTION_FACTORY_NAME, connectionFactoryName); + + preProcessIfWso2MB(configProperties); + updateMappedParameters(configProperties); + + Properties properties = new Properties(); + properties.putAll(configProperties); + BMap additionalProperties = (BMap) connectionConfigs + .getMapValue(PROPERTIES); + additionalProperties.entrySet().forEach(e -> { + properties.setProperty(e.getKey().getValue(), e.getValue().getValue()); + }); + return properties; + } + + private static void preProcessIfWso2MB(Map configParams) throws BallerinaJmsException { + String initialConnectionFactoryName = configParams.get(Constants.ALIAS_INITIAL_CONTEXT_FACTORY); + if (Constants.BMB_ICF_ALIAS.equalsIgnoreCase(initialConnectionFactoryName) + || Constants.MB_ICF_ALIAS.equalsIgnoreCase(initialConnectionFactoryName)) { + + configParams.put(Constants.ALIAS_INITIAL_CONTEXT_FACTORY, Constants.MB_ICF_NAME); + String connectionFactoryName = configParams.get(Constants.ALIAS_CONNECTION_FACTORY_NAME); + if (configParams.get(Constants.ALIAS_PROVIDER_URL) != null) { + System.setProperty("qpid.dest_syntax", "BURL"); + if (Objects.nonNull(connectionFactoryName) && !connectionFactoryName.isBlank()) { + configParams.put(Constants.MB_CF_NAME_PREFIX + connectionFactoryName, + configParams.get(Constants.ALIAS_PROVIDER_URL)); + configParams.remove(Constants.ALIAS_PROVIDER_URL); + } else { + throw new BallerinaJmsException( + Constants.ALIAS_CONNECTION_FACTORY_NAME + " property should be set"); + } + } else if (configParams.get(Constants.CONFIG_FILE_PATH) != null) { + configParams.put(Constants.ALIAS_PROVIDER_URL, configParams.get(Constants.CONFIG_FILE_PATH)); + configParams.remove(Constants.CONFIG_FILE_PATH); + } + } + } + + private static void updateMappedParameters(Map configParams) { + Iterator> iterator = configParams.entrySet().iterator(); + Map tempMap = new HashMap<>(); + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + String mappedParam = Constants.MAPPING_PARAMETERS.get(entry.getKey()); + if (mappedParam != null) { + tempMap.put(mappedParam, entry.getValue()); + iterator.remove(); + } + } + configParams.putAll(tempMap); + } + + /** Starts (or restarts) a connection's delivery of incoming messages. + * A call to {@code start} on a connection that has already been started is ignored. + * + * @param connection Ballerina connection object + * @return A Ballerina `jms:Error` if the JMS provider fails to start message delivery due to some internal error + */ + public static Object start(BObject connection) { + Object nativeConnection = connection.getNativeData(NATIVE_CONNECTION); + if (Objects.nonNull(nativeConnection)) { + try { + ((Connection) nativeConnection).start(); + return null; + } catch (JMSException exception) { + BError cause = ErrorCreator.createError(exception); + return ErrorCreator.createError(ModuleUtils.getModule(), JMS_ERROR, + StringUtils.fromString("Error occurred while starting the connection"), cause, null); + } + } + return ErrorCreator.createError(ModuleUtils.getModule(), JMS_ERROR, + StringUtils.fromString("Could not find the native JMS connection"), null, null); + } + + /** + * Temporarily stops a connection's delivery of incoming messages. Delivery + * can be restarted using the connection's {@code start} method. When + * the connection is stopped, delivery to all the connection's message + * consumers is inhibited: synchronous receives block, and messages are not + * delivered to message listeners. + * + * @param connection Ballerina connection object + * @return A Ballerina `jms:Error` if the JMS provider fails to stop message delivery for one + * of the following reasons: + *
    + *
  • an internal error has occurred or + *
  • this method has been called in a Java EE web or EJB application (though it is not guaranteed + * that an exception is thrown in this case) + *
+ */ + public static Object stop(BObject connection) { + Object nativeConnection = connection.getNativeData(NATIVE_CONNECTION); + if (Objects.nonNull(nativeConnection)) { + try { + ((Connection) nativeConnection).stop(); + return null; + } catch (JMSException exception) { + BError cause = ErrorCreator.createError(exception); + return ErrorCreator.createError(ModuleUtils.getModule(), JMS_ERROR, + StringUtils.fromString("Error occurred while stopping the connection"), cause, null); + } + } + return ErrorCreator.createError(ModuleUtils.getModule(), JMS_ERROR, + StringUtils.fromString("Could not find the native JMS connection"), null, null); + } + + /** + * Closes the connection. + * + * @param connection Ballerina connection object + * @return A Ballerina `jms:Error` if the JMS provider fails to close the + * connection due to some internal error. For example, a failure to release resources or + * to close a socket connection can cause this exception to be thrown. + */ + public static Object close(BObject connection) { + Object nativeConnection = connection.getNativeData(NATIVE_CONNECTION); + if (Objects.nonNull(nativeConnection)) { + try { + ((Connection) nativeConnection).close(); + return null; + } catch (JMSException exception) { + BError cause = ErrorCreator.createError(exception); + return ErrorCreator.createError(ModuleUtils.getModule(), JMS_ERROR, + StringUtils.fromString("Error occurred while closing the connection"), cause, null); + } + } + return ErrorCreator.createError(ModuleUtils.getModule(), JMS_ERROR, + StringUtils.fromString("Could not find the native JMS connection"), null, null); + } +} diff --git a/native/src/main/java/io/ballerina/stdlib/java.jms/JmsConnectionUtils.java b/native/src/main/java/io/ballerina/stdlib/java.jms/JmsConnectionUtils.java deleted file mode 100644 index 8b716894..00000000 --- a/native/src/main/java/io/ballerina/stdlib/java.jms/JmsConnectionUtils.java +++ /dev/null @@ -1,162 +0,0 @@ -/* - * Copyright (c) 2023, WSO2 LLC. (http://www.wso2.org) All Rights Reserved. - * - * WSO2 LLC. licenses this file to you under the Apache License, - * Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.ballerina.stdlib.java.jms; - -import io.ballerina.runtime.api.creators.ErrorCreator; -import io.ballerina.runtime.api.utils.StringUtils; -import io.ballerina.runtime.api.values.BError; -import io.ballerina.runtime.api.values.BMap; -import io.ballerina.runtime.api.values.BString; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import java.util.Properties; -import java.util.UUID; - -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.JMSException; -import javax.naming.InitialContext; -import javax.naming.NamingException; - -import static io.ballerina.stdlib.java.jms.Constants.JMS_ERROR; - -/** - * Representation of {@link javax.jms.Connection} with utility methods to invoke as inter-op functions. - */ -public class JmsConnectionUtils { - private static final Logger LOGGER = LoggerFactory.getLogger(JmsConnectionUtils.class); - - /** - * Creates a connection with the default user identity. - * - * @param initialContextFactory JNDI config that can be used to lookup JMS connection factory object - * @param providerUrl URL of the JNDI provider. - * @param connectionFactoryName Name of connection factory - * @param optionalConfigs Other JMS configs - * @return {@link javax.jms.Connection} object or else an error - */ - public static Object createJmsConnection(BString initialContextFactory, BString providerUrl, - BString connectionFactoryName, BMap optionalConfigs) { - try { - Connection connection = createConnection( - initialContextFactory, providerUrl, connectionFactoryName, optionalConfigs); - if (connection.getClientID() == null) { - connection.setClientID(UUID.randomUUID().toString()); - } - connection.setExceptionListener(new LoggingExceptionListener()); - connection.start(); - return connection; - } catch (BallerinaJmsException e) { - BError cause = ErrorCreator.createError(e); - return ErrorCreator.createError(ModuleUtils.getModule(), JMS_ERROR, - StringUtils.fromString(e.getMessage()), cause, null); - } catch (JMSException e) { - BError cause = ErrorCreator.createError(e); - return ErrorCreator.createError(ModuleUtils.getModule(), JMS_ERROR, - StringUtils.fromString("Error occurred while starting connection"), cause, null); - } - } - - private static Connection createConnection(BString initialContextFactory, BString providerUrl, - BString connectionFactoryName, BMap optionalConfigs) - throws BallerinaJmsException { - Map configParams = new HashMap<>(); - configParams.put(Constants.ALIAS_INITIAL_CONTEXT_FACTORY, initialContextFactory.getValue()); - configParams.put(Constants.ALIAS_PROVIDER_URL, providerUrl.getValue()); - configParams.put(Constants.ALIAS_CONNECTION_FACTORY_NAME, connectionFactoryName.getValue()); - - preProcessIfWso2MB(configParams); - updateMappedParameters(configParams); - - Properties properties = new Properties(); - properties.putAll(configParams); - optionalConfigs.entrySet().forEach(e -> { - properties.setProperty(e.getKey().getValue(), e.getValue().getValue()); - }); - - try { - String password = null; - String username = null; - InitialContext initialContext = new InitialContext(properties); - ConnectionFactory connectionFactory = - (ConnectionFactory) initialContext.lookup(connectionFactoryName.getValue()); - if (optionalConfigs.containsKey(Constants.ALIAS_USERNAME)) { - username = optionalConfigs.get(Constants.ALIAS_USERNAME).getValue(); - } - if (optionalConfigs.containsKey(Constants.ALIAS_PASSWORD)) { - password = optionalConfigs.get(Constants.ALIAS_PASSWORD).getValue(); - } - if (notNullOrEmptyAfterTrim(username) && password != null) { - return connectionFactory.createConnection(username, password); - } else { - return connectionFactory.createConnection(); - } - } catch (NamingException | JMSException e) { - String message = "Error while connecting to broker."; - LOGGER.error(message, e); - throw new BallerinaJmsException(message + " " + e.getMessage(), e); - } - } - - private static void preProcessIfWso2MB(Map configParams) throws BallerinaJmsException { - String initialConnectionFactoryName = configParams.get(Constants.ALIAS_INITIAL_CONTEXT_FACTORY); - if (Constants.BMB_ICF_ALIAS.equalsIgnoreCase(initialConnectionFactoryName) - || Constants.MB_ICF_ALIAS.equalsIgnoreCase(initialConnectionFactoryName)) { - - configParams.put(Constants.ALIAS_INITIAL_CONTEXT_FACTORY, Constants.MB_ICF_NAME); - String connectionFactoryName = configParams.get(Constants.ALIAS_CONNECTION_FACTORY_NAME); - if (configParams.get(Constants.ALIAS_PROVIDER_URL) != null) { - System.setProperty("qpid.dest_syntax", "BURL"); - if (notNullOrEmptyAfterTrim(connectionFactoryName)) { - configParams.put(Constants.MB_CF_NAME_PREFIX + connectionFactoryName, - configParams.get(Constants.ALIAS_PROVIDER_URL)); - configParams.remove(Constants.ALIAS_PROVIDER_URL); - } else { - throw new BallerinaJmsException( - Constants.ALIAS_CONNECTION_FACTORY_NAME + " property should be set"); - } - } else if (configParams.get(Constants.CONFIG_FILE_PATH) != null) { - configParams.put(Constants.ALIAS_PROVIDER_URL, configParams.get(Constants.CONFIG_FILE_PATH)); - configParams.remove(Constants.CONFIG_FILE_PATH); - } - } - } - - private static void updateMappedParameters(Map configParams) { - Iterator> iterator = configParams.entrySet().iterator(); - Map tempMap = new HashMap<>(); - while (iterator.hasNext()) { - Map.Entry entry = iterator.next(); - String mappedParam = Constants.MAPPING_PARAMETERS.get(entry.getKey()); - if (mappedParam != null) { - tempMap.put(mappedParam, entry.getValue()); - iterator.remove(); - } - } - configParams.putAll(tempMap); - } - - private static boolean notNullOrEmptyAfterTrim(String str) { - return !(str == null || str.trim().isEmpty()); - } -} diff --git a/native/src/main/java/io/ballerina/stdlib/java.jms/JmsConsumer.java b/native/src/main/java/io/ballerina/stdlib/java.jms/JmsConsumer.java new file mode 100644 index 00000000..e2401dc1 --- /dev/null +++ b/native/src/main/java/io/ballerina/stdlib/java.jms/JmsConsumer.java @@ -0,0 +1,237 @@ +/* + * Copyright (c) 2023, WSO2 LLC. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.ballerina.stdlib.java.jms; + +import io.ballerina.runtime.api.creators.ErrorCreator; +import io.ballerina.runtime.api.utils.StringUtils; +import io.ballerina.runtime.api.values.BError; +import io.ballerina.runtime.api.values.BMap; +import io.ballerina.runtime.api.values.BObject; +import io.ballerina.runtime.api.values.BString; + +import java.util.Objects; +import java.util.Optional; + +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Session; +import javax.jms.Topic; + +import static io.ballerina.stdlib.java.jms.CommonUtils.getBallerinaMessage; +import static io.ballerina.stdlib.java.jms.CommonUtils.getDestination; +import static io.ballerina.stdlib.java.jms.CommonUtils.getOptionalStringProperty; +import static io.ballerina.stdlib.java.jms.Constants.DESTINATION; +import static io.ballerina.stdlib.java.jms.Constants.JMS_ERROR; +import static io.ballerina.stdlib.java.jms.Constants.NATIVE_CONSUMER; +import static io.ballerina.stdlib.java.jms.Constants.NATIVE_SESSION; + +/** + * Represents {@link javax.jms.MessageConsumer} related utility functions. + */ +public class JmsConsumer { + private static final BString CONSUMER_TYPE = StringUtils.fromString("type"); + private static final BString MESSAGE_SELECTOR = StringUtils.fromString("messageSelector"); + private static final BString NO_LOCAL = StringUtils.fromString("noLocal"); + private static final BString SUBSCRIBER_NAME = StringUtils.fromString("subscriberName"); + private static final String DURABLE = "DURABLE"; + private static final String SHARED = "SHARED"; + private static final String SHARED_DURABLE = "SHARED_DURABLE"; + private static final String DEFAULT = "DEFAULT"; + + /** + * Creates a {@link javax.jms.MessageConsumer} object with given {@link javax.jms.Session}. + * + * @param consumer Ballerina consumer object + * @param session Ballerina session object + * @param consumerOptions JMS MessageConsumer configurations + * @return A Ballerina `jms:Error` if the JMS provider fails to create the MessageConsumer due to some + * internal error + */ + public static Object init(BObject consumer, BObject session, BMap consumerOptions) { + Object nativeSession = session.getNativeData(NATIVE_SESSION); + if (Objects.isNull(nativeSession)) { + return ErrorCreator.createError(ModuleUtils.getModule(), JMS_ERROR, + StringUtils.fromString("Could not find the native JMS session"), null, null); + } + Session jmsSession = (Session) nativeSession; + try { + MessageConsumer jmsConsumer = createConsumer((Session) jmsSession, consumerOptions); + consumer.addNativeData(NATIVE_CONSUMER, jmsConsumer); + } catch (BallerinaJmsException exception) { + BError cause = ErrorCreator.createError(exception); + return ErrorCreator.createError(ModuleUtils.getModule(), JMS_ERROR, + StringUtils.fromString(exception.getMessage()), cause, null); + } catch (JMSException exception) { + BError cause = ErrorCreator.createError(exception); + return ErrorCreator.createError(ModuleUtils.getModule(), JMS_ERROR, + StringUtils.fromString("Error occurred while initializing the JMS MessageConsumer"), + cause, null); + } + return null; + } + + private static MessageConsumer createConsumer(Session session, BMap consumerOptions) + throws BallerinaJmsException, JMSException { + BMap destination = (BMap) consumerOptions.getMapValue(DESTINATION); + Destination jmsDestination = getDestination(session, destination); + String consumerType = consumerOptions.getStringValue(CONSUMER_TYPE).getValue(); + String messageSelector = consumerOptions.getStringValue(MESSAGE_SELECTOR).getValue(); + boolean noLocal = consumerOptions.getBooleanValue(NO_LOCAL); + Optional subscriberNameOpt = getOptionalStringProperty(consumerOptions, SUBSCRIBER_NAME); + if (!DEFAULT.equals(consumerType)) { + if (subscriberNameOpt.isEmpty()) { + throw new BallerinaJmsException( + String.format("Subscriber name cannot be empty for consumer type: %s", consumerType) + ); + } + } + + if (DEFAULT.equals(consumerType)) { + return session.createConsumer(jmsDestination, messageSelector, noLocal); + } else if (DURABLE.equals(consumerType)) { + return session.createDurableSubscriber( + (Topic) destination, subscriberNameOpt.get(), messageSelector, noLocal); + } else if (SHARED.equals(consumerType)) { + return session.createSharedConsumer((Topic) destination, subscriberNameOpt.get(), messageSelector); + } else { + return session.createSharedDurableConsumer((Topic) destination, subscriberNameOpt.get(), messageSelector); + } + } + + /** + * Receives the next message that arrives within the specified timeout interval. + * + * @param consumer Ballerina consumer object + * @param timeout The timeout value (in milliseconds) + * @return A Ballerina `jms:Error` if the JMS MessageConsumer fails to receive the message due to some error + * or else the next message produced for this message consumer, or null + */ + public static Object receive(BObject consumer, long timeout) { + Object nativeConsumer = consumer.getNativeData(NATIVE_CONSUMER); + if (Objects.isNull(nativeConsumer)) { + return ErrorCreator.createError(ModuleUtils.getModule(), JMS_ERROR, + StringUtils.fromString("Could not find the native JMS MessageConsumer"), + null, null); + } + try { + Message message = ((MessageConsumer) nativeConsumer).receive(timeout); + if (Objects.isNull(message)) { + return null; + } + return getBallerinaMessage(message); + } catch (JMSException exception) { + BError cause = ErrorCreator.createError(exception); + return ErrorCreator.createError(ModuleUtils.getModule(), Constants.JMS_ERROR, + StringUtils.fromString("Error occurred while receiving messages"), cause, null); + } catch (BallerinaJmsException exception) { + BError cause = ErrorCreator.createError(exception); + return ErrorCreator.createError(ModuleUtils.getModule(), Constants.JMS_ERROR, + StringUtils.fromString("Error occurred while processing the received messages"), + cause, null); + } catch (Exception exception) { + BError cause = ErrorCreator.createError(exception); + return ErrorCreator.createError(ModuleUtils.getModule(), Constants.JMS_ERROR, + StringUtils.fromString("Unknown error occurred while processing the received messages"), + cause, null); + } + } + + /** + * Receives the next message if one is immediately available. + * + * @param consumer Ballerina consumer object + * @return A Ballerina `jms:Error` if the JMS MessageConsumer fails to receive the message due to some error + * or else the next message produced for this message consumer, or null + */ + public static Object receiveNoWait(BObject consumer) { + Object nativeConsumer = consumer.getNativeData(NATIVE_CONSUMER); + if (Objects.isNull(nativeConsumer)) { + return ErrorCreator.createError(ModuleUtils.getModule(), JMS_ERROR, + StringUtils.fromString("Could not find the native JMS MessageConsumer"), + null, null); + } + try { + Message message = ((MessageConsumer) nativeConsumer).receiveNoWait(); + if (Objects.isNull(message)) { + return null; + } + return getBallerinaMessage(message); + } catch (JMSException exception) { + BError cause = ErrorCreator.createError(exception); + return ErrorCreator.createError(ModuleUtils.getModule(), Constants.JMS_ERROR, + StringUtils.fromString("Error occurred while receiving messages"), cause, null); + } catch (BallerinaJmsException exception) { + BError cause = ErrorCreator.createError(exception); + return ErrorCreator.createError(ModuleUtils.getModule(), Constants.JMS_ERROR, + StringUtils.fromString("Error occurred while processing the received messages"), + cause, null); + } catch (Exception exception) { + BError cause = ErrorCreator.createError(exception); + return ErrorCreator.createError(ModuleUtils.getModule(), Constants.JMS_ERROR, + StringUtils.fromString("Unknown error occurred while processing the received messages"), + cause, null); + } + } + + /** + * Closes the message consumer. + * + * @param consumer Ballerina consumer object + * @return A Ballerina `jms:Error` if the JMS provider fails to close the consumer due to some internal error + */ + public static Object close(BObject consumer) { + Object nativeConsumer = consumer.getNativeData(NATIVE_CONSUMER); + if (Objects.isNull(nativeConsumer)) { + return ErrorCreator.createError(ModuleUtils.getModule(), JMS_ERROR, + StringUtils.fromString("Could not find the native JMS MessageConsumer"), + null, null); + } + try { + ((MessageConsumer) nativeConsumer).close(); + } catch (JMSException exception) { + BError cause = ErrorCreator.createError(exception); + return ErrorCreator.createError(ModuleUtils.getModule(), Constants.JMS_ERROR, + StringUtils.fromString("Error occurred while closing the message consumer"), cause, null); + } + return null; + } + + /** + * Acknowledges all consumed messages of the session of this consumed message. + * + * @param message Ballerina JMS message + * @return A Ballerina `jms:Error` if the JMS provider fails to acknowledge the messages due to some internal error. + */ + public static Object acknowledge(BMap message) { + try { + Object nativeMessage = message.getNativeData(Constants.NATIVE_MESSAGE); + if (Objects.nonNull(nativeMessage)) { + ((Message) nativeMessage).acknowledge(); + } + } catch (JMSException exception) { + BError cause = ErrorCreator.createError(exception); + return ErrorCreator.createError(ModuleUtils.getModule(), Constants.JMS_ERROR, + StringUtils.fromString("Error occurred while sending acknowledgement for the message"), + cause, null); + } + return null; + } +} diff --git a/native/src/main/java/io/ballerina/stdlib/java.jms/JmsDestinationUtils.java b/native/src/main/java/io/ballerina/stdlib/java.jms/JmsDestinationUtils.java deleted file mode 100644 index 5439f23e..00000000 --- a/native/src/main/java/io/ballerina/stdlib/java.jms/JmsDestinationUtils.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Copyright (c) 2023, WSO2 LLC. (http://www.wso2.org) All Rights Reserved. - * - * WSO2 LLC. licenses this file to you under the Apache License, - * Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.ballerina.stdlib.java.jms; - -import javax.jms.Destination; -import javax.jms.Queue; -import javax.jms.TemporaryQueue; -import javax.jms.TemporaryTopic; -import javax.jms.Topic; - -/** - * Representation of {@link javax.jms.Destination} with utility methods to invoke as inter-op functions. - */ -public class JmsDestinationUtils { - private JmsDestinationUtils() {} - - /** - * Get the {@link javax.jms.Destination} type. - * - * @param destination {@link javax.jms.Destination} object - * @return Ballerina Tuple represent {@link javax.jms.Destination} - */ - public static String getDestinationType(Destination destination) { - String destinationType = null; - if (destination instanceof TemporaryQueue) { - destinationType = Constants.DESTINATION_TYPE_TEMP_QUEUE; - } else if (destination instanceof TemporaryTopic) { - destinationType = Constants.DESTINATION_TYPE_TEMP_TOPIC; - } else if (destination instanceof Queue) { - destinationType = Constants.DESTINATION_TYPE_QUEUE; - } else if (destination instanceof Topic) { - destinationType = Constants.DESTINATION_TYPE_TOPIC; - } - return destinationType; - } -} diff --git a/native/src/main/java/io/ballerina/stdlib/java.jms/JmsListener.java b/native/src/main/java/io/ballerina/stdlib/java.jms/JmsListener.java index cb9a5ace..41c7abc6 100644 --- a/native/src/main/java/io/ballerina/stdlib/java.jms/JmsListener.java +++ b/native/src/main/java/io/ballerina/stdlib/java.jms/JmsListener.java @@ -42,7 +42,7 @@ import static io.ballerina.runtime.api.TypeTags.OBJECT_TYPE_TAG; import static io.ballerina.runtime.api.TypeTags.RECORD_TYPE_TAG; -import static io.ballerina.stdlib.java.jms.ConsumerUtils.getBallerinaMessage; +import static io.ballerina.stdlib.java.jms.CommonUtils.getBallerinaMessage; /** * A {@link javax.jms.MessageListener} implementation. diff --git a/native/src/main/java/io/ballerina/stdlib/java.jms/JmsMapMessageUtils.java b/native/src/main/java/io/ballerina/stdlib/java.jms/JmsMapMessageUtils.java deleted file mode 100644 index 3ba48581..00000000 --- a/native/src/main/java/io/ballerina/stdlib/java.jms/JmsMapMessageUtils.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Copyright (c) 2023, WSO2 LLC. (http://www.wso2.org) All Rights Reserved. - * - * WSO2 LLC. licenses this file to you under the Apache License, - * Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.ballerina.stdlib.java.jms; - -import io.ballerina.runtime.api.creators.ErrorCreator; -import io.ballerina.runtime.api.creators.ValueCreator; -import io.ballerina.runtime.api.utils.StringUtils; -import io.ballerina.runtime.api.values.BError; -import io.ballerina.runtime.api.values.BString; - -import java.util.Collections; -import java.util.List; - -import javax.jms.JMSException; -import javax.jms.MapMessage; -import javax.jms.Message; - -import static io.ballerina.stdlib.java.jms.Constants.JMS_ERROR; - -/** - * Representation of {@link javax.jms.MapMessage} with utility methods to invoke as inter-op functions. - */ -public class JmsMapMessageUtils { - private JmsMapMessageUtils() {} - - /** - * Return all names in the {@link javax.jms.MapMessage} as Ballerina array. - * - * @param message {@link javax.jms.MapMessage} object - * @return Ballerina array consist of map names - */ - public static Object getJmsMapNames(MapMessage message) { - try { - List propertyNames = Collections.list(message.getMapNames()); - return ValueCreator.createArrayValue(propertyNames.toArray(new BString[0])); - } catch (JMSException e) { - BError cause = ErrorCreator.createError(e); - return ErrorCreator.createError(ModuleUtils.getModule(), JMS_ERROR, - StringUtils.fromString("Error occurred while getting property names"), cause, null); - } - } - - /** - * Returns the byte array value with the specified name. - * - * @param message {@link javax.jms.Message} object - * @param name Field name - * @return a copy of the byte array value with the specified name; if there is no item by this name, - * a null value is returned. - */ - public static Object getBytes(Message message, String name) { - try { - MapMessage m = (MapMessage) message; - byte[] bytearray = m.getBytes(name); - return ValueCreator.createArrayValue(bytearray); - } catch (JMSException e) { - BError cause = ErrorCreator.createError(e); - return ErrorCreator.createError(ModuleUtils.getModule(), JMS_ERROR, - StringUtils.fromString("Error occurred while getting property names."), cause, null); - } - } -} diff --git a/native/src/main/java/io/ballerina/stdlib/java.jms/JmsMessageListenerUtils.java b/native/src/main/java/io/ballerina/stdlib/java.jms/JmsMessageListenerUtils.java index 2c828390..cc7fc583 100644 --- a/native/src/main/java/io/ballerina/stdlib/java.jms/JmsMessageListenerUtils.java +++ b/native/src/main/java/io/ballerina/stdlib/java.jms/JmsMessageListenerUtils.java @@ -25,23 +25,30 @@ import io.ballerina.runtime.api.values.BError; import io.ballerina.runtime.api.values.BObject; +import java.util.Objects; + import javax.jms.JMSException; import javax.jms.MessageConsumer; import static io.ballerina.stdlib.java.jms.Constants.JMS_ERROR; +import static io.ballerina.stdlib.java.jms.Constants.NATIVE_CONSUMER; /** * Representation of {@link javax.jms.MessageListener} with utility methods to invoke as inter-op functions. */ public class JmsMessageListenerUtils { - private JmsMessageListenerUtils() { - } - public static Object setMessageListener(Environment environment, MessageConsumer consumer, + public static Object setMessageListener(Environment environment, BObject consumer, BObject serviceObject) { + Object nativeConsumer = consumer.getNativeData(NATIVE_CONSUMER); + if (Objects.isNull(nativeConsumer)) { + return ErrorCreator.createError(ModuleUtils.getModule(), JMS_ERROR, + StringUtils.fromString("Could not find the native JMS MessageConsumer"), + null, null); + } Runtime bRuntime = environment.getRuntime(); try { - consumer.setMessageListener(new JmsListener(serviceObject, bRuntime)); + ((MessageConsumer) nativeConsumer).setMessageListener(new JmsListener(serviceObject, bRuntime)); } catch (JMSException e) { BError cause = ErrorCreator.createError(e); return ErrorCreator.createError(ModuleUtils.getModule(), JMS_ERROR, diff --git a/native/src/main/java/io/ballerina/stdlib/java.jms/JmsMessageUtils.java b/native/src/main/java/io/ballerina/stdlib/java.jms/JmsMessageUtils.java deleted file mode 100644 index b6fd73bb..00000000 --- a/native/src/main/java/io/ballerina/stdlib/java.jms/JmsMessageUtils.java +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Copyright (c) 2023, WSO2 LLC. (http://www.wso2.org) All Rights Reserved. - * - * WSO2 LLC. licenses this file to you under the Apache License, - * Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.ballerina.stdlib.java.jms; - -import io.ballerina.runtime.api.creators.ErrorCreator; -import io.ballerina.runtime.api.creators.ValueCreator; -import io.ballerina.runtime.api.utils.StringUtils; -import io.ballerina.runtime.api.values.BArray; -import io.ballerina.runtime.api.values.BError; -import io.ballerina.runtime.api.values.BString; - -import java.util.Collections; -import java.util.List; - -import javax.jms.BytesMessage; -import javax.jms.JMSException; -import javax.jms.MapMessage; -import javax.jms.Message; -import javax.jms.StreamMessage; -import javax.jms.TextMessage; - -import static io.ballerina.stdlib.java.jms.Constants.JMS_ERROR; - -/** - * Representation of {@link Message} with utility methods to invoke as inter-op functions. - */ -public class JmsMessageUtils { - - private JmsMessageUtils() {} - - /** - * Check whether {@link Message} is {@link TextMessage}. - * - * @param message {@link Message} object - * @return true/false based on the evaluation - */ - public static boolean isTextMessage(Message message) { - return message instanceof TextMessage; - } - - /** - * Check whether {@link Message} is {@link MapMessage}. - * - * @param message {@link Message} object - * @return true/false based on the evaluation - */ - public static boolean isMapMessage(Message message) { - return message instanceof MapMessage; - } - - /** - * Check whether {@link Message} is {@link BytesMessage}. - * - * @param message {@link Message} object - * @return true/false based on the evaluation - */ - public static boolean isBytesMessage(Message message) { - return message instanceof BytesMessage; - } - - /** - * Check whether {@link Message} is {@link StreamMessage}. - * - * @param message {@link Message} object - * @return true/false based on the evaluation - */ - public static boolean isStreamMessage(Message message) { - return message instanceof StreamMessage; - } - - /** - * Return all property names in the {@link MapMessage} as Ballerina array. - * - * @param message {@link Message} object - * @return Ballerina array consist of property names - */ - public static Object getJmsPropertyNames(Message message) { - try { - List propertyNames = Collections.list(message.getPropertyNames()); - return ValueCreator.createArrayValue(propertyNames.toArray(new BString[0])); - } catch (JMSException e) { - BError cause = ErrorCreator.createError(e); - return ErrorCreator.createError(ModuleUtils.getModule(), JMS_ERROR, - StringUtils.fromString("Error occurred while getting property names."), cause, null); - } - } - - public static Object getJMSCorrelationIDAsBytes(Message message) { - try { - MapMessage m = (MapMessage) message; - return ValueCreator.createArrayValue(m.getJMSCorrelationIDAsBytes()); - } catch (JMSException e) { - BError cause = ErrorCreator.createError(e); - return ErrorCreator.createError(ModuleUtils.getModule(), JMS_ERROR, - StringUtils.fromString("Error occurred while getting property names."), cause, null); - } - } - - /** - * Set the JMS correlation id value as an array of byte. - * - * @param message {@link Message} object - * @param value correlation id value as an array of byte - */ - public static Object setJMSCorrelationIDAsBytes(Message message, BArray value) { - try { - byte[] correlationId = value.getBytes(); - message.setJMSCorrelationIDAsBytes(correlationId); - } catch (JMSException e) { - BError cause = ErrorCreator.createError(e); - return ErrorCreator.createError(ModuleUtils.getModule(), JMS_ERROR, - StringUtils.fromString("Error occurred while setting correlationId value as an array of bytes."), - cause, null); - } - return null; - } -} diff --git a/native/src/main/java/io/ballerina/stdlib/java.jms/JmsProducer.java b/native/src/main/java/io/ballerina/stdlib/java.jms/JmsProducer.java new file mode 100644 index 00000000..0e4e59a4 --- /dev/null +++ b/native/src/main/java/io/ballerina/stdlib/java.jms/JmsProducer.java @@ -0,0 +1,143 @@ +/* + * Copyright (c) 2023, WSO2 LLC. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.ballerina.stdlib.java.jms; + +import io.ballerina.runtime.api.creators.ErrorCreator; +import io.ballerina.runtime.api.utils.StringUtils; +import io.ballerina.runtime.api.values.BError; +import io.ballerina.runtime.api.values.BMap; +import io.ballerina.runtime.api.values.BObject; +import io.ballerina.runtime.api.values.BString; + +import java.util.Objects; + +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import static io.ballerina.stdlib.java.jms.CommonUtils.getDestination; +import static io.ballerina.stdlib.java.jms.CommonUtils.getDestinationOrNull; +import static io.ballerina.stdlib.java.jms.Constants.JMS_ERROR; +import static io.ballerina.stdlib.java.jms.Constants.NATIVE_PRODUCER; +import static io.ballerina.stdlib.java.jms.Constants.NATIVE_SESSION; + +/** + * Representation of {@link javax.jms.MessageProducer} with utility methods to invoke as inter-op functions. + */ +public class JmsProducer { + + /** + * Creates a {@link javax.jms.MessageProducer} object with given {@link javax.jms.Session}. + * + * @param producer Ballerina producer object + * @param session Ballerina session object + * @param destination Relevant JMS destination + * @return A Ballerina `jms:Error` if the JMS provider fails to create the MessageProducer due to some + * internal error + */ + public static Object init(BObject producer, BObject session, Object destination) { + Object nativeSession = session.getNativeData(NATIVE_SESSION); + if (Objects.isNull(nativeSession)) { + return ErrorCreator.createError(ModuleUtils.getModule(), JMS_ERROR, + StringUtils.fromString("Could not find the native JMS session"), null, null); + } + Session jmsSession = (Session) nativeSession; + try { + Destination jmsDestination = getDestinationOrNull(jmsSession, destination); + MessageProducer jmsProducer = jmsSession.createProducer(jmsDestination); + producer.addNativeData(NATIVE_PRODUCER, jmsProducer); + } catch (BallerinaJmsException exception) { + BError cause = ErrorCreator.createError(exception); + return ErrorCreator.createError(ModuleUtils.getModule(), JMS_ERROR, + StringUtils.fromString(exception.getMessage()), cause, null); + } catch (JMSException exception) { + BError cause = ErrorCreator.createError(exception); + return ErrorCreator.createError(ModuleUtils.getModule(), JMS_ERROR, + StringUtils.fromString("Error occurred while initializing the JMS MessageProducer"), + cause, null); + } + return null; + } + + /** + * Sends a message using the {@code MessageProducer}'s default delivery mode, priority, and time to live. + * + * @param producer Ballerina producer object + * @param message The JMS message + * @return A Ballerina `jms:Error` if the JMS MessageProducer fails to send the message due to some error + */ + public static Object send(BObject producer, Message message) { + Object nativeProducer = producer.getNativeData(NATIVE_PRODUCER); + if (Objects.isNull(nativeProducer)) { + return ErrorCreator.createError(ModuleUtils.getModule(), JMS_ERROR, + StringUtils.fromString("Could not find the native JMS MessageProducer"), + null, null); + } + try { + ((MessageProducer) nativeProducer).send(message); + } catch (JMSException exception) { + BError cause = ErrorCreator.createError(exception); + return ErrorCreator.createError(ModuleUtils.getModule(), JMS_ERROR, + StringUtils.fromString("Error occurred while sending a message to the JMS provider"), + cause, null); + } + return null; + } + + /** + * Sends a message to a destination for an unidentified message producer using the {@code MessageProducer}'s + * default delivery mode, priority, and time to live. + * + * @param producer Ballerina producer object + * @param session Ballerina session object + * @param destination Relevant JMS destination + * @param message The JMS message + * @return A Ballerina `jms:Error` if the JMS MessageProducer fails to send the message due to some error + */ + public static Object sendTo(BObject producer, BObject session, BMap destination, + Message message) { + Object nativeProducer = producer.getNativeData(NATIVE_PRODUCER); + if (Objects.isNull(nativeProducer)) { + return ErrorCreator.createError(ModuleUtils.getModule(), JMS_ERROR, + StringUtils.fromString("Could not find the native JMS MessageProducer"), + null, null); + } + Object nativeSession = session.getNativeData(NATIVE_SESSION); + if (Objects.isNull(nativeSession)) { + return ErrorCreator.createError(ModuleUtils.getModule(), JMS_ERROR, + StringUtils.fromString("Could not find the native JMS session"), null, null); + } + try { + Destination jmsDestination = getDestination((Session) nativeSession, destination); + ((MessageProducer) nativeProducer).send(jmsDestination, message); + } catch (BallerinaJmsException exception) { + BError cause = ErrorCreator.createError(exception); + return ErrorCreator.createError(ModuleUtils.getModule(), JMS_ERROR, + StringUtils.fromString(exception.getMessage()), cause, null); + } catch (JMSException exception) { + BError cause = ErrorCreator.createError(exception); + return ErrorCreator.createError(ModuleUtils.getModule(), JMS_ERROR, + StringUtils.fromString("Error occurred while initializing the JMS MessageProducer"), + cause, null); + } + return null; + } +} diff --git a/native/src/main/java/io/ballerina/stdlib/java.jms/JmsSession.java b/native/src/main/java/io/ballerina/stdlib/java.jms/JmsSession.java new file mode 100644 index 00000000..549ec791 --- /dev/null +++ b/native/src/main/java/io/ballerina/stdlib/java.jms/JmsSession.java @@ -0,0 +1,140 @@ +/* + * Copyright (c) 2023, WSO2 LLC. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.ballerina.stdlib.java.jms; + +import io.ballerina.runtime.api.creators.ErrorCreator; +import io.ballerina.runtime.api.utils.StringUtils; +import io.ballerina.runtime.api.values.BError; +import io.ballerina.runtime.api.values.BObject; +import io.ballerina.runtime.api.values.BString; + +import java.util.Objects; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Session; + +import static io.ballerina.stdlib.java.jms.Constants.JMS_ERROR; +import static io.ballerina.stdlib.java.jms.Constants.NATIVE_CONNECTION; +import static io.ballerina.stdlib.java.jms.Constants.NATIVE_SESSION; + +/** + * Representation of {@link javax.jms.Session} with utility methods to invoke as inter-op functions. + */ +public class JmsSession { + private static final String TEXT = "TEXT"; + private static final String MAP = "MAP"; + + /** + * Creates a {@link javax.jms.Session} object with given {@link javax.jms.Connection}. + * + * @param session Ballerina session object + * @param connection Ballerina connection object + * @param ackMode Acknowledgment mode + * @return A Ballerina `jms:Error` if the JMS provider fails to create the connection due to some internal error + */ + public static Object init(BObject session, BObject connection, BString ackMode) { + int sessionAckMode = getSessionAckMode(ackMode.getValue()); + try { + Object nativeConnection = connection.getNativeData(NATIVE_CONNECTION); + if (Objects.isNull(nativeConnection)) { + return ErrorCreator.createError(ModuleUtils.getModule(), JMS_ERROR, + StringUtils.fromString("Could not find the native JMS connection"), null, null); + } + boolean transacted = Session.SESSION_TRANSACTED == sessionAckMode; + Session jmsSession = ((Connection) nativeConnection).createSession(transacted, sessionAckMode); + session.addNativeData(NATIVE_SESSION, jmsSession); + } catch (JMSException e) { + BError cause = ErrorCreator.createError(e); + return ErrorCreator.createError(ModuleUtils.getModule(), JMS_ERROR, + StringUtils.fromString(String.format("Error while creating session: %s", e.getMessage())), + cause, null); + } + return null; + } + + private static int getSessionAckMode(String ackMode) { + if (Constants.SESSION_TRANSACTED_MODE.equals(ackMode)) { + return Session.SESSION_TRANSACTED; + } else if (Constants.AUTO_ACKNOWLEDGE_MODE.equals(ackMode)) { + return Session.AUTO_ACKNOWLEDGE; + } else if (Constants.CLIENT_ACKNOWLEDGE_MODE.equals(ackMode)) { + return Session.CLIENT_ACKNOWLEDGE; + } else { + return Session.DUPS_OK_ACKNOWLEDGE; + } + } + + /** + * Unsubscribes a durable subscription that has been created by a client. + * + * @param session Ballerina session object + * @param subscriptionId Subscriber ID + * @return A Ballerina `jms:Error` if the session fails to unsubscribe to the durable subscription due to some + * internal error. + */ + public static Object unsubscribe(BObject session, BString subscriptionId) { + Object nativeSession = session.getNativeData(NATIVE_SESSION); + if (Objects.isNull(nativeSession)) { + return ErrorCreator.createError(ModuleUtils.getModule(), JMS_ERROR, + StringUtils.fromString("Could not find the native JMS session"), null, null); + } + try { + ((Session) nativeSession).unsubscribe(subscriptionId.getValue()); + } catch (JMSException exception) { + BError cause = ErrorCreator.createError(exception); + return ErrorCreator.createError(ModuleUtils.getModule(), JMS_ERROR, + StringUtils.fromString(String.format("Error while creating session: %s", exception.getMessage())), + cause, null); + } + return null; + } + + /** + * Creates a JMS message. + * + * @param session Ballerina session object + * @param messageType JMS message type + * @return {@link javax.jms.Message} or Ballerina `jms:Error` if the JMS provider fails to create this message due + * to some internal error. + */ + public static Object createJmsMessage(BObject session, BString messageType) { + Object nativeSession = session.getNativeData(NATIVE_SESSION); + if (Objects.isNull(nativeSession)) { + return ErrorCreator.createError(ModuleUtils.getModule(), JMS_ERROR, + StringUtils.fromString("Could not find the native JMS session"), null, null); + } + String jmsMessageType = messageType.getValue(); + try { + // currently ballerina JMS only support `Text`, `Map` and, `Bytes` message types + if (TEXT.equals(jmsMessageType)) { + return ((Session) nativeSession).createTextMessage(); + } else if (MAP.equals(jmsMessageType)) { + return ((Session) nativeSession).createMapMessage(); + } else { + return ((Session) nativeSession).createBytesMessage(); + } + } catch (JMSException exception) { + BError cause = ErrorCreator.createError(exception); + return ErrorCreator.createError(ModuleUtils.getModule(), JMS_ERROR, + StringUtils.fromString(String.format("Error while creating JMS message: %s", + exception.getMessage())), cause, null); + } + } +} diff --git a/native/src/main/java/io/ballerina/stdlib/java.jms/JmsSessionUtils.java b/native/src/main/java/io/ballerina/stdlib/java.jms/JmsSessionUtils.java deleted file mode 100644 index 6a0617d6..00000000 --- a/native/src/main/java/io/ballerina/stdlib/java.jms/JmsSessionUtils.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Copyright (c) 2023, WSO2 LLC. (http://www.wso2.org) All Rights Reserved. - * - * WSO2 LLC. licenses this file to you under the Apache License, - * Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.ballerina.stdlib.java.jms; - -import io.ballerina.runtime.api.creators.ErrorCreator; -import io.ballerina.runtime.api.utils.StringUtils; -import io.ballerina.runtime.api.values.BError; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.jms.Connection; -import javax.jms.JMSException; -import javax.jms.Session; -import javax.jms.TemporaryQueue; -import javax.jms.TemporaryTopic; - -import static io.ballerina.stdlib.java.jms.Constants.JMS_ERROR; - -/** - * Representation of {@link javax.jms.Session} with utility methods to invoke as inter-op functions. - */ -public class JmsSessionUtils { - private static final Logger LOGGER = LoggerFactory.getLogger(JmsSessionUtils.class); - - private JmsSessionUtils() {} - - /** - * Creates a {@link javax.jms.Session} object with given {@link javax.jms.Connection}. - * - * @param connection {@link javax.jms.Connection} object - * @param ackModeString Acknowledgment mode - * @return {@link javax.jms.Session} object - * @throws BallerinaJmsException in an error situation - */ - public static Object createJmsSession(Connection connection, String ackModeString) throws BallerinaJmsException { - - int sessionAckMode; - boolean transactedSession = false; - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Session ack mode string: {}", ackModeString); - } - - switch (ackModeString) { - case Constants.CLIENT_ACKNOWLEDGE_MODE: - sessionAckMode = Session.CLIENT_ACKNOWLEDGE; - break; - case Constants.SESSION_TRANSACTED_MODE: - sessionAckMode = Session.SESSION_TRANSACTED; - transactedSession = true; - break; - case Constants.DUPS_OK_ACKNOWLEDGE_MODE: - sessionAckMode = Session.DUPS_OK_ACKNOWLEDGE; - break; - case Constants.AUTO_ACKNOWLEDGE_MODE: - sessionAckMode = Session.AUTO_ACKNOWLEDGE; - break; - default: - return ErrorCreator.createError(ModuleUtils.getModule(), JMS_ERROR, - StringUtils.fromString(String.format("Unknown acknowledgment mode: %s", ackModeString)), - null, null); - } - - try { - return connection.createSession(transactedSession, sessionAckMode); - } catch (JMSException e) { - BError cause = ErrorCreator.createError(e); - return ErrorCreator.createError(ModuleUtils.getModule(), JMS_ERROR, - StringUtils.fromString(String.format("Error while creating session: %s", e.getMessage())), - cause, null); - } - } - - /** - * Creates a {@link javax.jms.TemporaryQueue} object. - * - * @param session {@link javax.jms.Session} object - * @return return temporary queue name - */ - public static Object createTemporaryJmsQueue(Session session) { - try { - TemporaryQueue temporaryQueue = session.createTemporaryQueue(); - return temporaryQueue.getQueueName(); - } catch (JMSException e) { - BError cause = ErrorCreator.createError(e); - return ErrorCreator.createError(ModuleUtils.getModule(), JMS_ERROR, - StringUtils.fromString("Error creating temporary queue."), cause, null); - } - } - - /** - * Creates a {@link javax.jms.TemporaryTopic} object. - * - * @param session {@link javax.jms.Session} object - * @return return temporary topic name - */ - public static Object createTemporaryJmsTopic(Session session) { - try { - TemporaryTopic temporaryTopic = session.createTemporaryTopic(); - return temporaryTopic.getTopicName(); - } catch (JMSException e) { - BError cause = ErrorCreator.createError(e); - return ErrorCreator.createError(ModuleUtils.getModule(), JMS_ERROR, - StringUtils.fromString("Error creating temporary topic."), cause, null); - } - } -}