From 6330914e687f2334840cc6bb57967476f7463f21 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pablo=20Hern=C3=A1n=20Carle?= Date: Thu, 22 Aug 2024 14:33:16 +0200 Subject: [PATCH 01/39] fix blocking websocket session opening MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Pablo Hernán Carle --- .../ws/ServerNotYetAvailableException.java | 15 ++++ .../ws/WebSocketProxyServerHandler.java | 63 +++++++++++++-- .../gateway/ws/WebSocketRoutedSession.java | 77 ++++++++++++------- .../ws/WebSocketRoutedSessionFactoryImpl.java | 2 + .../ws/WebSocketProxyServerHandlerTest.java | 13 +++- .../ws/WebSocketRoutedSessionTest.java | 3 +- 6 files changed, 137 insertions(+), 36 deletions(-) create mode 100644 gateway-service/src/main/java/org/zowe/apiml/gateway/ws/ServerNotYetAvailableException.java diff --git a/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/ServerNotYetAvailableException.java b/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/ServerNotYetAvailableException.java new file mode 100644 index 0000000000..3310f747ed --- /dev/null +++ b/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/ServerNotYetAvailableException.java @@ -0,0 +1,15 @@ +/* + * This program and the accompanying materials are made available under the terms of the + * Eclipse Public License v2.0 which accompanies this distribution, and is available at + * https://www.eclipse.org/legal/epl-v20.html + * + * SPDX-License-Identifier: EPL-2.0 + * + * Copyright Contributors to the Zowe Project. + */ + +package org.zowe.apiml.gateway.ws; + +public class ServerNotYetAvailableException extends RuntimeException { + +} diff --git a/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandler.java b/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandler.java index f175f83eef..92dba1e7af 100644 --- a/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandler.java +++ b/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandler.java @@ -17,10 +17,12 @@ import org.springframework.cloud.client.loadbalancer.LoadBalancerClient; import org.springframework.context.ApplicationContext; import org.springframework.retry.annotation.Backoff; +import org.springframework.retry.annotation.Recover; import org.springframework.retry.annotation.Retryable; import org.springframework.stereotype.Component; import org.springframework.web.socket.CloseStatus; import org.springframework.web.socket.SubProtocolCapable; +import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.WebSocketMessage; import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.handler.AbstractWebSocketHandler; @@ -30,12 +32,14 @@ import javax.annotation.PostConstruct; import javax.inject.Singleton; + import java.io.IOException; import java.net.URI; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; /** * Handle initialization and management of routed WebSocket sessions. Copies @@ -102,6 +106,7 @@ public Map getRoutedSessions() { return routedSessions; } + // This is the client (usually a browser) to API ML Gateway session @Override public void afterConnectionEstablished(WebSocketSession webSocketSession) throws IOException { String[] uriParts = getUriParts(webSocketSession); @@ -182,7 +187,17 @@ private void openWebSocketConnection(RoutedService service, ServiceInstance serv log.debug(String.format("Opening routed WebSocket session from %s to %s with %s by %s", uri.toString(), targetUrl, webSocketClientFactory, this)); WebSocketRoutedSession session = webSocketRoutedSessionFactory.session(webSocketSession, targetUrl, webSocketClientFactory); - routedSessions.put(webSocketSession.getId(), session); + session.getWebSocketClientSession().addCallback( + successSession -> routedSessions.put(webSocketSession.getId(), session), + failure -> { + log.debug("Failed opening client web socket session against {}. Server WebSocket session is {}", targetUrl, webSocketSession.getId(), failure); + try { + session.getClientHandler().handleMessage(webSocketSession, new TextMessage("Failed opening a session against " + targetUrl + ": " + failure.getMessage())); + webSocketSession.close(CloseStatus.SERVER_ERROR); + } catch (Exception e) { + log.debug("Failed seding / closing WebSocket session", e); + } + }); } @Override @@ -192,9 +207,13 @@ public void afterConnectionClosed(WebSocketSession session, CloseStatus status) .map(WebSocketRoutedSession::getWebSocketClientSession) .ifPresent(clientSession -> { try { - clientSession.close(status); + clientSession.get().close(status); } catch (IOException e) { - log.debug("Error closing WebSocket client connection {}: {}", clientSession.getId(), e.getMessage()); + log.debug("Error closing WebSocket client connection: {}", e.getMessage()); + } catch (InterruptedException e) { + // This will not happen, session will not be in the map unless future has already completed successfully + } catch (ExecutionException e) { + // This will not happen, session will not be in the map unless future has already completed successfully } }); routedSessions.remove(session.getId()); @@ -223,17 +242,33 @@ private void close(WebSocketSession session, CloseStatus status) { } } + @Recover + void recover(ServerNotYetAvailableException e, WebSocketSession webSocketSession, WebSocketMessage webSocketMessage) { + log.debug("Trying to close Server WebSocket session {} because of too many ServerNotYetAvailableException failures", webSocketSession.getId()); + try { + webSocketSession.close(CloseStatus.SESSION_NOT_RELIABLE); + } catch (IOException e1) { + log.debug("Failed closing Server WebSocket session {}", webSocketSession.getId()); + } + } + @Override + @Retryable(include = ServerNotYetAvailableException.class, backoff = @Backoff(value = 1000)) public void handleMessage(WebSocketSession webSocketSession, WebSocketMessage webSocketMessage) { - log.debug("handleMessage(session={},message={})", webSocketSession, webSocketMessage); + log.debug("handleMessage(session={}, message={})", webSocketSession, webSocketMessage); WebSocketRoutedSession session = getRoutedSession(webSocketSession); if (session == null) { - close(webSocketSession, CloseStatus.SESSION_NOT_RELIABLE); - return; + throw new ServerNotYetAvailableException(); } try { + if (!isClientConnectionReady(session)) { + log.debug("Client Session for Server session {} is available but not ready", webSocketSession.getId()); + throw new ServerNotYetAvailableException(); + } else if (isClientConnectionClosed(session)) { + recover(null, webSocketSession, webSocketMessage); + } session.sendMessageToServer(webSocketMessage); } catch (Exception ex) { log.debug("Error sending WebSocket message. Closing session due to exception:", ex); @@ -241,6 +276,22 @@ public void handleMessage(WebSocketSession webSocketSession, WebSocketMessage } } + private boolean isClientConnectionClosed(WebSocketRoutedSession session) { + try { + return session != null && session.getWebSocketClientSession().isDone() && !session.getWebSocketClientSession().get().isOpen(); + } catch (InterruptedException | ExecutionException e) { + return false; + } + } + + private boolean isClientConnectionReady(WebSocketRoutedSession session) { + try { + return session != null && session.getWebSocketClientSession().isDone() && session.getWebSocketClientSession().get().isOpen(); + } catch (InterruptedException | ExecutionException e) { + return false; + } + } + private WebSocketRoutedSession getRoutedSession(WebSocketSession webSocketSession) { return routedSessions.get(webSocketSession.getId()); } diff --git a/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSession.java b/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSession.java index 9f0316d7b5..4513d7a207 100644 --- a/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSession.java +++ b/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSession.java @@ -15,14 +15,16 @@ import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus; import org.springframework.util.concurrent.ListenableFuture; -import org.springframework.web.socket.*; +import org.springframework.web.socket.CloseStatus; +import org.springframework.web.socket.WebSocketHttpHeaders; +import org.springframework.web.socket.WebSocketMessage; +import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.client.jetty.JettyWebSocketClient; import java.io.IOException; import java.net.InetSocketAddress; import java.net.URI; import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; /** * Represents a connection in the proxying chain, establishes 'client' to @@ -32,19 +34,24 @@ */ @Slf4j public class WebSocketRoutedSession { - private static final int DEFAULT_TIMEOUT = 30000; - private final WebSocketSession webSocketClientSession; + private final ListenableFuture webSocketClientSession; private final WebSocketSession webSocketServerSession; + private final WebSocketProxyClientHandler clientHandler; + private final String targetUrl; public WebSocketRoutedSession(WebSocketSession webSocketServerSession, String targetUrl, WebSocketClientFactory webSocketClientFactory) { this.webSocketServerSession = webSocketServerSession; + this.targetUrl = targetUrl; + this.clientHandler = new WebSocketProxyClientHandler(webSocketServerSession); this.webSocketClientSession = createWebSocketClientSession(webSocketServerSession, targetUrl, webSocketClientFactory); } - public WebSocketRoutedSession(WebSocketSession webSocketServerSession, WebSocketSession webSocketClientSession) { + public WebSocketRoutedSession(WebSocketSession webSocketServerSession, ListenableFuture webSocketClientSession, WebSocketProxyClientHandler clientHandler, String targetUrl) { this.webSocketClientSession = webSocketClientSession; this.webSocketServerSession = webSocketServerSession; + this.clientHandler = clientHandler; + this.targetUrl = targetUrl; } private WebSocketHttpHeaders getWebSocketHttpHeaders(WebSocketSession webSocketServerSession) { @@ -58,7 +65,7 @@ private WebSocketHttpHeaders getWebSocketHttpHeaders(WebSocketSession webSocketS return headers; } - public WebSocketSession getWebSocketClientSession() { + public ListenableFuture getWebSocketClientSession() { return webSocketClientSession; } @@ -66,21 +73,18 @@ public WebSocketSession getWebSocketServerSession() { return webSocketServerSession; } - private WebSocketSession createWebSocketClientSession(WebSocketSession webSocketServerSession, String targetUrl, WebSocketClientFactory webSocketClientFactory) { + WebSocketProxyClientHandler getClientHandler() { + return clientHandler; + } + + private ListenableFuture createWebSocketClientSession(WebSocketSession webSocketServerSession, String targetUrl, WebSocketClientFactory webSocketClientFactory) { try { JettyWebSocketClient client = webSocketClientFactory.getClientInstance(); URI targetURI = new URI(targetUrl); WebSocketHttpHeaders headers = getWebSocketHttpHeaders(webSocketServerSession); - ListenableFuture futureSession = client - .doHandshake(new WebSocketProxyClientHandler(webSocketServerSession), headers, targetURI); - return futureSession.get(DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS); + return client.doHandshake(clientHandler, headers, targetURI); } catch (IllegalStateException e) { throw webSocketProxyException(targetUrl, e, webSocketServerSession, true); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw webSocketProxyException(targetUrl, e, webSocketServerSession, false); - } catch (ExecutionException e) { - throw handleExecutionException(targetUrl, e, webSocketServerSession, false); } catch (Exception e) { throw webSocketProxyException(targetUrl, e, webSocketServerSession, false); } @@ -112,13 +116,25 @@ private WebSocketProxyError webSocketProxyException(String targetUrl, Exception } public void sendMessageToServer(WebSocketMessage webSocketMessage) throws IOException { - log.debug("sendMessageToServer(session={},message={})", webSocketClientSession, webSocketMessage); - webSocketClientSession.sendMessage(webSocketMessage); + log.debug("sendMessageToServer(session={}, message={})", webSocketClientSession, webSocketMessage); + if (webSocketClientSession.isDone()) { + try { + webSocketClientSession.get().sendMessage(webSocketMessage); + } catch (IOException | InterruptedException | ExecutionException e) { + // TODO Log + } + } else { + // TODO Log + } } public void close(CloseStatus status) throws IOException { - if (webSocketClientSession.isOpen()) { - webSocketClientSession.close(status); + try { + if (webSocketClientSession.isDone() && webSocketClientSession.get().isOpen()) { + webSocketClientSession.get().close(status); + } + } catch (InterruptedException | ExecutionException | IOException e) { + // TODO Log } } @@ -141,15 +157,24 @@ public String getServerUri() { } public String getClientUri() { - URI uri = getWebSocketClientSession().getUri(); - if (uri != null) { - return uri.toString(); - } - - return null; + return targetUrl; } + /** + * Get the sessionId for this session's client session + * + * @return WebSocket client session id or null if not established (could be temporary) + * + * @throws ServerNotYetAvailableException If a client session is not established + */ public String getClientId() { - return getWebSocketClientSession().getId(); + if (!webSocketClientSession.isDone()) { + throw new ServerNotYetAvailableException(); + } + try { + return getWebSocketClientSession().get().getId(); + } catch (InterruptedException | ExecutionException e) { + return null; + } } } diff --git a/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSessionFactoryImpl.java b/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSessionFactoryImpl.java index 7d32ae9dca..f4520a236d 100644 --- a/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSessionFactoryImpl.java +++ b/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSessionFactoryImpl.java @@ -16,8 +16,10 @@ * Default implementation. Provides the WebSocketRoutedSession the same way as before. */ public class WebSocketRoutedSessionFactoryImpl implements WebSocketRoutedSessionFactory { + @Override public WebSocketRoutedSession session(WebSocketSession webSocketSession, String targetUrl, WebSocketClientFactory webSocketClientFactory) { return new WebSocketRoutedSession(webSocketSession, targetUrl, webSocketClientFactory); } + } diff --git a/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandlerTest.java b/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandlerTest.java index fbb10af42c..9ac41fcc60 100644 --- a/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandlerTest.java +++ b/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandlerTest.java @@ -16,6 +16,7 @@ import org.junit.jupiter.api.Test; import org.springframework.cloud.client.ServiceInstance; import org.springframework.cloud.client.loadbalancer.LoadBalancerClient; +import org.springframework.scheduling.annotation.AsyncResult; import org.springframework.test.util.ReflectionTestUtils; import org.springframework.web.socket.CloseStatus; import org.springframework.web.socket.WebSocketMessage; @@ -33,10 +34,16 @@ import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.*; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.not; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; class WebSocketProxyServerHandlerTest { private WebSocketProxyServerHandler underTest; @@ -223,7 +230,7 @@ void whenTheConnectionIsClosed_thenTheSessionIsClosedAndRemovedFromRepository() void whenTheConnectionIsClosed_thenClientSessionIsAlsoClosed() throws IOException { CloseStatus normalClose = CloseStatus.NORMAL; WebSocketSession clientSession = mock(WebSocketSession.class); - when(internallyStoredSession.getWebSocketClientSession()).thenReturn(clientSession); + when(internallyStoredSession.getWebSocketClientSession()).thenReturn(AsyncResult.forValue(clientSession)); underTest.afterConnectionClosed(establishedSession, normalClose); verify(clientSession, times(1)).close(normalClose); diff --git a/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSessionTest.java b/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSessionTest.java index 89269c5c8c..e7993b5dfe 100644 --- a/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSessionTest.java +++ b/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSessionTest.java @@ -42,7 +42,8 @@ void prepareSessionUnderTest() { clientSession = mock(WebSocketSession.class); serverSession = mock(WebSocketSession.class); - underTest = new WebSocketRoutedSession(serverSession, clientSession); + // underTest = new WebSocketRoutedSession(serverSession, clientSession, null, null); + underTest = null; } @Test From bdf16557ac75f1d3fe35d36340170bd0b0a3432d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pablo=20Hern=C3=A1n=20Carle?= Date: Thu, 22 Aug 2024 16:50:23 +0200 Subject: [PATCH 02/39] jetty client per route MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Pablo Hernán Carle --- .../gateway/ws/WebSocketClientFactory.java | 50 +++++++++++++++---- .../ws/WebSocketProxyServerHandler.java | 11 ++-- .../gateway/ws/WebSocketRoutedSession.java | 2 +- .../ws/WebSocketClientFactoryContextTest.java | 6 +-- .../ws/WebSocketClientFactoryTest.java | 10 ++-- .../ws/WebSocketRoutedSessionTest.java | 4 +- 6 files changed, 58 insertions(+), 25 deletions(-) diff --git a/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketClientFactory.java b/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketClientFactory.java index 809b42aab4..5942a92d1f 100644 --- a/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketClientFactory.java +++ b/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketClientFactory.java @@ -10,8 +10,6 @@ package org.zowe.apiml.gateway.ws; -import lombok.AccessLevel; -import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.util.ssl.SslContextFactory; @@ -23,17 +21,27 @@ import javax.annotation.PreDestroy; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + /** * Factory for provisioning web socket client *

* Manages the client lifecycle */ @Component -@RequiredArgsConstructor(access = AccessLevel.PACKAGE) // for testing purposes @Slf4j public class WebSocketClientFactory { - private final JettyWebSocketClient client; + private final SslContextFactory.Client jettyClientSslContextFactory; + private final int maxIdleWebSocketTimeout; + private final long connectTimeout; + private final long stopTimeout; + private final long asyncWriteTimeout; + private final int maxRequestBufferSize; + + private final ConcurrentMap clientsMap = new ConcurrentHashMap<>(); @Autowired public WebSocketClientFactory( @@ -46,6 +54,17 @@ public WebSocketClientFactory( ) { log.debug("Creating Jetty WebSocket client, with SslFactory: {}", jettyClientSslContextFactory); + this.jettyClientSslContextFactory = jettyClientSslContextFactory; + this.maxIdleWebSocketTimeout = maxIdleWebSocketTimeout; + this.connectTimeout = connectTimeout; + this.stopTimeout = stopTimeout; + this.asyncWriteTimeout = asyncWriteTimeout; + this.maxRequestBufferSize = maxRequestBufferSize; + } + + private JettyWebSocketClient createClient() { + log.debug("Creating Jetty WebSocket client, with SslFactory: {}", jettyClientSslContextFactory); + HttpClient httpClient = new HttpClient(jettyClientSslContextFactory); httpClient.setRequestBufferSize(maxRequestBufferSize); WebSocketClient wsClient = new WebSocketClient(httpClient); @@ -54,19 +73,28 @@ public WebSocketClientFactory( wsClient.setConnectTimeout(connectTimeout); wsClient.setStopTimeout(stopTimeout); wsClient.setAsyncWriteTimeout(asyncWriteTimeout); - client = new JettyWebSocketClient(wsClient); + JettyWebSocketClient client = new JettyWebSocketClient(wsClient); client.start(); + return client; } - JettyWebSocketClient getClientInstance() { - return client; + JettyWebSocketClient getClientInstance(String key) { + if (clientsMap.containsKey(key)) { + return clientsMap.get(key); + } + JettyWebSocketClient newClient = createClient(); + clientsMap.put(key, newClient); + return newClient; } @PreDestroy - void closeClient() { - if (client.isRunning()) { - log.debug("Closing Jetty WebSocket client"); - client.stop(); + void closeClients() { + for (Map.Entry entry : clientsMap.entrySet()) { + if (entry.getValue().isRunning()) { + log.debug("Closing Jetty WebSocket client"); + entry.getValue().stop(); + } + } } diff --git a/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandler.java b/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandler.java index 92dba1e7af..eb2921e616 100644 --- a/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandler.java +++ b/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandler.java @@ -243,7 +243,7 @@ private void close(WebSocketSession session, CloseStatus status) { } @Recover - void recover(ServerNotYetAvailableException e, WebSocketSession webSocketSession, WebSocketMessage webSocketMessage) { + void closeServerSession(ServerNotYetAvailableException e, WebSocketSession webSocketSession, WebSocketMessage webSocketMessage) { log.debug("Trying to close Server WebSocket session {} because of too many ServerNotYetAvailableException failures", webSocketSession.getId()); try { webSocketSession.close(CloseStatus.SESSION_NOT_RELIABLE); @@ -253,7 +253,12 @@ void recover(ServerNotYetAvailableException e, WebSocketSession webSocketSession } @Override - @Retryable(include = ServerNotYetAvailableException.class, backoff = @Backoff(value = 1000)) + @Retryable( + include = ServerNotYetAvailableException.class, + backoff = @Backoff(value = 6000), + recover = "closeServerSession", + maxAttempts = 5 + ) public void handleMessage(WebSocketSession webSocketSession, WebSocketMessage webSocketMessage) { log.debug("handleMessage(session={}, message={})", webSocketSession, webSocketMessage); WebSocketRoutedSession session = getRoutedSession(webSocketSession); @@ -267,7 +272,7 @@ public void handleMessage(WebSocketSession webSocketSession, WebSocketMessage log.debug("Client Session for Server session {} is available but not ready", webSocketSession.getId()); throw new ServerNotYetAvailableException(); } else if (isClientConnectionClosed(session)) { - recover(null, webSocketSession, webSocketMessage); + closeServerSession(null, webSocketSession, webSocketMessage); } session.sendMessageToServer(webSocketMessage); } catch (Exception ex) { diff --git a/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSession.java b/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSession.java index 4513d7a207..8dd3a3a663 100644 --- a/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSession.java +++ b/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSession.java @@ -79,7 +79,7 @@ WebSocketProxyClientHandler getClientHandler() { private ListenableFuture createWebSocketClientSession(WebSocketSession webSocketServerSession, String targetUrl, WebSocketClientFactory webSocketClientFactory) { try { - JettyWebSocketClient client = webSocketClientFactory.getClientInstance(); + JettyWebSocketClient client = webSocketClientFactory.getClientInstance(targetUrl); URI targetURI = new URI(targetUrl); WebSocketHttpHeaders headers = getWebSocketHttpHeaders(webSocketServerSession); return client.doHandshake(clientHandler, headers, targetURI); diff --git a/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketClientFactoryContextTest.java b/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketClientFactoryContextTest.java index d4fa30ed49..1f0f6c9bdf 100644 --- a/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketClientFactoryContextTest.java +++ b/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketClientFactoryContextTest.java @@ -70,18 +70,18 @@ void thenBeanIsInitialized() { class GivenFactory { private JettyWebSocketClient client = mock(JettyWebSocketClient.class); - private WebSocketClientFactory factory = new WebSocketClientFactory(client); + private WebSocketClientFactory factory = new WebSocketClientFactory(null, 0, 0, 0, 0, 0); @Test void whenIsRunning_thenStop() { doReturn(true).when(client).isRunning(); - factory.closeClient(); + factory.closeClients(); verify(client).stop(); } @Test void whenIsNotRunning_thenDontStop() { - factory.closeClient(); + factory.closeClients(); verify(client, never()).stop(); } diff --git a/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketClientFactoryTest.java b/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketClientFactoryTest.java index 4d5a7f9b18..6da5d94e01 100644 --- a/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketClientFactoryTest.java +++ b/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketClientFactoryTest.java @@ -34,25 +34,25 @@ class CreatedInstance { @BeforeEach void setUp() { this.client = mock(JettyWebSocketClient.class); - this.webSocketClientFactory = new WebSocketClientFactory(this.client); + this.webSocketClientFactory = new WebSocketClientFactory(null, 0, 0, 0, 0, 0); } @Test void givenRunningClient_whenClose_thenStopClient() { doReturn(true).when(client).isRunning(); - webSocketClientFactory.closeClient(); + webSocketClientFactory.closeClients(); verify(client).stop(); } @Test void givenStoppedClient_whenClose_thenDoNothing() { - webSocketClientFactory.closeClient(); + webSocketClientFactory.closeClients(); verify(client, never()).stop(); } @Test void whenGetClient_thenReturnInstance() { - assertSame(client, webSocketClientFactory.getClientInstance()); + assertSame(client, webSocketClientFactory.getClientInstance("key")); } } @@ -70,7 +70,7 @@ void setUp() { @Test void givenInitilizedClient_thenHasNonDefaultIdleConfig() { - WebSocketClient wsClient = (WebSocketClient) ReflectionTestUtils.getField(webSocketClientFactory.getClientInstance(), "client"); + WebSocketClient wsClient = (WebSocketClient) ReflectionTestUtils.getField(webSocketClientFactory.getClientInstance("key"), "client"); assertEquals(1234, wsClient.getMaxIdleTimeout()); } diff --git a/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSessionTest.java b/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSessionTest.java index e7993b5dfe..104749eb7a 100644 --- a/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSessionTest.java +++ b/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSessionTest.java @@ -82,7 +82,7 @@ class GivenFirstConstructor { void whenFailingToCreateSession_thenThrowException() { WebSocketClientFactory webSocketClientFactory = mock(WebSocketClientFactory.class); JettyWebSocketClient jettyWebSocketClient = mock(JettyWebSocketClient.class); - when(webSocketClientFactory.getClientInstance()).thenReturn(jettyWebSocketClient); + when(webSocketClientFactory.getClientInstance("key")).thenReturn(jettyWebSocketClient); HttpHeaders headers = new WebSocketHttpHeaders(); headers.add("header", "someHeader"); when(serverSession.getHandshakeHeaders()).thenReturn(headers); @@ -93,7 +93,7 @@ void whenFailingToCreateSession_thenThrowException() { @Test void whenFailingOnHandshake_thenThrowException() { WebSocketClientFactory webSocketClientFactory = mock(WebSocketClientFactory.class); - when(webSocketClientFactory.getClientInstance()).thenThrow(new IllegalStateException()); + when(webSocketClientFactory.getClientInstance("key")).thenThrow(new IllegalStateException()); assertThrows(WebSocketProxyError.class, () -> new WebSocketRoutedSession(serverSession, "", webSocketClientFactory)); } } From 458e6941301e010e42cf365d2d49beba2213f471 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pablo=20Hern=C3=A1n=20Carle?= Date: Fri, 23 Aug 2024 10:00:47 +0200 Subject: [PATCH 03/39] address review, remove unused code MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Pablo Hernán Carle --- .../ws/WebSocketProxyServerHandler.java | 2 +- .../gateway/ws/WebSocketRoutedSession.java | 25 +++---------------- 2 files changed, 4 insertions(+), 23 deletions(-) diff --git a/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandler.java b/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandler.java index eb2921e616..8e8ea0b951 100644 --- a/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandler.java +++ b/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandler.java @@ -195,7 +195,7 @@ private void openWebSocketConnection(RoutedService service, ServiceInstance serv session.getClientHandler().handleMessage(webSocketSession, new TextMessage("Failed opening a session against " + targetUrl + ": " + failure.getMessage())); webSocketSession.close(CloseStatus.SERVER_ERROR); } catch (Exception e) { - log.debug("Failed seding / closing WebSocket session", e); + log.debug("Failed sending / closing WebSocket session", e); } }); } diff --git a/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSession.java b/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSession.java index 8dd3a3a663..e033e7beb0 100644 --- a/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSession.java +++ b/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSession.java @@ -11,9 +11,7 @@ package org.zowe.apiml.gateway.ws; import lombok.extern.slf4j.Slf4j; -import org.eclipse.jetty.websocket.api.UpgradeException; import org.springframework.http.HttpHeaders; -import org.springframework.http.HttpStatus; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.web.socket.CloseStatus; import org.springframework.web.socket.WebSocketHttpHeaders; @@ -90,23 +88,6 @@ private ListenableFuture createWebSocketClientSession(WebSocke } } - private WebSocketProxyError handleExecutionException(String targetUrl, ExecutionException cause, WebSocketSession webSocketServerSession, boolean logError) { - if (cause.getCause() != null && cause.getCause().getCause() instanceof UpgradeException) { - UpgradeException upgradeException = (UpgradeException) cause.getCause().getCause(); - if (upgradeException.getResponseStatusCode() == HttpStatus.UNAUTHORIZED.value()) { - String message = "Invalid login credentials"; - if (logError) { - log.debug(message); - } - return new WebSocketProxyError(message, cause, webSocketServerSession); - } else { - return webSocketProxyException(targetUrl, cause, webSocketServerSession, logError); - } - } else { - return webSocketProxyException(targetUrl, cause, webSocketServerSession, logError); - } - } - private WebSocketProxyError webSocketProxyException(String targetUrl, Exception cause, WebSocketSession webSocketServerSession, boolean logError) { String message = String.format("Error opening session to WebSocket service at %s: %s", targetUrl, cause.getMessage()); if (logError) { @@ -121,10 +102,10 @@ public void sendMessageToServer(WebSocketMessage webSocketMessage) throws IOE try { webSocketClientSession.get().sendMessage(webSocketMessage); } catch (IOException | InterruptedException | ExecutionException e) { - // TODO Log + log.debug("Failed sending message: {}", webSocketMessage, e); } } else { - // TODO Log + log.debug("Tried to send a WebSocket message in a non-established client session."); } } @@ -134,7 +115,7 @@ public void close(CloseStatus status) throws IOException { webSocketClientSession.get().close(status); } } catch (InterruptedException | ExecutionException | IOException e) { - // TODO Log + log.debug("Failed to close WebSocket client session with status {}", status, e); } } From 7221a3f494354931d2413d2fe8bc961dc0b70331 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pablo=20Hern=C3=A1n=20Carle?= Date: Fri, 23 Aug 2024 10:22:50 +0200 Subject: [PATCH 04/39] Fix WebSocketClientFactoryTest MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Pablo Hernán Carle --- .../ws/WebSocketClientFactoryTest.java | 25 ++++++++++++++----- 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketClientFactoryTest.java b/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketClientFactoryTest.java index 6da5d94e01..a54205d150 100644 --- a/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketClientFactoryTest.java +++ b/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketClientFactoryTest.java @@ -10,31 +10,44 @@ package org.zowe.apiml.gateway.ws; +import org.eclipse.jetty.util.ssl.SslContextFactory; +import org.eclipse.jetty.websocket.client.WebSocketClient; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; import org.springframework.test.util.ReflectionTestUtils; import org.springframework.web.socket.client.jetty.JettyWebSocketClient; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertSame; -import static org.mockito.Mockito.*; - -import org.eclipse.jetty.util.ssl.SslContextFactory; -import org.eclipse.jetty.websocket.client.WebSocketClient; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +@ExtendWith(MockitoExtension.class) class WebSocketClientFactoryTest { @Nested class CreatedInstance { - private WebSocketClientFactory webSocketClientFactory; + @Mock private JettyWebSocketClient client; + private WebSocketClientFactory webSocketClientFactory; + @BeforeEach void setUp() { - this.client = mock(JettyWebSocketClient.class); this.webSocketClientFactory = new WebSocketClientFactory(null, 0, 0, 0, 0, 0); + ConcurrentMap clients = new ConcurrentHashMap<>(); + clients.put("key", client); + ReflectionTestUtils.setField(webSocketClientFactory, "clientsMap", clients); } @Test From 09acdcf688c8f23a2b92001da165532533fcca2b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pablo=20Hern=C3=A1n=20Carle?= Date: Fri, 23 Aug 2024 10:31:57 +0200 Subject: [PATCH 05/39] fix WebSocketClientFactoryContextTest MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Pablo Hernán Carle --- .../ws/WebSocketClientFactoryContextTest.java | 23 +++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketClientFactoryContextTest.java b/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketClientFactoryContextTest.java index 1f0f6c9bdf..5b5e43da13 100644 --- a/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketClientFactoryContextTest.java +++ b/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketClientFactoryContextTest.java @@ -14,6 +14,7 @@ import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.websocket.api.WebSocketPolicy; import org.eclipse.jetty.websocket.client.WebSocketClient; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; @@ -23,9 +24,15 @@ import org.springframework.test.util.ReflectionTestUtils; import org.springframework.web.socket.client.jetty.JettyWebSocketClient; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; @SpringBootTest( properties = { @@ -44,6 +51,11 @@ public class WebSocketClientFactoryContextTest { @Autowired private WebSocketClientFactory webSocketClientFactory; + @BeforeEach + void setUp() { + webSocketClientFactory.getClientInstance("key"); + } + @Nested class GivenWebSocketClientParametrization { @@ -51,7 +63,7 @@ class GivenWebSocketClientParametrization { void thenBeanIsInitialized() { assertNotNull(webSocketClientFactory); - JettyWebSocketClient jettyWebSocketClient = (JettyWebSocketClient) ReflectionTestUtils.getField(webSocketClientFactory, "client"); + JettyWebSocketClient jettyWebSocketClient = webSocketClientFactory.getClientInstance("key"); WebSocketClient webSocketClient = (WebSocketClient) ReflectionTestUtils.getField(jettyWebSocketClient, "client"); WebSocketPolicy policy = webSocketClient.getPolicy(); @@ -72,6 +84,13 @@ class GivenFactory { private JettyWebSocketClient client = mock(JettyWebSocketClient.class); private WebSocketClientFactory factory = new WebSocketClientFactory(null, 0, 0, 0, 0, 0); + @BeforeEach + void setUp() { + ConcurrentMap clients = new ConcurrentHashMap<>(); + clients.put("key", client); + ReflectionTestUtils.setField(factory, "clientsMap", clients); + } + @Test void whenIsRunning_thenStop() { doReturn(true).when(client).isRunning(); From c58fc48879461055e92d960b22775ea260f6c1d5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pablo=20Hern=C3=A1n=20Carle?= Date: Fri, 23 Aug 2024 10:48:45 +0200 Subject: [PATCH 06/39] fix WebSocketRoutedSessionTest MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Pablo Hernán Carle --- .../gateway/ws/WebSocketRoutedSession.java | 4 ++- .../ws/WebSocketRoutedSessionTest.java | 29 ++++++++++--------- 2 files changed, 19 insertions(+), 14 deletions(-) diff --git a/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSession.java b/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSession.java index e033e7beb0..24b2668a55 100644 --- a/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSession.java +++ b/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSession.java @@ -10,6 +10,7 @@ package org.zowe.apiml.gateway.ws; +import com.google.common.annotations.VisibleForTesting; import lombok.extern.slf4j.Slf4j; import org.springframework.http.HttpHeaders; import org.springframework.util.concurrent.ListenableFuture; @@ -45,7 +46,8 @@ public WebSocketRoutedSession(WebSocketSession webSocketServerSession, String ta this.webSocketClientSession = createWebSocketClientSession(webSocketServerSession, targetUrl, webSocketClientFactory); } - public WebSocketRoutedSession(WebSocketSession webSocketServerSession, ListenableFuture webSocketClientSession, WebSocketProxyClientHandler clientHandler, String targetUrl) { + @VisibleForTesting + WebSocketRoutedSession(WebSocketSession webSocketServerSession, ListenableFuture webSocketClientSession, WebSocketProxyClientHandler clientHandler, String targetUrl) { this.webSocketClientSession = webSocketClientSession; this.webSocketServerSession = webSocketServerSession; this.clientHandler = clientHandler; diff --git a/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSessionTest.java b/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSessionTest.java index 104749eb7a..6b422fa22e 100644 --- a/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSessionTest.java +++ b/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSessionTest.java @@ -13,7 +13,11 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; import org.springframework.http.HttpHeaders; +import org.springframework.scheduling.annotation.AsyncResult; import org.springframework.web.socket.CloseStatus; import org.springframework.web.socket.WebSocketHttpHeaders; import org.springframework.web.socket.WebSocketMessage; @@ -29,21 +33,27 @@ import static org.hamcrest.Matchers.nullValue; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.mockito.Mockito.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +@ExtendWith(MockitoExtension.class) class WebSocketRoutedSessionTest { + + @Mock private WebSocketSession clientSession; + @Mock private WebSocketSession serverSession; + @Mock + private WebSocketProxyClientHandler clientHandler; private WebSocketRoutedSession underTest; @BeforeEach void prepareSessionUnderTest() { - clientSession = mock(WebSocketSession.class); - serverSession = mock(WebSocketSession.class); - - // underTest = new WebSocketRoutedSession(serverSession, clientSession, null, null); - underTest = null; + underTest = new WebSocketRoutedSession(serverSession, new AsyncResult<>(clientSession), clientHandler, "ws://localhost:8080/petstore"); } @Test @@ -53,7 +63,6 @@ void givenValidServerAndClientSession_whenTheDetailsAreRequested_thenTheDetailsA String serverUriPath = "ws://gateway:8080/petstore"; when(clientSession.getId()).thenReturn(sessionId); - when(clientSession.getUri()).thenReturn(new URI(clientUriPath)); when(serverSession.getRemoteAddress()).thenReturn(new InetSocketAddress("gateway", 8080)); when(serverSession.getUri()).thenReturn(new URI(serverUriPath)); @@ -70,12 +79,6 @@ void givenBrokenServerSession_whenUriIsRequested_NullIsReturned() { assertThat(underTest.getServerUri(), is(nullValue())); } - @Test - void givenBrokenClientSession_whenUriIsRequested_NullIsReturned() { - when(clientSession.getUri()).thenReturn(null); - assertThat(underTest.getClientUri(), is(nullValue())); - } - @Nested class GivenFirstConstructor { @Test From 6a550375eafdedcd92576a092a721b5108311cb6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pablo=20Hern=C3=A1n=20Carle?= Date: Fri, 23 Aug 2024 13:08:31 +0200 Subject: [PATCH 07/39] wip WebSocketProxyServerHandlerTest MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Pablo Hernán Carle --- .../ws/WebSocketProxyServerHandlerTest.java | 34 +++++++++++++++---- 1 file changed, 28 insertions(+), 6 deletions(-) diff --git a/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandlerTest.java b/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandlerTest.java index 9ac41fcc60..928902112d 100644 --- a/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandlerTest.java +++ b/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandlerTest.java @@ -14,6 +14,9 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; import org.springframework.cloud.client.ServiceInstance; import org.springframework.cloud.client.loadbalancer.LoadBalancerClient; import org.springframework.scheduling.annotation.AsyncResult; @@ -30,6 +33,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.CountDownLatch; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.notNullValue; @@ -45,7 +49,9 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +@ExtendWith(MockitoExtension.class) class WebSocketProxyServerHandlerTest { + private WebSocketProxyServerHandler underTest; private WebSocketRoutedSessionFactory webSocketRoutedSessionFactory; private Map routedSessions; @@ -68,6 +74,7 @@ public void setup() { @Nested class WhenTheConnectionIsEstablished { + WebSocketSession establishedSession; @BeforeEach @@ -77,6 +84,7 @@ void prepareSessionMock() { @Nested class ThenTheValidSessionIsStoredInternally { + @BeforeEach void prepareRoutedService() { String serviceId = "valid-service"; @@ -99,15 +107,19 @@ void prepareRoutedService() { * Proper WebSocketSession is stored. */ @Test - void givenValidRoute() throws Exception { + void givenValidRoute() throws Exception { String path = "wss://gatewayHost:1443/valid-service/ws/v1/valid-path"; - when(webSocketRoutedSessionFactory.session(any(), any(), any())).thenReturn(mock(WebSocketRoutedSession.class)); + WebSocketRoutedSession routedSession = mock(WebSocketRoutedSession.class); + when(webSocketRoutedSessionFactory.session(any(), any(), any())).thenReturn(routedSession); ServiceInstance serviceInstance = mock(ServiceInstance.class); when(lbClient.choose(any())).thenReturn(serviceInstance); String establishedSessionId = "validAndUniqueId"; when(establishedSession.getId()).thenReturn(establishedSessionId); when(establishedSession.getUri()).thenReturn(new URI(path)); + CountDownLatch latch = new CountDownLatch(1); + when(routedSession.getWebSocketClientSession()).thenReturn(null); + underTest.afterConnectionEstablished(establishedSession); verify(webSocketRoutedSessionFactory).session(any(), any(), any()); @@ -119,6 +131,7 @@ void givenValidRoute() throws Exception { @Nested class ThenTheSocketIsClosed { + @BeforeEach void sessionIsOpen() { when(establishedSession.isOpen()).thenReturn(true); @@ -186,7 +199,6 @@ void givenNoInstanceOfTheServiceIsInTheRepository() throws Exception { @Test void givenNullService_thenCloseWebSocket() throws Exception { when(establishedSession.getUri()).thenReturn(new URI("wss://gatewayHost:1443/service-without-instance/ws/v1/valid-path")); - when(lbClient.choose(any())).thenReturn(null); RoutedServices routesForSpecificValidService = mock(RoutedServices.class); when(routesForSpecificValidService.findServiceByGatewayUrl("ws/v1")) .thenReturn(null); @@ -201,17 +213,18 @@ void givenNullService_thenCloseWebSocket() throws Exception { @Nested class GivenValidExistingSession { + + @Mock WebSocketSession establishedSession; + @Mock WebSocketRoutedSession internallyStoredSession; + @Mock WebSocketMessage passedMessage; @BeforeEach void prepareSessionMock() { - establishedSession = mock(WebSocketSession.class); String validSessionId = "123"; when(establishedSession.getId()).thenReturn(validSessionId); - passedMessage = mock(WebSocketMessage.class); - internallyStoredSession = mock(WebSocketRoutedSession.class); routedSessions.put(validSessionId, internallyStoredSession); } @@ -239,6 +252,10 @@ void whenTheConnectionIsClosed_thenClientSessionIsAlsoClosed() throws IOExceptio @Test void whenTheMessageIsReceived_thenTheMessageIsPassedToTheSession() throws Exception { + WebSocketSession clientSession = mock(WebSocketSession.class); + when(clientSession.isOpen()).thenReturn(true); + when(internallyStoredSession.getWebSocketClientSession()).thenReturn(AsyncResult.forValue(clientSession)); + underTest.handleMessage(establishedSession, passedMessage); verify(internallyStoredSession).sendMessageToServer(passedMessage); @@ -280,15 +297,18 @@ void whenClosingRoutedSessionThrowException_thenCatchIt() throws IOException { @Nested class WhenGettingRoutedSessions { + @Test void thenReturnThem() { Map expectedRoutedSessions = underTest.getRoutedSessions(); assertThat(expectedRoutedSessions, is(routedSessions)); } + } @Nested class WhenGettingSubProtocols { + @Test void thenReturnThem() { List protocol = new ArrayList<>(); @@ -297,5 +317,7 @@ void thenReturnThem() { List subProtocols = underTest.getSubProtocols(); assertThat(subProtocols, is(protocol)); } + } + } From 239a4d71bb8037ed77c3647bbbc78974b0784df6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pablo=20Hern=C3=A1n=20Carle?= Date: Fri, 23 Aug 2024 15:37:49 +0200 Subject: [PATCH 08/39] fix WebSocketProxyServerHandlerTest MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Pablo Hernán Carle --- .../ws/WebSocketProxyServerHandlerTest.java | 31 ++++++++----------- 1 file changed, 13 insertions(+), 18 deletions(-) diff --git a/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandlerTest.java b/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandlerTest.java index 928902112d..983ff012e5 100644 --- a/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandlerTest.java +++ b/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandlerTest.java @@ -33,7 +33,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.CountDownLatch; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.notNullValue; @@ -41,6 +40,7 @@ import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.not; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doThrow; @@ -75,13 +75,9 @@ public void setup() { @Nested class WhenTheConnectionIsEstablished { + @Mock WebSocketSession establishedSession; - @BeforeEach - void prepareSessionMock() { - establishedSession = mock(WebSocketSession.class); - } - @Nested class ThenTheValidSessionIsStoredInternally { @@ -117,8 +113,7 @@ void givenValidRoute() throws Exception { when(establishedSession.getId()).thenReturn(establishedSessionId); when(establishedSession.getUri()).thenReturn(new URI(path)); - CountDownLatch latch = new CountDownLatch(1); - when(routedSession.getWebSocketClientSession()).thenReturn(null); + when(routedSession.getWebSocketClientSession()).thenReturn(AsyncResult.forValue(establishedSession)); underTest.afterConnectionEstablished(establishedSession); @@ -264,32 +259,32 @@ void whenTheMessageIsReceived_thenTheMessageIsPassedToTheSession() throws Except @Test void whenExceptionIsThrown_thenRemoveRoutedSession() throws Exception { doThrow(new WebSocketException("error")).when(routedSessions.get("123")).sendMessageToServer(passedMessage); + when(internallyStoredSession.getWebSocketClientSession()).thenReturn(AsyncResult.forValue(establishedSession)); + when(establishedSession.isOpen()).thenReturn(true); + underTest.handleMessage(establishedSession, passedMessage); + assertTrue(routedSessions.isEmpty()); } @Test + /** + * This scenario is now handled by Spring Retry + */ void whenSessionIsNull_thenCloseAndReturn() throws IOException { routedSessions.replace("123", null); - underTest.handleMessage(establishedSession, passedMessage); - assertTrue(routedSessions.isEmpty()); - verify(establishedSession, times(1)).close(CloseStatus.SESSION_NOT_RELIABLE); + assertThrows(ServerNotYetAvailableException.class, () -> underTest.handleMessage(establishedSession, passedMessage)); } @Test void whenClosingSessionThrowException_thenCatchIt() throws IOException { CloseStatus status = CloseStatus.SESSION_NOT_RELIABLE; doThrow(new IOException()).when(establishedSession).close(status); - underTest.afterConnectionClosed(establishedSession, status); - assertTrue(routedSessions.isEmpty()); - } + when(internallyStoredSession.getWebSocketClientSession()).thenReturn(AsyncResult.forValue(establishedSession)); - @Test - void whenClosingRoutedSessionThrowException_thenCatchIt() throws IOException { - CloseStatus status = CloseStatus.SESSION_NOT_RELIABLE; - doThrow(new IOException()).when(routedSessions.get("123")).close(status); underTest.afterConnectionClosed(establishedSession, status); + assertTrue(routedSessions.isEmpty()); } From 7015276b863be69b09fae3588d4870d877899b22 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pablo=20Hern=C3=A1n=20Carle?= Date: Fri, 23 Aug 2024 16:54:13 +0200 Subject: [PATCH 09/39] address sonar MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Pablo Hernán Carle --- .../ws/WebSocketProxyServerHandler.java | 28 +++++-------- .../gateway/ws/WebSocketRoutedSession.java | 39 +++++++++++++------ .../ws/WebSocketProxyServerHandlerTest.java | 2 +- 3 files changed, 39 insertions(+), 30 deletions(-) diff --git a/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandler.java b/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandler.java index 8e8ea0b951..45e6106119 100644 --- a/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandler.java +++ b/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandler.java @@ -39,7 +39,6 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutionException; /** * Handle initialization and management of routed WebSocket sessions. Copies @@ -188,7 +187,10 @@ private void openWebSocketConnection(RoutedService service, ServiceInstance serv WebSocketRoutedSession session = webSocketRoutedSessionFactory.session(webSocketSession, targetUrl, webSocketClientFactory); session.getWebSocketClientSession().addCallback( - successSession -> routedSessions.put(webSocketSession.getId(), session), + successSession -> { + session.setClientSession(successSession); + routedSessions.put(webSocketSession.getId(), session); + }, failure -> { log.debug("Failed opening client web socket session against {}. Server WebSocket session is {}", targetUrl, webSocketSession.getId(), failure); try { @@ -204,16 +206,13 @@ private void openWebSocketConnection(RoutedService service, ServiceInstance serv public void afterConnectionClosed(WebSocketSession session, CloseStatus status) { // if the browser closes the session, close the GWs client one as well. Optional.ofNullable(routedSessions.get(session.getId())) - .map(WebSocketRoutedSession::getWebSocketClientSession) + .filter(routedSession -> routedSession.isClientConnected()) + .map(WebSocketRoutedSession::getClientSession) .ifPresent(clientSession -> { try { - clientSession.get().close(status); + clientSession.close(status); } catch (IOException e) { log.debug("Error closing WebSocket client connection: {}", e.getMessage()); - } catch (InterruptedException e) { - // This will not happen, session will not be in the map unless future has already completed successfully - } catch (ExecutionException e) { - // This will not happen, session will not be in the map unless future has already completed successfully } }); routedSessions.remove(session.getId()); @@ -282,19 +281,12 @@ public void handleMessage(WebSocketSession webSocketSession, WebSocketMessage } private boolean isClientConnectionClosed(WebSocketRoutedSession session) { - try { - return session != null && session.getWebSocketClientSession().isDone() && !session.getWebSocketClientSession().get().isOpen(); - } catch (InterruptedException | ExecutionException e) { - return false; - } + return session != null && session.isClientConnected(); } private boolean isClientConnectionReady(WebSocketRoutedSession session) { - try { - return session != null && session.getWebSocketClientSession().isDone() && session.getWebSocketClientSession().get().isOpen(); - } catch (InterruptedException | ExecutionException e) { - return false; - } + WebSocketSession clientSession = session.getClientSession(); + return session != null && session.getWebSocketClientSession().isDone() && clientSession != null && !clientSession.isOpen(); } private WebSocketRoutedSession getRoutedSession(WebSocketSession webSocketSession) { diff --git a/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSession.java b/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSession.java index 24b2668a55..4ca6166443 100644 --- a/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSession.java +++ b/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSession.java @@ -20,10 +20,11 @@ import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.client.jetty.JettyWebSocketClient; +import javax.annotation.Nullable; + import java.io.IOException; import java.net.InetSocketAddress; import java.net.URI; -import java.util.concurrent.ExecutionException; /** * Represents a connection in the proxying chain, establishes 'client' to @@ -39,6 +40,8 @@ public class WebSocketRoutedSession { private final WebSocketProxyClientHandler clientHandler; private final String targetUrl; + private WebSocketSession clientSession; + public WebSocketRoutedSession(WebSocketSession webSocketServerSession, String targetUrl, WebSocketClientFactory webSocketClientFactory) { this.webSocketServerSession = webSocketServerSession; this.targetUrl = targetUrl; @@ -77,6 +80,15 @@ WebSocketProxyClientHandler getClientHandler() { return clientHandler; } + void setClientSession(WebSocketSession clientSession) { + this.clientSession = clientSession; + } + + @Nullable + WebSocketSession getClientSession() { + return clientSession; + } + private ListenableFuture createWebSocketClientSession(WebSocketSession webSocketServerSession, String targetUrl, WebSocketClientFactory webSocketClientFactory) { try { JettyWebSocketClient client = webSocketClientFactory.getClientInstance(targetUrl); @@ -100,10 +112,10 @@ private WebSocketProxyError webSocketProxyException(String targetUrl, Exception public void sendMessageToServer(WebSocketMessage webSocketMessage) throws IOException { log.debug("sendMessageToServer(session={}, message={})", webSocketClientSession, webSocketMessage); - if (webSocketClientSession.isDone()) { + if (isClientConnected()) { try { - webSocketClientSession.get().sendMessage(webSocketMessage); - } catch (IOException | InterruptedException | ExecutionException e) { + clientSession.sendMessage(webSocketMessage); + } catch (IOException e) { log.debug("Failed sending message: {}", webSocketMessage, e); } } else { @@ -111,12 +123,16 @@ public void sendMessageToServer(WebSocketMessage webSocketMessage) throws IOE } } + public boolean isClientConnected() { + return webSocketClientSession.isDone() && clientSession != null && clientSession.isOpen(); + } + public void close(CloseStatus status) throws IOException { try { - if (webSocketClientSession.isDone() && webSocketClientSession.get().isOpen()) { - webSocketClientSession.get().close(status); + if (isClientConnected()) { + clientSession.close(status); } - } catch (InterruptedException | ExecutionException | IOException e) { + } catch (IOException e) { log.debug("Failed to close WebSocket client session with status {}", status, e); } } @@ -154,10 +170,11 @@ public String getClientId() { if (!webSocketClientSession.isDone()) { throw new ServerNotYetAvailableException(); } - try { - return getWebSocketClientSession().get().getId(); - } catch (InterruptedException | ExecutionException e) { - return null; + if (isClientConnected()) { + return clientSession.getId(); } + + return null; } + } diff --git a/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandlerTest.java b/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandlerTest.java index 983ff012e5..2decbc8821 100644 --- a/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandlerTest.java +++ b/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandlerTest.java @@ -271,7 +271,7 @@ void whenExceptionIsThrown_thenRemoveRoutedSession() throws Exception { /** * This scenario is now handled by Spring Retry */ - void whenSessionIsNull_thenCloseAndReturn() throws IOException { + void whenSessionIsNull_thenCloseAndReturn() { routedSessions.replace("123", null); assertThrows(ServerNotYetAvailableException.class, () -> underTest.handleMessage(establishedSession, passedMessage)); From cfb457de84ad745f0f1a2ca7e315013f930634d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pablo=20Hern=C3=A1n=20Carle?= Date: Fri, 23 Aug 2024 17:09:07 +0200 Subject: [PATCH 10/39] fix condition and fix test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Pablo Hernán Carle --- .../apiml/gateway/ws/WebSocketProxyServerHandler.java | 6 +++--- .../gateway/ws/WebSocketProxyServerHandlerTest.java | 9 +++++++-- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandler.java b/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandler.java index 45e6106119..ed5a29bdd5 100644 --- a/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandler.java +++ b/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandler.java @@ -281,12 +281,12 @@ public void handleMessage(WebSocketSession webSocketSession, WebSocketMessage } private boolean isClientConnectionClosed(WebSocketRoutedSession session) { - return session != null && session.isClientConnected(); + WebSocketSession clientSession = session.getClientSession(); + return session != null && session.getWebSocketClientSession().isDone() && clientSession != null && !clientSession.isOpen(); } private boolean isClientConnectionReady(WebSocketRoutedSession session) { - WebSocketSession clientSession = session.getClientSession(); - return session != null && session.getWebSocketClientSession().isDone() && clientSession != null && !clientSession.isOpen(); + return session != null && session.isClientConnected(); } private WebSocketRoutedSession getRoutedSession(WebSocketSession webSocketSession) { diff --git a/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandlerTest.java b/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandlerTest.java index 2decbc8821..f3fc7c76bb 100644 --- a/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandlerTest.java +++ b/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandlerTest.java @@ -238,9 +238,11 @@ void whenTheConnectionIsClosed_thenTheSessionIsClosedAndRemovedFromRepository() void whenTheConnectionIsClosed_thenClientSessionIsAlsoClosed() throws IOException { CloseStatus normalClose = CloseStatus.NORMAL; WebSocketSession clientSession = mock(WebSocketSession.class); - when(internallyStoredSession.getWebSocketClientSession()).thenReturn(AsyncResult.forValue(clientSession)); + when(internallyStoredSession.isClientConnected()).thenReturn(true); + when(internallyStoredSession.getClientSession()).thenReturn(clientSession); underTest.afterConnectionClosed(establishedSession, normalClose); + verify(clientSession, times(1)).close(normalClose); assertThat(routedSessions.entrySet(), hasSize(0)); } @@ -249,7 +251,9 @@ void whenTheConnectionIsClosed_thenClientSessionIsAlsoClosed() throws IOExceptio void whenTheMessageIsReceived_thenTheMessageIsPassedToTheSession() throws Exception { WebSocketSession clientSession = mock(WebSocketSession.class); when(clientSession.isOpen()).thenReturn(true); + when(internallyStoredSession.isClientConnected()).thenReturn(true); when(internallyStoredSession.getWebSocketClientSession()).thenReturn(AsyncResult.forValue(clientSession)); + when(internallyStoredSession.getClientSession()).thenReturn(clientSession); underTest.handleMessage(establishedSession, passedMessage); @@ -281,7 +285,8 @@ void whenSessionIsNull_thenCloseAndReturn() { void whenClosingSessionThrowException_thenCatchIt() throws IOException { CloseStatus status = CloseStatus.SESSION_NOT_RELIABLE; doThrow(new IOException()).when(establishedSession).close(status); - when(internallyStoredSession.getWebSocketClientSession()).thenReturn(AsyncResult.forValue(establishedSession)); + when(internallyStoredSession.isClientConnected()).thenReturn(true); + when(internallyStoredSession.getClientSession()).thenReturn(establishedSession); underTest.afterConnectionClosed(establishedSession, status); From 32cfce473d930b33891995ff89c38ba6a0ee0980 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pablo=20Hern=C3=A1n=20Carle?= Date: Fri, 23 Aug 2024 17:19:10 +0200 Subject: [PATCH 11/39] fix tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Pablo Hernán Carle --- .../zowe/apiml/gateway/ws/WebSocketRoutedSessionTest.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSessionTest.java b/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSessionTest.java index 6b422fa22e..1691f1945e 100644 --- a/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSessionTest.java +++ b/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSessionTest.java @@ -54,6 +54,7 @@ class WebSocketRoutedSessionTest { @BeforeEach void prepareSessionUnderTest() { underTest = new WebSocketRoutedSession(serverSession, new AsyncResult<>(clientSession), clientHandler, "ws://localhost:8080/petstore"); + underTest.setClientSession(clientSession); } @Test @@ -62,6 +63,7 @@ void givenValidServerAndClientSession_whenTheDetailsAreRequested_thenTheDetailsA String clientUriPath = "ws://localhost:8080/petstore"; String serverUriPath = "ws://gateway:8080/petstore"; + when(clientSession.isOpen()).thenReturn(true); when(clientSession.getId()).thenReturn(sessionId); when(serverSession.getRemoteAddress()).thenReturn(new InetSocketAddress("gateway", 8080)); when(serverSession.getUri()).thenReturn(new URI(serverUriPath)); @@ -105,7 +107,10 @@ void whenFailingOnHandshake_thenThrowException() { class GivenWSMessage { @Test void whenAddressNotNull_thenSendMessage() throws IOException { + when(clientSession.isOpen()).thenReturn(true); + underTest.sendMessageToServer(mock(WebSocketMessage.class)); + verify(clientSession, times(1)).sendMessage(any()); } } From da8a1932a2c02c7302251eadacae89ae7724bd76 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pablo=20Hern=C3=A1n=20Carle?= Date: Fri, 23 Aug 2024 17:28:19 +0200 Subject: [PATCH 12/39] fix test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Pablo Hernán Carle --- .../zowe/apiml/gateway/ws/WebSocketProxyServerHandlerTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandlerTest.java b/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandlerTest.java index f3fc7c76bb..d5e713db4c 100644 --- a/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandlerTest.java +++ b/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandlerTest.java @@ -264,6 +264,8 @@ void whenTheMessageIsReceived_thenTheMessageIsPassedToTheSession() throws Except void whenExceptionIsThrown_thenRemoveRoutedSession() throws Exception { doThrow(new WebSocketException("error")).when(routedSessions.get("123")).sendMessageToServer(passedMessage); when(internallyStoredSession.getWebSocketClientSession()).thenReturn(AsyncResult.forValue(establishedSession)); + when(internallyStoredSession.getClientSession()).thenReturn(establishedSession); + when(internallyStoredSession.isClientConnected()).thenReturn(true); when(establishedSession.isOpen()).thenReturn(true); underTest.handleMessage(establishedSession, passedMessage); From 36fb7648c83b29653450df31714ea32c7434448a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pablo=20Hern=C3=A1n=20Carle?= Date: Mon, 26 Aug 2024 10:43:33 +0200 Subject: [PATCH 13/39] use callback MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Pablo Hernán Carle --- .../ws/ClientSessionFailureCallback.java | 19 ++++++ .../ws/ClientSessionSuccessCallback.java | 19 ++++++ .../ws/WebSocketProxyServerHandler.java | 36 +++++----- .../gateway/ws/WebSocketRoutedSession.java | 65 +++++++++++-------- .../ws/WebSocketRoutedSessionFactory.java | 10 ++- .../ws/WebSocketRoutedSessionFactoryImpl.java | 9 ++- .../ws/WebSocketProxyServerHandlerTest.java | 12 ++-- .../ws/WebSocketRoutedSessionTest.java | 20 ++++-- 8 files changed, 131 insertions(+), 59 deletions(-) create mode 100644 gateway-service/src/main/java/org/zowe/apiml/gateway/ws/ClientSessionFailureCallback.java create mode 100644 gateway-service/src/main/java/org/zowe/apiml/gateway/ws/ClientSessionSuccessCallback.java diff --git a/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/ClientSessionFailureCallback.java b/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/ClientSessionFailureCallback.java new file mode 100644 index 0000000000..33025e15d1 --- /dev/null +++ b/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/ClientSessionFailureCallback.java @@ -0,0 +1,19 @@ +/* + * This program and the accompanying materials are made available under the terms of the + * Eclipse Public License v2.0 which accompanies this distribution, and is available at + * https://www.eclipse.org/legal/epl-v20.html + * + * SPDX-License-Identifier: EPL-2.0 + * + * Copyright Contributors to the Zowe Project. + */ + +package org.zowe.apiml.gateway.ws; + +import org.springframework.web.socket.WebSocketSession; + +public interface ClientSessionFailureCallback { + + void onClientSessionFailure(WebSocketRoutedSession webSocketRoutedSession, WebSocketSession serverSession, Throwable throwable); + +} diff --git a/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/ClientSessionSuccessCallback.java b/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/ClientSessionSuccessCallback.java new file mode 100644 index 0000000000..f753e7647f --- /dev/null +++ b/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/ClientSessionSuccessCallback.java @@ -0,0 +1,19 @@ +/* + * This program and the accompanying materials are made available under the terms of the + * Eclipse Public License v2.0 which accompanies this distribution, and is available at + * https://www.eclipse.org/legal/epl-v20.html + * + * SPDX-License-Identifier: EPL-2.0 + * + * Copyright Contributors to the Zowe Project. + */ + +package org.zowe.apiml.gateway.ws; + +import org.springframework.web.socket.WebSocketSession; + +public interface ClientSessionSuccessCallback { + + void onClientSessionSuccess(WebSocketRoutedSession webSocketRoutedSession, WebSocketSession serverSession, WebSocketSession clientSession); + +} diff --git a/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandler.java b/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandler.java index ed5a29bdd5..1aecea7ae0 100644 --- a/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandler.java +++ b/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandler.java @@ -48,7 +48,7 @@ @Component @Singleton @Slf4j -public class WebSocketProxyServerHandler extends AbstractWebSocketHandler implements RoutedServicesUser, SubProtocolCapable { +public class WebSocketProxyServerHandler extends AbstractWebSocketHandler implements RoutedServicesUser, SubProtocolCapable, ClientSessionFailureCallback, ClientSessionSuccessCallback { @Value("${server.webSocket.supportedProtocols:-}") private List subProtocols; @@ -185,21 +185,7 @@ private void openWebSocketConnection(RoutedService service, ServiceInstance serv log.debug(String.format("Opening routed WebSocket session from %s to %s with %s by %s", uri.toString(), targetUrl, webSocketClientFactory, this)); - WebSocketRoutedSession session = webSocketRoutedSessionFactory.session(webSocketSession, targetUrl, webSocketClientFactory); - session.getWebSocketClientSession().addCallback( - successSession -> { - session.setClientSession(successSession); - routedSessions.put(webSocketSession.getId(), session); - }, - failure -> { - log.debug("Failed opening client web socket session against {}. Server WebSocket session is {}", targetUrl, webSocketSession.getId(), failure); - try { - session.getClientHandler().handleMessage(webSocketSession, new TextMessage("Failed opening a session against " + targetUrl + ": " + failure.getMessage())); - webSocketSession.close(CloseStatus.SERVER_ERROR); - } catch (Exception e) { - log.debug("Failed sending / closing WebSocket session", e); - } - }); + webSocketRoutedSessionFactory.session(webSocketSession, targetUrl, webSocketClientFactory, this, this); } @Override @@ -282,7 +268,7 @@ public void handleMessage(WebSocketSession webSocketSession, WebSocketMessage private boolean isClientConnectionClosed(WebSocketRoutedSession session) { WebSocketSession clientSession = session.getClientSession(); - return session != null && session.getWebSocketClientSession().isDone() && clientSession != null && !clientSession.isOpen(); + return session != null && clientSession != null && !clientSession.isOpen(); } private boolean isClientConnectionReady(WebSocketRoutedSession session) { @@ -292,4 +278,20 @@ private boolean isClientConnectionReady(WebSocketRoutedSession session) { private WebSocketRoutedSession getRoutedSession(WebSocketSession webSocketSession) { return routedSessions.get(webSocketSession.getId()); } + + @Override + public void onClientSessionSuccess(WebSocketRoutedSession routedSession, WebSocketSession serverSession, WebSocketSession clientSession) { + routedSessions.put(serverSession.getId(), routedSession); + } + + @Override + public void onClientSessionFailure(WebSocketRoutedSession routedSession, WebSocketSession serverSession, Throwable throwable) { + log.debug("Failed opening client web socket session against {}. Server WebSocket session is {}", routedSession.getTargetUrl(), serverSession.getId(), throwable); + try { + routedSession.getClientHandler().handleMessage(serverSession, new TextMessage("Failed opening a session against " + routedSession.getTargetUrl() + ": " + throwable.getMessage())); + serverSession.close(CloseStatus.SERVER_ERROR); + } catch (Exception e) { + log.debug("Failed sending / closing WebSocket session", e); + } + } } diff --git a/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSession.java b/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSession.java index 4ca6166443..b96268e074 100644 --- a/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSession.java +++ b/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSession.java @@ -13,18 +13,17 @@ import com.google.common.annotations.VisibleForTesting; import lombok.extern.slf4j.Slf4j; import org.springframework.http.HttpHeaders; -import org.springframework.util.concurrent.ListenableFuture; import org.springframework.web.socket.CloseStatus; import org.springframework.web.socket.WebSocketHttpHeaders; import org.springframework.web.socket.WebSocketMessage; import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.client.jetty.JettyWebSocketClient; -import javax.annotation.Nullable; - import java.io.IOException; import java.net.InetSocketAddress; import java.net.URI; +import java.util.ArrayList; +import java.util.List; /** * Represents a connection in the proxying chain, establishes 'client' to @@ -35,23 +34,33 @@ @Slf4j public class WebSocketRoutedSession { - private final ListenableFuture webSocketClientSession; + private volatile WebSocketSession clientSession; private final WebSocketSession webSocketServerSession; private final WebSocketProxyClientHandler clientHandler; private final String targetUrl; - private WebSocketSession clientSession; + private final List successCallbacks = new ArrayList<>(); + private final List failureCallbacks = new ArrayList<>(); - public WebSocketRoutedSession(WebSocketSession webSocketServerSession, String targetUrl, WebSocketClientFactory webSocketClientFactory) { - this.webSocketServerSession = webSocketServerSession; - this.targetUrl = targetUrl; - this.clientHandler = new WebSocketProxyClientHandler(webSocketServerSession); - this.webSocketClientSession = createWebSocketClientSession(webSocketServerSession, targetUrl, webSocketClientFactory); + + public WebSocketRoutedSession( + WebSocketSession webSocketServerSession, + String targetUrl, + WebSocketClientFactory webSocketClientFactory, + ClientSessionSuccessCallback successCallback, + ClientSessionFailureCallback failureCallback) { + this.webSocketServerSession = webSocketServerSession; + this.targetUrl = targetUrl; + this.clientHandler = new WebSocketProxyClientHandler(webSocketServerSession); + this.successCallbacks.add(successCallback); + this.failureCallbacks.add(failureCallback); + + createWebSocketClientSession(webSocketServerSession, targetUrl, webSocketClientFactory); } @VisibleForTesting - WebSocketRoutedSession(WebSocketSession webSocketServerSession, ListenableFuture webSocketClientSession, WebSocketProxyClientHandler clientHandler, String targetUrl) { - this.webSocketClientSession = webSocketClientSession; + WebSocketRoutedSession(WebSocketSession webSocketServerSession, WebSocketSession webSocketClientSession, WebSocketProxyClientHandler clientHandler, String targetUrl) { + this.clientSession = webSocketClientSession; this.webSocketServerSession = webSocketServerSession; this.clientHandler = clientHandler; this.targetUrl = targetUrl; @@ -68,10 +77,6 @@ private WebSocketHttpHeaders getWebSocketHttpHeaders(WebSocketSession webSocketS return headers; } - public ListenableFuture getWebSocketClientSession() { - return webSocketClientSession; - } - public WebSocketSession getWebSocketServerSession() { return webSocketServerSession; } @@ -80,21 +85,29 @@ WebSocketProxyClientHandler getClientHandler() { return clientHandler; } - void setClientSession(WebSocketSession clientSession) { - this.clientSession = clientSession; - } - - @Nullable WebSocketSession getClientSession() { return clientSession; } - private ListenableFuture createWebSocketClientSession(WebSocketSession webSocketServerSession, String targetUrl, WebSocketClientFactory webSocketClientFactory) { + String getTargetUrl() { + return targetUrl; + } + + private void onSuccess(WebSocketSession serverSession, WebSocketSession clientSession) { + this.successCallbacks.forEach(callback -> callback.onClientSessionSuccess(this, serverSession, clientSession)); + } + + private void onFailure(WebSocketSession serverSession, Throwable throwable) { + this.failureCallbacks.forEach(callback -> callback.onClientSessionFailure(this, serverSession, throwable)); + } + + private void createWebSocketClientSession(WebSocketSession webSocketServerSession, String targetUrl, WebSocketClientFactory webSocketClientFactory) { try { JettyWebSocketClient client = webSocketClientFactory.getClientInstance(targetUrl); URI targetURI = new URI(targetUrl); WebSocketHttpHeaders headers = getWebSocketHttpHeaders(webSocketServerSession); - return client.doHandshake(clientHandler, headers, targetURI); + client.doHandshake(clientHandler, headers, targetURI) + .addCallback(clientSession -> this.onSuccess(webSocketServerSession, clientSession), e -> this.onFailure(webSocketServerSession, e)); } catch (IllegalStateException e) { throw webSocketProxyException(targetUrl, e, webSocketServerSession, true); } catch (Exception e) { @@ -111,7 +124,7 @@ private WebSocketProxyError webSocketProxyException(String targetUrl, Exception } public void sendMessageToServer(WebSocketMessage webSocketMessage) throws IOException { - log.debug("sendMessageToServer(session={}, message={})", webSocketClientSession, webSocketMessage); + log.debug("sendMessageToServer(session={}, message={})", clientSession, webSocketMessage); if (isClientConnected()) { try { clientSession.sendMessage(webSocketMessage); @@ -124,7 +137,7 @@ public void sendMessageToServer(WebSocketMessage webSocketMessage) throws IOE } public boolean isClientConnected() { - return webSocketClientSession.isDone() && clientSession != null && clientSession.isOpen(); + return clientSession != null && clientSession.isOpen(); } public void close(CloseStatus status) throws IOException { @@ -167,7 +180,7 @@ public String getClientUri() { * @throws ServerNotYetAvailableException If a client session is not established */ public String getClientId() { - if (!webSocketClientSession.isDone()) { + if (clientSession == null) { throw new ServerNotYetAvailableException(); } if (isClientConnected()) { diff --git a/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSessionFactory.java b/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSessionFactory.java index 442b8d3290..fd99220e2b 100644 --- a/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSessionFactory.java +++ b/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSessionFactory.java @@ -18,7 +18,13 @@ public interface WebSocketRoutedSessionFactory { * @param webSocketSession Valid Server side WebSocket Session. * @param targetUrl Full websocket URL towards the server * @param webSocketClientFactory Factory producing the current SSL Context. - * @return Valid routed session handling the client session + * @param successCallback callback to be called if the session is established successfully + * @param failureCallback callback to be called if the session fails to be established */ - WebSocketRoutedSession session(WebSocketSession webSocketSession, String targetUrl, WebSocketClientFactory webSocketClientFactory); + WebSocketRoutedSession session( + WebSocketSession webSocketSession, + String targetUrl, + WebSocketClientFactory webSocketClientFactory, + ClientSessionSuccessCallback successCallback, + ClientSessionFailureCallback failureCallback); } diff --git a/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSessionFactoryImpl.java b/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSessionFactoryImpl.java index f4520a236d..8b6482afcb 100644 --- a/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSessionFactoryImpl.java +++ b/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSessionFactoryImpl.java @@ -18,8 +18,13 @@ public class WebSocketRoutedSessionFactoryImpl implements WebSocketRoutedSessionFactory { @Override - public WebSocketRoutedSession session(WebSocketSession webSocketSession, String targetUrl, WebSocketClientFactory webSocketClientFactory) { - return new WebSocketRoutedSession(webSocketSession, targetUrl, webSocketClientFactory); + public WebSocketRoutedSession session( + WebSocketSession webSocketSession, + String targetUrl, + WebSocketClientFactory webSocketClientFactory, + ClientSessionSuccessCallback successCallback, + ClientSessionFailureCallback failureCallback) { + return new WebSocketRoutedSession(webSocketSession, targetUrl, webSocketClientFactory, successCallback, failureCallback); } } diff --git a/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandlerTest.java b/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandlerTest.java index d5e713db4c..26897bbc8a 100644 --- a/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandlerTest.java +++ b/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandlerTest.java @@ -19,7 +19,6 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.springframework.cloud.client.ServiceInstance; import org.springframework.cloud.client.loadbalancer.LoadBalancerClient; -import org.springframework.scheduling.annotation.AsyncResult; import org.springframework.test.util.ReflectionTestUtils; import org.springframework.web.socket.CloseStatus; import org.springframework.web.socket.WebSocketMessage; @@ -106,18 +105,17 @@ void prepareRoutedService() { void givenValidRoute() throws Exception { String path = "wss://gatewayHost:1443/valid-service/ws/v1/valid-path"; WebSocketRoutedSession routedSession = mock(WebSocketRoutedSession.class); - when(webSocketRoutedSessionFactory.session(any(), any(), any())).thenReturn(routedSession); + when(webSocketRoutedSessionFactory.session(any(), any(), any(), any(), any())).thenReturn(routedSession); ServiceInstance serviceInstance = mock(ServiceInstance.class); when(lbClient.choose(any())).thenReturn(serviceInstance); String establishedSessionId = "validAndUniqueId"; when(establishedSession.getId()).thenReturn(establishedSessionId); when(establishedSession.getUri()).thenReturn(new URI(path)); - when(routedSession.getWebSocketClientSession()).thenReturn(AsyncResult.forValue(establishedSession)); - + underTest.onClientSessionSuccess(routedSession, establishedSession, establishedSession); underTest.afterConnectionEstablished(establishedSession); - verify(webSocketRoutedSessionFactory).session(any(), any(), any()); + verify(webSocketRoutedSessionFactory).session(any(), any(), any(), any(), any()); WebSocketRoutedSession preparedSession = routedSessions.get(establishedSessionId); assertThat(preparedSession, is(notNullValue())); } @@ -252,7 +250,7 @@ void whenTheMessageIsReceived_thenTheMessageIsPassedToTheSession() throws Except WebSocketSession clientSession = mock(WebSocketSession.class); when(clientSession.isOpen()).thenReturn(true); when(internallyStoredSession.isClientConnected()).thenReturn(true); - when(internallyStoredSession.getWebSocketClientSession()).thenReturn(AsyncResult.forValue(clientSession)); + when(internallyStoredSession.getClientSession()).thenReturn(clientSession); when(internallyStoredSession.getClientSession()).thenReturn(clientSession); underTest.handleMessage(establishedSession, passedMessage); @@ -263,7 +261,7 @@ void whenTheMessageIsReceived_thenTheMessageIsPassedToTheSession() throws Except @Test void whenExceptionIsThrown_thenRemoveRoutedSession() throws Exception { doThrow(new WebSocketException("error")).when(routedSessions.get("123")).sendMessageToServer(passedMessage); - when(internallyStoredSession.getWebSocketClientSession()).thenReturn(AsyncResult.forValue(establishedSession)); + when(internallyStoredSession.getClientSession()).thenReturn(establishedSession); when(internallyStoredSession.getClientSession()).thenReturn(establishedSession); when(internallyStoredSession.isClientConnected()).thenReturn(true); when(establishedSession.isOpen()).thenReturn(true); diff --git a/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSessionTest.java b/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSessionTest.java index 1691f1945e..c18599f749 100644 --- a/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSessionTest.java +++ b/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSessionTest.java @@ -17,7 +17,6 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.springframework.http.HttpHeaders; -import org.springframework.scheduling.annotation.AsyncResult; import org.springframework.web.socket.CloseStatus; import org.springframework.web.socket.WebSocketHttpHeaders; import org.springframework.web.socket.WebSocketMessage; @@ -48,13 +47,16 @@ class WebSocketRoutedSessionTest { private WebSocketSession serverSession; @Mock private WebSocketProxyClientHandler clientHandler; + @Mock + private ClientSessionSuccessCallback successCallback; + @Mock + private ClientSessionFailureCallback failureCallback; private WebSocketRoutedSession underTest; @BeforeEach void prepareSessionUnderTest() { - underTest = new WebSocketRoutedSession(serverSession, new AsyncResult<>(clientSession), clientHandler, "ws://localhost:8080/petstore"); - underTest.setClientSession(clientSession); + underTest = new WebSocketRoutedSession(serverSession, clientSession, clientHandler, "ws://localhost:8080/petstore"); } @Test @@ -83,6 +85,7 @@ void givenBrokenServerSession_whenUriIsRequested_NullIsReturned() { @Nested class GivenFirstConstructor { + @Test void whenFailingToCreateSession_thenThrowException() { WebSocketClientFactory webSocketClientFactory = mock(WebSocketClientFactory.class); @@ -91,7 +94,7 @@ void whenFailingToCreateSession_thenThrowException() { HttpHeaders headers = new WebSocketHttpHeaders(); headers.add("header", "someHeader"); when(serverSession.getHandshakeHeaders()).thenReturn(headers); - assertThrows(WebSocketProxyError.class, () -> new WebSocketRoutedSession(serverSession, "", webSocketClientFactory)); + assertThrows(WebSocketProxyError.class, () -> new WebSocketRoutedSession(serverSession, "", webSocketClientFactory, successCallback, failureCallback)); } @@ -99,12 +102,13 @@ void whenFailingToCreateSession_thenThrowException() { void whenFailingOnHandshake_thenThrowException() { WebSocketClientFactory webSocketClientFactory = mock(WebSocketClientFactory.class); when(webSocketClientFactory.getClientInstance("key")).thenThrow(new IllegalStateException()); - assertThrows(WebSocketProxyError.class, () -> new WebSocketRoutedSession(serverSession, "", webSocketClientFactory)); + assertThrows(WebSocketProxyError.class, () -> new WebSocketRoutedSession(serverSession, "", webSocketClientFactory, successCallback, failureCallback)); } } @Nested class GivenWSMessage { + @Test void whenAddressNotNull_thenSendMessage() throws IOException { when(clientSession.isOpen()).thenReturn(true); @@ -113,10 +117,12 @@ void whenAddressNotNull_thenSendMessage() throws IOException { verify(clientSession, times(1)).sendMessage(any()); } + } @Nested class GivenServerRemoteAddress { + @Test void whenAddressNotNull_thenReturnIt() { when(serverSession.getRemoteAddress()).thenReturn(new InetSocketAddress("gateway", 8080)); @@ -129,10 +135,12 @@ void whenAddressNull_thenReturnNull() { when(serverSession.getRemoteAddress()).thenReturn(null); assertNull((underTest.getServerRemoteAddress())); } + } @Nested class GivenCloseCall { + @Test void whenClosingSession_thenClose() throws IOException { CloseStatus closeStatus = new CloseStatus(CloseStatus.NOT_ACCEPTABLE.getCode()); @@ -140,5 +148,7 @@ void whenClosingSession_thenClose() throws IOException { underTest.close(closeStatus); verify(clientSession, times(1)).close(closeStatus); } + } + } From b6b3c1524d62a2180f5b878360b2402e4258ac21 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pablo=20Hern=C3=A1n=20Carle?= Date: Mon, 26 Aug 2024 10:58:43 +0200 Subject: [PATCH 14/39] add assignment MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Pablo Hernán Carle --- .../java/org/zowe/apiml/gateway/ws/WebSocketRoutedSession.java | 1 + 1 file changed, 1 insertion(+) diff --git a/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSession.java b/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSession.java index b96268e074..26d681d436 100644 --- a/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSession.java +++ b/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSession.java @@ -94,6 +94,7 @@ String getTargetUrl() { } private void onSuccess(WebSocketSession serverSession, WebSocketSession clientSession) { + this.clientSession = clientSession; this.successCallbacks.forEach(callback -> callback.onClientSessionSuccess(this, serverSession, clientSession)); } From ca46f76896bf4b423e46bfd594b3fff3eb5c138f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pablo=20Hern=C3=A1n=20Carle?= Date: Mon, 26 Aug 2024 10:59:52 +0200 Subject: [PATCH 15/39] use atomic reference MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Pablo Hernán Carle --- .../apiml/gateway/ws/WebSocketRoutedSession.java | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSession.java b/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSession.java index 26d681d436..e6efcca0c5 100644 --- a/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSession.java +++ b/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSession.java @@ -24,6 +24,7 @@ import java.net.URI; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.atomic.AtomicReference; /** * Represents a connection in the proxying chain, establishes 'client' to @@ -34,7 +35,7 @@ @Slf4j public class WebSocketRoutedSession { - private volatile WebSocketSession clientSession; + private AtomicReference clientSession; private final WebSocketSession webSocketServerSession; private final WebSocketProxyClientHandler clientHandler; private final String targetUrl; @@ -86,7 +87,7 @@ WebSocketProxyClientHandler getClientHandler() { } WebSocketSession getClientSession() { - return clientSession; + return clientSession.get(); } String getTargetUrl() { @@ -94,7 +95,7 @@ String getTargetUrl() { } private void onSuccess(WebSocketSession serverSession, WebSocketSession clientSession) { - this.clientSession = clientSession; + this.clientSession.set(clientSession); this.successCallbacks.forEach(callback -> callback.onClientSessionSuccess(this, serverSession, clientSession)); } @@ -128,7 +129,7 @@ public void sendMessageToServer(WebSocketMessage webSocketMessage) throws IOE log.debug("sendMessageToServer(session={}, message={})", clientSession, webSocketMessage); if (isClientConnected()) { try { - clientSession.sendMessage(webSocketMessage); + clientSession.get().sendMessage(webSocketMessage); } catch (IOException e) { log.debug("Failed sending message: {}", webSocketMessage, e); } @@ -138,13 +139,13 @@ public void sendMessageToServer(WebSocketMessage webSocketMessage) throws IOE } public boolean isClientConnected() { - return clientSession != null && clientSession.isOpen(); + return clientSession != null && clientSession.get().isOpen(); } public void close(CloseStatus status) throws IOException { try { if (isClientConnected()) { - clientSession.close(status); + clientSession.get().close(status); } } catch (IOException e) { log.debug("Failed to close WebSocket client session with status {}", status, e); @@ -185,7 +186,7 @@ public String getClientId() { throw new ServerNotYetAvailableException(); } if (isClientConnected()) { - return clientSession.getId(); + return clientSession.get().getId(); } return null; From e9360ab66f44f5e2a64c1f02055614e51cf09bc9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pablo=20Hern=C3=A1n=20Carle?= Date: Mon, 26 Aug 2024 11:25:07 +0200 Subject: [PATCH 16/39] fix constructor MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Pablo Hernán Carle --- .../java/org/zowe/apiml/gateway/ws/WebSocketRoutedSession.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSession.java b/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSession.java index e6efcca0c5..6c55b19b02 100644 --- a/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSession.java +++ b/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSession.java @@ -61,7 +61,7 @@ public WebSocketRoutedSession( @VisibleForTesting WebSocketRoutedSession(WebSocketSession webSocketServerSession, WebSocketSession webSocketClientSession, WebSocketProxyClientHandler clientHandler, String targetUrl) { - this.clientSession = webSocketClientSession; + this.clientSession.set(webSocketClientSession); this.webSocketServerSession = webSocketServerSession; this.clientHandler = clientHandler; this.targetUrl = targetUrl; From b8ec6480b19689cc783924ed012f557a60722d8a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pablo=20Hern=C3=A1n=20Carle?= Date: Mon, 26 Aug 2024 12:23:38 +0200 Subject: [PATCH 17/39] fix initialization MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Pablo Hernán Carle --- .../org/zowe/apiml/gateway/ws/WebSocketRoutedSession.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSession.java b/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSession.java index 6c55b19b02..a2ebe6f797 100644 --- a/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSession.java +++ b/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSession.java @@ -35,7 +35,7 @@ @Slf4j public class WebSocketRoutedSession { - private AtomicReference clientSession; + private AtomicReference clientSession = new AtomicReference<>(); private final WebSocketSession webSocketServerSession; private final WebSocketProxyClientHandler clientHandler; private final String targetUrl; @@ -94,7 +94,7 @@ String getTargetUrl() { return targetUrl; } - private void onSuccess(WebSocketSession serverSession, WebSocketSession clientSession) { + private synchronized void onSuccess(WebSocketSession serverSession, WebSocketSession clientSession) { this.clientSession.set(clientSession); this.successCallbacks.forEach(callback -> callback.onClientSessionSuccess(this, serverSession, clientSession)); } From 58c8de19de65f2bcfc60edc6251e4f3283617072 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pablo=20Hern=C3=A1n=20Carle?= Date: Mon, 26 Aug 2024 16:53:04 +0200 Subject: [PATCH 18/39] try fix integration test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Pablo Hernán Carle --- .../integration/ha/WebSocketChaoticTest.java | 37 +++++++++++++++---- 1 file changed, 29 insertions(+), 8 deletions(-) diff --git a/integration-tests/src/test/java/org/zowe/apiml/integration/ha/WebSocketChaoticTest.java b/integration-tests/src/test/java/org/zowe/apiml/integration/ha/WebSocketChaoticTest.java index 6b0ef739dd..f4532e5d80 100644 --- a/integration-tests/src/test/java/org/zowe/apiml/integration/ha/WebSocketChaoticTest.java +++ b/integration-tests/src/test/java/org/zowe/apiml/integration/ha/WebSocketChaoticTest.java @@ -24,6 +24,7 @@ import org.zowe.apiml.util.requests.ha.HADiscoverableClientRequests; import org.zowe.apiml.util.requests.ha.HAGatewayRequests; +import java.io.IOException; import java.net.URI; import java.util.Base64; import java.util.concurrent.TimeUnit; @@ -53,6 +54,7 @@ static void setup() { } private TextWebSocketHandler appendResponseHandler(StringBuilder target, int countToNotify) { + final AtomicInteger counter = new AtomicInteger(countToNotify); return new TextWebSocketHandler() { @Override @@ -86,17 +88,34 @@ private WebSocketSession appendingWebSocketSession(URI uri, WebSocketHttpHeaders @Nested class WhenRoutingSessionGivenHA { + + private WebSocketSession session; + + @BeforeEach + void setUp() { + + } + + @AfterEach + void tearDown() throws IOException { + if (session != null) { + session.close(); + } + } + @Nested class OpeningASession { + @Nested @TestMethodOrder(MethodOrderer.OrderAnnotation.class) class WhenAnInstanceIsOff { + @Test @Order(1) void propagateTheSessionToTheAliveInstance() throws Exception { final StringBuilder response = new StringBuilder(); - WebSocketSession session = appendingWebSocketSession(gatewaysWsRequests.getGatewayUrl( 0, DISCOVERABLE_WS_UPPERCASE), VALID_AUTH_HEADERS, response, 1); + session = appendingWebSocketSession(gatewaysWsRequests.getGatewayUrl( 0, DISCOVERABLE_WS_UPPERCASE), VALID_AUTH_HEADERS, response, 1); // shutdown one instance of DC to check whether the message can reach out the other instance haDiscoverableClientRequests.shutdown(0); @@ -107,7 +126,6 @@ void propagateTheSessionToTheAliveInstance() throws Exception { } assertEquals("HELLO WORLD!", response.toString()); - session.close(); } @Test @@ -116,7 +134,7 @@ void newSessionCanBeCreated() throws Exception { final StringBuilder response = new StringBuilder(); // create websocket session using the second instance of Gateway - WebSocketSession session = appendingWebSocketSession(gatewaysWsRequests.getGatewayUrl( 1, DISCOVERABLE_WS_UPPERCASE), VALID_AUTH_HEADERS, response, 1); + session = appendingWebSocketSession(gatewaysWsRequests.getGatewayUrl( 1, DISCOVERABLE_WS_UPPERCASE), VALID_AUTH_HEADERS, response, 1); session.sendMessage(new TextMessage("hello world 2!")); synchronized (response) { @@ -124,24 +142,24 @@ void newSessionCanBeCreated() throws Exception { } assertEquals("HELLO WORLD 2!", response.toString()); - session.close(); } /** - * The issue here proves that we don't actually correctly route. + * The issue here proves that we don't actually correctly route. * TODO: Introduce HA capabailities for the WebSocket connections. */ @Nested class WhenAGatewayInstanceIsOff { + @Test void newSessionCanBeCreated() throws Exception { final StringBuilder response = new StringBuilder(); HAGatewayRequests haGatewayRequests = new HAGatewayRequests("https"); // take off an instance of Gateway haGatewayRequests.shutdown(0); - + // create websocket session using the second alive instance of Gateway - WebSocketSession session = appendingWebSocketSession(gatewaysWsRequests.getGatewayUrl( 1, DISCOVERABLE_WS_UPPERCASE), VALID_AUTH_HEADERS, response, 1); + session = appendingWebSocketSession(gatewaysWsRequests.getGatewayUrl( 1, DISCOVERABLE_WS_UPPERCASE), VALID_AUTH_HEADERS, response, 1); session.sendMessage(new TextMessage("hello world 2!")); synchronized (response) { @@ -149,11 +167,14 @@ void newSessionCanBeCreated() throws Exception { } assertEquals("HELLO WORLD 2!", response.toString()); - session.close(); } + } + } + } + } } From 28957ffe439d79032b36f1cb419f78d0db473785 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pablo=20Hern=C3=A1n=20Carle?= Date: Tue, 27 Aug 2024 10:34:56 +0200 Subject: [PATCH 19/39] add test for factory MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Pablo Hernán Carle --- ...WebSocketRoutedSessionFactoryImplTest.java | 58 +++++++++++++++++++ 1 file changed, 58 insertions(+) create mode 100644 gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSessionFactoryImplTest.java diff --git a/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSessionFactoryImplTest.java b/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSessionFactoryImplTest.java new file mode 100644 index 0000000000..88d93f6283 --- /dev/null +++ b/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSessionFactoryImplTest.java @@ -0,0 +1,58 @@ +/* + * This program and the accompanying materials are made available under the terms of the + * Eclipse Public License v2.0 which accompanies this distribution, and is available at + * https://www.eclipse.org/legal/epl-v20.html + * + * SPDX-License-Identifier: EPL-2.0 + * + * Copyright Contributors to the Zowe Project. + */ + +package org.zowe.apiml.gateway.ws; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.http.HttpHeaders; +import org.springframework.scheduling.annotation.AsyncResult; +import org.springframework.web.socket.WebSocketHttpHeaders; +import org.springframework.web.socket.WebSocketSession; +import org.springframework.web.socket.client.jetty.JettyWebSocketClient; + +import java.net.URI; + +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class WebSocketRoutedSessionFactoryImplTest { + + private static final String WSS_URI = "wss://service"; + + @Mock + private WebSocketClientFactory factory; + @Mock + private WebSocketSession webSocketSession; + @Mock + private JettyWebSocketClient jettyWebSocketClient; + + private WebSocketRoutedSessionFactoryImpl underTest = new WebSocketRoutedSessionFactoryImpl(); + + @BeforeEach + void setUp() { + when(webSocketSession.getHandshakeHeaders()).thenReturn(HttpHeaders.EMPTY); + } + + @Test + void testSession() { + when(factory.getClientInstance(WSS_URI)).thenReturn(jettyWebSocketClient); + when(jettyWebSocketClient.doHandshake(any(), any(WebSocketHttpHeaders.class), any(URI.class))).thenReturn(AsyncResult.forValue(null)); + + WebSocketRoutedSession generated = underTest.session(webSocketSession, WSS_URI, factory, null, null); + + assertNotNull(generated); + } +} From 2041c9d8a8053192ae820cc65f66c51207a70c8f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pablo=20Hern=C3=A1n=20Carle?= Date: Tue, 27 Aug 2024 10:37:14 +0200 Subject: [PATCH 20/39] try fix it MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Pablo Hernán Carle --- .../org/zowe/apiml/integration/ha/WebSocketChaoticTest.java | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/integration-tests/src/test/java/org/zowe/apiml/integration/ha/WebSocketChaoticTest.java b/integration-tests/src/test/java/org/zowe/apiml/integration/ha/WebSocketChaoticTest.java index f4532e5d80..d3c8251616 100644 --- a/integration-tests/src/test/java/org/zowe/apiml/integration/ha/WebSocketChaoticTest.java +++ b/integration-tests/src/test/java/org/zowe/apiml/integration/ha/WebSocketChaoticTest.java @@ -91,11 +91,6 @@ class WhenRoutingSessionGivenHA { private WebSocketSession session; - @BeforeEach - void setUp() { - - } - @AfterEach void tearDown() throws IOException { if (session != null) { @@ -149,6 +144,7 @@ void newSessionCanBeCreated() throws Exception { * TODO: Introduce HA capabailities for the WebSocket connections. */ @Nested + @Order(3) class WhenAGatewayInstanceIsOff { @Test From a185f3cc274063512442e87202743cd525af0848 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pablo=20Hern=C3=A1n=20Carle?= Date: Tue, 27 Aug 2024 10:49:26 +0200 Subject: [PATCH 21/39] wait for shutdown in IT MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Pablo Hernán Carle --- .../org/zowe/apiml/integration/ha/WebSocketChaoticTest.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/integration-tests/src/test/java/org/zowe/apiml/integration/ha/WebSocketChaoticTest.java b/integration-tests/src/test/java/org/zowe/apiml/integration/ha/WebSocketChaoticTest.java index d3c8251616..f30bc7fea3 100644 --- a/integration-tests/src/test/java/org/zowe/apiml/integration/ha/WebSocketChaoticTest.java +++ b/integration-tests/src/test/java/org/zowe/apiml/integration/ha/WebSocketChaoticTest.java @@ -148,12 +148,15 @@ void newSessionCanBeCreated() throws Exception { class WhenAGatewayInstanceIsOff { @Test + @Timeout(value = 30, unit = TimeUnit.SECONDS) void newSessionCanBeCreated() throws Exception { final StringBuilder response = new StringBuilder(); HAGatewayRequests haGatewayRequests = new HAGatewayRequests("https"); // take off an instance of Gateway haGatewayRequests.shutdown(0); + Thread.sleep(10 * 1000); + // create websocket session using the second alive instance of Gateway session = appendingWebSocketSession(gatewaysWsRequests.getGatewayUrl( 1, DISCOVERABLE_WS_UPPERCASE), VALID_AUTH_HEADERS, response, 1); From 501a223f4158325742625f2d5b533ccc2d04b50f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pablo=20Hern=C3=A1n=20Carle?= Date: Tue, 27 Aug 2024 10:53:59 +0200 Subject: [PATCH 22/39] fix null check MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Pablo Hernán Carle --- .../org/zowe/apiml/gateway/ws/WebSocketRoutedSession.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSession.java b/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSession.java index a2ebe6f797..1a05774213 100644 --- a/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSession.java +++ b/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSession.java @@ -139,7 +139,7 @@ public void sendMessageToServer(WebSocketMessage webSocketMessage) throws IOE } public boolean isClientConnected() { - return clientSession != null && clientSession.get().isOpen(); + return clientSession.get() != null && clientSession.get().isOpen(); } public void close(CloseStatus status) throws IOException { @@ -182,7 +182,7 @@ public String getClientUri() { * @throws ServerNotYetAvailableException If a client session is not established */ public String getClientId() { - if (clientSession == null) { + if (clientSession.get() == null) { throw new ServerNotYetAvailableException(); } if (isClientConnected()) { From c4df8ecac447495ebdfe0265ae9a0e17259662d1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pablo=20Hern=C3=A1n=20Carle?= Date: Tue, 27 Aug 2024 11:39:37 +0200 Subject: [PATCH 23/39] try without pause MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Pablo Hernán Carle --- .../org/zowe/apiml/integration/ha/WebSocketChaoticTest.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/integration-tests/src/test/java/org/zowe/apiml/integration/ha/WebSocketChaoticTest.java b/integration-tests/src/test/java/org/zowe/apiml/integration/ha/WebSocketChaoticTest.java index f30bc7fea3..4dd2323818 100644 --- a/integration-tests/src/test/java/org/zowe/apiml/integration/ha/WebSocketChaoticTest.java +++ b/integration-tests/src/test/java/org/zowe/apiml/integration/ha/WebSocketChaoticTest.java @@ -155,8 +155,6 @@ void newSessionCanBeCreated() throws Exception { // take off an instance of Gateway haGatewayRequests.shutdown(0); - Thread.sleep(10 * 1000); - // create websocket session using the second alive instance of Gateway session = appendingWebSocketSession(gatewaysWsRequests.getGatewayUrl( 1, DISCOVERABLE_WS_UPPERCASE), VALID_AUTH_HEADERS, response, 1); From 5b5318a21826cb2e182eac21a3b75d34f60508df Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pablo=20Hern=C3=A1n=20Carle?= Date: Tue, 27 Aug 2024 11:49:29 +0200 Subject: [PATCH 24/39] method reference MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Pablo Hernán Carle --- .../org/zowe/apiml/gateway/ws/WebSocketProxyServerHandler.java | 2 +- .../org/zowe/apiml/integration/ha/WebSocketChaoticTest.java | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandler.java b/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandler.java index 1aecea7ae0..b79c7b225d 100644 --- a/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandler.java +++ b/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandler.java @@ -192,7 +192,7 @@ private void openWebSocketConnection(RoutedService service, ServiceInstance serv public void afterConnectionClosed(WebSocketSession session, CloseStatus status) { // if the browser closes the session, close the GWs client one as well. Optional.ofNullable(routedSessions.get(session.getId())) - .filter(routedSession -> routedSession.isClientConnected()) + .filter(WebSocketRoutedSession::isClientConnected) .map(WebSocketRoutedSession::getClientSession) .ifPresent(clientSession -> { try { diff --git a/integration-tests/src/test/java/org/zowe/apiml/integration/ha/WebSocketChaoticTest.java b/integration-tests/src/test/java/org/zowe/apiml/integration/ha/WebSocketChaoticTest.java index 4dd2323818..f30bc7fea3 100644 --- a/integration-tests/src/test/java/org/zowe/apiml/integration/ha/WebSocketChaoticTest.java +++ b/integration-tests/src/test/java/org/zowe/apiml/integration/ha/WebSocketChaoticTest.java @@ -155,6 +155,8 @@ void newSessionCanBeCreated() throws Exception { // take off an instance of Gateway haGatewayRequests.shutdown(0); + Thread.sleep(10 * 1000); + // create websocket session using the second alive instance of Gateway session = appendingWebSocketSession(gatewaysWsRequests.getGatewayUrl( 1, DISCOVERABLE_WS_UPPERCASE), VALID_AUTH_HEADERS, response, 1); From 3abd1707ab7dd725823574152988fa716ace7341 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pablo=20Hern=C3=A1n=20Carle?= Date: Tue, 27 Aug 2024 13:36:43 +0200 Subject: [PATCH 25/39] add tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Pablo Hernán Carle --- .../ws/WebSocketProxyServerHandler.java | 9 ++- .../ws/WebSocketProxyServerHandlerTest.java | 65 +++++++++++++++++++ 2 files changed, 72 insertions(+), 2 deletions(-) diff --git a/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandler.java b/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandler.java index b79c7b225d..d20a0d8c6d 100644 --- a/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandler.java +++ b/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandler.java @@ -70,7 +70,7 @@ public List getSubProtocols() { @Autowired public WebSocketProxyServerHandler(WebSocketClientFactory webSocketClientFactory, LoadBalancerClient lbCLient, ApplicationContext context) { this.webSocketClientFactory = webSocketClientFactory; - this.routedSessions = new ConcurrentHashMap<>(); // Default + this.routedSessions = new ConcurrentHashMap<>(); this.webSocketRoutedSessionFactory = new WebSocketRoutedSessionFactoryImpl(); this.lbCLient = lbCLient; this.context = context; @@ -149,7 +149,11 @@ private void routeToService(WebSocketSession webSocketSession, String serviceId, } } - @Retryable(value = WebSocketProxyError.class, backoff = @Backoff(value = 1000)) + @Retryable( + value = WebSocketProxyError.class, + backoff = @Backoff(value = 1000, multiplier = 2), + maxAttempts = 3 + ) void openConn(String serviceId, RoutedService service, WebSocketSession webSocketSession, String path) throws IOException { ServiceInstance serviceInstance = this.lbCLient.choose(serviceId); if (serviceInstance != null) { @@ -290,6 +294,7 @@ public void onClientSessionFailure(WebSocketRoutedSession routedSession, WebSock try { routedSession.getClientHandler().handleMessage(serverSession, new TextMessage("Failed opening a session against " + routedSession.getTargetUrl() + ": " + throwable.getMessage())); serverSession.close(CloseStatus.SERVER_ERROR); + routedSessions.remove(serverSession.getId()); } catch (Exception e) { log.debug("Failed sending / closing WebSocket session", e); } diff --git a/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandlerTest.java b/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandlerTest.java index 26897bbc8a..b8196795f9 100644 --- a/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandlerTest.java +++ b/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandlerTest.java @@ -21,6 +21,7 @@ import org.springframework.cloud.client.loadbalancer.LoadBalancerClient; import org.springframework.test.util.ReflectionTestUtils; import org.springframework.web.socket.CloseStatus; +import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.WebSocketMessage; import org.springframework.web.socket.WebSocketSession; import org.zowe.apiml.product.routing.RoutedService; @@ -39,13 +40,19 @@ import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.not; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assumptions.assumeTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) @@ -202,6 +209,64 @@ void givenNullService_thenCloseWebSocket() throws Exception { verify(establishedSession).close(new CloseStatus(CloseStatus.NOT_ACCEPTABLE.getCode(), "Requested ws/v1 url is not known by the gateway")); } } + + @Test + void testOnClientSessionSuccess() { + assumeTrue(underTest.getRoutedSessions().isEmpty()); + when(establishedSession.getId()).thenReturn("mockId"); + + underTest.onClientSessionSuccess(mock(WebSocketRoutedSession.class), establishedSession, establishedSession); + + verifyNoMoreInteractions(establishedSession); + assertNotNull(underTest.getRoutedSessions().get("mockId")); + } + + } + + @Nested + class GivenInvalidClientSession { + + @Mock + private WebSocketSession session; + @Mock + private WebSocketRoutedSession routedSession; + + @Test + void whenHandleMessage_thenThrowNotAvailable() { + assumeTrue(underTest.getRoutedSessions().isEmpty()); + assertThrows(ServerNotYetAvailableException.class, () -> underTest.handleMessage(session, new TextMessage("null"))); + } + + @Test + void whenRecover_thenCloseServerSession() throws IOException { + doNothing().when(session).close(CloseStatus.SESSION_NOT_RELIABLE); + when(session.getId()).thenReturn("mockId"); + underTest.closeServerSession(null, session, null); + + verifyNoMoreInteractions(session); + } + + @Test + void whenFailedToObtain_thenCloseServerSession() throws Exception { + assumeTrue(underTest.getRoutedSessions().isEmpty()); + + underTest.getRoutedSessions().put("mockId", routedSession); + + WebSocketProxyClientHandler clientHandler = mock(WebSocketProxyClientHandler.class); + when(routedSession.getClientHandler()).thenReturn(clientHandler); + when(routedSession.getTargetUrl()).thenReturn("targetUrl"); + doNothing().when(clientHandler).handleMessage(eq(session), argThat(tm -> tm.getPayload() instanceof String && String.valueOf(tm.getPayload()).contains("a message"))); + doNothing().when(session).close(CloseStatus.SERVER_ERROR); + when(session.getId()).thenReturn("mockId"); + + underTest.onClientSessionFailure(routedSession, session, new RuntimeException("a message")); + + verifyNoMoreInteractions(session); + verifyNoMoreInteractions(routedSession); + + assertTrue(underTest.getRoutedSessions().isEmpty()); + } + } @Nested From d1816e8d8746fd5606587fadfbdb19828abc4cf9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pablo=20Hern=C3=A1n=20Carle?= Date: Tue, 27 Aug 2024 17:44:10 +0200 Subject: [PATCH 26/39] add tests for coverage MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Pablo Hernán Carle --- .../gateway/ws/WebSocketRoutedSession.java | 2 ++ .../ws/WebSocketRoutedSessionTest.java | 32 +++++++++++++++++++ 2 files changed, 34 insertions(+) diff --git a/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSession.java b/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSession.java index 1a05774213..6bf1349fa6 100644 --- a/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSession.java +++ b/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSession.java @@ -132,9 +132,11 @@ public void sendMessageToServer(WebSocketMessage webSocketMessage) throws IOE clientSession.get().sendMessage(webSocketMessage); } catch (IOException e) { log.debug("Failed sending message: {}", webSocketMessage, e); + throw e; } } else { log.debug("Tried to send a WebSocket message in a non-established client session."); + throw new ServerNotYetAvailableException(); } } diff --git a/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSessionTest.java b/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSessionTest.java index c18599f749..125cfee6a2 100644 --- a/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSessionTest.java +++ b/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSessionTest.java @@ -17,6 +17,7 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.springframework.http.HttpHeaders; +import org.springframework.test.util.ReflectionTestUtils; import org.springframework.web.socket.CloseStatus; import org.springframework.web.socket.WebSocketHttpHeaders; import org.springframework.web.socket.WebSocketMessage; @@ -26,13 +27,16 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.net.URI; +import java.util.concurrent.atomic.AtomicReference; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertThrowsExactly; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -77,6 +81,18 @@ void givenValidServerAndClientSession_whenTheDetailsAreRequested_thenTheDetailsA assertThat(underTest.getServerUri(), is(serverUriPath)); } + @Test + void givenNoClientSession_whenGetClientId_thenThrowException() { + ReflectionTestUtils.setField(underTest, "clientSession", new AtomicReference<>()); + assertThrows(ServerNotYetAvailableException.class, () -> underTest.getClientId()); + } + + @Test + void givenClientSessionNotConnected_whenGetClientId_thenNull() { + when(clientSession.isOpen()).thenReturn(false); + assertNull(underTest.getClientId()); + } + @Test void givenBrokenServerSession_whenUriIsRequested_NullIsReturned() { when(serverSession.getUri()).thenReturn(null); @@ -118,6 +134,22 @@ void whenAddressNotNull_thenSendMessage() throws IOException { verify(clientSession, times(1)).sendMessage(any()); } + @Test + void whenClientNotConnected_throwNotAvailable() throws IOException { + when(clientSession.isOpen()).thenReturn(false); + + assertThrows(ServerNotYetAvailableException.class, () -> underTest.sendMessageToServer(mock(WebSocketMessage.class))); + } + + @Test + void whenClientConnectedAndException_thenThrowIt() throws IOException { + when(clientSession.isOpen()).thenReturn(true); + Exception e = new IOException("message"); + doThrow(e).when(clientSession).sendMessage(any()); + + assertThrowsExactly(IOException.class, () -> underTest.sendMessageToServer(mock(WebSocketMessage.class)), "message"); + } + } @Nested From a51cb7dbbc2fabbc656efca3636774f36ae813cc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pablo=20Hern=C3=A1n=20Carle?= Date: Tue, 27 Aug 2024 18:30:46 +0200 Subject: [PATCH 27/39] add test for coverage MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Pablo Hernán Carle --- .../ws/WebSocketProxyServerHandler.java | 2 ++ .../ws/WebSocketProxyServerHandlerTest.java | 19 ++++++++++++++++++- 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandler.java b/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandler.java index d20a0d8c6d..74d3296f69 100644 --- a/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandler.java +++ b/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandler.java @@ -264,6 +264,8 @@ public void handleMessage(WebSocketSession webSocketSession, WebSocketMessage closeServerSession(null, webSocketSession, webSocketMessage); } session.sendMessageToServer(webSocketMessage); + } catch (ServerNotYetAvailableException e) { + throw e; } catch (Exception ex) { log.debug("Error sending WebSocket message. Closing session due to exception:", ex); close(webSocketSession, CloseStatus.SESSION_NOT_RELIABLE); diff --git a/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandlerTest.java b/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandlerTest.java index b8196795f9..697017b4cf 100644 --- a/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandlerTest.java +++ b/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandlerTest.java @@ -316,13 +316,30 @@ void whenTheMessageIsReceived_thenTheMessageIsPassedToTheSession() throws Except when(clientSession.isOpen()).thenReturn(true); when(internallyStoredSession.isClientConnected()).thenReturn(true); when(internallyStoredSession.getClientSession()).thenReturn(clientSession); - when(internallyStoredSession.getClientSession()).thenReturn(clientSession); underTest.handleMessage(establishedSession, passedMessage); verify(internallyStoredSession).sendMessageToServer(passedMessage); } + @Test + void givenClientNotConnected_whenHandleMessage_thenThrowException() { when(internallyStoredSession.isClientConnected()).thenReturn(false); + assertThrows(ServerNotYetAvailableException.class, () -> underTest.handleMessage(establishedSession, passedMessage)); + } + + @Test + void givenClientConnectionClosed_whenHandleMessage_thenCloseServerSession() throws IOException { + WebSocketSession clientSession = mock(WebSocketSession.class); + + when(clientSession.isOpen()).thenReturn(false); + when(internallyStoredSession.isClientConnected()).thenReturn(true); + when(internallyStoredSession.getClientSession()).thenReturn(clientSession); + + underTest.handleMessage(establishedSession, passedMessage); + + verify(establishedSession, times(1)).close(CloseStatus.SESSION_NOT_RELIABLE); + } + @Test void whenExceptionIsThrown_thenRemoveRoutedSession() throws Exception { doThrow(new WebSocketException("error")).when(routedSessions.get("123")).sendMessageToServer(passedMessage); From 6528e135b2a64beee3509bc76a6a07c2372e61a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pablo=20Hern=C3=A1n=20Carle?= Date: Tue, 27 Aug 2024 22:54:32 +0200 Subject: [PATCH 28/39] address sonar issues MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Pablo Hernán Carle --- .../org/zowe/apiml/gateway/ws/WebSocketProxyServerHandler.java | 2 +- .../java/org/zowe/apiml/gateway/ws/WebSocketRoutedSession.java | 2 +- .../zowe/apiml/gateway/ws/WebSocketProxyServerHandlerTest.java | 3 ++- .../org/zowe/apiml/gateway/ws/WebSocketRoutedSessionTest.java | 2 +- .../org/zowe/apiml/integration/ha/WebSocketChaoticTest.java | 2 -- 5 files changed, 5 insertions(+), 6 deletions(-) diff --git a/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandler.java b/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandler.java index 74d3296f69..2d95d25c2e 100644 --- a/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandler.java +++ b/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandler.java @@ -274,7 +274,7 @@ public void handleMessage(WebSocketSession webSocketSession, WebSocketMessage private boolean isClientConnectionClosed(WebSocketRoutedSession session) { WebSocketSession clientSession = session.getClientSession(); - return session != null && clientSession != null && !clientSession.isOpen(); + return clientSession != null && !clientSession.isOpen(); } private boolean isClientConnectionReady(WebSocketRoutedSession session) { diff --git a/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSession.java b/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSession.java index 6bf1349fa6..60907b4219 100644 --- a/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSession.java +++ b/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSession.java @@ -109,7 +109,7 @@ private void createWebSocketClientSession(WebSocketSession webSocketServerSessio URI targetURI = new URI(targetUrl); WebSocketHttpHeaders headers = getWebSocketHttpHeaders(webSocketServerSession); client.doHandshake(clientHandler, headers, targetURI) - .addCallback(clientSession -> this.onSuccess(webSocketServerSession, clientSession), e -> this.onFailure(webSocketServerSession, e)); + .addCallback(obtainedSession -> this.onSuccess(webSocketServerSession, obtainedSession), e -> this.onFailure(webSocketServerSession, e)); } catch (IllegalStateException e) { throw webSocketProxyException(targetUrl, e, webSocketServerSession, true); } catch (Exception e) { diff --git a/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandlerTest.java b/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandlerTest.java index 697017b4cf..8056d879cf 100644 --- a/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandlerTest.java +++ b/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandlerTest.java @@ -234,7 +234,8 @@ class GivenInvalidClientSession { @Test void whenHandleMessage_thenThrowNotAvailable() { assumeTrue(underTest.getRoutedSessions().isEmpty()); - assertThrows(ServerNotYetAvailableException.class, () -> underTest.handleMessage(session, new TextMessage("null"))); + WebSocketMessage message = new TextMessage("null"); + assertThrows(ServerNotYetAvailableException.class, () -> underTest.handleMessage(session, message)); } @Test diff --git a/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSessionTest.java b/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSessionTest.java index 125cfee6a2..cd5766ce5b 100644 --- a/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSessionTest.java +++ b/gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketRoutedSessionTest.java @@ -135,7 +135,7 @@ void whenAddressNotNull_thenSendMessage() throws IOException { } @Test - void whenClientNotConnected_throwNotAvailable() throws IOException { + void whenClientNotConnected_throwNotAvailable() { when(clientSession.isOpen()).thenReturn(false); assertThrows(ServerNotYetAvailableException.class, () -> underTest.sendMessageToServer(mock(WebSocketMessage.class))); diff --git a/integration-tests/src/test/java/org/zowe/apiml/integration/ha/WebSocketChaoticTest.java b/integration-tests/src/test/java/org/zowe/apiml/integration/ha/WebSocketChaoticTest.java index f30bc7fea3..4dd2323818 100644 --- a/integration-tests/src/test/java/org/zowe/apiml/integration/ha/WebSocketChaoticTest.java +++ b/integration-tests/src/test/java/org/zowe/apiml/integration/ha/WebSocketChaoticTest.java @@ -155,8 +155,6 @@ void newSessionCanBeCreated() throws Exception { // take off an instance of Gateway haGatewayRequests.shutdown(0); - Thread.sleep(10 * 1000); - // create websocket session using the second alive instance of Gateway session = appendingWebSocketSession(gatewaysWsRequests.getGatewayUrl( 1, DISCOVERABLE_WS_UPPERCASE), VALID_AUTH_HEADERS, response, 1); From f1b05990ccdafb96bbec3d888694c3ce03eaa438 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pablo=20Hern=C3=A1n=20Carle?= Date: Tue, 27 Aug 2024 23:17:15 +0200 Subject: [PATCH 29/39] await MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Pablo Hernán Carle --- .../org/zowe/apiml/integration/ha/WebSocketChaoticTest.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/integration-tests/src/test/java/org/zowe/apiml/integration/ha/WebSocketChaoticTest.java b/integration-tests/src/test/java/org/zowe/apiml/integration/ha/WebSocketChaoticTest.java index 4dd2323818..81466694ba 100644 --- a/integration-tests/src/test/java/org/zowe/apiml/integration/ha/WebSocketChaoticTest.java +++ b/integration-tests/src/test/java/org/zowe/apiml/integration/ha/WebSocketChaoticTest.java @@ -26,11 +26,13 @@ import java.io.IOException; import java.net.URI; +import java.time.Duration; import java.util.Base64; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import static org.apache.tomcat.websocket.Constants.SSL_CONTEXT_PROPERTY; +import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.zowe.apiml.util.requests.Endpoints.*; @@ -155,6 +157,8 @@ void newSessionCanBeCreated() throws Exception { // take off an instance of Gateway haGatewayRequests.shutdown(0); + await().during(Duration.ofSeconds(5)); + // create websocket session using the second alive instance of Gateway session = appendingWebSocketSession(gatewaysWsRequests.getGatewayUrl( 1, DISCOVERABLE_WS_UPPERCASE), VALID_AUTH_HEADERS, response, 1); From 053718e35bfda646988067764c59034ea179ad68 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pablo=20Hern=C3=A1n=20Carle?= Date: Wed, 28 Aug 2024 09:21:06 +0200 Subject: [PATCH 30/39] use await MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Pablo Hernán Carle --- .../integration/ha/WebSocketChaoticTest.java | 31 ++++++++++++------- 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/integration-tests/src/test/java/org/zowe/apiml/integration/ha/WebSocketChaoticTest.java b/integration-tests/src/test/java/org/zowe/apiml/integration/ha/WebSocketChaoticTest.java index 81466694ba..0a67a0e650 100644 --- a/integration-tests/src/test/java/org/zowe/apiml/integration/ha/WebSocketChaoticTest.java +++ b/integration-tests/src/test/java/org/zowe/apiml/integration/ha/WebSocketChaoticTest.java @@ -10,7 +10,13 @@ package org.zowe.apiml.integration.ha; -import org.junit.jupiter.api.*; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.MethodOrderer; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestMethodOrder; +import org.junit.jupiter.api.Timeout; import org.springframework.core.annotation.Order; import org.springframework.web.socket.CloseStatus; import org.springframework.web.socket.TextMessage; @@ -26,7 +32,6 @@ import java.io.IOException; import java.net.URI; -import java.time.Duration; import java.util.Base64; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -34,7 +39,7 @@ import static org.apache.tomcat.websocket.Constants.SSL_CONTEXT_PROPERTY; import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.zowe.apiml.util.requests.Endpoints.*; +import static org.zowe.apiml.util.requests.Endpoints.DISCOVERABLE_WS_UPPERCASE; /** * Verify behaviour of the Websocket under HA and chaotic testing @@ -157,17 +162,19 @@ void newSessionCanBeCreated() throws Exception { // take off an instance of Gateway haGatewayRequests.shutdown(0); - await().during(Duration.ofSeconds(5)); + await("Gateway Shutdown") + .atMost(20, TimeUnit.SECONDS) + .untilAsserted(() -> { + // create websocket session using the second alive instance of Gateway + session = appendingWebSocketSession(gatewaysWsRequests.getGatewayUrl( 1, DISCOVERABLE_WS_UPPERCASE), VALID_AUTH_HEADERS, response, 1); - // create websocket session using the second alive instance of Gateway - session = appendingWebSocketSession(gatewaysWsRequests.getGatewayUrl( 1, DISCOVERABLE_WS_UPPERCASE), VALID_AUTH_HEADERS, response, 1); + session.sendMessage(new TextMessage("hello world 2!")); + synchronized (response) { + response.wait(WAIT_TIMEOUT_MS); + } - session.sendMessage(new TextMessage("hello world 2!")); - synchronized (response) { - response.wait(WAIT_TIMEOUT_MS); - } - - assertEquals("HELLO WORLD 2!", response.toString()); + assertEquals("HELLO WORLD 2!", response.toString()); + }); } } From 68d6a0593abbbd7835837251eab1fe10d05017fc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pablo=20Hern=C3=A1n=20Carle?= Date: Wed, 28 Aug 2024 09:52:38 +0200 Subject: [PATCH 31/39] wip MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Pablo Hernán Carle --- .../org/zowe/apiml/integration/ha/WebSocketChaoticTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/integration-tests/src/test/java/org/zowe/apiml/integration/ha/WebSocketChaoticTest.java b/integration-tests/src/test/java/org/zowe/apiml/integration/ha/WebSocketChaoticTest.java index 0a67a0e650..fffd40eba0 100644 --- a/integration-tests/src/test/java/org/zowe/apiml/integration/ha/WebSocketChaoticTest.java +++ b/integration-tests/src/test/java/org/zowe/apiml/integration/ha/WebSocketChaoticTest.java @@ -156,7 +156,7 @@ class WhenAGatewayInstanceIsOff { @Test @Timeout(value = 30, unit = TimeUnit.SECONDS) - void newSessionCanBeCreated() throws Exception { + void newSessionCanBeCreated() { final StringBuilder response = new StringBuilder(); HAGatewayRequests haGatewayRequests = new HAGatewayRequests("https"); // take off an instance of Gateway @@ -164,7 +164,7 @@ void newSessionCanBeCreated() throws Exception { await("Gateway Shutdown") .atMost(20, TimeUnit.SECONDS) - .untilAsserted(() -> { + .until(() -> { // create websocket session using the second alive instance of Gateway session = appendingWebSocketSession(gatewaysWsRequests.getGatewayUrl( 1, DISCOVERABLE_WS_UPPERCASE), VALID_AUTH_HEADERS, response, 1); @@ -173,7 +173,7 @@ void newSessionCanBeCreated() throws Exception { response.wait(WAIT_TIMEOUT_MS); } - assertEquals("HELLO WORLD 2!", response.toString()); + return response.toString().equals("HELLO WORLD 2!"); }); } From a5f5d76f5e66351cd61299d1137f53105dd06317 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pablo=20Hern=C3=A1n=20Carle?= Date: Wed, 28 Aug 2024 10:05:33 +0200 Subject: [PATCH 32/39] close temp sessions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Pablo Hernán Carle --- .../zowe/apiml/integration/ha/WebSocketChaoticTest.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/integration-tests/src/test/java/org/zowe/apiml/integration/ha/WebSocketChaoticTest.java b/integration-tests/src/test/java/org/zowe/apiml/integration/ha/WebSocketChaoticTest.java index fffd40eba0..b58881df49 100644 --- a/integration-tests/src/test/java/org/zowe/apiml/integration/ha/WebSocketChaoticTest.java +++ b/integration-tests/src/test/java/org/zowe/apiml/integration/ha/WebSocketChaoticTest.java @@ -173,7 +173,12 @@ void newSessionCanBeCreated() { response.wait(WAIT_TIMEOUT_MS); } - return response.toString().equals("HELLO WORLD 2!"); + if (response.toString().equals("HELLO WORLD 2!")) { + return true; + } else { + session.close(); + return false; + } }); } From 5a9128724f043137cc2558af5808423f8b580c5d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pablo=20Hern=C3=A1n=20Carle?= Date: Wed, 28 Aug 2024 10:22:30 +0200 Subject: [PATCH 33/39] logs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Pablo Hernán Carle --- .../apiml/gateway/ws/WebSocketProxyServerHandler.java | 1 + .../apiml/integration/ha/WebSocketChaoticTest.java | 10 +++++++++- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandler.java b/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandler.java index 2d95d25c2e..351e4394e0 100644 --- a/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandler.java +++ b/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandler.java @@ -195,6 +195,7 @@ private void openWebSocketConnection(RoutedService service, ServiceInstance serv @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus status) { // if the browser closes the session, close the GWs client one as well. + log.debug("Server session closed from client with status {}", status); Optional.ofNullable(routedSessions.get(session.getId())) .filter(WebSocketRoutedSession::isClientConnected) .map(WebSocketRoutedSession::getClientSession) diff --git a/integration-tests/src/test/java/org/zowe/apiml/integration/ha/WebSocketChaoticTest.java b/integration-tests/src/test/java/org/zowe/apiml/integration/ha/WebSocketChaoticTest.java index b58881df49..3fe4e42bbe 100644 --- a/integration-tests/src/test/java/org/zowe/apiml/integration/ha/WebSocketChaoticTest.java +++ b/integration-tests/src/test/java/org/zowe/apiml/integration/ha/WebSocketChaoticTest.java @@ -10,6 +10,7 @@ package org.zowe.apiml.integration.ha; +import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.MethodOrderer; @@ -46,6 +47,7 @@ */ @ChaoticHATest @TestMethodOrder(MethodOrderer.OrderAnnotation.class) +@Slf4j class WebSocketChaoticTest implements TestWithStartedInstances { private final HAGatewayRequests gatewaysWsRequests = new HAGatewayRequests("wss"); private final HADiscoverableClientRequests haDiscoverableClientRequests = new HADiscoverableClientRequests(); @@ -166,13 +168,19 @@ void newSessionCanBeCreated() { .atMost(20, TimeUnit.SECONDS) .until(() -> { // create websocket session using the second alive instance of Gateway - session = appendingWebSocketSession(gatewaysWsRequests.getGatewayUrl( 1, DISCOVERABLE_WS_UPPERCASE), VALID_AUTH_HEADERS, response, 1); + URI gatewayUrl = gatewaysWsRequests.getGatewayUrl( 1, DISCOVERABLE_WS_UPPERCASE); + + log.error("trying with gatewayUrl: {}", gatewayUrl); + + session = appendingWebSocketSession(gatewayUrl, VALID_AUTH_HEADERS, response, 1); session.sendMessage(new TextMessage("hello world 2!")); synchronized (response) { response.wait(WAIT_TIMEOUT_MS); } + log.error("Obtained response from {}: {}", gatewayUrl, response.toString()); + if (response.toString().equals("HELLO WORLD 2!")) { return true; } else { From faa83878a9725c82cbcff0fa80256bf5eb9a2c9b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pablo=20Hern=C3=A1n=20Carle?= Date: Wed, 28 Aug 2024 10:33:15 +0200 Subject: [PATCH 34/39] websocket debug logs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Pablo Hernán Carle --- gateway-service/src/main/resources/application.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gateway-service/src/main/resources/application.yml b/gateway-service/src/main/resources/application.yml index 4543b4e5bd..5407cb517d 100644 --- a/gateway-service/src/main/resources/application.yml +++ b/gateway-service/src/main/resources/application.yml @@ -10,7 +10,7 @@ logging: com.netflix.discovery.DiscoveryClient: OFF org.springframework.boot.web.embedded.tomcat.TomcatWebServer: INFO org.springframework.web.socket: WARN - org.zowe.apiml.gateway.ws: INFO + org.zowe.apiml.gateway.ws: DEBUG org.zowe.apiml.gateway.error: INFO org.eclipse.jetty: WARN org.springframework.web.servlet.PageNotFound: ERROR From 672ddbf75f4a0570dfbaa4ae167534c24e5ff89a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pablo=20Hern=C3=A1n=20Carle?= Date: Wed, 28 Aug 2024 10:45:39 +0200 Subject: [PATCH 35/39] debug websocket MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Pablo Hernán Carle --- gateway-service/src/main/resources/application.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/gateway-service/src/main/resources/application.yml b/gateway-service/src/main/resources/application.yml index 5407cb517d..4946f8992a 100644 --- a/gateway-service/src/main/resources/application.yml +++ b/gateway-service/src/main/resources/application.yml @@ -9,7 +9,7 @@ logging: com.netflix.discovery.shared.transport.decorator.RedirectingEurekaHttpClient: OFF com.netflix.discovery.DiscoveryClient: OFF org.springframework.boot.web.embedded.tomcat.TomcatWebServer: INFO - org.springframework.web.socket: WARN + org.springframework.web.socket: DEBUG org.zowe.apiml.gateway.ws: DEBUG org.zowe.apiml.gateway.error: INFO org.eclipse.jetty: WARN @@ -307,6 +307,7 @@ logging: org.ehcache: INFO org.springframework.cloud.netflix.zuul.filters.post.SendErrorFilter: INFO com.netflix.discovery.shared.transport.decorator: DEBUG + org.springframework.web.socket: DEBUG org.zowe.apiml.gateway.ws: DEBUG --- From 706f2f3b3baef58d523f375c2a66bcfedde0389a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pablo=20Hern=C3=A1n=20Carle?= Date: Wed, 28 Aug 2024 10:48:26 +0200 Subject: [PATCH 36/39] add poll interval MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Pablo Hernán Carle --- .../org/zowe/apiml/integration/ha/WebSocketChaoticTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/integration-tests/src/test/java/org/zowe/apiml/integration/ha/WebSocketChaoticTest.java b/integration-tests/src/test/java/org/zowe/apiml/integration/ha/WebSocketChaoticTest.java index 3fe4e42bbe..0a7f8dfa71 100644 --- a/integration-tests/src/test/java/org/zowe/apiml/integration/ha/WebSocketChaoticTest.java +++ b/integration-tests/src/test/java/org/zowe/apiml/integration/ha/WebSocketChaoticTest.java @@ -33,6 +33,7 @@ import java.io.IOException; import java.net.URI; +import java.time.Duration; import java.util.Base64; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -166,6 +167,7 @@ void newSessionCanBeCreated() { await("Gateway Shutdown") .atMost(20, TimeUnit.SECONDS) + .pollInterval(Duration.ofSeconds(2)) .until(() -> { // create websocket session using the second alive instance of Gateway URI gatewayUrl = gatewaysWsRequests.getGatewayUrl( 1, DISCOVERABLE_WS_UPPERCASE); From b57f619c87fb84348434d58abcb57fee08a20fcd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pablo=20Hern=C3=A1n=20Carle?= Date: Wed, 28 Aug 2024 10:52:31 +0200 Subject: [PATCH 37/39] debug MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Pablo Hernán Carle --- gateway-service/src/main/resources/application.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gateway-service/src/main/resources/application.yml b/gateway-service/src/main/resources/application.yml index 4946f8992a..2a9070a58a 100644 --- a/gateway-service/src/main/resources/application.yml +++ b/gateway-service/src/main/resources/application.yml @@ -1,6 +1,6 @@ logging: level: - ROOT: INFO + ROOT: DEBUG org.zowe.apiml: INFO org.springframework: WARN com.netflix: WARN From 892a1bbee4d70ade1e4337cff9e2a4327c872506 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pablo=20Hern=C3=A1n=20Carle?= Date: Wed, 28 Aug 2024 11:30:40 +0200 Subject: [PATCH 38/39] polldelay MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Pablo Hernán Carle --- gateway-service/src/main/resources/application.yml | 6 +++--- .../zowe/apiml/integration/ha/WebSocketChaoticTest.java | 7 +------ 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/gateway-service/src/main/resources/application.yml b/gateway-service/src/main/resources/application.yml index 2a9070a58a..9539a4535f 100644 --- a/gateway-service/src/main/resources/application.yml +++ b/gateway-service/src/main/resources/application.yml @@ -1,6 +1,6 @@ logging: level: - ROOT: DEBUG + ROOT: INFO org.zowe.apiml: INFO org.springframework: WARN com.netflix: WARN @@ -9,8 +9,8 @@ logging: com.netflix.discovery.shared.transport.decorator.RedirectingEurekaHttpClient: OFF com.netflix.discovery.DiscoveryClient: OFF org.springframework.boot.web.embedded.tomcat.TomcatWebServer: INFO - org.springframework.web.socket: DEBUG - org.zowe.apiml.gateway.ws: DEBUG + org.springframework.web.socket: WARN + org.zowe.apiml.gateway.ws: INFO org.zowe.apiml.gateway.error: INFO org.eclipse.jetty: WARN org.springframework.web.servlet.PageNotFound: ERROR diff --git a/integration-tests/src/test/java/org/zowe/apiml/integration/ha/WebSocketChaoticTest.java b/integration-tests/src/test/java/org/zowe/apiml/integration/ha/WebSocketChaoticTest.java index 0a7f8dfa71..2cc353d59e 100644 --- a/integration-tests/src/test/java/org/zowe/apiml/integration/ha/WebSocketChaoticTest.java +++ b/integration-tests/src/test/java/org/zowe/apiml/integration/ha/WebSocketChaoticTest.java @@ -10,7 +10,6 @@ package org.zowe.apiml.integration.ha; -import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.MethodOrderer; @@ -48,7 +47,6 @@ */ @ChaoticHATest @TestMethodOrder(MethodOrderer.OrderAnnotation.class) -@Slf4j class WebSocketChaoticTest implements TestWithStartedInstances { private final HAGatewayRequests gatewaysWsRequests = new HAGatewayRequests("wss"); private final HADiscoverableClientRequests haDiscoverableClientRequests = new HADiscoverableClientRequests(); @@ -167,13 +165,12 @@ void newSessionCanBeCreated() { await("Gateway Shutdown") .atMost(20, TimeUnit.SECONDS) + .pollDelay(Duration.ofSeconds(5)) .pollInterval(Duration.ofSeconds(2)) .until(() -> { // create websocket session using the second alive instance of Gateway URI gatewayUrl = gatewaysWsRequests.getGatewayUrl( 1, DISCOVERABLE_WS_UPPERCASE); - log.error("trying with gatewayUrl: {}", gatewayUrl); - session = appendingWebSocketSession(gatewayUrl, VALID_AUTH_HEADERS, response, 1); session.sendMessage(new TextMessage("hello world 2!")); @@ -181,8 +178,6 @@ void newSessionCanBeCreated() { response.wait(WAIT_TIMEOUT_MS); } - log.error("Obtained response from {}: {}", gatewayUrl, response.toString()); - if (response.toString().equals("HELLO WORLD 2!")) { return true; } else { From 56b3c5508f2b875cfe117480ac274603833f994b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pablo=20Hern=C3=A1n=20Carle?= Date: Wed, 28 Aug 2024 14:25:24 +0200 Subject: [PATCH 39/39] address review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Pablo Hernán Carle --- .../apiml/gateway/ws/WebSocketProxyServerHandler.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandler.java b/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandler.java index 351e4394e0..3ccced375e 100644 --- a/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandler.java +++ b/gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandler.java @@ -296,10 +296,15 @@ public void onClientSessionFailure(WebSocketRoutedSession routedSession, WebSock log.debug("Failed opening client web socket session against {}. Server WebSocket session is {}", routedSession.getTargetUrl(), serverSession.getId(), throwable); try { routedSession.getClientHandler().handleMessage(serverSession, new TextMessage("Failed opening a session against " + routedSession.getTargetUrl() + ": " + throwable.getMessage())); - serverSession.close(CloseStatus.SERVER_ERROR); routedSessions.remove(serverSession.getId()); } catch (Exception e) { - log.debug("Failed sending / closing WebSocket session", e); + log.debug("Failed sending server error WebSocket message to client: {}", e.getMessage()); + } finally { + try { + serverSession.close(CloseStatus.SERVER_ERROR); + } catch (IOException e) { + log.debug("Failed closing server WebSocket session: {}", e.getMessage()); + } } } }