Skip to content

Commit

Permalink
Merge pull request #4599 from aashikam/jetstream-bbe
Browse files Browse the repository at this point in the history
Add BBEs for NATS JetStream
  • Loading branch information
aashikam committed Jun 21, 2023
2 parents 8b85979 + 7c9f3c6 commit 943a19f
Show file tree
Hide file tree
Showing 10 changed files with 133 additions and 0 deletions.
18 changes: 18 additions & 0 deletions examples/index.json
Original file line number Diff line number Diff line change
Expand Up @@ -3159,6 +3159,15 @@
"disableVerificationReason": "Includes ballerinax components",
"disablePlayground": true,
"isLearnByExample": false
},
{
"name": "Consume JetStream message",
"url": "nats-jestream-sub",
"verifyBuild": false,
"verifyOutput": false,
"disableVerificationReason": "Includes ballerinax components",
"disablePlayground": true,
"isLearnByExample": false
}
]
},
Expand All @@ -3184,6 +3193,15 @@
"disableVerificationReason": "Includes ballerinax components",
"disablePlayground": true,
"isLearnByExample": false
},
{
"name": "Publish message",
"url": "nats-jestream-pub",
"verifyBuild": false,
"verifyOutput": false,
"disableVerificationReason": "Includes ballerinax components",
"disablePlayground": true,
"isLearnByExample": false
}
]
},
Expand Down
38 changes: 38 additions & 0 deletions examples/nats-jetstream-pub/nats_jetstream_pub.bal
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import ballerina/http;
import ballerinax/nats;

type Order readonly & record {
int orderId;
string productName;
decimal price;
boolean isValid;
};

service / on new http:Listener(9092) {
private final string SUBJECT_NAME = "orders";
private final nats:JetStreamClient orderClient;

function init() returns error? {
// Initiate a NATS client passing the URL of the NATS broker.
nats:Client natsClient = check new (nats:DEFAULT_URL);

// Initiate the NATS `JetStreamClient` at the start of the service. This will be used
// throughout the lifetime of the service.
self.orderClient = check new (natsClient);
nats:StreamConfiguration config = {
name: "demo",
subjects: [self.SUBJECT_NAME],
storageType: nats:MEMORY
};
_ = check self.orderClient->addStream(config);
}

resource function post orders(Order newOrder) returns http:Accepted|error {
// Produce a message to the specified subject.
check self.orderClient->publishMessage({
subject: self.SUBJECT_NAME,
content: newOrder.toString().toBytes()
});
return http:ACCEPTED;
}
}
20 changes: 20 additions & 0 deletions examples/nats-jetstream-pub/nats_jetstream_pub.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# NATS JetStream client - Publish message

The `nats:JetStreamClient` allows you to publish messages to a specific subject. To create a `nats:JetStreamClient`, you need to provide a valid instance of the `nats:Client`. Before publishing messages, you should call the `addStream` function to configure the stream. This function requires a `nats:JetStreamConfiguration` object with the `name`, `subjects`, and `storageType` values. To publish messages, you can use the `publishMessage` method, which takes the message content and subject as arguments. This method allows you to send messages that can be received by one or more subscribers.

::: code nats_jetstream_pub.bal :::

## Prerequisites
- Start an instance of the [NATS JetStream server](https://docs.nats.io/running-a-nats-service/configuration/resource_management).
- Run the NATS JetStream service given in the [NATS JetStream service - Consume message](/learn/by-example/nats-jetstream-sub/) example.

Run the client program by executing the following command.

::: out nats_jetstream_pub.server.out :::

Invoke the service by executing the following cURL command in a new terminal.

::: out nats_jetstream_pub.client.out :::

## Related links
- [`nats:JetStreamClient` client object - API documentation](https://lib.ballerina.io/ballerinax/nats/latest/clients/JetStreamClient)
2 changes: 2 additions & 0 deletions examples/nats-jetstream-pub/nats_jetstream_pub.metatags
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
description: BBE on producing and consuming a message from a subject in the NATS JetStream server using Ballerina.
keywords: ballerina, ballerina by example, bbe, nats, jetstream, server, publish, subscribe
1 change: 1 addition & 0 deletions examples/nats-jetstream-pub/nats_jetstream_pub.out
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
$ curl http://localhost:9092/orders -H "Content-type:application/json" -d "{\"orderId\": 1, \"productName\": \"Sport shoe\", \"price\": 27.5, \"isValid\": true}"
1 change: 1 addition & 0 deletions examples/nats-jetstream-pub/nats_jetstream_pub.server.out
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
$ bal run nats_jetstream_pub.bal
32 changes: 32 additions & 0 deletions examples/nats-jetstream-sub/nats_jestream_sub.bal
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import ballerina/log;
import ballerinax/nats;

type Order readonly & record {
int orderId;
string productName;
decimal price;
boolean isValid;
};

// Initiate a NATS client passing the URL of the NATS broker.
nats:Client natsClient = check new (nats:DEFAULT_URL);

// Initialize a NATS JetStream listener.
listener nats:JetStreamListener subscription = new (natsClient);
const string SUBJECT_NAME = "orders";

@nats:StreamServiceConfig {
subject: SUBJECT_NAME,
autoAck: false
}
// Bind the consumer to listen to the messages published to the 'orders' subject.
service nats:JetStreamService on subscription {
remote function onMessage(nats:JetStreamMessage message) returns error? {
string stringContent = check string:fromBytes(message.content);
json jsonContent = check stringContent.fromJsonString();
Order 'order = check jsonContent.cloneWithType();
if 'order.isValid {
log:printInfo(string `Received valid order for ${'order.productName}`);
}
}
}
17 changes: 17 additions & 0 deletions examples/nats-jetstream-sub/nats_jestream_sub.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# NATS JetStream service - Consume message

The `nats:JetStreamService` listens to a specified subject for incoming messages. Whenever a publisher sends a message to that subject, any active service listening to it will receive the message. You need to provide an instance of the `nats:Client` to create a `nats:JetStreamListener`. Once you have a `nats:JetStreamListener`, you can attach a `nats:JetStreamService` to it in order to listen to a specific subject and consume incoming messages. The subject to listen to can be either specified as the service path or provided in the `subject` field of the `nats:StreamServiceConfig`. This setup allows you to effectively listen to messages sent to a particular subject.

::: code nats_jestream_sub.bal :::

## Prerequisites
- Start an instance of the [NATS JetStream server](https://docs.nats.io/running-a-nats-service/configuration/resource_management).

Run the service by executing the following command.

::: out nats_jestream_sub.out :::

>**Tip:** You can invoke the above service via the [NATS JetStream client](/learn/by-example/nats-jetstream-pub/).
## Related links
- [`nats` package - API documentation](https://lib.ballerina.io/ballerinax/nats/latest)
2 changes: 2 additions & 0 deletions examples/nats-jetstream-sub/nats_jestream_sub.metatags
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
description: BBE on producing and consuming a message from a subject in the NATS JetStream server using Ballerina.
keywords: ballerina, ballerina by example, bbe, nats, jetstream, server, publish, subscribe
2 changes: 2 additions & 0 deletions examples/nats-jetstream-sub/nats_jestream_sub.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
$ bal run nats_jetstream_sub.bal
time = 2023-06-20T20:17:28.026+05:30 level = INFO module = "" message = "Received valid order for Sport shoe"

0 comments on commit 943a19f

Please sign in to comment.