Skip to content

Commit

Permalink
[refactor][proxy] Refactor Proxy code and fix connection stalling by …
Browse files Browse the repository at this point in the history
…switching to auto read mode (#14713)

### Motivation

Refactor Proxy code to make it easier to understand and maintain. In addition, switch to use auto read mode since the proxies connections seem to stall in some cases since the proxied connection doesn't use Netty's auto read mode and the read handling doesn't seem complete.

Currently, the proxy calls `.read()` when a message is written to the connection. There might be more messages flowing in the other direction and it could result in a blocked connection with the current solution that doesn't use Netty's auto read mode.

Currently auto read is disabled in DirectProxyHandler for the connection between the proxy and the broker:
https://github.com/apache/pulsar/blob/a26905371749798ec5288fb07a69978a36aacfaa/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java#L112

### Modifications

- replace broker host parsing with a simple solution
- pass remote host name to ProxyBackendHandler in the constructor
- rename "targetBrokerUrl" to "brokerHostAndPort" since the "targetBrokerUrl" is really "hostname:port" string
- move HA proxy message handling to ProxyBackendHandle and extract the logic to a method
- remove the static "inboundOutboundChannelMap" which was used for log level 2
  - make it obsolete by passing the peer channel id to ParserProxyHandler
 - Enable auto read in proxy and remove `ctx.read()` / `channel.read()` calls
- prepare for IPv6 support (reported as #14732) by improving the `host:port` parsing (pick last `:` since IPv6 address might contains multiple `:` characters)
- Handle backpressure properly by switching auto read off when channel writability changes
  - change auto read of the proxy-broker connection based on the writability of the client-proxy connection
  - change auto read of the client-proxy connection based on the writability of the proxy-broker connection
- Consistently handle write errors by delegating exception handling to exceptionCaught method by using `.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE)`

(cherry picked from commit a1037c7)
  • Loading branch information
lhotari committed Mar 24, 2022
1 parent dae653f commit 766e5fe
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 137 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ private static List<String> parseCommaSeparatedConfigValue(String configValue) {
}

public CompletableFuture<InetSocketAddress> resolveAndCheckTargetAddress(String hostAndPort) {
int pos = hostAndPort.indexOf(':');
int pos = hostAndPort.lastIndexOf(':');
String host = hostAndPort.substring(0, pos);
int port = Integer.parseInt(hostAndPort.substring(pos + 1));
if (!isPortAllowed(port)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelId;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.socket.SocketChannel;
Expand All @@ -40,14 +40,8 @@
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import javax.net.ssl.SSLSession;
Expand All @@ -71,11 +65,11 @@ public class DirectProxyHandler {

@Getter
private final Channel inboundChannel;
private final ProxyConnection proxyConnection;
@Getter
Channel outboundChannel;
@Getter
private final Rate inboundChannelRequestsRate;
protected static Map<ChannelId, ChannelId> inboundOutboundChannelMap = new ConcurrentHashMap<>();
private final String originalPrincipal;
private final AuthData clientAuthData;
private final String clientAuthMethod;
Expand All @@ -86,12 +80,13 @@ public class DirectProxyHandler {
private final ProxyService service;
private final Runnable onHandshakeCompleteAction;

public DirectProxyHandler(ProxyService service, ProxyConnection proxyConnection, String targetBrokerUrl,
public DirectProxyHandler(ProxyService service, ProxyConnection proxyConnection, String brokerHostAndPort,
InetSocketAddress targetBrokerAddress, int protocolVersion,
Supplier<SslHandler> sslHandlerSupplier) {
this.service = service;
this.authentication = proxyConnection.getClientAuthentication();
this.inboundChannel = proxyConnection.ctx().channel();
this.proxyConnection = proxyConnection;
this.inboundChannelRequestsRate = new Rate();
this.originalPrincipal = proxyConnection.clientAuthRole;
this.clientAuthData = proxyConnection.clientAuthData;
Expand All @@ -109,7 +104,18 @@ public DirectProxyHandler(ProxyService service, ProxyConnection proxyConnection,
if (brokerProxyConnectTimeoutMs > 0) {
b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, brokerProxyConnectTimeoutMs);
}
b.group(inboundChannel.eventLoop()).channel(inboundChannel.getClass()).option(ChannelOption.AUTO_READ, false);
b.group(inboundChannel.eventLoop())
.channel(inboundChannel.getClass());

String remoteHost;
try {
remoteHost = parseHost(brokerHostAndPort);
} catch (IllegalArgumentException e) {
log.warn("[{}] Failed to parse broker host '{}'", inboundChannel, brokerHostAndPort, e);
inboundChannel.close();
return;
}

b.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
Expand All @@ -123,69 +129,58 @@ protected void initChannel(SocketChannel ch) {
}
ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(
Commands.DEFAULT_MAX_MESSAGE_SIZE + Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));
ch.pipeline().addLast("proxyOutboundHandler", new ProxyBackendHandler(config, protocolVersion));
ch.pipeline().addLast("proxyOutboundHandler",
new ProxyBackendHandler(config, protocolVersion, remoteHost));
}
});

URI targetBroker;
try {
// targetBrokerUrl is coming in the "hostname:6650" form, so we need
// to extract host and port
targetBroker = new URI("pulsar://" + targetBrokerUrl);
} catch (URISyntaxException e) {
log.warn("[{}] Failed to parse broker url '{}'", inboundChannel, targetBrokerUrl, e);
inboundChannel.close();
return;
}

ChannelFuture f = b.connect(targetBrokerAddress);
outboundChannel = f.channel();
f.addListener(future -> {
if (!future.isSuccess()) {
// Close the connection if the connection attempt has failed.
log.warn("[{}] Establishing connection to {} ({}) failed. Closing inbound channel.", inboundChannel,
targetBrokerAddress, targetBrokerUrl, future.cause());
targetBrokerAddress, brokerHostAndPort, future.cause());
inboundChannel.close();
return;
}
final ProxyBackendHandler cnx = (ProxyBackendHandler) outboundChannel.pipeline()
.get("proxyOutboundHandler");
cnx.setRemoteHostName(targetBroker.getHost());

// if enable full parsing feature
if (service.getProxyLogLevel() == 2) {
//Set a map between inbound and outbound,
//so can find inbound by outbound or find outbound by inbound
inboundOutboundChannelMap.put(outboundChannel.id(), inboundChannel.id());
}
});
}

if (!config.isHaProxyProtocolEnabled()) {
return;
}
private static String parseHost(String brokerPortAndHost) {
int pos = brokerPortAndHost.lastIndexOf(':');
if (pos > 0) {
return brokerPortAndHost.substring(0, pos);
} else {
throw new IllegalArgumentException("Illegal broker host:port '" + brokerPortAndHost + "'");
}
}

if (proxyConnection.hasHAProxyMessage()) {
outboundChannel.writeAndFlush(encodeProxyProtocolMessage(proxyConnection.getHAProxyMessage()));
} else {
if (!(inboundChannel.remoteAddress() instanceof InetSocketAddress)) {
return;
}
if (!(outboundChannel.localAddress() instanceof InetSocketAddress)) {
return;
}
private void writeHAProxyMessage() {
if (proxyConnection.hasHAProxyMessage()) {
outboundChannel.writeAndFlush(encodeProxyProtocolMessage(proxyConnection.getHAProxyMessage()))
.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
} else {
if (inboundChannel.remoteAddress() instanceof InetSocketAddress
&& outboundChannel.localAddress() instanceof InetSocketAddress) {
InetSocketAddress clientAddress = (InetSocketAddress) inboundChannel.remoteAddress();
String sourceAddress = clientAddress.getAddress().getHostAddress();
int sourcePort = clientAddress.getPort();
InetSocketAddress proxyAddress = (InetSocketAddress) inboundChannel.remoteAddress();
String destinationAddress = proxyAddress.getAddress().getHostAddress();
int destinationPort = proxyAddress.getPort();
HAProxyMessage msg = new HAProxyMessage(HAProxyProtocolVersion.V1, HAProxyCommand.PROXY,
HAProxyProxiedProtocol.TCP4, sourceAddress, destinationAddress, sourcePort, destinationPort);
outboundChannel.writeAndFlush(encodeProxyProtocolMessage(msg));
HAProxyProxiedProtocol.TCP4, sourceAddress, destinationAddress, sourcePort,
destinationPort);
outboundChannel.writeAndFlush(encodeProxyProtocolMessage(msg))
.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
msg.release();
}
});
}
}



