diff --git a/ballerina/Ballerina.toml b/ballerina/Ballerina.toml index 4ad991c9..6221b20a 100644 --- a/ballerina/Ballerina.toml +++ b/ballerina/Ballerina.toml @@ -26,3 +26,15 @@ groupId = "javax.jms" artifactId = "javax.jms-api" version = "2.0.1" path = "./lib/javax.jms-api-2.0.1.jar" + +[[platform.java11.dependency]] +path = "./lib/activemq-client-5.18.2.jar" +scope = "testOnly" + +[[platform.java11.dependency]] +path = "./lib/geronimo-j2ee-management_1.1_spec-1.0.1.jar" +scope = "testOnly" + +[[platform.java11.dependency]] +path = "./lib/hawtbuf-1.11.jar" +scope = "testOnly" diff --git a/ballerina/Dependencies.toml b/ballerina/Dependencies.toml index 594d611f..2811df4a 100644 --- a/ballerina/Dependencies.toml +++ b/ballerina/Dependencies.toml @@ -15,12 +15,48 @@ modules = [ {org = "ballerina", packageName = "jballerina.java", moduleName = "jballerina.java"} ] +[[package]] +org = "ballerina" +name = "lang.error" +version = "0.0.0" +scope = "testOnly" +dependencies = [ + {org = "ballerina", name = "jballerina.java"} +] + +[[package]] +org = "ballerina" +name = "lang.runtime" +version = "0.0.0" +scope = "testOnly" +dependencies = [ + {org = "ballerina", name = "jballerina.java"} +] +modules = [ + {org = "ballerina", packageName = "lang.runtime", moduleName = "lang.runtime"} +] + +[[package]] +org = "ballerina" +name = "test" +version = "0.0.0" +scope = "testOnly" +dependencies = [ + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "lang.error"} +] +modules = [ + {org = "ballerina", packageName = "test", moduleName = "test"} +] + [[package]] org = "ballerinax" name = "java.jms" version = "0.1.3" dependencies = [ - {org = "ballerina", name = "jballerina.java"} + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "lang.runtime"}, + {org = "ballerina", name = "test"} ] modules = [ {org = "ballerinax", packageName = "java.jms", moduleName = "java.jms"} diff --git a/ballerina/build.gradle b/ballerina/build.gradle index 2c404fa4..f208eca7 100644 --- a/ballerina/build.gradle +++ b/ballerina/build.gradle @@ -59,7 +59,7 @@ ballerina { packageOrganization = packageOrg module = packageName langVersion = ballerinaLangVersion - testCoverageParam = "--code-coverage --coverage-format=xml --includes=io.ballerina.stdlib.*:stdlib.java.jms*" + testCoverageParam = "--code-coverage --coverage-format=xml --includes=*" } configurations { @@ -75,6 +75,19 @@ dependencies { externalJars(group: 'javax.jms', name: 'javax.jms-api', version: "${javaxJmsVersion}") { transitive = false } + + /** + Test related dependencies + */ + externalJars(group: 'org.apache.activemq', name: 'activemq-client', version: "${activemqClientVersion}") { + transitive = false + } + externalJars(group: 'org.apache.geronimo.specs', name: 'geronimo-j2ee-management_1.1_spec', version: "${geronimoJ2eeMng11SpecVersion}") { + transitive = false + } + externalJars(group: 'org.fusesource.hawtbuf', name: 'hawtbuf', version: "${hawtbufVersion}") { + transitive = false + } } task updateTomlFiles { @@ -85,6 +98,9 @@ task updateTomlFiles { newConfig = newConfig.replace("@toml.version@", tomlVersion) newConfig = newConfig.replace("@slf4j.version@", stdlibDependentSlf4jVersion) newConfig = newConfig.replace("@javax.jms.version@", stdlibDependentJavaxJmsVersion) + newConfig = newConfig.replace('@activemq.client.version@', project.activemqClientVersion) + newConfig = newConfig.replace('@geronimoj2ee.spec.version@', project.geronimoJ2eeMng11SpecVersion) + newConfig = newConfig.replace('@hawtbuf.version@', project.hawtbufVersion) ballerinaTomlFile.text = newConfig } } @@ -102,51 +118,51 @@ task commitTomlFiles { } } -//task startMqttServer() { -// doLast { -// if (!Os.isFamily(Os.FAMILY_WINDOWS)) { -// def stdOut = new ByteArrayOutputStream() -// exec { -// commandLine 'sh', '-c', "docker ps --filter name=mqtt-test" -// standardOutput = stdOut -// } -// if (!stdOut.toString().contains("mqtt-test")) { -// println "Starting Mqtt server." -// exec { -// commandLine 'sh', '-c', "docker-compose -f tests/resources/docker-compose.yaml up -d" -// standardOutput = stdOut -// } -// println stdOut.toString() -// sleep(5 * 1000) -// } else { -// println "Mqtt server is already running." -// } -// } -// } -//} -// -//task stopMqttServer() { -// doLast { -// if (!Os.isFamily(Os.FAMILY_WINDOWS)) { -// def stdOut = new ByteArrayOutputStream() -// exec { -// commandLine 'sh', '-c', "docker ps --filter name=mqtt-test" -// standardOutput = stdOut -// } -// if (stdOut.toString().contains("mqtt-test")) { -// println "Stopping Mqtt server." -// exec { -// commandLine 'sh', '-c', "docker-compose -f tests/resources/docker-compose.yaml rm -svf" -// standardOutput = stdOut -// } -// println stdOut.toString() -// sleep(5 * 1000) -// } else { -// println "Mqtt server is not started." -// } -// } -// } -//} +task startActiveMQServer() { + doLast { + if (!Os.isFamily(Os.FAMILY_WINDOWS)) { + def stdOut = new ByteArrayOutputStream() + exec { + commandLine 'sh', '-c', "docker ps --filter name=activemq-test" + standardOutput = stdOut + } + if (!stdOut.toString().contains("activemq-test")) { + println "Starting ActiveMQ server." + exec { + commandLine 'sh', '-c', "docker-compose -f tests/resources/docker-compose.yaml up -d" + standardOutput = stdOut + } + println stdOut.toString() + sleep(5 * 1000) + } else { + println "ActiveMQ server is already running." + } + } + } +} + +task stopActiveMQServer() { + doLast { + if (!Os.isFamily(Os.FAMILY_WINDOWS)) { + def stdOut = new ByteArrayOutputStream() + exec { + commandLine 'sh', '-c', "docker ps --filter name=activemq-test" + standardOutput = stdOut + } + if (stdOut.toString().contains("activemq-test")) { + println "Stopping ActiveMQ server." + exec { + commandLine 'sh', '-c', "docker-compose -f tests/resources/docker-compose.yaml rm -svf" + standardOutput = stdOut + } + println stdOut.toString() + sleep(5 * 1000) + } else { + println "ActiveMQ server is not started." + } + } + } +} publishing { publications { @@ -168,8 +184,8 @@ publishing { updateTomlFiles.dependsOn copyStdlibs -//test.dependsOn startMqttServer -//build.finalizedBy stopMqttServer +test.dependsOn startActiveMQServer +build.finalizedBy stopActiveMQServer build.dependsOn ":java.jms-native:build" build.dependsOn "generatePomFileForMavenPublication" diff --git a/ballerina/message_producer.bal b/ballerina/message_producer.bal index d2bcd650..70c18b34 100644 --- a/ballerina/message_producer.bal +++ b/ballerina/message_producer.bal @@ -75,23 +75,27 @@ isolated function getJmsMessage(Session session, Message message) returns handle return jmsMessage; } +const string TEXT = "TEXT"; +const string BYTES = "BYTES"; +const string MAP = "MAP"; + isolated function constructJmsMessage(Session session, Message message) returns handle|Error { if message is TextMessage { - handle jmsMessage = check session.createJmsMessage("TEXT"); + handle jmsMessage = check session.createJmsMessage(TEXT); error? result = trap externWriteText(jmsMessage, java:fromString(message.content)); if result is error { return error Error(result.message()); } return jmsMessage; } else if message is BytesMessage { - handle jmsMessage = check session.createJmsMessage("BYTES"); + handle jmsMessage = check session.createJmsMessage(BYTES); error? result = trap externWriteBytes(jmsMessage, message.content); if result is error { return error Error(result.message()); } return jmsMessage; } else if message is MapMessage { - handle jmsMessage = check session.createJmsMessage("MAP"); + handle jmsMessage = check session.createJmsMessage(MAP); error? result = trap populateMapMessage(jmsMessage, message.content); if result is error { return error Error(result.message()); @@ -101,7 +105,8 @@ isolated function constructJmsMessage(Session session, Message message) returns return error Error("Unidentified message type"); } -isolated function updateReplyToMessageField(Session session, handle jmsMessage, Destination? replyTo = ()) returns Error? { +isolated function updateReplyToMessageField(Session session, handle jmsMessage, + Destination? replyTo = ()) returns Error? { if replyTo is () { return; } diff --git a/ballerina/resources/.keep b/ballerina/resources/.keep deleted file mode 100644 index e69de29b..00000000 diff --git a/ballerina/tests/jms_connection_tests.bal b/ballerina/tests/jms_connection_tests.bal new file mode 100644 index 00000000..c2853f8f --- /dev/null +++ b/ballerina/tests/jms_connection_tests.bal @@ -0,0 +1,168 @@ +// Copyright (c) 2023, WSO2 LLC. (http://www.wso2.org) All Rights Reserved. +// +// WSO2 LLC. licenses this file to you under the Apache License, +// Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import ballerina/lang.runtime; +import ballerina/test; + +@test:Config { + groups: ["connection"] +} +isolated function testCreateConnectionSuccess() returns error? { + Connection connection = check new ( + initialContextFactory = "org.apache.activemq.jndi.ActiveMQInitialContextFactory", + providerUrl = "tcp://localhost:61616" + ); + check connection->close(); +} + +@test:Config { + groups: ["connection"] +} +isolated function testCreateConnectionInvalidInitialContextFactory() returns error? { + Connection|Error connection = new ( + initialContextFactory = "io.sample.SampleMQInitialContextFactory", + providerUrl = "tcp://localhost:61616" + ); + test:assertTrue(connection is Error, + "Connection created with invalid initial context factory"); + if connection is Error { + test:assertEquals(connection.message(), + "Error occurred while connecting to broker: Cannot instantiate class: io.sample.SampleMQInitialContextFactory", + "Invalid connection init error message"); + } +} + +@test:Config { + groups: ["connection"] +} +isolated function testCreateConnectionInvalidProviderUrl() returns error? { + Connection|Error connection = new ( + initialContextFactory = "org.apache.activemq.jndi.ActiveMQInitialContextFactory", + providerUrl = "tcp://localhost:61615" + ); + test:assertTrue(connection is Error, + "Connection created with invalid provider URL"); + if connection is Error { + test:assertEquals(connection.message(), + "Error occurred while connecting to broker: Could not connect to broker URL: tcp://localhost:61615. Reason: java.net.ConnectException: Connection refused (Connection refused)", + "Invalid connection init error message"); + } +} + +// @test:Config { +// groups: ["connection"] +// } +isolated function testCreateConnectionInvalidCredentials() returns error? { + Connection|Error connection = new ( + initialContextFactory = "org.apache.activemq.jndi.ActiveMQInitialContextFactory", + providerUrl = "tcp://localhost:61616", + username = "testuser", + password = "testpassword" + ); + test:assertTrue(connection is Error, + "Connection created with invalid credentials"); + if connection is Error { + test:assertEquals(connection.message(), + "Error occurred while connecting to broker: Could not connect to broker URL: tcp://localhost:61615. Reason: java.net.ConnectException: Connection refused (Connection refused)", + "Invalid connection init error message"); + } +} + +@test:Config { + groups: ["connection"] +} +isolated function testConnectionRestart() returns error? { + Connection connection = check new ( + initialContextFactory = "org.apache.activemq.jndi.ActiveMQInitialContextFactory", + providerUrl = "tcp://localhost:61616" + ); + check connection->stop(); + runtime:sleep(2); + check connection->'start(); + runtime:sleep(2); + check connection->close(); +} + +@test:Config { + groups: ["connection"] +} +isolated function testConnectionRestartAfterClosing() returns error? { + Connection connection = check new ( + initialContextFactory = "org.apache.activemq.jndi.ActiveMQInitialContextFactory", + providerUrl = "tcp://localhost:61616" + ); + check connection->close(); + runtime:sleep(2); + Error? result = connection->'start(); + test:assertTrue(result is Error, + "Connection restarted successfully after closing"); + if result is Error { + test:assertEquals(result.message(), + "Error occurred while starting the connection: The connection is already closed", + "Invalid connection restart error message"); + } +} + +@test:Config { + groups: ["connection"] +} +isolated function testConnectionStopAfterClosing() returns error? { + Connection connection = check new ( + initialContextFactory = "org.apache.activemq.jndi.ActiveMQInitialContextFactory", + providerUrl = "tcp://localhost:61616" + ); + check connection->close(); + runtime:sleep(2); + Error? result = connection->stop(); + test:assertTrue(result is Error, + "Connection stopped successfully after closing"); + if result is Error { + test:assertEquals(result.message(), + "Error occurred while stopping the connection: The connection is already closed", + "Invalid connection restart error message"); + } +} + +@test:Config { + groups: ["connection"] +} +isolated function testCreateSession() returns error? { + Connection connection = check new ( + initialContextFactory = "org.apache.activemq.jndi.ActiveMQInitialContextFactory", + providerUrl = "tcp://localhost:61616" + ); + Session session = check connection->createSession(); + check session->close(); + check connection->close(); +} + +@test:Config { + groups: ["connection"] +} +isolated function testCreateSessionAfterConnectionClose() returns error? { + Connection connection = check new ( + initialContextFactory = "org.apache.activemq.jndi.ActiveMQInitialContextFactory", + providerUrl = "tcp://localhost:61616" + ); + check connection->close(); + Session|Error session = connection->createSession(); + test:assertTrue(session is Error, "Created session after connection closed"); + if session is Error { + test:assertEquals(session.message(), + "Error while creating session: The connection is already closed", + "Invalid session creation failure message"); + } +} diff --git a/ballerina/tests/jms_consumer_tests.bal b/ballerina/tests/jms_consumer_tests.bal new file mode 100644 index 00000000..ed35d9f5 --- /dev/null +++ b/ballerina/tests/jms_consumer_tests.bal @@ -0,0 +1,169 @@ +// Copyright (c) 2023, WSO2 LLC. (http://www.wso2.org) All Rights Reserved. +// +// WSO2 LLC. licenses this file to you under the Apache License, +// Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import ballerina/lang.runtime; +import ballerina/test; + +final MessageProducer queue7Producer = check createProducer(AUTO_ACK_SESSION, { + 'type: QUEUE, + name: "test-queue-7" +}); +final MessageConsumer queue7Consumer = check createConsumer(AUTO_ACK_SESSION, destination = { + 'type: QUEUE, + name: "test-queue-7" +}); + +@test:Config { + groups: ["consumer"] +} +isolated function testReceiveNoWaitWithQueue() returns error? { + Message? response = check queue7Consumer->receiveNoWait(); + test:assertTrue(response is (), "Received a message for non-existing scenario"); + + TextMessage message = { + content: "This is a sample message" + }; + check queue7Producer->send(message); + response = check queue7Consumer->receiveNoWait(); + test:assertTrue(response is TextMessage, "Received a invalid message type"); + if response is TextMessage { + test:assertEquals(response.content, "This is a sample message", "Invalid content received"); + } +} + +@test:Config { + groups: ["consumer"] +} +isolated function testRequestReplyWithQueue() returns error? { + TextMessage requestMessage = { + content: "This is a request message", + correlationId: "cid-123", + replyTo: { + 'type: QUEUE, + name: "reply-queue" + } + }; + check queue7Producer->send(requestMessage); + + Message? request = check queue7Consumer->receive(5000); + test:assertTrue(request is TextMessage, "Invalid message received"); + if request is TextMessage { + MessageProducer replyProducer = check createProducer(AUTO_ACK_SESSION, { + 'type: QUEUE, + name: "reply-queue" + }); + test:assertTrue(request.correlationId is string, "Could not find the correlation Id"); + TextMessage replyMessage = { + content: "This is a reply message" + }; + replyMessage.correlationId = check request.correlationId.ensureType(); + check replyProducer->send(replyMessage); + check replyProducer->close(); + } +} + +@test:Config { + groups: ["consumer"] +} +isolated function testRequestReplyWithTempQueue() returns error? { + TextMessage requestMessage = { + content: "This is a request message", + correlationId: "cid-123", + replyTo: { + 'type: TEMPORARY_QUEUE, + name: "temp-reply-queue" + } + }; + check queue7Producer->send(requestMessage); + + Message? request = check queue7Consumer->receive(5000); + test:assertTrue(request is TextMessage, "Invalid message received"); + if request is TextMessage { + test:assertTrue(request.replyTo is Destination, "Could not find the replyTo destination in a request-message"); + Destination replyTo = check request.replyTo.ensureType(); + MessageProducer replyProducer = check createProducer(AUTO_ACK_SESSION, replyTo); + test:assertTrue(request.correlationId is string, "Could not find the correlation Id"); + TextMessage replyMessage = { + content: "This is a reply message" + }; + replyMessage.correlationId = check request.correlationId.ensureType(); + check replyProducer->send(replyMessage); + check replyProducer->close(); + } +} + +@test:Config { + groups: ["consumer"] +} +isolated function testReceiveMapMessageWithMultipleTypes() returns error? { + map content = { + intPayload: 1, + floatPayload: 1.0, + strPayload: "This is a sample message", + bytePayload: "This is a sample message".toBytes(), + boolPayload: true, + decimalField: 12.22, + byteField: 1 + }; + MapMessage message = { + content: content + }; + check queue7Producer->send(message); + runtime:sleep(2); + Message? response = check queue7Consumer->receiveNoWait(); + test:assertTrue(response is MapMessage, "Received a invalid message type"); + if response is MapMessage { + test:assertEquals(response.content, content, "Invalid content received"); + } +} + +final MessageProducer topic7Producer = check createProducer(AUTO_ACK_SESSION, { + 'type: TOPIC, + name: "test-topic-7" +}); +final MessageConsumer topic7Consumer = check createConsumer(AUTO_ACK_SESSION, destination = { + 'type: TOPIC, + name: "test-topic-7" +}); + +@test:Config { + groups: ["consumer"] +} +isolated function testReceiveNoWaitWithTopic() returns error? { + Message? response = check topic7Consumer->receiveNoWait(); + test:assertTrue(response is (), "Received a message for non-existing scenario"); + + TextMessage message = { + content: "This is a sample message" + }; + check topic7Producer->send(message); + runtime:sleep(2); + response = check topic7Consumer->receiveNoWait(); + test:assertTrue(response is TextMessage, "Received a invalid message type"); + if response is TextMessage { + test:assertEquals(response.content, "This is a sample message", "Invalid content received"); + } +} + +@test:AfterGroups { + value: ["consumer"] +} +isolated function afterConsumerTests() returns error? { + check queue7Producer->close(); + check queue7Consumer->close(); + check topic7Producer->close(); + check topic7Consumer->close(); +} diff --git a/ballerina/tests/jms_message_listener_tests.bal b/ballerina/tests/jms_message_listener_tests.bal new file mode 100644 index 00000000..43d31c18 --- /dev/null +++ b/ballerina/tests/jms_message_listener_tests.bal @@ -0,0 +1,442 @@ +// Copyright (c) 2023, WSO2 LLC. (http://www.wso2.org) All Rights Reserved. +// +// WSO2 LLC. licenses this file to you under the Apache License, +// Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import ballerina/lang.runtime; +import ballerina/test; + +final MessageProducer queue3Producer = check createProducer(AUTO_ACK_SESSION, { + 'type: QUEUE, + name: "test-queue-3" +}); +final Listener queue3Listener = check new ( + connectionConfig = { + initialContextFactory: "org.apache.activemq.jndi.ActiveMQInitialContextFactory", + providerUrl: "tcp://localhost:61616" + }, + consumerOptions = { + destination: { + 'type: QUEUE, + name: "test-queue-3" + } + } +); +isolated int queue3ServiceReceivedMessageCount = 0; +isolated boolean queue3ServiceReceivedTextMsg = false; +isolated boolean queue3ServiceReceivedMapMsg = false; +isolated boolean queue3ServiceReceivedBytesMsg = false; + +final MessageProducer topic3Producer = check createProducer(AUTO_ACK_SESSION, { + 'type: TOPIC, + name: "test-topic-3" +}); +final Listener topic3Listener = check new ( + connectionConfig = { + initialContextFactory: "org.apache.activemq.jndi.ActiveMQInitialContextFactory", + providerUrl: "tcp://localhost:61616" + }, + consumerOptions = { + destination: { + 'type: TOPIC, + name: "test-topic-3" + } + } +); +isolated int topic3ServiceReceivedMessageCount = 0; +isolated boolean topic3ServiceReceivedTextMsg = false; +isolated boolean topic3ServiceReceivedMapMsg = false; +isolated boolean topic3ServiceReceivedBytesMsg = false; + +@test:BeforeGroups { + value: ["messageListener"] +} +isolated function beforeMessageListenerTests() returns error? { + Service queue3Service = service object { + remote function onMessage(Message message) returns error? { + if message is TextMessage { + lock { + queue3ServiceReceivedTextMsg = true; + } + } + if message is MapMessage { + lock { + queue3ServiceReceivedMapMsg = true; + } + } + if message is BytesMessage { + lock { + queue3ServiceReceivedBytesMsg = true; + } + } + lock { + queue3ServiceReceivedMessageCount += 1; + } + } + }; + check queue3Listener.attach(queue3Service, "test-queue-3-service"); + check queue3Listener.'start(); + + Service topic3Service = service object { + remote function onMessage(Message message) returns error? { + if message is TextMessage { + lock { + topic3ServiceReceivedTextMsg = true; + } + } + if message is MapMessage { + lock { + topic3ServiceReceivedMapMsg = true; + } + } + if message is BytesMessage { + lock { + topic3ServiceReceivedBytesMsg = true; + } + } + lock { + topic3ServiceReceivedMessageCount += 1; + } + } + }; + check topic3Listener.attach(topic3Service, "test-topic-3-service"); + check topic3Listener.'start(); +} + +@test:Config { + groups: ["messageListener"] +} +isolated function testQueueMessageListener() returns error? { + TextMessage textMsg = { + content: "This is a sample message" + }; + check queue3Producer->send(textMsg); + runtime:sleep(2); + lock { + test:assertTrue(queue3ServiceReceivedTextMsg, + "Queue message listener did not received the text message"); + } + + MapMessage mapMessage = { + content: { + user: "John Doe", + message: "This is a sample message" + } + }; + check queue3Producer->send(mapMessage); + runtime:sleep(2); + lock { + test:assertTrue(queue3ServiceReceivedMapMsg, + "Queue message listener did not received the map message"); + } + + BytesMessage bytesMessage = { + content: "This is a sample message".toBytes() + }; + check queue3Producer->send(bytesMessage); + runtime:sleep(2); + lock { + test:assertTrue(queue3ServiceReceivedBytesMsg, + "Queue message listener did not received the bytes message"); + } + + lock { + test:assertEquals(queue3ServiceReceivedMessageCount, 3, + "Queue message listener did not received the expected number of messages"); + } +} + +@test:Config { + groups: ["messageListener"] +} +isolated function testTopicMessageListener() returns error? { + TextMessage textMsg = { + content: "This is a sample message" + }; + check topic3Producer->send(textMsg); + runtime:sleep(2); + lock { + test:assertTrue(topic3ServiceReceivedTextMsg, + "Topic message listener did not received the text message"); + } + + MapMessage mapMessage = { + content: { + user: "John Doe", + message: "This is a sample message" + } + }; + check topic3Producer->send(mapMessage); + runtime:sleep(2); + lock { + test:assertTrue(topic3ServiceReceivedMapMsg, + "Topic message listener did not received the map message"); + } + + BytesMessage bytesMessage = { + content: "This is a sample message".toBytes() + }; + check topic3Producer->send(bytesMessage); + runtime:sleep(2); + lock { + test:assertTrue(topic3ServiceReceivedBytesMsg, + "Topic message listener did not received the bytes message"); + } + + lock { + test:assertEquals(topic3ServiceReceivedMessageCount, 3, + "Topic message listener did not received the expected number of messages"); + } +} + +boolean textMsgReceived = false; +boolean mapMsgReceived = false; +boolean bytesMsgReceived = false; +int receivedMsgCount = 0; + +@test:Config { + groups: ["messageListener"] +} +function testNonIsolatedMessageListener() returns error? { + Listener nonIsolatedMsgListener = check new ( + connectionConfig = { + initialContextFactory: "org.apache.activemq.jndi.ActiveMQInitialContextFactory", + providerUrl: "tcp://localhost:61616" + }, + consumerOptions = { + destination: { + 'type: QUEUE, + name: "test-isolation" + } + } + ); + Service nonIsolatedSvc = service object { + remote function onMessage(Message message) returns error? { + if message is TextMessage { + textMsgReceived = true; + } + if message is MapMessage { + mapMsgReceived = true; + } + if message is BytesMessage { + bytesMsgReceived = true; + } + receivedMsgCount += 1; + } + }; + check nonIsolatedMsgListener.attach(nonIsolatedSvc, "non-isolated-service"); + check nonIsolatedMsgListener.'start(); + + MessageProducer producer = check createProducer(AUTO_ACK_SESSION, { + 'type: QUEUE, + name: "test-isolation" + }); + TextMessage textMsg = { + content: "This is a sample message" + }; + check producer->send(textMsg); + runtime:sleep(2); + test:assertTrue(textMsgReceived, + "Queue message listener did not received the text message"); + + MapMessage mapMessage = { + content: { + user: "John Doe", + message: "This is a sample message" + } + }; + check producer->send(mapMessage); + runtime:sleep(2); + test:assertTrue(mapMsgReceived, + "Queue message listener did not received the map message"); + + BytesMessage bytesMessage = { + content: "This is a sample message".toBytes() + }; + check producer->send(bytesMessage); + runtime:sleep(2); + test:assertTrue(bytesMsgReceived, + "Queue message listener did not received the bytes message"); + + test:assertEquals(receivedMsgCount, 3, + "Queue message listener did not received the expected number of messages"); + check producer->close(); + check nonIsolatedMsgListener.gracefulStop(); +} + +isolated boolean msgListenerWithCallerTextMsgReceived = false; +isolated boolean msgListenerWithCallerMapMsgReceived = false; +isolated boolean msgListenerWithCallerBytesMsgReceived = false; +isolated int msgListenerWithCallerReceivedMsgCount = 0; + +@test:Config { + groups: ["messageListener"] +} +isolated function testMessageListenerWithCaller() returns error? { + Listener msgListener = check new ( + connectionConfig = { + initialContextFactory: "org.apache.activemq.jndi.ActiveMQInitialContextFactory", + providerUrl: "tcp://localhost:61616" + }, + acknowledgementMode = CLIENT_ACKNOWLEDGE, + consumerOptions = { + destination: { + 'type: QUEUE, + name: "test-caller" + } + } + ); + Service consumerSvc = service object { + remote function onMessage(Message message, Caller caller) returns error? { + if message is TextMessage { + lock { + msgListenerWithCallerTextMsgReceived = true; + } + } + if message is MapMessage { + lock { + msgListenerWithCallerMapMsgReceived = true; + } + } + if message is BytesMessage { + lock { + msgListenerWithCallerBytesMsgReceived = true; + } + } + lock { + msgListenerWithCallerReceivedMsgCount += 1; + } + check caller->acknowledge(message); + } + }; + check msgListener.attach(consumerSvc, "test-caller-service"); + check msgListener.'start(); + + MessageProducer producer = check createProducer(AUTO_ACK_SESSION, { + 'type: QUEUE, + name: "test-caller" + }); + TextMessage textMsg = { + content: "This is a sample message" + }; + check producer->send(textMsg); + runtime:sleep(2); + lock { + test:assertTrue(msgListenerWithCallerTextMsgReceived, + "Queue message listener did not received the text message"); + } + + MapMessage mapMessage = { + content: { + user: "John Doe", + message: "This is a sample message" + } + }; + check producer->send(mapMessage); + runtime:sleep(2); + lock { + test:assertTrue(msgListenerWithCallerMapMsgReceived, + "Queue message listener did not received the map message"); + } + + BytesMessage bytesMessage = { + content: "This is a sample message".toBytes() + }; + check producer->send(bytesMessage); + runtime:sleep(2); + lock { + test:assertTrue(msgListenerWithCallerBytesMsgReceived, + "Queue message listener did not received the bytes message"); + } + + lock { + test:assertEquals(msgListenerWithCallerReceivedMsgCount, 3, + "Queue message listener did not received the expected number of messages"); + } + check producer->close(); + check msgListener.gracefulStop(); +} + +@test:Config { + groups: ["messageListener"] +} +isolated function testMessageListenerReturningError() returns error? { + Listener msgListener = check new ( + connectionConfig = { + initialContextFactory: "org.apache.activemq.jndi.ActiveMQInitialContextFactory", + providerUrl: "tcp://localhost:61616" + }, + acknowledgementMode = CLIENT_ACKNOWLEDGE, + consumerOptions = { + destination: { + 'type: QUEUE, + name: "test-onMessage-error" + } + } + ); + Service consumerSvc = service object { + remote function onMessage(Message message) returns error? { + return error("Error occurred while processing the message"); + } + }; + check msgListener.attach(consumerSvc, "test-onMessage-error-service"); + check msgListener.'start(); + + MessageProducer producer = check createProducer(AUTO_ACK_SESSION, { + 'type: QUEUE, + name: "test-onMessage-error" + }); + TextMessage textMsg = { + content: "This is a sample message" + }; + check producer->send(textMsg); + runtime:sleep(2); +} + +@test:Config { + groups: ["messageListener"] +} +isolated function testMessageListenerImmediateStop() returns error? { + Listener msgListener = check new ( + connectionConfig = { + initialContextFactory: "org.apache.activemq.jndi.ActiveMQInitialContextFactory", + providerUrl: "tcp://localhost:61616" + }, + acknowledgementMode = CLIENT_ACKNOWLEDGE, + consumerOptions = { + destination: { + 'type: QUEUE, + name: "test-caller" + } + } + ); + Service consumerSvc = service object { + remote function onMessage(Message message, Caller caller) returns error? {} + }; + check msgListener.attach(consumerSvc, "test-caller-service"); + check msgListener.'start(); + runtime:sleep(2); + check msgListener.immediateStop(); +} + +@test:AfterGroups { + value: ["messageListener"] +} +isolated function afterMessageListenerTests() returns error? { + check queue3Producer->close(); + check queue3Listener.gracefulStop(); + + check topic3Producer->close(); + check topic3Listener.gracefulStop(); +} diff --git a/ballerina/tests/jms_producer_tests.bal b/ballerina/tests/jms_producer_tests.bal new file mode 100644 index 00000000..15d45909 --- /dev/null +++ b/ballerina/tests/jms_producer_tests.bal @@ -0,0 +1,97 @@ +// Copyright (c) 2023, WSO2 LLC. (http://www.wso2.org) All Rights Reserved. +// +// WSO2 LLC. licenses this file to you under the Apache License, +// Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import ballerina/test; + +@test:Config { + groups: ["producer"] +} +isolated function testCreateProducerWithoutQueueName() returns error? { + MessageProducer|Error producer = AUTO_ACK_SESSION.createProducer({ + 'type: QUEUE + }); + test:assertTrue(producer is Error, "Allowing creating a queue-producer without queue-name"); + if producer is Error { + test:assertEquals(producer.message(), + "JMS destination name can not be empty for destination type: QUEUE", + "Invalid error message for producer init error"); + } +} + +@test:Config { + groups: ["producer"] +} +isolated function testProducerSendToWithoutQueueName() returns error? { + MessageProducer producer = check AUTO_ACK_SESSION.createProducer(); + TextMessage message = { + content: "This is a sample message" + }; + Error? result = producer->sendTo({ 'type: QUEUE }, message); + test:assertTrue(result is Error, "Allowing sending messages to a queue without queue-name"); + if result is Error { + test:assertEquals(result.message(), + "JMS destination name can not be empty for destination type: QUEUE", + "Invalid error message for producer init error"); + } +} + +@test:Config { + groups: ["producer"] +} +isolated function testReplyToErrorForQueue() returns error? { + MessageProducer producer = check AUTO_ACK_SESSION.createProducer(); + TextMessage message = { + content: "This is a request message", + correlationId: "cid-123", + replyTo: { + 'type: QUEUE + } + }; + Error? result = producer->sendTo({ + 'type: QUEUE, + name: "reply-to-error-queue" + }, message); + test:assertTrue(result is Error, "Sent message with errorneous replyTo field"); + if result is Error { + test:assertEquals(result.message(), + "JMS destination name can not be empty for destination type: QUEUE", + "Invalid error message for invalid-destination error"); + } +} + +@test:Config { + groups: ["producer"] +} +isolated function testReplyToErrorForTopic() returns error? { + MessageProducer producer = check AUTO_ACK_SESSION.createProducer(); + TextMessage message = { + content: "This is a request message", + correlationId: "cid-123", + replyTo: { + 'type: TOPIC + } + }; + Error? result = producer->sendTo({ + 'type: TOPIC, + name: "reply-to-error-topic" + }, message); + test:assertTrue(result is Error, "Sent message with errorneous replyTo field"); + if result is Error { + test:assertEquals(result.message(), + "JMS destination name can not be empty for destination type: TOPIC", + "Invalid error message for invalid-destination error"); + } +} diff --git a/ballerina/tests/jms_queue_tests.bal b/ballerina/tests/jms_queue_tests.bal new file mode 100644 index 00000000..c05a89a1 --- /dev/null +++ b/ballerina/tests/jms_queue_tests.bal @@ -0,0 +1,209 @@ +// Copyright (c) 2023, WSO2 LLC. (http://www.wso2.org) All Rights Reserved. +// +// WSO2 LLC. licenses this file to you under the Apache License, +// Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import ballerina/test; + +final MessageProducer queue1Producer = check createProducer(AUTO_ACK_SESSION, { + 'type: QUEUE, + name: "test-queue-1" +}); +final MessageConsumer queue1Consumer = check createConsumer(AUTO_ACK_SESSION, destination = { + 'type: QUEUE, + name: "test-queue-1" +}); + +@test:Config { + groups: ["queue"] +} +isolated function testQueueWithTextMessage() returns error? { + string content = "This is a sample message"; + TextMessage message = { + content: content + }; + check queue1Producer->send(message); + Message? response = check queue1Consumer->receive(5000); + test:assertTrue(response is TextMessage, "Invalid message type received"); + if response is TextMessage { + test:assertEquals(response.content, content, "Invalid payload"); + } +} + +@test:Config { + groups: ["queue"] +} +isolated function testQueueWithMapMessage() returns error? { + map content = { + user: "John Doe", + message: "This is a sample message" + }; + MapMessage message = { + content: content + }; + check queue1Producer->send(message); + Message? response = check queue1Consumer->receive(5000); + test:assertTrue(response is MapMessage, "Invalid message type received"); + if response is MapMessage { + test:assertEquals(response.content, content, "Invalid payload"); + } +} + +@test:Config { + groups: ["queue"] +} +isolated function testQueueWithBytesMessage() returns error? { + byte[] content = "This is a sample message".toBytes(); + BytesMessage message = { + content: content + }; + check queue1Producer->send(message); + Message? response = check queue1Consumer->receive(5000); + test:assertTrue(response is BytesMessage, "Invalid message type received"); + if response is BytesMessage { + test:assertEquals(response.content, content, "Invalid payload"); + } +} + +@test:Config { + groups: ["queue"] +} +isolated function testTempQueue() returns error? { + MessageProducer tempQueueProducer = check createProducer(AUTO_ACK_SESSION, { + 'type: TEMPORARY_QUEUE + }); + string content = "This is a sample message"; + TextMessage message = { + content: content + }; + check tempQueueProducer->send(message); + check tempQueueProducer->close(); +} + +@test:Config { + groups: ["queue"] +} +isolated function testQueueProducerSendToError() returns error? { + TextMessage message = { + content: "This is a sample message" + }; + Error? result = queue1Producer->sendTo({ + 'type: QUEUE, + name: "unsupported-queue" + }, message); + test:assertTrue(result is Error, + "Allowing to send messages to other destination rather than the configured destination"); +} + +final MessageProducer queueProducerWithoutDestination = check createProducerWithoutDestination(AUTO_ACK_SESSION); +final MessageConsumer queue2Consumer = check createConsumer(AUTO_ACK_SESSION, destination = { + 'type: QUEUE, + name: "test-queue-2" +}); + +@test:Config { + groups: ["queue"] +} +isolated function testQueueWithTextMessageUsingSendTo() returns error? { + string content = "This is a sample message"; + TextMessage message = { + content: content + }; + check queueProducerWithoutDestination->sendTo({ + 'type: QUEUE, + name: "test-queue-2" + }, message); + Message? response = check queue2Consumer->receive(5000); + test:assertTrue(response is TextMessage, "Invalid message type received"); + if response is TextMessage { + test:assertEquals(response.content, content, "Invalid payload"); + } +} + +@test:Config { + groups: ["queue"] +} +isolated function testQueueWithMapMessageUsingSendTo() returns error? { + map content = { + user: "John Doe", + message: "This is a sample message" + }; + MapMessage message = { + content: content + }; + check queueProducerWithoutDestination->sendTo({ + 'type: QUEUE, + name: "test-queue-2" + }, message); + Message? response = check queue2Consumer->receive(5000); + test:assertTrue(response is MapMessage, "Invalid message type received"); + if response is MapMessage { + test:assertEquals(response.content, content, "Invalid payload"); + } +} + +@test:Config { + groups: ["queue"] +} +isolated function testQueueWithBytesMessageUsingSendTo() returns error? { + byte[] content = "This is a sample message".toBytes(); + BytesMessage message = { + content: content + }; + check queueProducerWithoutDestination->sendTo({ + 'type: QUEUE, + name: "test-queue-2" + }, message); + Message? response = check queue2Consumer->receive(5000); + test:assertTrue(response is BytesMessage, "Invalid message type received"); + if response is BytesMessage { + test:assertEquals(response.content, content, "Invalid payload"); + } +} + +@test:Config { + groups: ["queue"] +} +isolated function testTempQueueUsingSendTo() returns error? { + MessageProducer tempQueueProducer = check createProducerWithoutDestination(AUTO_ACK_SESSION); + string content = "This is a sample message"; + TextMessage message = { + content: content + }; + check tempQueueProducer->sendTo({ + 'type: TEMPORARY_QUEUE + }, message); + check tempQueueProducer->close(); +} + +@test:Config { + groups: ["queue"] +} +isolated function testQueueProducerSendError() returns error? { + TextMessage message = { + content: "This is a sample message" + }; + Error? result = queueProducerWithoutDestination->send(message); + test:assertTrue(result is Error, "Allowing to send messages without providing a destination"); +} + +@test:AfterGroups { + value: ["queue"] +} +isolated function afterQueueTests() returns error? { + check queue1Producer->close(); + check queue1Consumer->close(); + check queueProducerWithoutDestination->close(); + check queue2Consumer->close(); +} diff --git a/ballerina/tests/jms_session_client_ack_tests.bal b/ballerina/tests/jms_session_client_ack_tests.bal new file mode 100644 index 00000000..2a69483b --- /dev/null +++ b/ballerina/tests/jms_session_client_ack_tests.bal @@ -0,0 +1,115 @@ +// Copyright (c) 2023, WSO2 LLC. (http://www.wso2.org) All Rights Reserved. +// +// WSO2 LLC. licenses this file to you under the Apache License, +// Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import ballerina/test; + +final Session clientAckSession = check createSession(CLIENT_ACKNOWLEDGE); + +final MessageProducer queue4Producer = check createProducer(clientAckSession, { + 'type: QUEUE, + name: "test-queue-4" +}); +final MessageConsumer queue4Consumer = check createConsumer(clientAckSession, destination = { + 'type: QUEUE, + name: "test-queue-4" +}); + +@test:Config { + groups: ["sessionClientAck"] +} +isolated function testClientAckWithQueue() returns error? { + string content = "This is a sample message"; + TextMessage message = { + content: content + }; + check queue4Producer->send(message); + Message? response = check queue4Consumer->receive(5000); + test:assertTrue(response is TextMessage, "Invalid message type received"); + if response is TextMessage { + test:assertEquals(response.content, content, "Invalid payload"); + check queue4Consumer->acknowledge(response); + } +} + +final MessageProducer topic4Producer = check createProducer(clientAckSession, { + 'type: TOPIC, + name: "test-topic-4" +}); +final MessageConsumer topic4Consumer = check createConsumer(clientAckSession, destination = { + 'type: TOPIC, + name: "test-topic-4" +}); + +@test:Config { + groups: ["sessionClientAck"] +} +isolated function testClientAckWithTopic() returns error? { + string content = "This is a sample message"; + TextMessage message = { + content: content + }; + check topic4Producer->send(message); + Message? response = check topic4Consumer->receive(5000); + test:assertTrue(response is TextMessage, "Invalid message type received"); + if response is TextMessage { + test:assertEquals(response.content, content, "Invalid payload"); + check topic4Consumer->acknowledge(response); + } +} + +@test:Config { + groups: ["sessionClientAck"] +} +isolated function testInvalidClientAck() returns error? { + Session session = check createSession(CLIENT_ACKNOWLEDGE); + MessageProducer producer = check createProducer(session, { + 'type: QUEUE, + name: "session-ack-queue" + }); + MessageConsumer consumer = check createConsumer(session, destination = { + 'type: QUEUE, + name: "session-ack-queue" + }); + string content = "This is a sample message"; + TextMessage message = { + content: content + }; + check producer->send(message); + Message? response = check consumer->receive(5000); + test:assertTrue(response is TextMessage, "Invalid message type received"); + if response is TextMessage { + test:assertEquals(response.content, content, "Invalid payload"); + check session->close(); + Error? result = consumer->acknowledge(response); + test:assertTrue(result is Error, "Successfully acknowledged messages in a closed session"); + if result is Error { + test:assertEquals(result.message(), + "Error occurred while sending acknowledgement for the message: The Consumer is closed", + "Invalid client ack error message recieved"); + } + } +} + +@test:AfterGroups { + value: ["sessionClientAck"] +} +isolated function afterSessionClientAckTests() returns error? { + check queue4Producer->close(); + check queue4Consumer->close(); + check topic4Producer->close(); + check topic4Consumer->close(); + check clientAckSession->close(); +} diff --git a/ballerina/tests/jms_session_dups_ok_ack_tests.bal b/ballerina/tests/jms_session_dups_ok_ack_tests.bal new file mode 100644 index 00000000..f88cf01b --- /dev/null +++ b/ballerina/tests/jms_session_dups_ok_ack_tests.bal @@ -0,0 +1,144 @@ +// Copyright (c) 2023, WSO2 LLC. (http://www.wso2.org) All Rights Reserved. +// +// WSO2 LLC. licenses this file to you under the Apache License, +// Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import ballerina/test; + +final Session dupsOkAckSession = check createSession(DUPS_OK_ACKNOWLEDGE); + +final MessageProducer queue6Producer = check createProducer(dupsOkAckSession, { + 'type: QUEUE, + name: "test-queue-6" +}); +final MessageConsumer queue6Consumer = check createConsumer(dupsOkAckSession, destination = { + 'type: QUEUE, + name: "test-queue-6" +}); + +@test:Config { + groups: ["sessionDupsOkAck"] +} +isolated function testDupsOkAckWithQueue() returns error? { + MapMessage msg1 = { + content: { + messageId: 1, + payload: "This is the first message" + } + }; + MapMessage msg2 = { + content: { + messageId: 2, + payload: "This is the second message" + } + }; + MapMessage msg3 = { + content: { + messageId: 3, + payload: "This is the third message" + } + }; + MapMessage msg4 = { + content: { + messageId: 4, + payload: "This is the fourth message" + } + }; + check queue6Producer->send(msg1); + check queue6Producer->send(msg2); + check queue6Producer->send(msg3); + check queue6Producer->send(msg4); + + int[] messageIds = []; + while true { + Message? response = check queue6Consumer->receive(5000); + if response is MapMessage { + int messageId = check response.content["messageId"].ensureType(); + if messageIds.indexOf(messageId) is () { + messageIds.push(messageId); + } + } else { + break; + } + } + test:assertEquals(messageIds, [1, 2, 3, 4], "Invalid set of message Ids found"); +} + +final MessageProducer topic6Producer = check createProducer(dupsOkAckSession, { + 'type: TOPIC, + name: "test-topic-6" +}); +final MessageConsumer topic6Consumer = check createConsumer(dupsOkAckSession, destination = { + 'type: TOPIC, + name: "test-topic-6" +}); + +@test:Config { + groups: ["sessionDupsOkAck"] +} +isolated function testDupsOkAckWithTopic() returns error? { + MapMessage msg1 = { + content: { + messageId: 1, + payload: "This is the first message" + } + }; + MapMessage msg2 = { + content: { + messageId: 2, + payload: "This is the second message" + } + }; + MapMessage msg3 = { + content: { + messageId: 3, + payload: "This is the third message" + } + }; + MapMessage msg4 = { + content: { + messageId: 4, + payload: "This is the fourth message" + } + }; + check topic6Producer->send(msg1); + check topic6Producer->send(msg2); + check topic6Producer->send(msg3); + check topic6Producer->send(msg4); + + int[] messageIds = []; + while true { + Message? response = check topic6Consumer->receive(5000); + if response is MapMessage { + int messageId = check response.content["messageId"].ensureType(); + if messageIds.indexOf(messageId) is () { + messageIds.push(messageId); + } + } else { + break; + } + } + test:assertEquals(messageIds, [1, 2, 3, 4], "Invalid set of message Ids found"); +} + +@test:AfterGroups { + value: ["sessionDupsOkAck"] +} +isolated function afterSessionDupsOkAckTests() returns error? { + check queue6Producer->close(); + check queue6Consumer->close(); + check topic6Producer->close(); + check topic6Consumer->close(); + check dupsOkAckSession->close(); +} diff --git a/ballerina/tests/jms_session_tests.bal b/ballerina/tests/jms_session_tests.bal new file mode 100644 index 00000000..e3d6f851 --- /dev/null +++ b/ballerina/tests/jms_session_tests.bal @@ -0,0 +1,205 @@ +// Copyright (c) 2023, WSO2 LLC. (http://www.wso2.org) All Rights Reserved. +// +// WSO2 LLC. licenses this file to you under the Apache License, +// Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import ballerina/test; + +final Session autoAckSession = check createSession(AUTO_ACKNOWLEDGE); + +@test:Config { + groups: ["session"] +} +isolated function testCreateQueueProducer() returns error? { + MessageProducer queueProducer = check autoAckSession.createProducer({ + 'type: QUEUE, + name: "producer-create" + }); + check queueProducer->close(); +} + +@test:Config { + groups: ["session"] +} +isolated function testCreateTempQueueProducer() returns error? { + MessageProducer tempQueueProducer = check autoAckSession.createProducer({ + 'type: TEMPORARY_QUEUE + }); + check tempQueueProducer->close(); +} + +@test:Config { + groups: ["session"] +} +isolated function testCreateTopicProducer() returns error? { + MessageProducer topicProducer = check autoAckSession.createProducer({ + 'type: TOPIC, + name: "producer-create" + }); + check topicProducer->close(); +} + +@test:Config { + groups: ["session"] +} +isolated function testCreateTempTopicProducer() returns error? { + MessageProducer tempTopicProducer = check autoAckSession.createProducer({ + 'type: TEMPORARY_TOPIC + }); + check tempTopicProducer->close(); +} + +@test:Config { + groups: ["session"] +} +isolated function testCreateProducerWithoutDestination() returns error? { + MessageProducer producer = check autoAckSession.createProducer(); + check producer->close(); +} + +@test:Config { + groups: ["session"] +} +isolated function testCreateDefaultQueueConsumer() returns error? { + MessageConsumer queueConsumer = check autoAckSession.createConsumer(destination = { + 'type: QUEUE, + name: "consumer-create" + }); + check queueConsumer->close(); +} + +@test:Config { + groups: ["session"] +} +isolated function testCreateDefaultTopicConsumer() returns error? { + MessageConsumer topicConsumer = check autoAckSession.createConsumer(destination = { + 'type: TOPIC, + name: "consumer-create" + }); + check topicConsumer->close(); +} + +@test:Config { + groups: ["session"] +} +isolated function testCreateDurableConsumer() returns error? { + MessageConsumer durableSubscriber = check autoAckSession.createConsumer( + 'type = DURABLE, + destination = { + 'type: TOPIC, + name: "consumer-create" + }, + subscriberName = "durable-subscriber" + ); + check durableSubscriber->close(); + check autoAckSession->unsubscribe("durable-subscriber"); +} + +@test:Config { + groups: ["session"] +} +isolated function testCreateDurableConsumerForQueueError() returns error? { + MessageConsumer|Error durableSubscriber = autoAckSession.createConsumer( + 'type = DURABLE, + destination = { + 'type: QUEUE, + name: "consumer-create" + }, + subscriberName = "durable-subscriber" + ); + test:assertTrue(durableSubscriber is Error, "Durable subscription created for a queue"); + if durableSubscriber is Error { + test:assertEquals(durableSubscriber.message(), + "Invalid destination type: QUEUE provided for a DURABLE consumer", + "Invalid error message for consumer-creation"); + } +} + +@test:Config { + groups: ["session"] +} +isolated function testCreateDurableConsumerWithoutNameError() returns error? { + MessageConsumer|Error durableSubscriber = autoAckSession.createConsumer( + 'type = DURABLE, + destination = { + 'type: TOPIC, + name: "consumer-create" + } + ); + test:assertTrue(durableSubscriber is Error, "Durable subscription created withou a subscriber name"); + if durableSubscriber is Error { + test:assertEquals(durableSubscriber.message(), + "Subscriber name cannot be empty for consumer type DURABLE", + "Invalid error message for consumer-creation"); + } +} + +@test:Config { + groups: ["session"] +} +isolated function tesUnsubscribeFromInvalidSubscription() returns error? { + Session session = check createSession(AUTO_ACKNOWLEDGE); + Error? result = session->unsubscribe("invalidSubscriber"); + test:assertTrue(result is Error, "Invalid subscription removal allowed"); + if result is Error { + string errorMsg = "Error while unsubscribing from the subscription session"; + test:assertTrue(result.message().startsWith(errorMsg), + "Invalid error message for ubsubscription from invalid-subscriber"); + } +} + +// @test:Config { +// groups: ["session"] +// } +isolated function testCreateSharedConsumer() returns error? { + MessageConsumer sharedSubscriber = check autoAckSession.createConsumer( + 'type = SHARED, + destination = { + 'type: TOPIC, + name: "consumer-create" + }, + subscriberName = "shared-subscriber" + ); + check sharedSubscriber->close(); +} + +// @test:Config { +// groups: ["session"] +// } +isolated function testCreateSharedDurableConsumer() returns error? { + MessageConsumer sharedDurableSubscriber = check autoAckSession.createConsumer( + 'type = SHARED_DURABLE, + destination = { + 'type: TOPIC, + name: "consumer-create" + }, + subscriberName = "shared-subscriber" + ); + check sharedDurableSubscriber->close(); +} + +@test:Config { + groups: ["session"] +} +isolated function testCloseSession() returns error? { + Session session = check createSession(AUTO_ACKNOWLEDGE); + check session->close(); +} + +@test:AfterGroups { + value: ["session"] +} +isolated function afterSessionTests() returns error? { + check autoAckSession->close(); +} diff --git a/ballerina/tests/jms_session_transacted_tests.bal b/ballerina/tests/jms_session_transacted_tests.bal new file mode 100644 index 00000000..294bc9d5 --- /dev/null +++ b/ballerina/tests/jms_session_transacted_tests.bal @@ -0,0 +1,440 @@ +// Copyright (c) 2023, WSO2 LLC. (http://www.wso2.org) All Rights Reserved. +// +// WSO2 LLC. licenses this file to you under the Apache License, +// Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import ballerina/test; + +final Session transactedProducerSession = check createSession(SESSION_TRANSACTED); +final Session transactedConsumerSession = check createSession(SESSION_TRANSACTED); + +final MessageProducer queue5Producer = check createProducer(transactedProducerSession, { + 'type: QUEUE, + name: "test-queue-5" +}); +final MessageConsumer queue5Consumer = check createConsumer(transactedConsumerSession, destination = { + 'type: QUEUE, + name: "test-queue-5" +}); + +@test:Config { + groups: ["sessionTransacted"] +} +isolated function testTransactionsCommitWithQueue() returns error? { + TextMessage msg1 = { + content: "This is the first message" + }; + TextMessage msg2 = { + content: "This is the second message" + }; + TextMessage msg3 = { + content: "This is the third message" + }; + TextMessage msg4 = { + content: "End of messages" + }; + check queue5Producer->send(msg1); + check queue5Producer->send(msg2); + check queue5Producer->send(msg3); + check queue5Producer->send(msg4); + check transactedProducerSession->'commit(); + + int receivedMessages = 0; + while true { + Message? response = check queue5Consumer->receive(5000); + if response is TextMessage { + receivedMessages += 1; + if response.content == "End of messages" { + check transactedConsumerSession->'commit(); + break; + } + } + } + test:assertEquals(receivedMessages, 4, "Invalid number of received messages"); +} + +@test:Config { + groups: ["sessionTransacted"] +} +isolated function testTransactionsProducerRollbackWithQueue() returns error? { + TextMessage msg1 = { + content: "This is the first message" + }; + TextMessage msg2 = { + content: "This is the second message" + }; + TextMessage msg3 = { + content: "This is the third message" + }; + TextMessage msg4 = { + content: "End of messages" + }; + check queue5Producer->send(msg1); + check queue5Producer->send(msg2); + check queue5Producer->send(msg3); + check queue5Producer->send(msg4); + check transactedProducerSession->'rollback(); +} + +@test:Config { + groups: ["sessionTransacted"] +} +isolated function testTransactionsConsumerRollbackWithQueue() returns error? { + TextMessage msg1 = { + content: "This is the first message" + }; + TextMessage msg2 = { + content: "This is the second message" + }; + TextMessage msg3 = { + content: "This is the third message" + }; + TextMessage msg4 = { + content: "End of messages" + }; + check queue5Producer->send(msg1); + check queue5Producer->send(msg2); + check queue5Producer->send(msg3); + check queue5Producer->send(msg4); + check transactedProducerSession->'commit(); + + int receivedMessages = 0; + while true { + Message? response = check queue5Consumer->receive(5000); + if response is TextMessage { + receivedMessages += 1; + if response.content == "This is the third message" { + check transactedConsumerSession->'rollback(); + break; + } + } + } + test:assertEquals(receivedMessages, 3, "Invalid number of received messages"); +} + +@test:Config { + groups: ["sessionTransacted"] +} +isolated function testProducerRollbackConsumerCommitWithQueue() returns error? { + TextMessage msg1 = { + content: "This is the first message" + }; + TextMessage msg2 = { + content: "This is the second message" + }; + TextMessage msg3 = { + content: "This is the third message" + }; + TextMessage msg4 = { + content: "End of messages" + }; + check queue5Producer->send(msg1); + check queue5Producer->send(msg2); + check queue5Producer->send(msg3); + check queue5Producer->send(msg4); + check transactedProducerSession->'rollback(); + + int receivedMessages = 0; + while true { + Message? response = check queue5Consumer->receive(5000); + if response is TextMessage { + receivedMessages += 1; + if response.content == "End of messages" { + check transactedConsumerSession->'commit(); + break; + } + } else if response is () { + check transactedConsumerSession->'commit(); + break; + } + } + test:assertEquals(receivedMessages, 0, "Invalid number of received messages"); +} + +@test:Config { + groups: ["sessionTransacted"] +} +isolated function testProducerCommitConsumerRollbackWithQueue() returns error? { + TextMessage msg1 = { + content: "This is the first message" + }; + TextMessage msg2 = { + content: "This is the second message" + }; + TextMessage msg3 = { + content: "This is the third message" + }; + TextMessage msg4 = { + content: "End of messages" + }; + check queue5Producer->send(msg1); + check queue5Producer->send(msg2); + check queue5Producer->send(msg3); + check queue5Producer->send(msg4); + check transactedProducerSession->'commit(); + + int receivedMessages = 0; + int numberOfAttempts = 0; + while true { + Message? response = check queue5Consumer->receive(5000); + if response is TextMessage { + receivedMessages += 1; + if response.content == "End of messages" { + if numberOfAttempts == 0 { + check transactedConsumerSession->'rollback(); + numberOfAttempts += 1; + } else { + check transactedConsumerSession->'commit(); + break; + + } + } + } + } + test:assertEquals(receivedMessages, 8, "Invalid number of received messages"); +} + +final MessageProducer topic5Producer = check createProducer(transactedProducerSession, { + 'type: TOPIC, + name: "test-topic-5" +}); +final MessageConsumer topic5Consumer = check createConsumer(transactedConsumerSession, destination = { + 'type: TOPIC, + name: "test-topic-5" +}); + +@test:Config { + groups: ["sessionTransacted"] +} +isolated function testTransactionsCommitWithTopic() returns error? { + TextMessage msg1 = { + content: "This is the first message" + }; + TextMessage msg2 = { + content: "This is the second message" + }; + TextMessage msg3 = { + content: "This is the third message" + }; + TextMessage msg4 = { + content: "End of messages" + }; + check topic5Producer->send(msg1); + check topic5Producer->send(msg2); + check topic5Producer->send(msg3); + check topic5Producer->send(msg4); + check transactedProducerSession->'commit(); + + int receivedMessages = 0; + while true { + Message? response = check topic5Consumer->receive(5000); + if response is TextMessage { + receivedMessages += 1; + if response.content == "End of messages" { + check transactedConsumerSession->'commit(); + break; + } + } + } + test:assertEquals(receivedMessages, 4, "Invalid number of received messages"); +} + +@test:Config { + groups: ["sessionTransacted"] +} +isolated function testTransactionsProducerRollbackWithTopic() returns error? { + TextMessage msg1 = { + content: "This is the first message" + }; + TextMessage msg2 = { + content: "This is the second message" + }; + TextMessage msg3 = { + content: "This is the third message" + }; + TextMessage msg4 = { + content: "End of messages" + }; + check topic5Producer->send(msg1); + check topic5Producer->send(msg2); + check topic5Producer->send(msg3); + check topic5Producer->send(msg4); + check transactedProducerSession->'rollback(); +} + +@test:Config { + groups: ["sessionTransacted"] +} +isolated function testTransactionsConsumerRollbackWithTopic() returns error? { + TextMessage msg1 = { + content: "This is the first message" + }; + TextMessage msg2 = { + content: "This is the second message" + }; + TextMessage msg3 = { + content: "This is the third message" + }; + TextMessage msg4 = { + content: "End of messages" + }; + check topic5Producer->send(msg1); + check topic5Producer->send(msg2); + check topic5Producer->send(msg3); + check topic5Producer->send(msg4); + check transactedProducerSession->'commit(); + + int receivedMessages = 0; + while true { + Message? response = check topic5Consumer->receive(5000); + if response is TextMessage { + receivedMessages += 1; + if response.content == "This is the third message" { + check transactedConsumerSession->'rollback(); + break; + } + } + } + test:assertEquals(receivedMessages, 3, "Invalid number of received messages"); +} + +@test:Config { + groups: ["sessionTransacted"] +} +isolated function testTransactionCommitWithoutTransactedSession() returns error? { + Session producerSession = check createSession(AUTO_ACKNOWLEDGE); + MessageProducer producer = check createProducer(producerSession, { + 'type: TOPIC, + name: "test-transaction-topic" + }); + Session consumerSession = check createSession(AUTO_ACKNOWLEDGE); + MessageConsumer consumer = check createConsumer(consumerSession, destination = { + 'type: TOPIC, + name: "test-transaction-topic" + }); + TextMessage msg1 = { + content: "This is the first message" + }; + TextMessage msg2 = { + content: "This is the second message" + }; + TextMessage msg3 = { + content: "This is the third message" + }; + TextMessage msg4 = { + content: "End of messages" + }; + check producer->send(msg1); + check producer->send(msg2); + check producer->send(msg3); + check producer->send(msg4); + Error? producerCommit = producerSession->'commit(); + test:assertTrue(producerCommit is Error, "Commit enabled for non-transacted session"); + if producerCommit is Error { + test:assertEquals(producerCommit.message(), + "Error while committing the JMS transaction: Not a transacted session", + "Invalid error message for non-transacted session commit"); + } + + while true { + Message? response = check consumer->receive(5000); + if response is TextMessage { + if response.content == "End of messages" { + Error? consumerCommit = consumerSession->'commit(); + test:assertTrue(consumerCommit is Error, "Commit enabled for non-transacted session"); + if consumerCommit is Error { + test:assertEquals(consumerCommit.message(), + "Error while committing the JMS transaction: Not a transacted session", + "Invalid error message for non-transacted session commit"); + } + break; + } + } + } + check producer->close(); + check producerSession->close(); + check consumer->close(); + check consumerSession->close(); +} + +@test:Config { + groups: ["sessionTransacted"] +} +isolated function testTransactionRollbackWithoutTransactedSession() returns error? { + Session producerSession = check createSession(AUTO_ACKNOWLEDGE); + MessageProducer producer = check createProducer(producerSession, { + 'type: TOPIC, + name: "test-transaction-topic" + }); + Session consumerSession = check createSession(AUTO_ACKNOWLEDGE); + MessageConsumer consumer = check createConsumer(consumerSession, destination = { + 'type: TOPIC, + name: "test-transaction-topic" + }); + TextMessage msg1 = { + content: "This is the first message" + }; + TextMessage msg2 = { + content: "This is the second message" + }; + TextMessage msg3 = { + content: "This is the third message" + }; + TextMessage msg4 = { + content: "End of messages" + }; + check producer->send(msg1); + check producer->send(msg2); + check producer->send(msg3); + check producer->send(msg4); + Error? producerRollback = producerSession->'rollback(); + test:assertTrue(producerRollback is Error, "Rollback enabled for non-transacted session"); + if producerRollback is Error { + test:assertEquals(producerRollback.message(), + "Error while rolling back the JMS transaction: Not a transacted session", + "Invalid error message for non-transacted session rollback"); + } + + while true { + Message? response = check consumer->receive(5000); + if response is TextMessage { + if response.content == "End of messages" { + Error? consumerRollback = consumerSession->'rollback(); + test:assertTrue(consumerRollback is Error, "Commit enabled for non-transacted session"); + if consumerRollback is Error { + test:assertEquals(consumerRollback.message(), + "Error while rolling back the JMS transaction: Not a transacted session", + "Invalid error message for non-transacted session rollback"); + } + break; + } + } + } + check producer->close(); + check producerSession->close(); + check consumer->close(); + check consumerSession->close(); +} + +@test:AfterGroups { + value: ["sessionTransacted"] +} +isolated function afterSessionTransactedTests() returns error? { + check queue5Producer->close(); + check queue5Consumer->close(); + check topic5Producer->close(); + check topic5Consumer->close(); + check transactedProducerSession->close(); + check transactedConsumerSession->close(); +} diff --git a/ballerina/tests/jms_topic_tests.bal b/ballerina/tests/jms_topic_tests.bal new file mode 100644 index 00000000..e7c1d771 --- /dev/null +++ b/ballerina/tests/jms_topic_tests.bal @@ -0,0 +1,183 @@ +// Copyright (c) 2023, WSO2 LLC. (http://www.wso2.org) All Rights Reserved. +// +// WSO2 LLC. licenses this file to you under the Apache License, +// Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import ballerina/test; + +final MessageProducer topic1Producer = check createProducer(AUTO_ACK_SESSION, { + 'type: TOPIC, + name: "test-topic-1" +}); +final MessageConsumer topic1Consumer = check createConsumer(AUTO_ACK_SESSION, destination = { + 'type: TOPIC, + name: "test-topic-1" +}); + +@test:Config { + groups: ["topic"] +} +isolated function testTopicWithTextMessage() returns error? { + string content = "This is a sample message"; + TextMessage message = { + content: content + }; + check topic1Producer->send(message); + Message? response = check topic1Consumer->receive(5000); + test:assertTrue(response is TextMessage, "Invalid message type received"); + if response is TextMessage { + test:assertEquals(response.content, content, "Invalid payload"); + } +} + +@test:Config { + groups: ["topic"] +} +isolated function testTopicWithMapMessage() returns error? { + map content = { + user: "John Doe", + message: "This is a sample message" + }; + MapMessage message = { + content: content + }; + check topic1Producer->send(message); + Message? response = check topic1Consumer->receive(5000); + test:assertTrue(response is MapMessage, "Invalid message type received"); + if response is MapMessage { + test:assertEquals(response.content, content, "Invalid payload"); + } +} + +@test:Config { + groups: ["topic"] +} +isolated function testTopicWithBytesMessage() returns error? { + byte[] content = "This is a sample message".toBytes(); + BytesMessage message = { + content: content + }; + check topic1Producer->send(message); + Message? response = check topic1Consumer->receive(5000); + test:assertTrue(response is BytesMessage, "Invalid message type received"); + if response is BytesMessage { + test:assertEquals(response.content, content, "Invalid payload"); + } +} + +@test:Config { + groups: ["topic"] +} +isolated function testTempTopic() returns error? { + MessageProducer tempTopicProducer = check createProducer(AUTO_ACK_SESSION, { + 'type: TEMPORARY_TOPIC + }); + string content = "This is a sample message"; + TextMessage message = { + content: content + }; + check tempTopicProducer->send(message); + check tempTopicProducer->close(); +} + +final MessageProducer topicProducerWithoutDestination = check createProducerWithoutDestination(AUTO_ACK_SESSION); +final MessageConsumer topic2Consumer = check createConsumer(AUTO_ACK_SESSION, destination = { + 'type: TOPIC, + name: "test-topic-2" +}); + +@test:Config { + groups: ["topic"] +} +isolated function testTopicWithTextMessageUsingSendTo() returns error? { + string content = "This is a sample message"; + TextMessage message = { + content: content + }; + check topicProducerWithoutDestination->sendTo({ + 'type: TOPIC, + name: "test-topic-2" + }, message); + Message? response = check topic2Consumer->receive(5000); + test:assertTrue(response is TextMessage, "Invalid message type received"); + if response is TextMessage { + test:assertEquals(response.content, content, "Invalid payload"); + } +} + +@test:Config { + groups: ["topic"] +} +isolated function testTopicWithMapMessageUsingSendTo() returns error? { + map content = { + user: "John Doe", + message: "This is a sample message" + }; + MapMessage message = { + content: content + }; + check topicProducerWithoutDestination->sendTo({ + 'type: TOPIC, + name: "test-topic-2" + }, message); + Message? response = check topic2Consumer->receive(5000); + test:assertTrue(response is MapMessage, "Invalid message type received"); + if response is MapMessage { + test:assertEquals(response.content, content, "Invalid payload"); + } +} + +@test:Config { + groups: ["topic"] +} +isolated function testTopicProducerSendToBytesMessage() returns error? { + byte[] content = "This is a sample message".toBytes(); + BytesMessage message = { + content: content + }; + check topicProducerWithoutDestination->sendTo({ + 'type: TOPIC, + name: "test-topic-2" + }, message); + Message? response = check topic2Consumer->receive(5000); + test:assertTrue(response is BytesMessage, "Invalid message type received"); + if response is BytesMessage { + test:assertEquals(response.content, content, "Invalid payload"); + } +} + +@test:Config { + groups: ["queue"] +} +isolated function testTempTopicUsingSendTo() returns error? { + MessageProducer tempTopicProducer = check createProducerWithoutDestination(AUTO_ACK_SESSION); + string content = "This is a sample message"; + TextMessage message = { + content: content + }; + check tempTopicProducer->sendTo({ + 'type: TEMPORARY_TOPIC + }, message); + check tempTopicProducer->close(); +} + +@test:AfterGroups { + value: ["topic"] +} +isolated function afterTopicTests() returns error? { + check topic1Producer->close(); + check topic1Consumer->close(); + check topicProducerWithoutDestination->close(); + check topic2Consumer->close(); +} diff --git a/ballerina/tests/resources/.keep b/ballerina/tests/resources/.keep deleted file mode 100644 index e69de29b..00000000 diff --git a/ballerina/tests/resources/docker-compose.yaml b/ballerina/tests/resources/docker-compose.yaml new file mode 100644 index 00000000..a6e75568 --- /dev/null +++ b/ballerina/tests/resources/docker-compose.yaml @@ -0,0 +1,8 @@ +version: '2' + +services: + activemq: + image: 'apache/activemq-classic:latest' + container_name: activemq-test-server + ports: + - '61616:61616' diff --git a/ballerina/tests/test_commons.bal b/ballerina/tests/test_commons.bal new file mode 100644 index 00000000..caf592a1 --- /dev/null +++ b/ballerina/tests/test_commons.bal @@ -0,0 +1,48 @@ +// Copyright (c) 2023, WSO2 LLC. (http://www.wso2.org) All Rights Reserved. +// +// WSO2 LLC. licenses this file to you under the Apache License, +// Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import ballerina/test; + +final Connection TEST_CONNECTION = check new ( + initialContextFactory = "org.apache.activemq.jndi.ActiveMQInitialContextFactory", + providerUrl = "tcp://localhost:61616" +); + +final Session AUTO_ACK_SESSION = check createSession(AUTO_ACKNOWLEDGE); + +isolated function createSession(AcknowledgementMode acknowledgementMode) returns Session|error { + return TEST_CONNECTION->createSession(acknowledgementMode); +} + +isolated function createProducer(Session session, Destination destination) returns MessageProducer|error { + return session.createProducer(destination); +} + +isolated function createProducerWithoutDestination(Session session) returns MessageProducer|error { + return session.createProducer(); +} + +isolated function createConsumer(Session session, *ConsumerOptions options) returns MessageConsumer|error { + return session.createConsumer(options); +} + +@test:AfterSuite { + alwaysRun: true +} +isolated function afterSuite() returns error? { + check AUTO_ACK_SESSION->close(); + check TEST_CONNECTION->close(); +} diff --git a/build-config/resources/Ballerina.toml b/build-config/resources/Ballerina.toml index afe784e0..5f2c528a 100644 --- a/build-config/resources/Ballerina.toml +++ b/build-config/resources/Ballerina.toml @@ -26,3 +26,15 @@ groupId = "javax.jms" artifactId = "javax.jms-api" version = "@javax.jms.version@" path = "./lib/javax.jms-api-@javax.jms.version@.jar" + +[[platform.java11.dependency]] +path = "./lib/activemq-client-@activemq.client.version@.jar" +scope = "testOnly" + +[[platform.java11.dependency]] +path = "./lib/geronimo-j2ee-management_1.1_spec-@geronimoj2ee.spec.version@.jar" +scope = "testOnly" + +[[platform.java11.dependency]] +path = "./lib/hawtbuf-@hawtbuf.version@.jar" +scope = "testOnly" diff --git a/gradle.properties b/gradle.properties index 0a029ab4..e9890c26 100644 --- a/gradle.properties +++ b/gradle.properties @@ -10,10 +10,15 @@ downloadPluginVersion=4.0.4 releasePluginVersion=2.6.0 ballerinaGradlePluginVersion=1.0.0 -# Azure dependencies +# JMS dependencies javaxJmsVersion=2.0.1 slf4jVersion=2.0.7 +# Test dependenices +activemqClientVersion=5.18.2 +geronimoJ2eeMng11SpecVersion=1.0.1 +hawtbufVersion=1.11 + #stdlib dependencies # Level 01 diff --git a/native/src/main/java/io/ballerina/stdlib/java.jms/JmsConsumer.java b/native/src/main/java/io/ballerina/stdlib/java.jms/JmsConsumer.java index 2d739f79..7e70b817 100644 --- a/native/src/main/java/io/ballerina/stdlib/java.jms/JmsConsumer.java +++ b/native/src/main/java/io/ballerina/stdlib/java.jms/JmsConsumer.java @@ -58,8 +58,8 @@ public class JmsConsumer { /** * Creates a {@link javax.jms.MessageConsumer} object with given {@link javax.jms.Session}. * - * @param consumer Ballerina consumer object - * @param session Ballerina session object + * @param consumer Ballerina consumer object + * @param session Ballerina session object * @param consumerOptions JMS MessageConsumer configurations * @return A Ballerina `jms:Error` if the JMS provider fails to create the MessageConsumer due to some * internal error @@ -99,11 +99,12 @@ private static MessageConsumer createConsumer(Session session, BMap destination, diff --git a/native/src/main/java/io/ballerina/stdlib/java.jms/JmsSession.java b/native/src/main/java/io/ballerina/stdlib/java.jms/JmsSession.java index a152e4cd..62452138 100644 --- a/native/src/main/java/io/ballerina/stdlib/java.jms/JmsSession.java +++ b/native/src/main/java/io/ballerina/stdlib/java.jms/JmsSession.java @@ -84,7 +84,8 @@ public static Object unsubscribe(BObject session, BString subscriptionId) { nativeSession.unsubscribe(subscriptionId.getValue()); } catch (JMSException exception) { return createError(JMS_ERROR, - String.format("Error while creating session: %s", exception.getMessage()), exception); + String.format("Error while unsubscribing from the subscription session: %s", + exception.getMessage()), exception); } return null; }