Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: WebSocket non-blocking session opening #3695

Merged
merged 42 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from 41 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
6330914
fix blocking websocket session opening
Aug 22, 2024
bdf1655
jetty client per route
Aug 22, 2024
0708c58
Merge branch 'v2.x.x' into reboot/fix/ws-concurrent
pablocarle Aug 22, 2024
458e694
address review, remove unused code
Aug 23, 2024
7221a3f
Fix WebSocketClientFactoryTest
Aug 23, 2024
09acdcf
fix WebSocketClientFactoryContextTest
Aug 23, 2024
c58fc48
fix WebSocketRoutedSessionTest
Aug 23, 2024
6a55037
wip WebSocketProxyServerHandlerTest
Aug 23, 2024
239a4d7
fix WebSocketProxyServerHandlerTest
Aug 23, 2024
7015276
address sonar
Aug 23, 2024
cfb457d
fix condition and fix test
Aug 23, 2024
32cfce4
fix tests
Aug 23, 2024
da8a193
fix test
Aug 23, 2024
36fb764
use callback
Aug 26, 2024
b6b3c15
add assignment
Aug 26, 2024
ca46f76
use atomic reference
Aug 26, 2024
e9360ab
fix constructor
Aug 26, 2024
b8ec648
fix initialization
Aug 26, 2024
58c8de1
try fix integration test
Aug 26, 2024
d5d3b22
Merge branch 'v2.x.x' into reboot/fix/ws-concurrent
pablocarle Aug 27, 2024
28957ff
add test for factory
Aug 27, 2024
6e14842
Merge branch 'reboot/fix/ws-concurrent' of https://github.com/zowe/ap…
Aug 27, 2024
2041c9d
try fix it
Aug 27, 2024
a185f3c
wait for shutdown in IT
Aug 27, 2024
501a223
fix null check
Aug 27, 2024
c4df8ec
try without pause
Aug 27, 2024
5b5318a
method reference
Aug 27, 2024
3abd170
add tests
Aug 27, 2024
d1816e8
add tests for coverage
Aug 27, 2024
a51cb7d
add test for coverage
Aug 27, 2024
6528e13
address sonar issues
Aug 27, 2024
f1b0599
await
Aug 27, 2024
053718e
use await
Aug 28, 2024
68d6a05
wip
Aug 28, 2024
a5f5d76
close temp sessions
Aug 28, 2024
5a91287
logs
Aug 28, 2024
faa8387
websocket debug logs
Aug 28, 2024
672ddbf
debug websocket
Aug 28, 2024
706f2f3
add poll interval
Aug 28, 2024
b57f619
debug
Aug 28, 2024
892a1bb
polldelay
Aug 28, 2024
56b3c55
address review
Aug 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* This program and the accompanying materials are made available under the terms of the
* Eclipse Public License v2.0 which accompanies this distribution, and is available at
* https://www.eclipse.org/legal/epl-v20.html
*
* SPDX-License-Identifier: EPL-2.0
*
* Copyright Contributors to the Zowe Project.
*/

package org.zowe.apiml.gateway.ws;

import org.springframework.web.socket.WebSocketSession;