private ByteBuf encodeProxyProtocolMessage(HAProxyMessage msg) {
// Max length of v1 version proxy protocol message is 108
ByteBuf out = Unpooled.buffer(108);
Expand Down Expand Up @@ -217,30 +212,45 @@ enum BackendState {
Init, HandshakeCompleted
}

public class ProxyBackendHandler extends PulsarDecoder implements FutureListener<Void> {
public class ProxyBackendHandler extends PulsarDecoder {

private BackendState state = BackendState.Init;
private String remoteHostName;
private final String remoteHostName;
protected ChannelHandlerContext ctx;
private final ProxyConfiguration config;
private final int protocolVersion;

public ProxyBackendHandler(ProxyConfiguration config, int protocolVersion) {
public ProxyBackendHandler(ProxyConfiguration config, int protocolVersion, String remoteHostName) {
this.config = config;
this.protocolVersion = protocolVersion;
this.remoteHostName = remoteHostName;
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
this.ctx = ctx;

if (config.isHaProxyProtocolEnabled()) {
writeHAProxyMessage();
}

// Send the Connect command to broker
authenticationDataProvider = authentication.getAuthData(remoteHostName);
AuthData authData = authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
ByteBuf command;
command = Commands.newConnect(authentication.getAuthMethodName(), authData, protocolVersion, "Pulsar proxy",
null /* target broker */, originalPrincipal, clientAuthData, clientAuthMethod);
outboundChannel.writeAndFlush(command);
outboundChannel.read();
outboundChannel.writeAndFlush(command)
.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
}

@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
// handle backpressure
// stop/resume reading input from connection between the client and the proxy
// when the writability of the connection between the proxy and the broker changes
inboundChannel.config().setAutoRead(ctx.channel().isWritable());
super.channelWritabilityChanged(ctx);
}

@Override
Expand All @@ -261,7 +271,8 @@ public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exce
if (msg instanceof ByteBuf) {
ProxyService.BYTES_COUNTER.inc(((ByteBuf) msg).readableBytes());
}
inboundChannel.writeAndFlush(msg).addListener(this);
inboundChannel.writeAndFlush(msg)
.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
break;

default:
Expand Down Expand Up @@ -300,26 +311,13 @@ protected void handleAuthChallenge(CommandAuthChallenge authChallenge) {
log.debug("{} Mutual auth {}", ctx.channel(), authentication.getAuthMethodName());
}

outboundChannel.writeAndFlush(request);
outboundChannel.read();
outboundChannel.writeAndFlush(request)
.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
} catch (Exception e) {
log.error("Error mutual verify", e);
}
}

@Override
public void operationComplete(Future<Void> future) {
// This is invoked when the write operation on the paired connection
// is completed
if (future.isSuccess()) {
outboundChannel.read();
} else {
log.warn("[{}] [{}] Failed to write on proxy connection. Closing both connections.", inboundChannel,
outboundChannel, future.cause());
inboundChannel.close();
}
}

@Override
protected void messageReceived() {
// no-op
Expand Down Expand Up @@ -349,18 +347,7 @@ protected void handleConnected(CommandConnected connected) {
int maxMessageSize =
connected.hasMaxMessageSize() ? connected.getMaxMessageSize() : Commands.INVALID_MAX_MESSAGE_SIZE;
inboundChannel.writeAndFlush(Commands.newConnected(connected.getProtocolVersion(), maxMessageSize))
.addListener(future -> {
if (future.isSuccess()) {
// Start reading from both connections
inboundChannel.read();
outboundChannel.read();
} else {
log.warn("[{}] [{}] Failed to write to inbound connection. Closing both connections.",
inboundChannel,
outboundChannel, future.cause());
inboundChannel.close();
}
});
.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
}

private void startDirectProxying(CommandConnected connected) {
Expand Down Expand Up @@ -389,20 +376,20 @@ private void startDirectProxying(CommandConnected connected) {
inboundChannel.pipeline().addBefore("handler", "inboundParser",
new ParserProxyHandler(service, inboundChannel,
ParserProxyHandler.FRONTEND_CONN,
connected.getMaxMessageSize()));
connected.getMaxMessageSize(), outboundChannel.id()));
outboundChannel.pipeline().addBefore("proxyOutboundHandler", "outboundParser",
new ParserProxyHandler(service, outboundChannel,
ParserProxyHandler.BACKEND_CONN,
connected.getMaxMessageSize()));
connected.getMaxMessageSize(), inboundChannel.id()));
} else {
inboundChannel.pipeline().addBefore("handler", "inboundParser",
new ParserProxyHandler(service, inboundChannel,
ParserProxyHandler.FRONTEND_CONN,
Commands.DEFAULT_MAX_MESSAGE_SIZE));
Commands.DEFAULT_MAX_MESSAGE_SIZE, outboundChannel.id()));
outboundChannel.pipeline().addBefore("proxyOutboundHandler", "outboundParser",
new ParserProxyHandler(service, outboundChannel,
ParserProxyHandler.BACKEND_CONN,
Commands.DEFAULT_MAX_MESSAGE_SIZE));
Commands.DEFAULT_MAX_MESSAGE_SIZE, inboundChannel.id()));
}
}
}
Expand All @@ -418,10 +405,6 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.close();
}

