-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.bal
136 lines (123 loc) · 5.55 KB
/
main.bal
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
import ballerina/http;
import ballerina/log;
import ballerina/websocket;
// Values for these configurations are provided when running the service.
// See the Config.toml file in the root directory.
configurable map<string> backends = ?;
listener websocket:Listener gwListener = new (9090,
secureSocket = {
key: {
certFile: "../resources/public.crt",
keyFile: "../resources/private.key"
}
}
);
# This service accepts the WebSocket upgrade request.
@websocket:ServiceConfig {
auth: [
{
jwtValidatorConfig: {
issuer: "wso2",
audience: "ballerina",
signatureConfig: {
certFile: "../resources/public.crt"
},
scopeKey: "scp"
},
scopes: ["admin"]
}
]
}
isolated service /gateway on gwListener {
# Description.
#
# + id - parameter description
# + return - return value description
isolated resource function get .(@http:Header {name: "ID"} string id) returns websocket:Service|websocket:UpgradeError {
log:printInfo("WebSocket handshake happened! ", backendId = id);
if !backends.hasKey(id) {
return <websocket:UpgradeError>error("Invalid service identifier");
}
string backendUrl = backends.get(id);
websocket:Service|error router = new Router(id, backendUrl);
if router is error {
return <websocket:UpgradeError>error(router.message());
} else {
return router;
}
}
}
isolated service class Router {
*websocket:Service;
private final websocket:Client wsBackend;
private final string backendId;
# Router service initializer that creates the WebSocket client to the backend.
#
# + backendId - backend identifier
# + backendUrl - backend URL
# + return - return value description
isolated function init(string backendId, string backendUrl) returns error? {
self.backendId = backendId;
self.wsBackend = check new websocket:Client(backendUrl, readTimeout = 30, writeTimeout = 30);
}
# As soon as the WebSocket handshake is completed and the connection is established,
# the onOpen remote method is dispatched.
#
# + caller - represents the client who initiated the connection
# + return - returns and error if the connection to the backend fails
isolated remote function onOpen(websocket:Caller caller) returns error? {
log:printInfo("Connection established with the backend", backendId = self.backendId, connectionId = caller.getConnectionId());
_ = start self.routeMessagesFromBackendToClient(self.wsBackend, caller);
}
# This remote method is dispatched when a close frame with a statusCode and a reason is received
#
# + caller - represents the client who initiated the connection
# + statusCode - statusCode
# + reason - reason
isolated remote function onClose(websocket:Caller caller, int statusCode, string reason) {
log:printInfo("Connection closed by the client", 'statusCode = statusCode, 'reason = reason, backendId = self.backendId, connectionId = caller.getConnectionId());
websocket:Error? status = self.wsBackend->close(reason = "Client closed the connection");
if status is websocket:Error {
self.logError("Error closing the backend connection", status, caller);
}
}
# This `remote function` is triggered when a new message is received from a client. It accepts `anydata` as the function argument. The received data will be converted to the data type stated as the function argument..
#
# + caller - represents the client who initiated the message
# + message - message received from the client
# + return - return an error if the message cannot be written to the backend
isolated remote function onMessage(websocket:Caller caller, anydata message) returns error? {
websocket:Error? status = self.wsBackend->writeMessage(message);
if status is websocket:ConnectionError {
self.logError("Backend connection is closed", status, caller);
websocket:Error? closeErr = caller->close(statusCode = 1000, reason = "Backend connection is closed");
if closeErr is websocket:Error {
self.logError("Error closing the client connection", closeErr, caller);
}
} else if status is websocket:Error {
self.logError("Error writing message to the backend", status, caller);
}
}
# Routes messages received from the backend to the client.
#
# + wsBackend - websocket client to the backend
# + wsCaller - represents the client who initiated the connection
isolated function routeMessagesFromBackendToClient(websocket:Client wsBackend, websocket:Caller wsCaller) {
while (true) {
anydata|websocket:Error msg = self.wsBackend->readMessage();
if msg is websocket:Error {
self.logError("Error reading message from the backend", msg, wsCaller);
return;
} else {
websocket:Error? err = wsCaller->writeMessage(msg);
if err is websocket:Error {
self.logError("Error writing message to client", err, wsCaller);
return;
}
}
}
}
isolated function logError(string msg, websocket:Error err, websocket:Caller caller) {
log:printError(msg, 'error = err, backendId = self.backendId, connectionId = caller.getConnectionId());
}
}