public interface ClientSessionFailureCallback {

void onClientSessionFailure(WebSocketRoutedSession webSocketRoutedSession, WebSocketSession serverSession, Throwable throwable);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* This program and the accompanying materials are made available under the terms of the
* Eclipse Public License v2.0 which accompanies this distribution, and is available at
* https://www.eclipse.org/legal/epl-v20.html
*
* SPDX-License-Identifier: EPL-2.0
*
* Copyright Contributors to the Zowe Project.
*/

package org.zowe.apiml.gateway.ws;

import org.springframework.web.socket.WebSocketSession;

public interface ClientSessionSuccessCallback {

void onClientSessionSuccess(WebSocketRoutedSession webSocketRoutedSession, WebSocketSession serverSession, WebSocketSession clientSession);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* This program and the accompanying materials are made available under the terms of the
* Eclipse Public License v2.0 which accompanies this distribution, and is available at
* https://www.eclipse.org/legal/epl-v20.html
*
* SPDX-License-Identifier: EPL-2.0
*
* Copyright Contributors to the Zowe Project.
*/

package org.zowe.apiml.gateway.ws;

public class ServerNotYetAvailableException extends RuntimeException {

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@

package org.zowe.apiml.gateway.ws;

import lombok.AccessLevel;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.util.ssl.SslContextFactory;
Expand All @@ -23,17 +21,27 @@

import javax.annotation.PreDestroy;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/**
* Factory for provisioning web socket client
* <p>
* Manages the client lifecycle
*/
@Component
@RequiredArgsConstructor(access = AccessLevel.PACKAGE) // for testing purposes
@Slf4j
public class WebSocketClientFactory {

private final JettyWebSocketClient client;
private final SslContextFactory.Client jettyClientSslContextFactory;
private final int maxIdleWebSocketTimeout;
private final long connectTimeout;
private final long stopTimeout;
private final long asyncWriteTimeout;
private final int maxRequestBufferSize;

private final ConcurrentMap<String, JettyWebSocketClient> clientsMap = new ConcurrentHashMap<>();

@Autowired
public WebSocketClientFactory(
Expand All @@ -46,6 +54,17 @@ public WebSocketClientFactory(
) {
log.debug("Creating Jetty WebSocket client, with SslFactory: {}", jettyClientSslContextFactory);

this.jettyClientSslContextFactory = jettyClientSslContextFactory;
this.maxIdleWebSocketTimeout = maxIdleWebSocketTimeout;
this.connectTimeout = connectTimeout;
this.stopTimeout = stopTimeout;
this.asyncWriteTimeout = asyncWriteTimeout;
this.maxRequestBufferSize = maxRequestBufferSize;
}

private JettyWebSocketClient createClient() {
log.debug("Creating Jetty WebSocket client, with SslFactory: {}", jettyClientSslContextFactory);

HttpClient httpClient = new HttpClient(jettyClientSslContextFactory);
httpClient.setRequestBufferSize(maxRequestBufferSize);
WebSocketClient wsClient = new WebSocketClient(httpClient);
Expand All @@ -54,19 +73,28 @@ public WebSocketClientFactory(
wsClient.setConnectTimeout(connectTimeout);
wsClient.setStopTimeout(stopTimeout);
wsClient.setAsyncWriteTimeout(asyncWriteTimeout);
client = new JettyWebSocketClient(wsClient);
JettyWebSocketClient client = new JettyWebSocketClient(wsClient);
client.start();
return client;
}

JettyWebSocketClient getClientInstance() {
return client;
JettyWebSocketClient getClientInstance(String key) {
if (clientsMap.containsKey(key)) {
return clientsMap.get(key);
}
JettyWebSocketClient newClient = createClient();
clientsMap.put(key, newClient);
return newClient;
}

@PreDestroy
void closeClient() {
if (client.isRunning()) {
log.debug("Closing Jetty WebSocket client");
client.stop();
void closeClients() {
for (Map.Entry<String, JettyWebSocketClient> entry : clientsMap.entrySet()) {
if (entry.getValue().isRunning()) {
log.debug("Closing Jetty WebSocket client");
entry.getValue().stop();
}

}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
import org.springframework.cloud.client.loadbalancer.LoadBalancerClient;
import org.springframework.context.ApplicationContext;
import org.springframework.retry.annotation.Backoff;
import org.springframework.retry.annotation.Recover;
import org.springframework.retry.annotation.Retryable;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.SubProtocolCapable;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.AbstractWebSocketHandler;
Expand All @@ -30,6 +32,7 @@

import javax.annotation.PostConstruct;
import javax.inject.Singleton;

import java.io.IOException;
import java.net.URI;
import java.util.List;
Expand All @@ -45,7 +48,7 @@
@Component
@Singleton
@Slf4j
public class WebSocketProxyServerHandler extends AbstractWebSocketHandler implements RoutedServicesUser, SubProtocolCapable {
public class WebSocketProxyServerHandler extends AbstractWebSocketHandler implements RoutedServicesUser, SubProtocolCapable, ClientSessionFailureCallback, ClientSessionSuccessCallback {

@Value("${server.webSocket.supportedProtocols:-}")
private List<String> subProtocols;
Expand All @@ -67,7 +70,7 @@ public List<String> getSubProtocols() {
@Autowired
public WebSocketProxyServerHandler(WebSocketClientFactory webSocketClientFactory, LoadBalancerClient lbCLient, ApplicationContext context) {
this.webSocketClientFactory = webSocketClientFactory;
this.routedSessions = new ConcurrentHashMap<>(); // Default
this.routedSessions = new ConcurrentHashMap<>();
this.webSocketRoutedSessionFactory = new WebSocketRoutedSessionFactoryImpl();
this.lbCLient = lbCLient;
this.context = context;
Expand Down Expand Up @@ -102,6 +105,7 @@ public Map<String, WebSocketRoutedSession> getRoutedSessions() {
return routedSessions;
}

// This is the client (usually a browser) to API ML Gateway session
@Override
public void afterConnectionEstablished(WebSocketSession webSocketSession) throws IOException {
String[] uriParts = getUriParts(webSocketSession);
Expand Down Expand Up @@ -145,7 +149,11 @@ private void routeToService(WebSocketSession webSocketSession, String serviceId,
}
}

@Retryable(value = WebSocketProxyError.class, backoff = @Backoff(value = 1000))
@Retryable(
value = WebSocketProxyError.class,
backoff = @Backoff(value = 1000, multiplier = 2),
maxAttempts = 3
)
void openConn(String serviceId, RoutedService service, WebSocketSession webSocketSession, String path) throws IOException {
ServiceInstance serviceInstance = this.lbCLient.choose(serviceId);
if (serviceInstance != null) {
Expand Down Expand Up @@ -181,20 +189,21 @@ private void openWebSocketConnection(RoutedService service, ServiceInstance serv

log.debug(String.format("Opening routed WebSocket session from %s to %s with %s by %s", uri.toString(), targetUrl, webSocketClientFactory, this));

WebSocketRoutedSession session = webSocketRoutedSessionFactory.session(webSocketSession, targetUrl, webSocketClientFactory);
routedSessions.put(webSocketSession.getId(), session);
webSocketRoutedSessionFactory.session(webSocketSession, targetUrl, webSocketClientFactory, this, this);
}

@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
// if the browser closes the session, close the GWs client one as well.
log.debug("Server session closed from client with status {}", status);
Optional.ofNullable(routedSessions.get(session.getId()))
.map(WebSocketRoutedSession::getWebSocketClientSession)
.filter(WebSocketRoutedSession::isClientConnected)
.map(WebSocketRoutedSession::getClientSession)
.ifPresent(clientSession -> {
try {
clientSession.close(status);
} catch (IOException e) {
log.debug("Error closing WebSocket client connection {}: {}", clientSession.getId(), e.getMessage());
log.debug("Error closing WebSocket client connection: {}", e.getMessage());
}
});
routedSessions.remove(session.getId());
Expand Down Expand Up @@ -223,25 +232,74 @@ private void close(WebSocketSession session, CloseStatus status) {
}
}

@Recover
void closeServerSession(ServerNotYetAvailableException e, WebSocketSession webSocketSession, WebSocketMessage<?> webSocketMessage) {
pablocarle marked this conversation as resolved.
Show resolved Hide resolved
log.debug("Trying to close Server WebSocket session {} because of too many ServerNotYetAvailableException failures", webSocketSession.getId());
try {
webSocketSession.close(CloseStatus.SESSION_NOT_RELIABLE);
} catch (IOException e1) {
log.debug("Failed closing Server WebSocket session {}", webSocketSession.getId());
}
}

@Override
@Retryable(
include = ServerNotYetAvailableException.class,
backoff = @Backoff(value = 6000),
recover = "closeServerSession",
maxAttempts = 5
)
public void handleMessage(WebSocketSession webSocketSession, WebSocketMessage<?> webSocketMessage) {
log.debug("handleMessage(session={},message={})", webSocketSession, webSocketMessage);
log.debug("handleMessage(session={}, message={})", webSocketSession, webSocketMessage);
WebSocketRoutedSession session = getRoutedSession(webSocketSession);

if (session == null) {
close(webSocketSession, CloseStatus.SESSION_NOT_RELIABLE);
return;
throw new ServerNotYetAvailableException();
}

try {
if (!isClientConnectionReady(session)) {
log.debug("Client Session for Server session {} is available but not ready", webSocketSession.getId());
throw new ServerNotYetAvailableException();
} else if (isClientConnectionClosed(session)) {
closeServerSession(null, webSocketSession, webSocketMessage);
}
session.sendMessageToServer(webSocketMessage);
} catch (ServerNotYetAvailableException e) {
throw e;
} catch (Exception ex) {
log.debug("Error sending WebSocket message. Closing session due to exception:", ex);
close(webSocketSession, CloseStatus.SESSION_NOT_RELIABLE);
}
}

private boolean isClientConnectionClosed(WebSocketRoutedSession session) {
WebSocketSession clientSession = session.getClientSession();
return clientSession != null && !clientSession.isOpen();
}

private boolean isClientConnectionReady(WebSocketRoutedSession session) {
return session != null && session.isClientConnected();
}

private WebSocketRoutedSession getRoutedSession(WebSocketSession webSocketSession) {
return routedSessions.get(webSocketSession.getId());
}

@Override
public void onClientSessionSuccess(WebSocketRoutedSession routedSession, WebSocketSession serverSession, WebSocketSession clientSession) {
routedSessions.put(serverSession.getId(), routedSession);
}

@Override
public void onClientSessionFailure(WebSocketRoutedSession routedSession, WebSocketSession serverSession, Throwable throwable) {
log.debug("Failed opening client web socket session against {}. Server WebSocket session is {}", routedSession.getTargetUrl(), serverSession.getId(), throwable);
try {
routedSession.getClientHandler().handleMessage(serverSession, new TextMessage("Failed opening a session against " + routedSession.getTargetUrl() + ": " + throwable.getMessage()));
serverSession.close(CloseStatus.SERVER_ERROR);
routedSessions.remove(serverSession.getId());
} catch (Exception e) {
log.debug("Failed sending / closing WebSocket session", e);
}
}
}
Loading
Loading