Skip to content

Commit

Permalink
fix(java-client): fix Negotiation failed after a session with authe…
Browse files Browse the repository at this point in the history
…ntication enabled is closed (#2132)

Resolve #2133.

After the session with authentication enabled to remote server is closed(e.g.
the server is killed), the flag(i.e. `authSucceed`) marking whether the session
has been authenticated is still kept `true`. Afterwards, while trying to creating
another new connection to the same remote server for this session, some requests
would be pending before it is connected successfully. Once the session is connected,
the pending requests would be sent to the remote server. Typically, the first element
of the pending queue would be a non-negotiation request(`query_cfg_request` to
the meta server for example), and the second one would be a negotiation request.
Normally, the non-negotiation request would be moved to another pending queue
for authentication; however, since the flag still marks that the session has been
authenticated successfully, the non-negotiation request would be directed to the
remote server which would never reply to the client since the server think that the
negotiation has not been launched. However, since the client has actually launched
the negotiation, it would close the session due to timeout or having not receiving
response from the server for long time. Therefore, the above process would repeat
again.

To fix this problem, the flag should be reset after the session is closed. Then, the
client would never send non-negotiation requests before the negotiation with the
server becomes successful.
  • Loading branch information
empiredan authored Oct 24, 2024
1 parent d91a6e7 commit f13084a
Show file tree
Hide file tree
Showing 3 changed files with 303 additions and 169 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -122,19 +122,20 @@ public void asyncSend(
VolatileFields cache = fields;
if (cache.state == ConnState.CONNECTED) {
write(entry, cache);
} else {
synchronized (pendingSend) {
cache = fields;
if (cache.state == ConnState.CONNECTED) {
write(entry, cache);
} else {
if (!pendingSend.offer(entry)) {
logger.warn("pendingSend queue is full, drop the request");
}
return;
}

synchronized (pendingSend) {
cache = fields;
if (cache.state == ConnState.CONNECTED) {
write(entry, cache);
} else {
if (!pendingSend.offer(entry)) {
logger.warn("pendingSend queue is full for session {}, drop the request", name());
}
}
tryConnect();
}
tryConnect();
}

public void closeSession() {
Expand All @@ -145,9 +146,9 @@ public void closeSession() {
// but the connection may not be completely closed then, that is,
// the state may not be marked as DISCONNECTED immediately.
f.nettyChannel.close().sync();
logger.info("channel to {} closed", address.toString());
logger.info("channel to {} closed", name());
} catch (Exception ex) {
logger.warn("close channel {} failed: ", address.toString(), ex);
logger.warn("close channel {} failed: ", name(), ex);
}
} else if (f.state == ConnState.CONNECTING) { // f.nettyChannel == null
// If our actively-close strategy fails to reconnect the session due to
Expand Down Expand Up @@ -189,9 +190,7 @@ public ChannelFuture tryConnect() {
boolean needConnect = false;
synchronized (pendingSend) {
if (fields.state == ConnState.DISCONNECTED) {
VolatileFields cache = new VolatileFields();
cache.state = ConnState.CONNECTING;
fields = cache;
fields = new VolatileFields(ConnState.CONNECTING);
needConnect = true;
}
}
Expand Down Expand Up @@ -221,79 +220,96 @@ public void operationComplete(ChannelFuture channelFuture) throws Exception {
}
});
} catch (UnknownHostException ex) {
logger.error("invalid address: {}", address.toString());
logger.error("invalid address: {}", name());
assert false;
return null; // unreachable
}
}

private void markSessionConnected(Channel activeChannel) {
VolatileFields newCache = new VolatileFields();
newCache.state = ConnState.CONNECTED;
newCache.nettyChannel = activeChannel;
VolatileFields newCache = new VolatileFields(ConnState.CONNECTED, activeChannel);

// Note that actions in interceptor such as Negotiation might send request by
// ReplicaSession#asyncSend(), inside which ReplicaSession#tryConnect() would
// also be called since current state of this session is still CONNECTING.
// However, it would never create another connection, thus it is safe to do
// `fields = newCache` at the end.
interceptorManager.onConnected(this);

synchronized (pendingSend) {
if (fields.state != ConnState.CONNECTING) {
// this session may have been closed or connected already
// This session may have been closed or connected already.
logger.info("{}: session is {}, skip to mark it connected", name(), fields.state);
return;
}

while (!pendingSend.isEmpty()) {
RequestEntry e = pendingSend.poll();
if (pendingResponse.get(e.sequenceId) != null) {
write(e, newCache);
} else {
logger.info("{}: {} is removed from pending, perhaps timeout", name(), e.sequenceId);
}
}
// Once the authentication is enabled, any request except Negotiation such as
// query_cfg_operator for meta would be cached in authPendingSend and sent after
// Negotiation is successful. Negotiation would be performed first before any other
// request for the reason that AuthProtocol#isAuthRequest() would return true for
// negotiation_operator.
sendPendingRequests(pendingSend, newCache);
fields = newCache;
}
}

void markSessionDisconnect() {
VolatileFields cache = fields;
if (cache.state == ConnState.DISCONNECTED) {
logger.warn("{}: session is closed already", name());
resetAuth();
return;
}

synchronized (pendingSend) {
if (cache.state != ConnState.DISCONNECTED) {
// NOTICE:
// 1. when a connection is reset, the timeout response
// is not answered in the order they query
// 2. It's likely that when the session is disconnecting
// but the caller of the api query/asyncQuery didn't notice
// this. In this case, we are relying on the timeout task.
try {
while (!pendingSend.isEmpty()) {
RequestEntry e = pendingSend.poll();
tryNotifyFailureWithSeqID(
e.sequenceId, error_code.error_types.ERR_SESSION_RESET, false);
}
List<RequestEntry> l = new LinkedList<RequestEntry>();
for (Map.Entry<Integer, RequestEntry> entry : pendingResponse.entrySet()) {
l.add(entry.getValue());
}
for (RequestEntry e : l) {
tryNotifyFailureWithSeqID(
e.sequenceId, error_code.error_types.ERR_SESSION_RESET, false);
}
} catch (Exception e) {
logger.error(
"failed to notify callers due to unexpected exception [state={}]: ",
cache.state.toString(),
e);
} finally {
logger.info("{}: mark the session to be disconnected from state={}", name(), cache.state);
// ensure the state must be set DISCONNECTED
cache = new VolatileFields();
cache.state = ConnState.DISCONNECTED;
cache.nettyChannel = null;
fields = cache;
// NOTICE:
// 1. when a connection is reset, the timeout response
// is not answered in the order they query
// 2. It's likely that when the session is disconnecting
// but the caller of the api query/asyncQuery didn't notice
// this. In this case, we are relying on the timeout task.
try {
while (!pendingSend.isEmpty()) {
RequestEntry entry = pendingSend.poll();
tryNotifyFailureWithSeqID(
entry.sequenceId, error_code.error_types.ERR_SESSION_RESET, false);
}
} else {
logger.warn("{}: session is closed already", name());
List<RequestEntry> pendingEntries = new LinkedList<>();
for (Map.Entry<Integer, RequestEntry> entry : pendingResponse.entrySet()) {
pendingEntries.add(entry.getValue());
}
for (RequestEntry entry : pendingEntries) {
tryNotifyFailureWithSeqID(
entry.sequenceId, error_code.error_types.ERR_SESSION_RESET, false);
}
} catch (Exception ex) {
logger.error(
"{}: failed to notify callers due to unexpected exception [state={}]: ",
name(),
cache.state.toString(),
ex);
} finally {
logger.info("{}: mark the session to be disconnected from state={}", name(), cache.state);
fields = new VolatileFields(ConnState.DISCONNECTED);
}
}

// Reset the authentication once the connection is closed.
resetAuth();
}

// After the authentication is reset, a new Negotiation would be launched.
private void resetAuth() {
int pendingSize;
synchronized (authPendingSend) {
authSucceed = false;
pendingSize = authPendingSend.size();
}

logger.info(
"authentication is reset for session {}, with still {} request entries pending",
name(),
pendingSize);
}

// Notify the RPC sender if failure occurred.
Expand Down Expand Up @@ -342,6 +358,7 @@ void tryNotifyFailureWithSeqID(int seqID, error_code.error_types errno, boolean
}

private void write(final RequestEntry entry, VolatileFields cache) {
// Under some circumstances requests are not allowed to be sent or delayed.
if (!interceptorManager.onSendMessage(this, entry)) {
return;
}
Expand Down Expand Up @@ -378,8 +395,8 @@ private ScheduledFuture<?> addTimer(final int seqID, long timeoutInMillseconds)
public void run() {
try {
tryNotifyFailureWithSeqID(seqID, error_code.error_types.ERR_TIMEOUT, true);
} catch (Exception e) {
logger.warn("try notify with sequenceID {} exception!", seqID, e);
} catch (Exception ex) {
logger.warn("{}: try notify with sequenceID {} exception!", name(), seqID, ex);
}
}
},
Expand All @@ -388,43 +405,52 @@ public void run() {
}

public void onAuthSucceed() {
Queue<RequestEntry> swappedPendingSend = new LinkedList<>();
Queue<RequestEntry> swappedPendingSend;
synchronized (authPendingSend) {
authSucceed = true;
swappedPendingSend.addAll(authPendingSend);
swappedPendingSend = new LinkedList<>(authPendingSend);
authPendingSend.clear();
}

while (!swappedPendingSend.isEmpty()) {
RequestEntry e = swappedPendingSend.poll();
if (pendingResponse.get(e.sequenceId) != null) {
write(e, fields);
logger.info(
"authentication is successful for session {}, then {} pending request entries would be sent",
name(),
swappedPendingSend.size());

sendPendingRequests(swappedPendingSend, fields);
}

private void sendPendingRequests(Queue<RequestEntry> pendingEntries, VolatileFields cache) {
while (!pendingEntries.isEmpty()) {
RequestEntry entry = pendingEntries.poll();
if (pendingResponse.get(entry.sequenceId) != null) {
write(entry, cache);
} else {
logger.info("{}: {} is removed from pending, perhaps timeout", name(), e.sequenceId);
logger.info("{}: {} is removed from pending, perhaps timeout", name(), entry.sequenceId);
}
}
}

// return value:
// Return value:
// true - pend succeed
// false - pend failed
public boolean tryPendRequest(RequestEntry entry) {
// double check. the first one doesn't lock the lock.
// Because authSucceed only transfered from false to true.
// So if it is true now, it will not change in the later.
// But if it is false now, maybe it will change soon. So we should use lock to protect it.
if (!this.authSucceed) {
synchronized (authPendingSend) {
if (!this.authSucceed) {
if (!authPendingSend.offer(entry)) {
logger.warn("{}: pend request {} failed", name(), entry.sequenceId);
}
return true;
}
// Double check.
if (this.authSucceed) {
return false;
}

synchronized (authPendingSend) {
if (this.authSucceed) {
return false;
}

if (!authPendingSend.offer(entry)) {
logger.warn("{}: pend request {} failed", name(), entry.sequenceId);
}
}

return false;
return true;
}

final class DefaultHandler extends SimpleChannelInboundHandler<RequestEntry> {
Expand Down Expand Up @@ -463,38 +489,56 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
}
}

// for test
// Only for test.
ConnState getState() {
return fields.state;
}

interface AuthPendingChecker {
void onCheck(Queue<RequestEntry> realAuthPendingSend);
}

void checkAuthPending(AuthPendingChecker checker) {
synchronized (authPendingSend) {
checker.onCheck(authPendingSend);
}
}

interface MessageResponseFilter {
boolean abandonIt(error_code.error_types err, TMessage header);
}

MessageResponseFilter filter = null;

final ConcurrentHashMap<Integer, RequestEntry> pendingResponse =
new ConcurrentHashMap<Integer, RequestEntry>();
final ConcurrentHashMap<Integer, RequestEntry> pendingResponse = new ConcurrentHashMap<>();
private final AtomicInteger seqId = new AtomicInteger(0);

final Queue<RequestEntry> pendingSend = new LinkedList<RequestEntry>();
final Queue<RequestEntry> pendingSend = new LinkedList<>();

static final class VolatileFields {
public ConnState state = ConnState.DISCONNECTED;
public Channel nettyChannel = null;
public VolatileFields(ConnState state, Channel nettyChannel) {
this.state = state;
this.nettyChannel = nettyChannel;
}

public VolatileFields(ConnState state) {
this(state, null);
}

public ConnState state;
public Channel nettyChannel;
}

volatile VolatileFields fields = new VolatileFields();
volatile VolatileFields fields = new VolatileFields(ConnState.DISCONNECTED);

private final rpc_address address;
private Bootstrap boot;
private EventLoopGroup timeoutTaskGroup;
private ReplicaSessionInterceptorManager interceptorManager;
private final Bootstrap boot;
private final EventLoopGroup timeoutTaskGroup;
private final ReplicaSessionInterceptorManager interceptorManager;
private volatile boolean authSucceed;
final Queue<RequestEntry> authPendingSend = new LinkedList<>();
private final Queue<RequestEntry> authPendingSend = new LinkedList<>();

// Session will be actively closed if all the rpcs across `sessionResetTimeWindowMs`
// Session will be actively closed if all the RPCs across `sessionResetTimeWindowMs`
// are timed out, in that case we suspect that the server is unavailable.

// Timestamp of the first timed out rpc.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.pegasus.rpc.interceptor.ReplicaSessionInterceptor;

public class AuthReplicaSessionInterceptor implements ReplicaSessionInterceptor, Closeable {
private AuthProtocol protocol;
private final AuthProtocol protocol;

public AuthReplicaSessionInterceptor(ClientOptions options) throws IllegalArgumentException {
this.protocol = options.getCredential().getProtocol();
Expand All @@ -37,7 +37,7 @@ public void onConnected(ReplicaSession session) {

@Override
public boolean onSendMessage(ReplicaSession session, final ReplicaSession.RequestEntry entry) {
// tryPendRequest returns false means that the negotiation is succeed now
// tryPendRequest returns false means that the negotiation is successful now.
return protocol.isAuthRequest(entry) || !session.tryPendRequest(entry);
}

Expand Down
Loading

0 comments on commit f13084a

Please sign in to comment.