Skip to content

Commit

Permalink
Update docs
Browse files Browse the repository at this point in the history
  • Loading branch information
aashikam committed Apr 3, 2024
1 parent f3a225d commit 3c26176
Show file tree
Hide file tree
Showing 13 changed files with 74 additions and 64 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ nats:Error? result =
2. Publish as a request that expects a reply:
```ballerina
string message = "hello world";
nats:Message|nats:Error reqReply =
nats:AnydataMessage|nats:Error reqReply =
natsClient->requestMessage({ content: message.toBytes(), subject: "demo.nats.basic"}, 5);
```

Expand All @@ -76,7 +76,7 @@ nats:Error? result = natsClient->publish({ content: message.toBytes(), subject:
}
service nats:Service on new nats:Listener(nats:DEFAULT_URL) {
remote function onMessage(nats:Message message) {
remote function onMessage(nats:AnydataMessage message) {
}
}
```
Expand All @@ -90,7 +90,7 @@ service nats:Service on new nats:Listener(nats:DEFAULT_URL) {
service nats:Service on new nats:Listener(nats:DEFAULT_URL) {
// The returned message will be published to the replyTo subject of the consumed message
remote function onRequest(nats:Message message) returns string? {
remote function onRequest(nats:AnydataMessage message) returns string? {
return "Reply Message";
}
}
Expand Down
2 changes: 1 addition & 1 deletion ballerina-tests/tests/data_binding_publish_tests.bal
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,7 @@ service nats:Service on new nats:Listener(DATA_BINDING_URL) {
remote function onMessage(XmlMessage msg) {
}

remote function onError(nats:Message message, nats:Error err) {
remote function onError(nats:AnydataMessage message, nats:Error err) {
log:printInfo("Error Received: " + err.message());
setOnErrorReceived(true);
setOnErrorMessage(err.message());
Expand Down
6 changes: 3 additions & 3 deletions ballerina/Module.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ nats:Error? result =
2. Publish as a request that expects a reply:
```ballerina
string message = "hello world";
nats:Message|nats:Error reqReply =
nats:AnydataMessage|nats:Error reqReply =
natsClient->requestMessage({ content: message.toBytes(), subject: "demo.nats.basic"}, 5);
```

Expand All @@ -69,7 +69,7 @@ nats:Error? result = natsClient->publish({ content: message.toBytes(), subject:
}
service nats:Service on new nats:Listener(nats:DEFAULT_URL) {
remote function onMessage(nats:Message message) {
remote function onMessage(nats:AnydataMessage message) {
}
}
```
Expand All @@ -83,7 +83,7 @@ service nats:Service on new nats:Listener(nats:DEFAULT_URL) {
service nats:Service on new nats:Listener(nats:DEFAULT_URL) {
// The returned message will be published to the replyTo subject of the consumed message
remote function onRequest(nats:Message message) returns string? {
remote function onRequest(nats:AnydataMessage message) returns string? {
return "Reply Message";
}
}
Expand Down
6 changes: 3 additions & 3 deletions ballerina/Package.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ nats:Error? result =
2. Publish as a request that expects a reply:
```ballerina
string message = "hello world";
nats:Message|nats:Error reqReply =
nats:AnydataMessage|nats:Error reqReply =
natsClient->requestMessage({ content: message.toBytes(), subject: "demo.nats.basic"}, 5);
```

Expand All @@ -69,7 +69,7 @@ nats:Error? result = natsClient->publish({ content: message.toBytes(), subject:
}
service nats:Service on new nats:Listener(nats:DEFAULT_URL) {
remote function onMessage(nats:Message message) {
remote function onMessage(nats:AnydataMessage message) {
}
}
```
Expand All @@ -83,7 +83,7 @@ service nats:Service on new nats:Listener(nats:DEFAULT_URL) {
service nats:Service on new nats:Listener(nats:DEFAULT_URL) {
// The returned message will be published to the replyTo subject of the consumed message
remote function onRequest(nats:Message message) returns string? {
remote function onRequest(nats:AnydataMessage message) returns string? {
return "Reply Message";
}
}
Expand Down
4 changes: 2 additions & 2 deletions docs/proposals/data-binding.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ When receiving the same message,
```ballerina
service "demo" on new nats:Listener(nats:DEFAULT_URL) {
remote function onMessage(nats:Message message) returns nats:Error? {
remote function onMessage(nats:AnydataMessage message) returns nats:Error? {
string messageContent = check string:fromBytes(message.content);
Person person = check value:fromJsonStringWithType(messageContent);
}
Expand All @@ -56,7 +56,7 @@ Receiving the message as a request,
```ballerina
service "demo" on new nats:Listener(nats:DEFAULT_URL) {
remote function onRequest(nats:Message message) returns anydata|nats:Error? {
remote function onRequest(nats:AnydataMessage message) returns anydata|nats:Error? {
string messageContent = check string:fromBytes(message.content);
Person person = check value:fromJsonStringWithType(messageContent);
return "New person received";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ configurable string LISTENING_SUBJECT = ?;
service "notificationService" on new nats:Listener(nats:DEFAULT_URL) {

// Listens to NATS subject for any successful orders
remote function onMessage(nats:Message message) returns error? {
remote function onMessage(nats:BytesMessage message) returns error? {

// Convert the byte values in the NATS Message to type Order
string messageContent = check string:fromBytes(message.content);
Expand Down
4 changes: 2 additions & 2 deletions examples/order-manager/order-processor/order_processor.bal
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ service "orderProcessorService" on new nats:Listener(nats:DEFAULT_URL) {
nats:Client natsClient = checkpanic new(nats:DEFAULT_URL);

// Listens to NATS subject for any new orders and process them
remote function onMessage(nats:Message message) returns error? {
remote function onMessage(nats:BytesMessage message) returns error? {

// Uses Ballerina query expressions to filter out the successful orders and publish to NATS subject
check from types:Order 'order in check getOrdersFromMessage(message)
Expand All @@ -48,7 +48,7 @@ service "orderProcessorService" on new nats:Listener(nats:DEFAULT_URL) {
}

// Convert the byte values in NATS message to type Order[]
function getOrdersFromMessage(nats:Message message) returns types:Order[]|error {
function getOrdersFromMessage(nats:BytesMessage message) returns types:Order[]|error {
types:Order[] receivedOrders = [];
string messageContent = check string:fromBytes(message.content);
json jsonContent = check value:fromJsonString(messageContent);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ nats:Service consumerService =
subject: PUBLISH_SUBJECT
}
service object {
remote function onMessage(nats:Message msg) {
remote function onMessage(nats:BytesMessage msg) {
byte[] messageContent = <@untainted> msg.content;

string|error message = string:fromBytes(messageContent);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ nats:Service consumerService =
subject: SUBJECT
}
service object {
remote function onMessage(nats:Message msg) {
remote function onMessage(nats:BytesMessage msg) {
byte[] messageContent = <@untainted> msg.content;

string|error message = strings:fromBytes(messageContent);
Expand Down
2 changes: 1 addition & 1 deletion load-tests/simple_producer_consumer/src/main.bal
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ nats:Service natsService =
subject: SUBJECT
}
service object {
remote function onMessage(nats:Message message) returns error? {
remote function onMessage(nats:BytesMessage message) returns error? {
string|error messageContent = 'string:fromBytes(message.content);
if messageContent is error {
lock {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public class Constants {
public static final String SERVICE_LIST = "service_list";

// Represents the message which will be consumed from NATS.
public static final String NATS_MESSAGE_OBJ_NAME = "Message";
public static final String NATS_MESSAGE_OBJ_NAME = "AnydataMessage";
public static final String MESSAGE_CONTENT = "content";
public static final String MESSAGE_SUBJECT = "subject";
public static final String MESSAGE_REPLY_TO = "replyTo";
Expand Down
7 changes: 0 additions & 7 deletions native/src/main/java/io/ballerina/stdlib/nats/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -130,13 +130,6 @@ public static RecordType getRecordType(BTypedesc bTypedesc) {
return recordType;
}

public static RecordType getRecordType(Type type) {
if (type.getTag() == TypeTags.INTERSECTION_TAG) {
return (RecordType) TypeUtils.getReferredType(((IntersectionType) (type)).getConstituentTypes().get(0));
}
return (RecordType) type;
}

public static Object getValueWithIntendedType(Type type, byte[] value) throws BError {
String strValue = new String(value, StandardCharsets.UTF_8);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import io.ballerina.runtime.api.async.Callback;
import io.ballerina.runtime.api.async.StrandMetadata;
import io.ballerina.runtime.api.creators.ValueCreator;
import io.ballerina.runtime.api.types.Field;
import io.ballerina.runtime.api.types.IntersectionType;
import io.ballerina.runtime.api.types.MethodType;
import io.ballerina.runtime.api.types.ObjectType;
Expand All @@ -48,6 +47,7 @@
import io.nats.client.Connection;
import io.nats.client.Message;
import io.nats.client.MessageHandler;
import org.ballerinalang.langlib.value.CloneReadOnly;

import java.util.HashMap;
import java.util.Map;
Expand All @@ -59,14 +59,15 @@
import static io.ballerina.runtime.api.utils.TypeUtils.getReferredType;
import static io.ballerina.stdlib.nats.Constants.CONSTRAINT_VALIDATION;
import static io.ballerina.stdlib.nats.Constants.IS_ANYDATA_MESSAGE;
import static io.ballerina.stdlib.nats.Constants.MESSAGE_CONTENT;
import static io.ballerina.stdlib.nats.Constants.NATS;
import static io.ballerina.stdlib.nats.Constants.ORG_NAME;
import static io.ballerina.stdlib.nats.Constants.PARAM_ANNOTATION_PREFIX;
import static io.ballerina.stdlib.nats.Constants.PARAM_PAYLOAD_ANNOTATION_NAME;
import static io.ballerina.stdlib.nats.Constants.TYPE_CHECKER_OBJECT_NAME;
import static io.ballerina.stdlib.nats.Utils.createPayloadBindingError;
import static io.ballerina.stdlib.nats.Utils.getElementTypeDescFromArrayTypeDesc;
import static io.ballerina.stdlib.nats.Utils.getModule;
import static io.ballerina.stdlib.nats.Utils.getRecordType;
import static io.ballerina.stdlib.nats.Utils.validateConstraints;

/**
Expand Down Expand Up @@ -153,31 +154,6 @@ private void dispatchOnMessage(String subject, String replyTo, byte[] data) thro
}
}

private static Object createAndPopulateMessageRecord(byte[] message, String replyTo, String subject,
Parameter parameter) {
BMap<BString, Object> msgObj;
BArray msgData = ValueCreator.createArrayValue(message);
Map<String, Object> valueMap = new HashMap<>();
valueMap.put(Constants.MESSAGE_CONTENT, msgData);
valueMap.put(Constants.MESSAGE_SUBJECT, StringUtils.fromString(subject));
if (replyTo != null) {
valueMap.put(Constants.MESSAGE_REPLY_TO, StringUtils.fromString(replyTo));
}
Type referredType = getReferredType(parameter.type);
if (referredType.getTag() == TypeTags.INTERSECTION_TAG) {
msgObj = ValueCreator.createReadonlyRecordValue(getModule(),
Constants.NATS_MESSAGE_OBJ_NAME, valueMap);
} else {
BMap<BString, Object> msgRecord = ValueCreator.createRecordValue((RecordType) referredType);
Map<String, Field> fieldMap = ((RecordType) referredType).getFields();
Type contentType = getReferredType(fieldMap.get(Constants.MESSAGE_CONTENT).getFieldType());
Object msg = Utils.getValueWithIntendedType(contentType, message);
msgObj = ValueCreator.createRecordValue(msgRecord, msg, StringUtils.fromString(subject),
StringUtils.fromString(replyTo));
}
return msgObj;
}

private static MethodType getAttachedFunctionType(BObject serviceObject, String functionName) {
MethodType function = null;
ObjectType objectType = (ObjectType) TypeUtils.getReferredType(TypeUtils.getType(serviceObject));
Expand All @@ -196,7 +172,7 @@ private void executeOnRequestResource(CountDownLatch countDownLatch, String subj
StrandMetadata metadata = new StrandMetadata(getModule().getOrg(), getModule().getName(),
getModule().getVersion(), Constants.ON_REQUEST_RESOURCE);
executeResource(Constants.ON_REQUEST_RESOURCE, new ResponseCallback(countDownLatch, subject,
natsMetricsReporter, replyTo, this.natsConnection), metadata, PredefinedTypes.TYPE_ANYDATA,
natsMetricsReporter, replyTo, this.natsConnection), metadata, PredefinedTypes.TYPE_ANYDATA,
subject, args);
}

Expand Down Expand Up @@ -224,7 +200,7 @@ private void executeOnMessageResource(CountDownLatch countDownLatch, String subj
StrandMetadata metadata = new StrandMetadata(getModule().getOrg(), getModule().getName(),
getModule().getVersion(), Constants.ON_MESSAGE_RESOURCE);
executeResource(Constants.ON_MESSAGE_RESOURCE, new ResponseCallback(countDownLatch, subject,
natsMetricsReporter), metadata, PredefinedTypes.TYPE_NULL,
natsMetricsReporter), metadata, PredefinedTypes.TYPE_NULL,
replyTo, args);
}

Expand Down Expand Up @@ -273,10 +249,10 @@ private Object[] getResourceArguments(byte[] message, String replyTo, String sub
throw Utils.createNatsError("Invalid remote function signature");
}
messageExists = true;
Object record = createAndPopulateMessageRecord(message, replyTo, subject, parameter);
arguments[index++] = validateConstraints(record, getElementTypeDescFromArrayTypeDesc(
ValueCreator.createTypedescValue(referredType)), constraintValidation);
arguments[index++] = true;
Object record = createAndPopulateMessageRecord(message, replyTo, subject, referredType);
validateConstraints(record, getElementTypeDescFromArrayTypeDesc(ValueCreator
.createTypedescValue(parameter.type)), constraintValidation);
arguments[index++] = record;
break;
}
/*-fallthrough*/
Expand All @@ -285,12 +261,13 @@ private Object[] getResourceArguments(byte[] message, String replyTo, String sub
throw Utils.createNatsError("Invalid remote function signature");
}
payloadExists = true;
Object value = Utils.getValueWithIntendedType(getPayloadType(referredType), message);
arguments[index++] = validateConstraints(value, getElementTypeDescFromArrayTypeDesc(
ValueCreator.createTypedescValue(parameter.type)), constraintValidation);
arguments[index++] = true;
Object value = createPayload(message, referredType);
validateConstraints(value, getElementTypeDescFromArrayTypeDesc(ValueCreator
.createTypedescValue(parameter.type)), constraintValidation);
arguments[index++] = value;
break;
}
arguments[index++] = true;
}
return arguments;
}
Expand Down Expand Up @@ -323,9 +300,49 @@ private boolean invokeIsAnydataMessageTypeMethod(Type paramType) {
return messageTypeCheckCallback.getIsMessageType();
}

private static BMap<BString, Object> createAndPopulateMessageRecord(byte[] message, String replyTo, String subject,
Type messageType) {
RecordType recordType = getRecordType(messageType);
Type intendedType = TypeUtils.getReferredType(recordType.getFields().get(MESSAGE_CONTENT).getFieldType());
BMap<BString, Object> messageRecord = ValueCreator.createRecordValue(recordType);
Object messageContent = Utils.getValueWithIntendedType(intendedType, message);
if (messageContent instanceof BError) {
throw createPayloadBindingError(String.format("Data binding failed: %s", ((BError) messageContent)
.getMessage()), (BError) messageContent);
}
messageRecord.put(StringUtils.fromString(MESSAGE_CONTENT), messageContent);
messageRecord.put(StringUtils.fromString(Constants.MESSAGE_SUBJECT), StringUtils.fromString(subject));
if (replyTo != null) {
messageRecord.put(StringUtils.fromString(Constants.MESSAGE_REPLY_TO), StringUtils.fromString(replyTo));
}
if (messageType.getTag() == TypeTags.INTERSECTION_TAG) {
messageRecord.freezeDirect();
}
return messageRecord;
}

private static Object createPayload(byte[] message, Type payloadType) {
Object messageContent = Utils.getValueWithIntendedType(getPayloadType(payloadType), message);
if (messageContent instanceof BError) {
throw createPayloadBindingError(String.format("Data binding failed: %s", ((BError) messageContent)
.getMessage()), (BError) messageContent);
}
if (payloadType.isReadOnly()) {
return CloneReadOnly.cloneReadOnly(messageContent);
}
return messageContent;
}

private static RecordType getRecordType(Type type) {
if (type.getTag() == TypeTags.INTERSECTION_TAG) {
return (RecordType) TypeUtils.getReferredType(((IntersectionType) (type)).getConstituentTypes().get(0));
}
return (RecordType) type;
}

private static Type getPayloadType(Type definedType) {
if (definedType.getTag() == INTERSECTION_TAG) {
return ((IntersectionType) definedType).getConstituentTypes().get(0);
return ((IntersectionType) definedType).getConstituentTypes().get(0);
}
return definedType;
}
Expand Down

0 comments on commit 3c26176

Please sign in to comment.