Skip to content

Commit

Permalink
Merge pull request #1854 from dilanSachi/goaway-error-log
Browse files Browse the repository at this point in the history
Fix connection getting closed by stale eviction task after it has been closed by the server
  • Loading branch information
dilanSachi committed Feb 14, 2024
2 parents 9b1616e + 52d283d commit 54dc275
Show file tree
Hide file tree
Showing 12 changed files with 312 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,8 @@ function testHttp2ValidHeaderLength() returns error? {
}

//Tests the behaviour when header size is greater than the configured threshold
@test:Config {}
// TODO: Enable after fixing this issue : https://github.com/ballerina-platform/ballerina-library/issues/4461
@test:Config {enable: false}
function testHttp2InvalidHeaderLength() returns error? {
http:Client limitClient = check new ("http://localhost:" + http2RequestLimitsTestPort3.toString(),
http2Settings = {http2PriorKnowledge: true});
Expand Down
6 changes: 3 additions & 3 deletions ballerina/Ballerina.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
org = "ballerina"
name = "http"
version = "2.10.6"
version = "2.10.7"
authors = ["Ballerina"]
keywords = ["http", "network", "service", "listener", "client"]
repository = "https://github.com/ballerina-platform/module-ballerina-http"
Expand All @@ -16,8 +16,8 @@ graalvmCompatible = true
[[platform.java17.dependency]]
groupId = "io.ballerina.stdlib"
artifactId = "http-native"
version = "2.10.6"
path = "../native/build/libs/http-native-2.10.6.jar"
version = "2.10.7"
path = "../native/build/libs/http-native-2.10.7-SNAPSHOT.jar"

[[platform.java17.dependency]]
groupId = "io.ballerina.stdlib"
Expand Down
2 changes: 1 addition & 1 deletion ballerina/CompilerPlugin.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ id = "http-compiler-plugin"
class = "io.ballerina.stdlib.http.compiler.HttpCompilerPlugin"

[[dependency]]
path = "../compiler-plugin/build/libs/http-compiler-plugin-2.10.6.jar"
path = "../compiler-plugin/build/libs/http-compiler-plugin-2.10.7-SNAPSHOT.jar"
6 changes: 3 additions & 3 deletions ballerina/Dependencies.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ modules = [
[[package]]
org = "ballerina"
name = "cache"
version = "3.7.0"
version = "3.7.1"
dependencies = [
{org = "ballerina", name = "constraint"},
{org = "ballerina", name = "jballerina.java"},
Expand Down Expand Up @@ -76,7 +76,7 @@ modules = [
[[package]]
org = "ballerina"
name = "http"
version = "2.10.6"
version = "2.10.7"
dependencies = [
{org = "ballerina", name = "auth"},
{org = "ballerina", name = "cache"},
Expand Down Expand Up @@ -283,7 +283,7 @@ modules = [
[[package]]
org = "ballerina"
name = "observe"
version = "1.2.0"
version = "1.2.2"
dependencies = [
{org = "ballerina", name = "jballerina.java"}
]
Expand Down
5 changes: 5 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ This file contains all the notable changes done to the Ballerina HTTP package th
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to
[Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## Unreleased

### Fixed
- [Fix connection getting closed by stale eviction task after it has been closed by the server](https://github.com/ballerina-platform/ballerina-library/issues/6050)

## [2.10.6] - 2024-02-01

### Added
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,9 @@ public void onStreamClosed(Http2Stream stream) {

@Override
public void onGoAwayReceived(int lastStreamId, long errorCode, ByteBuf debugData) {
if (isStale.get()) {
return;
}
markAsStale();
http2ClientChannel.inFlightMessages.forEach((streamId, outboundMsgHolder) -> {
if (streamId > lastStreamId) {
Expand Down Expand Up @@ -331,6 +334,10 @@ void markAsStale() {
}
}

void removeClosedChannelFromStalePool() {
http2ConnectionManager.removeClosedChannelFromStalePool(this);
}

boolean hasInFlightMessages() {
return !inFlightMessages.isEmpty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import io.ballerina.stdlib.http.transport.contractimpl.common.HttpRoute;
import io.ballerina.stdlib.http.transport.contractimpl.common.states.Http2MessageStateContext;
import io.ballerina.stdlib.http.transport.contractimpl.sender.channel.pool.PoolConfiguration;
import io.netty.channel.DefaultEventLoop;
import io.netty.util.concurrent.DefaultPromise;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -36,6 +34,8 @@
*/
public class Http2ConnectionManager {

Logger logger = LoggerFactory.getLogger(this.getClass());

private final Http2ChannelPool http2ChannelPool = new Http2ChannelPool();
private final BlockingQueue<Http2ClientChannel> http2StaleClientChannels = new LinkedBlockingQueue<>();
private final PoolConfiguration poolConfiguration;
Expand Down Expand Up @@ -165,10 +165,15 @@ void markClientChannelAsStale(HttpRoute httpRoute, Http2ClientChannel http2Clien
http2StaleClientChannels.add(http2ClientChannel);
}

void removeClosedChannelFromStalePool(Http2ClientChannel http2ClientChannel) {
if (!http2StaleClientChannels.remove(http2ClientChannel)) {
logger.warn("Specified channel does not exist in the stale list.");
}
}

private void initiateConnectionEvictionTask() {
Timer timer = new Timer(true);
TimerTask timerTask = new TimerTask() {
Logger logger = LoggerFactory.getLogger(this.getClass());
@Override
public void run() {
http2StaleClientChannels.forEach(http2ClientChannel -> {
Expand All @@ -191,12 +196,8 @@ public void run() {
}

public void closeChannelAndEvict(Http2ClientChannel http2ClientChannel) {
boolean result = http2StaleClientChannels.remove(http2ClientChannel);
if (!result) {
logger.warn("Specified channel does not exist in the stale list.");
}
http2ClientChannel.getConnection()
.close(new DefaultPromise(new DefaultEventLoop()));
removeClosedChannelFromStalePool(http2ClientChannel);
http2ClientChannel.getConnection().close(http2ClientChannel.getChannel().newPromise());
}
};
timer.schedule(timerTask, poolConfiguration.getTimeBetweenStaleEviction(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ public void channelInactive(ChannelHandlerContext ctx) {
LOG.debug("Channel is inactive");
}
http2ClientChannel.destroy();
http2ClientChannel.removeClosedChannelFromStalePool();
}

private boolean isUnexpected100ContinueResponse(Http2Headers http2Headers, HttpCarbonMessage inboundReq) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Copyright (c) 2024, WSO2 LLC. (http://www.wso2.com).
*
* WSO2 LLC. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.ballerina.stdlib.http.transport.http2.frameleveltests;

import io.ballerina.stdlib.http.transport.contract.Constants;
import io.ballerina.stdlib.http.transport.contract.HttpClientConnector;
import io.ballerina.stdlib.http.transport.contract.HttpWsConnectorFactory;
import io.ballerina.stdlib.http.transport.contract.config.SenderConfiguration;
import io.ballerina.stdlib.http.transport.contract.config.TransportsConfiguration;
import io.ballerina.stdlib.http.transport.contractimpl.DefaultHttpWsConnectorFactory;
import io.ballerina.stdlib.http.transport.contractimpl.sender.channel.pool.PoolConfiguration;
import io.ballerina.stdlib.http.transport.message.HttpConnectorUtil;
import io.ballerina.stdlib.http.transport.util.DefaultHttpConnectorListener;
import io.ballerina.stdlib.http.transport.util.TestUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterClass;
import org.testng.annotations.Test;

import java.io.IOException;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static io.ballerina.stdlib.http.transport.contract.Constants.REMOTE_SERVER_CLOSED_WHILE_READING_INBOUND_RESPONSE_BODY;
import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.GO_AWAY_FRAME_MAX_STREAM_05;
import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.HEADER_FRAME_STREAM_03;
import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.SETTINGS_FRAME;
import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.SETTINGS_FRAME_WITH_ACK;
import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.SLEEP_TIME;
import static io.ballerina.stdlib.http.transport.util.TestUtil.getDecoderErrorMessage;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail;

/**
* This contains a test case where the tcp server sends a GoAway and the connection gets closed from the
* server side after an eviction occurs. The successfulness of this test case cannot be confirmed by the assert
* completely. We have to look at the logs in order to check whether there are any internal netty exceptions
*/
public class Http2ChannelCloseAfterConnectionEvictionAfterTcpServerGoAwayScenarioTest {

private static final Logger LOGGER =
LoggerFactory.getLogger(Http2ChannelCloseAfterConnectionEvictionAfterTcpServerGoAwayScenarioTest.class);
private HttpClientConnector h2ClientWithPriorKnowledge;
private ServerSocket serverSocket;

public HttpClientConnector setupHttp2PriorKnowledgeClient(long minIdleTimeInStaleState,
long timeBetweenStaleEviction) {
HttpWsConnectorFactory connectorFactory = new DefaultHttpWsConnectorFactory();
PoolConfiguration poolConfiguration = new PoolConfiguration();
poolConfiguration.setMinIdleTimeInStaleState(minIdleTimeInStaleState);
poolConfiguration.setTimeBetweenStaleEviction(timeBetweenStaleEviction);
TransportsConfiguration transportsConfiguration = new TransportsConfiguration();
SenderConfiguration senderConfiguration = new SenderConfiguration();
senderConfiguration.setPoolConfiguration(poolConfiguration);
senderConfiguration.setScheme(Constants.HTTP_SCHEME);
senderConfiguration.setHttpVersion(Constants.HTTP_2_0);
senderConfiguration.setForceHttp2(true);
return connectorFactory.createHttpClientConnector(
HttpConnectorUtil.getTransportProperties(transportsConfiguration), senderConfiguration);
}

@Test
private void testChannelCloseAfterConnectionEvictionScenario() {
try {
runTcpServer(TestUtil.HTTP_SERVER_PORT);
h2ClientWithPriorKnowledge = setupHttp2PriorKnowledgeClient(3000, 1000);
CountDownLatch latch1 = new CountDownLatch(1);
DefaultHttpConnectorListener msgListener1 = TestUtil.sendRequestAsync(latch1, h2ClientWithPriorKnowledge);
latch1.await(TestUtil.HTTP2_RESPONSE_TIME_OUT, TimeUnit.SECONDS);
// Waiting more than the minIdleTimeInStaleState to trigger a connectionEviction try and a channelInactive
Thread.sleep(8000);

String errorMsg1 = getDecoderErrorMessage(msgListener1);
assertEquals(errorMsg1, REMOTE_SERVER_CLOSED_WHILE_READING_INBOUND_RESPONSE_BODY);
} catch (InterruptedException | IOException e) {
LOGGER.error("Exception occurred");
fail();
}
}

private void runTcpServer(int port) throws IOException {
if (serverSocket != null) {
serverSocket.close();
}
new Thread(() -> {
try {
serverSocket = new ServerSocket(port);
LOGGER.info("HTTP/2 TCP Server listening on port " + port);
Socket clientSocket = serverSocket.accept();
LOGGER.info("Accepted connection from: " + clientSocket.getInetAddress());
try (OutputStream outputStream = clientSocket.getOutputStream()) {
sendGoAwayForASingleStream(outputStream);
} catch (Exception e) {
LOGGER.error(e.getMessage());
}
} catch (IOException e) {
LOGGER.error(e.getMessage());
}
}).start();
}

private void sendGoAwayForASingleStream(OutputStream outputStream) throws IOException, InterruptedException {
outputStream.write(SETTINGS_FRAME);
outputStream.write(SETTINGS_FRAME_WITH_ACK);
Thread.sleep(SLEEP_TIME);
outputStream.write(HEADER_FRAME_STREAM_03);
Thread.sleep(SLEEP_TIME);
// This will move the connection to the stale connections list
outputStream.write(GO_AWAY_FRAME_MAX_STREAM_05);
// Waiting more than the time of `minIdleTimeInStaleState` and exit the socket write to trigger a
// `channelInactive` after minIdleTime exceeds.
Thread.sleep(5000);
}

@AfterClass
public void cleanUp() throws IOException {
h2ClientWithPriorKnowledge.close();
serverSocket.close();
}
}
Loading

0 comments on commit 54dc275

Please sign in to comment.