Skip to content

Commit

Permalink
8229822: ThrowingPushPromises tests sometimes fail due to EOF
Browse files Browse the repository at this point in the history
SocketTube is fixed to cater for errors caused by pausing/resuming events on an asynchronously closed connection, from within the selector's manager thread. Http2Connection and Stream are fixed to prevent sending a DataFrame on a stream after Reset has been sent.

Backport-of: 77c46ea9112b0c2632b4af1d899d59a132878da3
  • Loading branch information
MBaesken committed Aug 6, 2024
1 parent 5cedb3d commit bf9ba35
Show file tree
Hide file tree
Showing 10 changed files with 169 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,11 @@ private Http2Connection(HttpConnection connection,
Log.logTrace("Connection send window size {0} ", windowController.connectionWindowSize());

Stream<?> initialStream = createStream(exchange);
initialStream.registerStream(1);
boolean opened = initialStream.registerStream(1, true);
if (debug.on() && !opened) {
debug.log("Initial stream was cancelled - but connection is maintained: " +
"reset frame will need to be sent later");
}
windowController.registerStream(1, getInitialSendWindowSize());
initialStream.requestSent();
// Upgrading:
Expand All @@ -338,6 +342,11 @@ private Http2Connection(HttpConnection connection,
this.initial = initial;
connectFlows(connection);
sendConnectionPreface();
if (!opened) {
debug.log("ensure reset frame is sent to cancel initial stream");
initialStream.sendCancelStreamFrame();
}

}

// Used when upgrading an HTTP/1.1 connection to HTTP/2 after receiving
Expand Down Expand Up @@ -847,7 +856,7 @@ private <T> void handlePushPromise(Stream<T> parent, PushPromiseFrame pp)
Exchange<T> pushExch = new Exchange<>(pushReq, parent.exchange.multi);
Stream.PushedStream<T> pushStream = createPushStream(parent, pushExch);
pushExch.exchImpl = pushStream;
pushStream.registerStream(promisedStreamid);
pushStream.registerStream(promisedStreamid, true);
parent.incoming_pushPromise(pushReq, pushStream);
}

Expand All @@ -872,14 +881,15 @@ private void handleConnectionFrame(Http2Frame frame)
}
}

void resetStream(int streamid, int code) throws IOException {
void resetStream(int streamid, int code) {
try {
if (connection.channel().isOpen()) {
// no need to try & send a reset frame if the
// connection channel is already closed.
Log.logError(
"Resetting stream {0,number,integer} with error code {1,number,integer}",
streamid, code);
markStream(streamid, code);
ResetFrame frame = new ResetFrame(streamid, code);
sendFrame(frame);
} else if (debug.on()) {
Expand All @@ -892,6 +902,11 @@ void resetStream(int streamid, int code) throws IOException {
}
}

private void markStream(int streamid, int code) {
Stream<?> s = streams.get(streamid);
if (s != null) s.markStream(code);
}

// reduce count of streams by 1 if stream still exists
synchronized void decrementStreamsCount(int streamid) {
Stream<?> s = streams.get(streamid);
Expand Down Expand Up @@ -1193,12 +1208,19 @@ private Stream<?> registerNewStream(OutgoingHeaders<Stream<?>> oh) {
Stream<?> stream = oh.getAttachment();
assert stream.streamid == 0;
int streamid = nextstreamid;
nextstreamid += 2;
stream.registerStream(streamid);
// set outgoing window here. This allows thread sending
// body to proceed.
windowController.registerStream(streamid, getInitialSendWindowSize());
return stream;
if (stream.registerStream(streamid, false)) {
// set outgoing window here. This allows thread sending
// body to proceed.
nextstreamid += 2;
windowController.registerStream(streamid, getInitialSendWindowSize());
return stream;
} else {
stream.cancelImpl(new IOException("Request cancelled"));
if (finalStream() && streams.isEmpty()) {
close();
}
return null;
}
}

private final Object sendlock = new Object();
Expand All @@ -1212,7 +1234,9 @@ void sendFrame(Http2Frame frame) {
OutgoingHeaders<Stream<?>> oh = (OutgoingHeaders<Stream<?>>) frame;
Stream<?> stream = registerNewStream(oh);
// provide protection from inserting unordered frames between Headers and Continuation
publisher.enqueue(encodeHeaders(oh, stream));
if (stream != null) {
publisher.enqueue(encodeHeaders(oh, stream));
}
} else {
publisher.enqueue(encodeFrame(frame));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2018, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2020, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
Expand Down Expand Up @@ -246,7 +246,7 @@ public final void handle() {
}
@Override
public final void abort(IOException error) {
debug().log(() -> "abort: " + error);
debug().log(() -> this.getClass().getSimpleName() + " abort: " + error);
pause(); // pause, then signal
signalError(error); // should not be resumed after abort (not checked)
}
Expand Down Expand Up @@ -724,10 +724,12 @@ public final void request(long n) {
@Override
public final void cancel() {
pauseReadEvent();
if (debug.on()) debug.log("Read subscription cancelled");
if (Log.channel()) {
Log.logChannel("Read subscription cancelled for channel {0}",
channelDescr());
}
if (debug.on()) debug.log("Stopping read scheduler");
readScheduler.stop();
}

Expand All @@ -748,6 +750,7 @@ final void handleError() {
}

final void signalError(Throwable error) {
if (debug.on()) debug.log("signal read error: " + error);
if (!errorRef.compareAndSet(null, error)) {
return;
}
Expand Down Expand Up @@ -808,6 +811,7 @@ final void read() {
}
current.errorRef.compareAndSet(null, error);
current.signalCompletion();
if (debug.on()) debug.log("Stopping read scheduler");
readScheduler.stop();
debugState("leaving read() loop with error: ");
return;
Expand All @@ -831,6 +835,7 @@ final void read() {
// anyway.
pauseReadEvent();
current.signalCompletion();
if (debug.on()) debug.log("Stopping read scheduler");
readScheduler.stop();
}
debugState("leaving read() loop after EOF: ");
Expand All @@ -850,6 +855,7 @@ final void read() {
// waiting for this event to terminate.
// So resume the read event and return now...
resumeReadEvent();
if (errorRef.get() != null) continue;
debugState("leaving read() loop after onNext: ");
return;
} else {
Expand All @@ -861,6 +867,7 @@ final void read() {
// readable again.
demand.increase(1);
resumeReadEvent();
if (errorRef.get() != null) continue;
debugState("leaving read() loop with no bytes");
return;
}
Expand All @@ -879,6 +886,7 @@ final void read() {
// Trying to pause the event here would actually
// introduce a race condition between this loop and
// request(n).
if (errorRef.get() != null) continue;
debugState("leaving read() loop with no demand");
break;
}
Expand Down Expand Up @@ -946,6 +954,7 @@ protected final void signalEvent() {

@Override
protected final void signalError(Throwable error) {
if (debug.on()) debug.log("signalError to %s (%s)", sub, error);
sub.signalError(error);
}

Expand Down
Loading

0 comments on commit bf9ba35

Please sign in to comment.