Skip to content

Commit

Permalink
Merge pull request #95 from ayeshLK/local_development
Browse files Browse the repository at this point in the history
Refactor `java.jms` codebase
  • Loading branch information
ayeshLK authored Aug 9, 2023
2 parents bd23d12 + f1a2a44 commit de8e522
Show file tree
Hide file tree
Showing 9 changed files with 159 additions and 266 deletions.
7 changes: 6 additions & 1 deletion ballerina/message.bal
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ isolated function externWriteText(handle message, handle value) returns error? =

isolated function externWriteBytes(handle message, byte[] value) returns error? = @java:Method {
name: "writeBytes",
'class: "io.ballerina.stdlib.java.jms.JmsBytesMessage"
'class: "io.ballerina.stdlib.java.jms.JmsMessageUtils"
} external;

isolated function externSetBoolean(handle message, handle name, boolean value) returns error? = @java:Method {
Expand All @@ -98,3 +98,8 @@ isolated function externSetString(handle message, handle name, handle value) ret
name: "setString",
'class: "javax.jms.MapMessage"
} external;

isolated function externSetBytes(handle message, handle name, byte[] value) returns error? = @java:Method {
name: "writeBytesField",
'class: "io.ballerina.stdlib.java.jms.JmsMessageUtils"
} external;
2 changes: 2 additions & 0 deletions ballerina/message_producer.bal
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ isolated function populateMapMessage(handle mapMessage, map<anydata> keyValues)
check externSetBoolean(mapMessage, java:fromString('key), value);
} else if value is string {
check externSetString(mapMessage, java:fromString('key), java:fromString(value));
} else if value is byte[] {
check externSetBytes(mapMessage, java:fromString('key), value);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@

package io.ballerina.stdlib.java.jms;

import io.ballerina.runtime.api.creators.ErrorCreator;
import io.ballerina.runtime.api.creators.ValueCreator;
import io.ballerina.runtime.api.types.Type;
import io.ballerina.runtime.api.utils.StringUtils;
import io.ballerina.runtime.api.utils.TypeUtils;
import io.ballerina.runtime.api.utils.ValueUtils;
import io.ballerina.runtime.api.values.BError;
import io.ballerina.runtime.api.values.BMap;
import io.ballerina.runtime.api.values.BString;

Expand Down Expand Up @@ -53,6 +55,12 @@ public class CommonUtils {
private static final String TEMPORARY_QUEUE = "TEMPORARY_QUEUE";
private static final String TOPIC = "TOPIC";

public static BError createError(String errorType, String message, Throwable throwable) {
BError cause = ErrorCreator.createError(throwable);
return ErrorCreator.createError(
ModuleUtils.getModule(), errorType, StringUtils.fromString(message), cause, null);
}

public static Optional<String> getOptionalStringProperty(BMap<BString, Object> config, BString fieldName) {
if (config.containsKey(fieldName)) {
return Optional.of(config.getStringValue(fieldName).getValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@

package io.ballerina.stdlib.java.jms;

import io.ballerina.runtime.api.creators.ErrorCreator;
import io.ballerina.runtime.api.utils.StringUtils;
import io.ballerina.runtime.api.values.BError;
import io.ballerina.runtime.api.values.BMap;
import io.ballerina.runtime.api.values.BObject;
import io.ballerina.runtime.api.values.BString;
Expand All @@ -39,6 +37,7 @@
import javax.naming.InitialContext;
import javax.naming.NamingException;

import static io.ballerina.stdlib.java.jms.CommonUtils.createError;
import static io.ballerina.stdlib.java.jms.CommonUtils.getOptionalStringProperty;
import static io.ballerina.stdlib.java.jms.Constants.JMS_ERROR;
import static io.ballerina.stdlib.java.jms.Constants.NATIVE_CONNECTION;
Expand Down Expand Up @@ -71,14 +70,10 @@ public static Object init(BObject connection, BMap<BString, Object> connectionCo
jmsConnection.start();
connection.addNativeData(NATIVE_CONNECTION, jmsConnection);
} catch (BallerinaJmsException e) {
BError cause = ErrorCreator.createError(e);
return ErrorCreator.createError(ModuleUtils.getModule(), JMS_ERROR,
StringUtils.fromString(e.getMessage()), cause, null);
return createError(JMS_ERROR, e.getMessage(), e);
} catch (JMSException e) {
BError cause = ErrorCreator.createError(e);
return ErrorCreator.createError(ModuleUtils.getModule(), JMS_ERROR,
StringUtils.fromString("Error occurred while initializing and starring connection"),
cause, null);
return createError(JMS_ERROR,
String.format("Error occurred while initializing and starring connection: %s", e.getMessage()), e);
}
return null;
}
Expand Down Expand Up @@ -178,19 +173,15 @@ private static void updateMappedParameters(Map<String, String> configParams) {
* @return A Ballerina `jms:Error` if the JMS provider fails to start message delivery due to some internal error
*/
public static Object start(BObject connection) {
Object nativeConnection = connection.getNativeData(NATIVE_CONNECTION);
if (Objects.nonNull(nativeConnection)) {
try {
((Connection) nativeConnection).start();
return null;
} catch (JMSException exception) {
BError cause = ErrorCreator.createError(exception);
return ErrorCreator.createError(ModuleUtils.getModule(), JMS_ERROR,
StringUtils.fromString("Error occurred while starting the connection"), cause, null);
}
Connection nativeConnection = (Connection) connection.getNativeData(NATIVE_CONNECTION);
try {
nativeConnection.start();
} catch (JMSException exception) {
return createError(JMS_ERROR,
String.format("Error occurred while starting the connection: %s", exception.getMessage()),
exception);
}
return ErrorCreator.createError(ModuleUtils.getModule(), JMS_ERROR,
StringUtils.fromString("Could not find the native JMS connection"), null, null);
return null;
}

/**
Expand All @@ -210,19 +201,15 @@ public static Object start(BObject connection) {
* </ul>
*/
public static Object stop(BObject connection) {
Object nativeConnection = connection.getNativeData(NATIVE_CONNECTION);
if (Objects.nonNull(nativeConnection)) {
try {
((Connection) nativeConnection).stop();
return null;
} catch (JMSException exception) {
BError cause = ErrorCreator.createError(exception);
return ErrorCreator.createError(ModuleUtils.getModule(), JMS_ERROR,
StringUtils.fromString("Error occurred while stopping the connection"), cause, null);
}
Connection nativeConnection = (Connection) connection.getNativeData(NATIVE_CONNECTION);
try {
nativeConnection.stop();
} catch (JMSException exception) {
return createError(JMS_ERROR,
String.format("Error occurred while stopping the connection: %s", exception.getMessage()),
exception);
}
return ErrorCreator.createError(ModuleUtils.getModule(), JMS_ERROR,
StringUtils.fromString("Could not find the native JMS connection"), null, null);
return null;
}

/**
Expand All @@ -234,18 +221,14 @@ public static Object stop(BObject connection) {
* to close a socket connection can cause this exception to be thrown.
*/
public static Object close(BObject connection) {
Object nativeConnection = connection.getNativeData(NATIVE_CONNECTION);
if (Objects.nonNull(nativeConnection)) {
try {
((Connection) nativeConnection).close();
return null;
} catch (JMSException exception) {
BError cause = ErrorCreator.createError(exception);
return ErrorCreator.createError(ModuleUtils.getModule(), JMS_ERROR,
StringUtils.fromString("Error occurred while closing the connection"), cause, null);
}
Connection nativeConnection = (Connection) connection.getNativeData(NATIVE_CONNECTION);
try {
nativeConnection.close();
} catch (JMSException exception) {
return createError(JMS_ERROR,
String.format("Error occurred while closing the connection: %s", exception.getMessage()),
exception);
}
return ErrorCreator.createError(ModuleUtils.getModule(), JMS_ERROR,
StringUtils.fromString("Could not find the native JMS connection"), null, null);
return null;
}
}
97 changes: 31 additions & 66 deletions native/src/main/java/io/ballerina/stdlib/java.jms/JmsConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@

package io.ballerina.stdlib.java.jms;

import io.ballerina.runtime.api.creators.ErrorCreator;
import io.ballerina.runtime.api.utils.StringUtils;
import io.ballerina.runtime.api.values.BError;
import io.ballerina.runtime.api.values.BMap;
import io.ballerina.runtime.api.values.BObject;
import io.ballerina.runtime.api.values.BString;
Expand All @@ -35,6 +33,7 @@
import javax.jms.Session;
import javax.jms.Topic;

import static io.ballerina.stdlib.java.jms.CommonUtils.createError;
import static io.ballerina.stdlib.java.jms.CommonUtils.getBallerinaMessage;
import static io.ballerina.stdlib.java.jms.CommonUtils.getDestination;
import static io.ballerina.stdlib.java.jms.CommonUtils.getOptionalStringProperty;
Expand Down Expand Up @@ -66,24 +65,16 @@ public class JmsConsumer {
* internal error
*/
public static Object init(BObject consumer, BObject session, BMap<BString, Object> consumerOptions) {
Object nativeSession = session.getNativeData(NATIVE_SESSION);
if (Objects.isNull(nativeSession)) {
return ErrorCreator.createError(ModuleUtils.getModule(), JMS_ERROR,
StringUtils.fromString("Could not find the native JMS session"), null, null);
}
Session jmsSession = (Session) nativeSession;
Session nativeSession = (Session) session.getNativeData(NATIVE_SESSION);
try {
MessageConsumer jmsConsumer = createConsumer((Session) jmsSession, consumerOptions);
MessageConsumer jmsConsumer = createConsumer(nativeSession, consumerOptions);
consumer.addNativeData(NATIVE_CONSUMER, jmsConsumer);
} catch (BallerinaJmsException exception) {
BError cause = ErrorCreator.createError(exception);
return ErrorCreator.createError(ModuleUtils.getModule(), JMS_ERROR,
StringUtils.fromString(exception.getMessage()), cause, null);
return createError(JMS_ERROR, exception.getMessage(), exception);
} catch (JMSException exception) {
BError cause = ErrorCreator.createError(exception);
return ErrorCreator.createError(ModuleUtils.getModule(), JMS_ERROR,
StringUtils.fromString("Error occurred while initializing the JMS MessageConsumer"),
cause, null);
return createError(JMS_ERROR,
String.format("Error occurred while initializing the JMS MessageConsumer: %s",
exception.getMessage()), exception);
}
return null;
}
Expand Down Expand Up @@ -125,32 +116,22 @@ private static MessageConsumer createConsumer(Session session, BMap<BString, Obj
* or else the next message produced for this message consumer, or null
*/
public static Object receive(BObject consumer, long timeout) {
Object nativeConsumer = consumer.getNativeData(NATIVE_CONSUMER);
if (Objects.isNull(nativeConsumer)) {
return ErrorCreator.createError(ModuleUtils.getModule(), JMS_ERROR,
StringUtils.fromString("Could not find the native JMS MessageConsumer"),
null, null);
}
MessageConsumer nativeConsumer = (MessageConsumer) consumer.getNativeData(NATIVE_CONSUMER);
try {
Message message = ((MessageConsumer) nativeConsumer).receive(timeout);
Message message = nativeConsumer.receive(timeout);
if (Objects.isNull(message)) {
return null;
}
return getBallerinaMessage(message);
} catch (JMSException exception) {
BError cause = ErrorCreator.createError(exception);
return ErrorCreator.createError(ModuleUtils.getModule(), Constants.JMS_ERROR,
StringUtils.fromString("Error occurred while receiving messages"), cause, null);
return createError(JMS_ERROR,
String.format("Error occurred while receiving messages: %s", exception.getMessage()), exception);
} catch (BallerinaJmsException exception) {
BError cause = ErrorCreator.createError(exception);
return ErrorCreator.createError(ModuleUtils.getModule(), Constants.JMS_ERROR,
StringUtils.fromString("Error occurred while processing the received messages"),
cause, null);
return createError(JMS_ERROR, exception.getMessage(), exception);
} catch (Exception exception) {
BError cause = ErrorCreator.createError(exception);
return ErrorCreator.createError(ModuleUtils.getModule(), Constants.JMS_ERROR,
StringUtils.fromString("Unknown error occurred while processing the received messages"),
cause, null);
return createError(JMS_ERROR,
String.format("Unknown error occurred while processing the received messages: %s",
exception.getMessage()), exception);
}
}

Expand All @@ -162,32 +143,22 @@ public static Object receive(BObject consumer, long timeout) {
* or else the next message produced for this message consumer, or null
*/
public static Object receiveNoWait(BObject consumer) {
Object nativeConsumer = consumer.getNativeData(NATIVE_CONSUMER);
if (Objects.isNull(nativeConsumer)) {
return ErrorCreator.createError(ModuleUtils.getModule(), JMS_ERROR,
StringUtils.fromString("Could not find the native JMS MessageConsumer"),
null, null);
}
MessageConsumer nativeConsumer = (MessageConsumer) consumer.getNativeData(NATIVE_CONSUMER);
try {
Message message = ((MessageConsumer) nativeConsumer).receiveNoWait();
Message message = nativeConsumer.receiveNoWait();
if (Objects.isNull(message)) {
return null;
}
return getBallerinaMessage(message);
} catch (JMSException exception) {
BError cause = ErrorCreator.createError(exception);
return ErrorCreator.createError(ModuleUtils.getModule(), Constants.JMS_ERROR,
StringUtils.fromString("Error occurred while receiving messages"), cause, null);
return createError(JMS_ERROR,
String.format("Error occurred while receiving messages: %s", exception.getMessage()), exception);
} catch (BallerinaJmsException exception) {
BError cause = ErrorCreator.createError(exception);
return ErrorCreator.createError(ModuleUtils.getModule(), Constants.JMS_ERROR,
StringUtils.fromString("Error occurred while processing the received messages"),
cause, null);
return createError(JMS_ERROR, exception.getMessage(), exception);
} catch (Exception exception) {
BError cause = ErrorCreator.createError(exception);
return ErrorCreator.createError(ModuleUtils.getModule(), Constants.JMS_ERROR,
StringUtils.fromString("Unknown error occurred while processing the received messages"),
cause, null);
return createError(JMS_ERROR,
String.format("Unknown error occurred while processing the received messages: %s",
exception.getMessage()), exception);
}
}

Expand All @@ -198,18 +169,13 @@ public static Object receiveNoWait(BObject consumer) {
* @return A Ballerina `jms:Error` if the JMS provider fails to close the consumer due to some internal error
*/
public static Object close(BObject consumer) {
Object nativeConsumer = consumer.getNativeData(NATIVE_CONSUMER);
if (Objects.isNull(nativeConsumer)) {
return ErrorCreator.createError(ModuleUtils.getModule(), JMS_ERROR,
StringUtils.fromString("Could not find the native JMS MessageConsumer"),
null, null);
}
MessageConsumer nativeConsumer = (MessageConsumer) consumer.getNativeData(NATIVE_CONSUMER);
try {
((MessageConsumer) nativeConsumer).close();
nativeConsumer.close();
} catch (JMSException exception) {
BError cause = ErrorCreator.createError(exception);
return ErrorCreator.createError(ModuleUtils.getModule(), Constants.JMS_ERROR,
StringUtils.fromString("Error occurred while closing the message consumer"), cause, null);
return createError(JMS_ERROR,
String.format("Error occurred while closing the message consumer: %s", exception.getMessage()),
exception);
}
return null;
}
Expand All @@ -227,10 +193,9 @@ public static Object acknowledge(BMap<BString, Object> message) {
((Message) nativeMessage).acknowledge();
}
} catch (JMSException exception) {
BError cause = ErrorCreator.createError(exception);
return ErrorCreator.createError(ModuleUtils.getModule(), Constants.JMS_ERROR,
StringUtils.fromString("Error occurred while sending acknowledgement for the message"),
cause, null);
return createError(JMS_ERROR,
String.format("Error occurred while sending acknowledgement for the message: %s",
exception.getMessage()), exception);
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,12 @@

import io.ballerina.runtime.api.Environment;
import io.ballerina.runtime.api.Runtime;
import io.ballerina.runtime.api.creators.ErrorCreator;
import io.ballerina.runtime.api.utils.StringUtils;
import io.ballerina.runtime.api.values.BError;
import io.ballerina.runtime.api.values.BObject;

import java.util.Objects;

import javax.jms.JMSException;
import javax.jms.MessageConsumer;

import static io.ballerina.stdlib.java.jms.CommonUtils.createError;
import static io.ballerina.stdlib.java.jms.Constants.JMS_ERROR;
import static io.ballerina.stdlib.java.jms.Constants.NATIVE_CONSUMER;

Expand All @@ -40,19 +36,13 @@ public class JmsMessageListenerUtils {

public static Object setMessageListener(Environment environment, BObject consumer,
BObject serviceObject) {
Object nativeConsumer = consumer.getNativeData(NATIVE_CONSUMER);
if (Objects.isNull(nativeConsumer)) {
return ErrorCreator.createError(ModuleUtils.getModule(), JMS_ERROR,
StringUtils.fromString("Could not find the native JMS MessageConsumer"),
null, null);
}
MessageConsumer nativeConsumer = (MessageConsumer) consumer.getNativeData(NATIVE_CONSUMER);
Runtime bRuntime = environment.getRuntime();
try {
((MessageConsumer) nativeConsumer).setMessageListener(new JmsListener(serviceObject, bRuntime));
nativeConsumer.setMessageListener(new JmsListener(serviceObject, bRuntime));
} catch (JMSException e) {
BError cause = ErrorCreator.createError(e);
return ErrorCreator.createError(ModuleUtils.getModule(), JMS_ERROR,
StringUtils.fromString("Error occurred while setting the message listener"), cause, null);
return createError(JMS_ERROR,
String.format("Error occurred while setting the message listener: %s", e.getMessage()), e);
}
return null;
}
Expand Down
Loading

0 comments on commit de8e522

Please sign in to comment.