public void setRemoteHostName(String remoteHostName) {
this.remoteHostName = remoteHostName;
}

private boolean verifyTlsHostName(String hostname, ChannelHandlerContext ctx) {
ChannelHandler sslHandler = ctx.channel().pipeline().get("tls");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelId;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
Expand Down Expand Up @@ -53,6 +54,7 @@ public class ParserProxyHandler extends ChannelInboundHandlerAdapter {
private final String connType;

private final int maxMessageSize;
private final ChannelId peerChannelId;
private final ProxyService service;


Expand All @@ -66,11 +68,13 @@ public class ParserProxyHandler extends ChannelInboundHandlerAdapter {
*/
private static final Map<String, String> consumerHashMap = new ConcurrentHashMap<>();

public ParserProxyHandler(ProxyService service, Channel channel, String type, int maxMessageSize) {
public ParserProxyHandler(ProxyService service, Channel channel, String type, int maxMessageSize,
ChannelId peerChannelId) {
this.service = service;
this.channel = channel;
this.connType = type;
this.maxMessageSize = maxMessageSize;
this.peerChannelId = peerChannelId;
}

private void logging(Channel conn, BaseCommand.Type cmdtype, String info, List<RawMessage> messages) {
Expand Down Expand Up @@ -154,7 +158,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
break;
}
topicName = TopicName.get(ParserProxyHandler.consumerHashMap.get(cmd.getMessage().getConsumerId()
+ "," + DirectProxyHandler.inboundOutboundChannelMap.get(ctx.channel().id())));
+ "," + peerChannelId));
msgBytes = new MutableLong(0);
MessageParser.parseMessage(topicName, -1L,
-1L, buffer, (message) -> {
Expand Down
Loading

0 comments on commit 766e5fe

Please sign in to comment.