Skip to content

Commit

Permalink
Merge pull request #101 from ayeshLK/local_development
Browse files Browse the repository at this point in the history
Add support to async-execute network actions
  • Loading branch information
ayeshLK authored Aug 14, 2023
2 parents 2fabd4f + 67743fc commit 2eb0adf
Show file tree
Hide file tree
Showing 15 changed files with 245 additions and 95 deletions.
2 changes: 1 addition & 1 deletion ballerina/caller.bal
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,6 @@ public isolated client class Caller {
# + message - JMS message record
# + return - `jms:Error` if there is an error in the execution or else nil
isolated remote function acknowledge(Message message) returns Error? = @java:Method {
'class: "io.ballerina.stdlib.java.jms.JmsConsumer"
'class: "io.ballerina.stdlib.java.jms.consumer.Actions"
} external;
}
10 changes: 5 additions & 5 deletions ballerina/message_consumer.bal
Original file line number Diff line number Diff line change
Expand Up @@ -70,36 +70,36 @@ public isolated client class MessageConsumer {

isolated function externInit(Session session, ConsumerOptions consumerOptions) returns Error? = @java:Method {
name: "init",
'class: "io.ballerina.stdlib.java.jms.JmsConsumer"
'class: "io.ballerina.stdlib.java.jms.consumer.Actions"
} external;

# Receives the next message that arrives within the specified timeout interval.
#
# + timeoutMillis - Message receive timeout
# + return - `jms:JmsMessage` or `jsm:Error` if there is an error in the execution
isolated remote function receive(int timeoutMillis = 0) returns Message|Error? = @java:Method {
'class: "io.ballerina.stdlib.java.jms.JmsConsumer"
'class: "io.ballerina.stdlib.java.jms.consumer.Actions"
} external;

# Receives the next message if one is immediately available.
#
# + return - `jms:JmsMessage` or `jsm:Error` if there is an error in the execution
isolated remote function receiveNoWait() returns Message|Error? = @java:Method {
'class: "io.ballerina.stdlib.java.jms.JmsConsumer"
'class: "io.ballerina.stdlib.java.jms.consumer.Actions"
} external;

# Mark a JMS message as received.
#
# + message - JMS message record
# + return - `jms:Error` if there is an error in the execution or else nil
isolated remote function acknowledge(Message message) returns Error? = @java:Method {
'class: "io.ballerina.stdlib.java.jms.JmsConsumer"
'class: "io.ballerina.stdlib.java.jms.consumer.Actions"
} external;

# Closes the message consumer.
#
# + return - `jms:Error` if there is an error or else nil
isolated remote function close() returns Error? = @java:Method {
'class: "io.ballerina.stdlib.java.jms.JmsConsumer"
'class: "io.ballerina.stdlib.java.jms.consumer.Actions"
} external;
}
4 changes: 2 additions & 2 deletions ballerina/message_listener.bal
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public isolated class Listener {
# ```
#
# + 'service - The service to be detached
# + return - A `kafka:Error` if an error is encountered while detaching a service or else `()`
# + return - A `jms:Error` if an error is encountered while detaching a service or else `()`
public isolated function detach(Service 'service) returns Error? {}

# Starts the endpoint.
Expand Down Expand Up @@ -94,5 +94,5 @@ public isolated class Listener {
}

isolated function setMessageListener(MessageConsumer consumer, Service 'service) returns Error? = @java:Method {
'class: "io.ballerina.stdlib.java.jms.JmsMessageListenerUtils"
'class: "io.ballerina.stdlib.java.jms.listener.Utils"
} external;
8 changes: 4 additions & 4 deletions ballerina/message_producer.bal
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public isolated client class MessageProducer {

isolated function externInit(Session session, Destination? destination) returns Error? = @java:Method {
name: "init",
'class: "io.ballerina.stdlib.java.jms.JmsProducer"
'class: "io.ballerina.stdlib.java.jms.producer.Actions"
} external;

# Sends a message to the JMS provider.
Expand All @@ -41,7 +41,7 @@ public isolated client class MessageProducer {

isolated function externSend(handle message) returns Error? = @java:Method {
name: "send",
'class: "io.ballerina.stdlib.java.jms.JmsProducer"
'class: "io.ballerina.stdlib.java.jms.producer.Actions"
} external;

# Sends a message to a given destination of the JMS provider.
Expand All @@ -57,14 +57,14 @@ public isolated client class MessageProducer {
isolated function externSendTo(Session session, Destination destination, handle message)
returns Error? = @java:Method {
name: "sendTo",
'class: "io.ballerina.stdlib.java.jms.JmsProducer"
'class: "io.ballerina.stdlib.java.jms.producer.Actions"
} external;

# Closes the message producer.
#
# + return - `jms:Error` if there is an error or else nil
isolated remote function close() returns Error? = @java:Method {
'class: "io.ballerina.stdlib.java.jms.JmsProducer"
'class: "io.ballerina.stdlib.java.jms.producer.Actions"
} external;
};

Expand Down
26 changes: 26 additions & 0 deletions ballerina/tests/jms_message_listener_tests.bal
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,32 @@ isolated function testMessageListenerImmediateStop() returns error? {
check msgListener.immediateStop();
}

@test:Config {
groups: ["messageListener"]
}
isolated function testMessageListenerAttachWithoutSvcPath() returns error? {
Listener msgListener = check new (
connectionConfig = {
initialContextFactory: "org.apache.activemq.jndi.ActiveMQInitialContextFactory",
providerUrl: "tcp://localhost:61616"
},
acknowledgementMode = CLIENT_ACKNOWLEDGE,
consumerOptions = {
destination: {
'type: QUEUE,
name: "test-caller"
}
}
);
Service consumerSvc = service object {
remote function onMessage(Message message, Caller caller) returns error? {}
};
check msgListener.attach(consumerSvc);
check msgListener.'start();
runtime:sleep(2);
check msgListener.immediateStop();
}

@test:AfterGroups {
value: ["messageListener"]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
*/
public class BallerinaJmsException extends Exception {

BallerinaJmsException(String message) {
public BallerinaJmsException(String message) {
super(message);
}

Expand Down
16 changes: 8 additions & 8 deletions native/src/main/java/io/ballerina/stdlib/java.jms/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,12 @@ public class Constants {
static final String SESSION_TRANSACTED_MODE = "SESSION_TRANSACTED";

// Native properties in respective ballerina objects
static final String NATIVE_CONNECTION = "connection";
static final String NATIVE_SESSION = "session";
static final String NATIVE_PRODUCER = "producer";
static final String NATIVE_CONSUMER = "consumer";
public static final String NATIVE_CONNECTION = "connection";
public static final String NATIVE_SESSION = "session";
public static final String NATIVE_PRODUCER = "producer";
public static final String NATIVE_CONSUMER = "consumer";

static final String NATIVE_MESSAGE = "message";
public static final String NATIVE_MESSAGE = "message";

// Ballerina JMS message types
static final String MESSAGE_BAL_RECORD_NAME = "Message";
Expand All @@ -124,7 +124,7 @@ public class Constants {
static final BString TIMESTAMP = StringUtils.fromString("timestamp");
static final BString CORRELATION_ID = StringUtils.fromString("correlationId");
static final BString REPLY_TO = StringUtils.fromString("replyTo");
static final BString DESTINATION = StringUtils.fromString("destination");
public static final BString DESTINATION = StringUtils.fromString("destination");
static final BString DELIVERY_MODE = StringUtils.fromString("deliveryMode");
static final BString REDELIVERED = StringUtils.fromString("redelivered");
static final BString JMS_TYPE = StringUtils.fromString("jmsType");
Expand All @@ -139,14 +139,14 @@ public class Constants {
static final BString TOPIC = StringUtils.fromString("TOPIC");
static final BString TEMPORARY_TOPIC = StringUtils.fromString("TEMPORARY_TOPIC");

static final String SERVICE_RESOURCE_ON_MESSAGE = "onMessage";
public static final String SERVICE_RESOURCE_ON_MESSAGE = "onMessage";
static final String SERVICE_RESOURCE_ON_TEXT_MESSAGE = "onTextMessage";
static final String SERVICE_RESOURCE_ON_MAP_MESSAGE = "onMapMessage";
static final String SERVICE_RESOURCE_ON_BYTES_MESSAGE = "onBytesMessage";
static final String SERVICE_RESOURCE_ON_STREAM_MESSAGE = "onStreamMessage";
static final String SERVICE_RESOURCE_ON_OTHER_MESSAGE = "onOtherMessage";

static final String CALLER = "Caller";
public static final String CALLER = "Caller";

private Constants() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,21 @@
* under the License.
*/

package io.ballerina.stdlib.java.jms;
package io.ballerina.stdlib.java.jms.consumer;

import io.ballerina.runtime.api.Environment;
import io.ballerina.runtime.api.Future;
import io.ballerina.runtime.api.utils.StringUtils;
import io.ballerina.runtime.api.values.BError;
import io.ballerina.runtime.api.values.BMap;
import io.ballerina.runtime.api.values.BObject;
import io.ballerina.runtime.api.values.BString;
import io.ballerina.stdlib.java.jms.BallerinaJmsException;

import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import javax.jms.Destination;
import javax.jms.JMSException;
Expand All @@ -40,12 +46,15 @@
import static io.ballerina.stdlib.java.jms.Constants.DESTINATION;
import static io.ballerina.stdlib.java.jms.Constants.JMS_ERROR;
import static io.ballerina.stdlib.java.jms.Constants.NATIVE_CONSUMER;
import static io.ballerina.stdlib.java.jms.Constants.NATIVE_MESSAGE;
import static io.ballerina.stdlib.java.jms.Constants.NATIVE_SESSION;

/**
* Represents {@link javax.jms.MessageConsumer} related utility functions.
*/
public class JmsConsumer {
public class Actions {
private static final ExecutorService executorService = Executors.newCachedThreadPool(new ConsumerThreadFactory());

private static final BString CONSUMER_TYPE = StringUtils.fromString("type");
private static final BString MESSAGE_SELECTOR = StringUtils.fromString("messageSelector");
private static final BString NO_LOCAL = StringUtils.fromString("noLocal");
Expand Down Expand Up @@ -111,56 +120,76 @@ private static MessageConsumer createConsumer(Session session, BMap<BString, Obj
/**
* Receives the next message that arrives within the specified timeout interval.
*
* @param env Ballerina runtime environment
* @param consumer Ballerina consumer object
* @param timeout The timeout value (in milliseconds)
* @return A Ballerina `jms:Error` if the JMS MessageConsumer fails to receive the message due to some error
* or else the next message produced for this message consumer, or null
*/
public static Object receive(BObject consumer, long timeout) {
public static Object receive(Environment env, BObject consumer, long timeout) {
MessageConsumer nativeConsumer = (MessageConsumer) consumer.getNativeData(NATIVE_CONSUMER);
try {
Message message = nativeConsumer.receive(timeout);
if (Objects.isNull(message)) {
return null;
Future balFuture = env.markAsync();
executorService.execute(() -> {
try {
Message message = nativeConsumer.receive(timeout);
if (Objects.isNull(message)) {
balFuture.complete(null);
} else {
BMap<BString, Object> ballerinaMsg = getBallerinaMessage(message);
balFuture.complete(ballerinaMsg);
}
} catch (JMSException exception) {
BError bError = createError(JMS_ERROR,
String.format("Error occurred while receiving messages: %s", exception.getMessage()),
exception);
balFuture.complete(bError);
} catch (BallerinaJmsException exception) {
balFuture.complete(createError(JMS_ERROR, exception.getMessage(), exception));
} catch (Exception exception) {
BError bError = createError(JMS_ERROR,
String.format("Unknown error occurred while processing the received messages: %s",
exception.getMessage()), exception);
balFuture.complete(bError);
}
return getBallerinaMessage(message);
} catch (JMSException exception) {
return createError(JMS_ERROR,
String.format("Error occurred while receiving messages: %s", exception.getMessage()), exception);
} catch (BallerinaJmsException exception) {
return createError(JMS_ERROR, exception.getMessage(), exception);
} catch (Exception exception) {
return createError(JMS_ERROR,
String.format("Unknown error occurred while processing the received messages: %s",
exception.getMessage()), exception);
}
});
return null;
}

/**
* Receives the next message if one is immediately available.
*
* @param env Ballerina runtime environment
* @param consumer Ballerina consumer object
* @return A Ballerina `jms:Error` if the JMS MessageConsumer fails to receive the message due to some error
* or else the next message produced for this message consumer, or null
*/
public static Object receiveNoWait(BObject consumer) {
public static Object receiveNoWait(Environment env, BObject consumer) {
MessageConsumer nativeConsumer = (MessageConsumer) consumer.getNativeData(NATIVE_CONSUMER);
try {
Message message = nativeConsumer.receiveNoWait();
if (Objects.isNull(message)) {
return null;
Future balFuture = env.markAsync();
executorService.execute(() -> {
try {
Message message = nativeConsumer.receiveNoWait();
if (Objects.isNull(message)) {
balFuture.complete(null);
} else {
BMap<BString, Object> ballerinaMsg = getBallerinaMessage(message);
balFuture.complete(ballerinaMsg);
}
} catch (JMSException exception) {
BError bError = createError(JMS_ERROR,
String.format("Error occurred while receiving messages: %s", exception.getMessage()),
exception);
balFuture.complete(bError);
} catch (BallerinaJmsException exception) {
balFuture.complete(createError(JMS_ERROR, exception.getMessage(), exception));
} catch (Exception exception) {
BError bError = createError(JMS_ERROR,
String.format("Unknown error occurred while processing the received messages: %s",
exception.getMessage()), exception);
balFuture.complete(bError);
}
return getBallerinaMessage(message);
} catch (JMSException exception) {
return createError(JMS_ERROR,
String.format("Error occurred while receiving messages: %s", exception.getMessage()), exception);
} catch (BallerinaJmsException exception) {
return createError(JMS_ERROR, exception.getMessage(), exception);
} catch (Exception exception) {
return createError(JMS_ERROR,
String.format("Unknown error occurred while processing the received messages: %s",
exception.getMessage()), exception);
}
});
return null;
}

/**
Expand Down Expand Up @@ -189,7 +218,7 @@ public static Object close(BObject consumer) {
*/
public static Object acknowledge(BMap<BString, Object> message) {
try {
Object nativeMessage = message.getNativeData(Constants.NATIVE_MESSAGE);
Object nativeMessage = message.getNativeData(NATIVE_MESSAGE);
if (Objects.nonNull(nativeMessage)) {
((Message) nativeMessage).acknowledge();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright (c) 2023, WSO2 LLC. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 LLC. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.ballerina.stdlib.java.jms.consumer;

import java.util.concurrent.ThreadFactory;

/**
* A {@link ThreadFactory} object that creates new threads on demand for JMS consumer network actions.
*/
class ConsumerThreadFactory implements ThreadFactory {

@Override
public Thread newThread(Runnable runnable) {
Thread jmsConsumerThread = new Thread(runnable);
jmsConsumerThread.setName("balx-jms-consumer-network-thread");
return jmsConsumerThread;
}
}
Loading

0 comments on commit 2eb0adf

Please sign in to comment.