Skip to content

Commit

Permalink
Better handling for uncommon/exception gRPC cases (#4016)
Browse files Browse the repository at this point in the history
Fixes three logged errors. Two are handed by not permitting an error
to be thrown into gRPC internals, but instead catch, cancel the
stream, and log the error. The third case is handled by following
the appropriate async write pattern required by servlets.

Fixes #3965
Fixes #3052
  • Loading branch information
niloc132 authored Jun 24, 2023
1 parent 3b6d176 commit d358f50
Show file tree
Hide file tree
Showing 5 changed files with 296 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,12 @@ private final class Sink implements AbstractServerStream.Sink {
@Override
public void writeHeaders(Metadata headers) {
writeHeadersToServletResponse(headers);
resp.setTrailerFields(trailerSupplier);
try {
resp.setTrailerFields(trailerSupplier);
} catch (IllegalStateException e) {
logger.log(WARNING, String.format("[{%s}] Exception writing trailers", logId), e);
cancel(Status.fromThrowable(e));
}
try {
writer.flush();
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,108 +1,32 @@
package io.grpc.servlet.jakarta.web;

import io.grpc.internal.GrpcUtil;
import jakarta.servlet.AsyncContext;
import jakarta.servlet.FilterChain;
import jakarta.servlet.ServletException;
import jakarta.servlet.ServletRequest;
import jakarta.servlet.ServletResponse;
import jakarta.servlet.http.HttpFilter;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletRequestWrapper;
import jakarta.servlet.http.HttpServletResponse;
import jakarta.servlet.http.HttpServletResponseWrapper;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.regex.Pattern;

/**
* Servlet filter that translates grpc-web on the fly to match what is expected by GrpcServlet. This work is done
* in-process with no addition copies to the request or response data - only the content type header and the trailer
* content is specially treated at this time.
*
* <p>
* Note that grpc-web-text is not yet supported.
*/
public class GrpcWebFilter extends HttpFilter {
private static final Logger logger = Logger.getLogger(GrpcWebFilter.class.getName());

public static final String CONTENT_TYPE_GRPC_WEB = GrpcUtil.CONTENT_TYPE_GRPC + "-web";

@Override
public void doFilter(HttpServletRequest request, HttpServletResponse response, FilterChain chain)
throws IOException, ServletException {
if (isGrpcWeb(request)) {
// wrap the request and response to paper over the grpc-web details
GrpcWebHttpResponse wrappedResponse = new GrpcWebHttpResponse(response);
HttpServletRequestWrapper wrappedRequest = new HttpServletRequestWrapper(request) {
@Override
public String getContentType() {
// Adapt the content-type to replace grpc-web with grpc
return super.getContentType().replaceFirst(Pattern.quote(CONTENT_TYPE_GRPC_WEB),
GrpcUtil.CONTENT_TYPE_GRPC);
}

@Override
public AsyncContext startAsync() throws IllegalStateException {
return startAsync(this, wrappedResponse);
}

@Override
public AsyncContext startAsync(ServletRequest servletRequest, ServletResponse servletResponse)
throws IllegalStateException {
AsyncContext delegate = super.startAsync(servletRequest, servletResponse);
return new DelegatingAsyncContext(delegate) {
@Override
public void complete() {
// Write any trailers out to the output stream as a payload, since grpc-web doesn't
// use proper trailers.
try {
if (wrappedResponse.trailers != null) {
Map<String, String> map = wrappedResponse.trailers.get();
if (map != null) {
// write a payload, even for an empty set of trailers, but not for
// the absence of trailers.
int trailerLength = map.entrySet().stream()
.mapToInt(e -> e.getKey().length() + e.getValue().length() + 4).sum();
ByteBuffer payload = ByteBuffer.allocate(5 + trailerLength);
payload.put((byte) 0x80);
payload.putInt(trailerLength);
for (Map.Entry<String, String> entry : map.entrySet()) {
payload.put(entry.getKey().getBytes(StandardCharsets.US_ASCII));
payload.put((byte) ':');
payload.put((byte) ' ');
payload.put(entry.getValue().getBytes(StandardCharsets.US_ASCII));
payload.put((byte) '\r');
payload.put((byte) '\n');
}
if (payload.hasRemaining()) {
// Normally we must not throw, but this is an exceptional case. Complete
// the stream, _then_ throw.
super.complete();
throw new IllegalStateException(
"Incorrectly sized buffer, trailer payload will be sized wrong");
}
wrappedResponse.getOutputStream().write(payload.array());
}
}
} catch (IOException e) {
// complete() should not throw, but instead just log the error. In this case,
// the connection has likely been lost, so there is no way to send the trailers,
// so we just let the exception slide.
logger.log(Level.FINE, "Error sending grpc-web trailers", e);
}

// Let the superclass complete the stream so we formally close it
super.complete();
}
};
}
};
GrpcWebServletResponse wrappedResponse = new GrpcWebServletResponse(response);
GrpcWebServletRequest wrappedRequest = new GrpcWebServletRequest(request, wrappedResponse);

chain.doFilter(wrappedRequest, wrappedResponse);
} else {
Expand All @@ -113,30 +37,4 @@ public void complete() {
private static boolean isGrpcWeb(ServletRequest request) {
return request.getContentType() != null && request.getContentType().startsWith(CONTENT_TYPE_GRPC_WEB);
}

private static class GrpcWebHttpResponse extends HttpServletResponseWrapper {
private Supplier<Map<String, String>> trailers;

public GrpcWebHttpResponse(HttpServletResponse response) {
super(response);
}

@Override
public void setContentType(String type) {
// Adapt the content-type to be grpc-web
super.setContentType(
type.replaceFirst(Pattern.quote(GrpcUtil.CONTENT_TYPE_GRPC), CONTENT_TYPE_GRPC_WEB));
}

// intercept trailers and write them out as a message just before we complete
@Override
public void setTrailerFields(Supplier<Map<String, String>> supplier) {
trailers = supplier;
}

@Override
public Supplier<Map<String, String>> getTrailerFields() {
return trailers;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package io.grpc.servlet.jakarta.web;

import jakarta.servlet.ServletOutputStream;
import jakarta.servlet.WriteListener;

import java.io.IOException;

/**
* Wraps the usual ServletOutputStream so as to allow downstream writers to use it according to the servlet spec, but
* still make it easy to write trailers as a payload instead of using HTTP trailers at the end of a stream.
*/
public class GrpcWebOutputStream extends ServletOutputStream implements WriteListener {
private final ServletOutputStream wrapped;

// Access to these are guarded by synchronized
private Runnable waiting;
private WriteListener writeListener;

public GrpcWebOutputStream(ServletOutputStream wrapped) {
this.wrapped = wrapped;
}

@Override
public boolean isReady() {
return wrapped.isReady();
}

/**
* Internal helper method to correctly write the given bytes, then complete the stream.
*
* @param bytes the bytes to write once writing is possible
* @param close a Runnable to invoke once this write is complete
*/
public synchronized void writeAndCloseWhenReady(byte[] bytes, Runnable close) {
if (writeListener == null) {
throw new IllegalStateException("writeListener");
}
if (isReady()) {
try {
write(bytes);
} catch (IOException ignored) {
// Ignore this error, we're closing anyway
} finally {
close.run();
}
} else {
waiting = () -> {
try {
write(bytes);
} catch (IOException e) {
// ignore this, we're closing anyway
} finally {
close.run();
}
};
}
}

@Override
public synchronized void setWriteListener(WriteListener writeListener) {
this.writeListener = writeListener;
wrapped.setWriteListener(this);
}

@Override
public void write(int i) throws IOException {
wrapped.write(i);
}

@Override
public void write(byte[] b) throws IOException {
wrapped.write(b);
}

@Override
public void write(byte[] b, int off, int len) throws IOException {
wrapped.write(b, off, len);
}

@Override
public void flush() throws IOException {
wrapped.flush();
}

@Override
public void close() throws IOException {
wrapped.close();
}

@Override
public synchronized void onWritePossible() throws IOException {
if (writeListener != null) {
writeListener.onWritePossible();
}
if (waiting != null) {
waiting.run();
waiting = null;
}
}

@Override
public void onError(Throwable t) {
if (writeListener != null) {
writeListener.onError(t);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package io.grpc.servlet.jakarta.web;

import io.grpc.internal.GrpcUtil;
import jakarta.servlet.AsyncContext;
import jakarta.servlet.ServletRequest;
import jakarta.servlet.ServletResponse;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletRequestWrapper;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.regex.Pattern;

/**
* Wraps an incoming gRPC-web request so that a downstream filter/servlet can read it instead as a gRPC payload. This
* currently involves changing the incoming content-type, and managing the wrapped request so that downstream operations
* to handle this request behave correctly.
*/
public class GrpcWebServletRequest extends HttpServletRequestWrapper {
private static final Logger logger = Logger.getLogger(GrpcWebServletRequest.class.getName());

private final GrpcWebServletResponse wrappedResponse;

public GrpcWebServletRequest(HttpServletRequest request, GrpcWebServletResponse wrappedResponse) {
super(request);
this.wrappedResponse = wrappedResponse;
}

@Override
public String getContentType() {
// Adapt the content-type to replace grpc-web with grpc
return super.getContentType().replaceFirst(Pattern.quote(GrpcWebFilter.CONTENT_TYPE_GRPC_WEB),
GrpcUtil.CONTENT_TYPE_GRPC);
}

@Override
public AsyncContext startAsync() throws IllegalStateException {
return startAsync(this, wrappedResponse);
}

@Override
public AsyncContext startAsync(ServletRequest servletRequest, ServletResponse servletResponse)
throws IllegalStateException {
AsyncContext delegate = super.startAsync(servletRequest, servletResponse);
return new DelegatingAsyncContext(delegate) {
private void safelyComplete() {
try {
// Let the superclass complete the stream so we formally close it
super.complete();
} catch (Exception e) {
// As above, complete() should not throw, so just log this failure and continue.
// This statement is somewhat dubious, since Jetty itself is clearly throwing in
// complete()... leading us to add this try/catch to begin with.
logger.log(Level.FINE, "Error invoking complete() on underlying stream", e);
}

}

@Override
public void complete() {
// Emit trailers as part of the response body, then complete the request. Note that this may mean
// that we don't actually call super.complete() synchronously.
wrappedResponse.writeTrailers(this::safelyComplete);
}
};
}
}
Loading

0 comments on commit d358f50

Please sign in to comment.