Skip to content

Commit

Permalink
fix(java-client): fix Negotiation failed after session is disconnected
Browse files Browse the repository at this point in the history
  • Loading branch information
empiredan committed Oct 16, 2024
1 parent 73d4bf7 commit 6055811
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -122,19 +122,20 @@ public void asyncSend(
VolatileFields cache = fields;
if (cache.state == ConnState.CONNECTED) {
write(entry, cache);
} else {
synchronized (pendingSend) {
cache = fields;
if (cache.state == ConnState.CONNECTED) {
write(entry, cache);
} else {
if (!pendingSend.offer(entry)) {
logger.warn("pendingSend queue is full, drop the request");
}
return;
}

synchronized (pendingSend) {
cache = fields;
if (cache.state == ConnState.CONNECTED) {
write(entry, cache);
} else {
if (!pendingSend.offer(entry)) {
logger.warn("pendingSend queue is full, drop the request");
}
}
tryConnect();
}
tryConnect();
}

public void closeSession() {
Expand Down Expand Up @@ -189,9 +190,7 @@ public ChannelFuture tryConnect() {
boolean needConnect = false;
synchronized (pendingSend) {
if (fields.state == ConnState.DISCONNECTED) {
VolatileFields cache = new VolatileFields();
cache.state = ConnState.CONNECTING;
fields = cache;
fields = new VolatileFields(ConnState.CONNECTING);
needConnect = true;
}
}
Expand Down Expand Up @@ -228,10 +227,9 @@ public void operationComplete(ChannelFuture channelFuture) throws Exception {
}

private void markSessionConnected(Channel activeChannel) {
VolatileFields newCache = new VolatileFields();
newCache.state = ConnState.CONNECTED;
newCache.nettyChannel = activeChannel;
VolatileFields newCache = new VolatileFields(ConnState.CONNECTED, activeChannel);

//
interceptorManager.onConnected(this);

synchronized (pendingSend) {
Expand All @@ -241,59 +239,59 @@ private void markSessionConnected(Channel activeChannel) {
return;
}

while (!pendingSend.isEmpty()) {
RequestEntry e = pendingSend.poll();
if (pendingResponse.get(e.sequenceId) != null) {
write(e, newCache);
} else {
logger.info("{}: {} is removed from pending, perhaps timeout", name(), e.sequenceId);
}
}
sendPendingRequests(pendingSend, newCache);
fields = newCache;
}
}

void markSessionDisconnect() {
VolatileFields cache = fields;
if (cache.state == ConnState.DISCONNECTED) {
logger.warn("{}: session is closed already", name());
resetAuth();
return;
}

synchronized (pendingSend) {
if (cache.state != ConnState.DISCONNECTED) {
// NOTICE:
// 1. when a connection is reset, the timeout response
// is not answered in the order they query
// 2. It's likely that when the session is disconnecting
// but the caller of the api query/asyncQuery didn't notice
// this. In this case, we are relying on the timeout task.
try {
while (!pendingSend.isEmpty()) {
RequestEntry e = pendingSend.poll();
tryNotifyFailureWithSeqID(
e.sequenceId, error_code.error_types.ERR_SESSION_RESET, false);
}
List<RequestEntry> l = new LinkedList<RequestEntry>();
for (Map.Entry<Integer, RequestEntry> entry : pendingResponse.entrySet()) {
l.add(entry.getValue());
}
for (RequestEntry e : l) {
tryNotifyFailureWithSeqID(
e.sequenceId, error_code.error_types.ERR_SESSION_RESET, false);
}
} catch (Exception e) {
logger.error(
"failed to notify callers due to unexpected exception [state={}]: ",
cache.state.toString(),
e);
} finally {
logger.info("{}: mark the session to be disconnected from state={}", name(), cache.state);
// ensure the state must be set DISCONNECTED
cache = new VolatileFields();
cache.state = ConnState.DISCONNECTED;
cache.nettyChannel = null;
fields = cache;
// NOTICE:
// 1. when a connection is reset, the timeout response
// is not answered in the order they query
// 2. It's likely that when the session is disconnecting
// but the caller of the api query/asyncQuery didn't notice
// this. In this case, we are relying on the timeout task.
try {
while (!pendingSend.isEmpty()) {
RequestEntry e = pendingSend.poll();
tryNotifyFailureWithSeqID(
e.sequenceId, error_code.error_types.ERR_SESSION_RESET, false);
}
} else {
logger.warn("{}: session is closed already", name());
List<RequestEntry> l = new LinkedList<RequestEntry>();
for (Map.Entry<Integer, RequestEntry> entry : pendingResponse.entrySet()) {
l.add(entry.getValue());
}
for (RequestEntry e : l) {
tryNotifyFailureWithSeqID(
e.sequenceId, error_code.error_types.ERR_SESSION_RESET, false);
}
} catch (Exception e) {
logger.error(
"failed to notify callers due to unexpected exception [state={}]: ",
cache.state.toString(),
e);
} finally {
logger.info("{}: mark the session to be disconnected from state={}", name(), cache.state);
fields = new VolatileFields(ConnState.DISCONNECTED);
}
}

resetAuth();
}

private void resetAuth() {
synchronized (authPendingSend) {
authSucceed = false;
authPendingSend.clear();
}
}

// Notify the RPC sender if failure occurred.
Expand Down Expand Up @@ -388,43 +386,47 @@ public void run() {
}

public void onAuthSucceed() {
Queue<RequestEntry> swappedPendingSend = new LinkedList<>();
Queue<RequestEntry> swappedPendingSend;
synchronized (authPendingSend) {
authSucceed = true;
swappedPendingSend.addAll(authPendingSend);
swappedPendingSend = new LinkedList<>(authPendingSend);
authPendingSend.clear();
}

while (!swappedPendingSend.isEmpty()) {
RequestEntry e = swappedPendingSend.poll();
if (pendingResponse.get(e.sequenceId) != null) {
write(e, fields);
sendPendingRequests(swappedPendingSend, fields);
}

private void sendPendingRequests(Queue<RequestEntry> pendingEntries, VolatileFields cache) {
while (!pendingEntries.isEmpty()) {
RequestEntry entry = pendingEntries.poll();
if (pendingResponse.get(entry.sequenceId) != null) {
write(entry, cache);
} else {
logger.info("{}: {} is removed from pending, perhaps timeout", name(), e.sequenceId);
logger.info("{}: {} is removed from pending, perhaps timeout", name(), entry.sequenceId);
}
}
}

// return value:
// Return value:
// true - pend succeed
// false - pend failed
public boolean tryPendRequest(RequestEntry entry) {
// double check. the first one doesn't lock the lock.
// Because authSucceed only transfered from false to true.
// So if it is true now, it will not change in the later.
// But if it is false now, maybe it will change soon. So we should use lock to protect it.
if (!this.authSucceed) {
synchronized (authPendingSend) {
if (!this.authSucceed) {
if (!authPendingSend.offer(entry)) {
logger.warn("{}: pend request {} failed", name(), entry.sequenceId);
}
return true;
}
// Double check.
if (this.authSucceed) {
return false;
}

synchronized (authPendingSend) {
if (this.authSucceed) {
return false;
}

if (!authPendingSend.offer(entry)) {
logger.warn("{}: pend request {} failed", name(), entry.sequenceId);
}
}

return false;
return true;
}

final class DefaultHandler extends SimpleChannelInboundHandler<RequestEntry> {
Expand Down Expand Up @@ -463,7 +465,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
}
}

// for test
// For test.
ConnState getState() {
return fields.state;
}
Expand All @@ -481,20 +483,29 @@ interface MessageResponseFilter {
final Queue<RequestEntry> pendingSend = new LinkedList<RequestEntry>();

static final class VolatileFields {
public ConnState state = ConnState.DISCONNECTED;
public Channel nettyChannel = null;
public VolatileFields(ConnState state, Channel nettyChannel) {
this.state = state;
this.nettyChannel = nettyChannel;
}

public VolatileFields(ConnState state) {
this(state, null);
}

public ConnState state;
public Channel nettyChannel;
}

volatile VolatileFields fields = new VolatileFields();
volatile VolatileFields fields = new VolatileFields(ConnState.DISCONNECTED);

private final rpc_address address;
private Bootstrap boot;
private EventLoopGroup timeoutTaskGroup;
private ReplicaSessionInterceptorManager interceptorManager;
private final Bootstrap boot;
private final EventLoopGroup timeoutTaskGroup;
private final ReplicaSessionInterceptorManager interceptorManager;
private volatile boolean authSucceed;
final Queue<RequestEntry> authPendingSend = new LinkedList<>();
private final Queue<RequestEntry> authPendingSend = new LinkedList<>();

// Session will be actively closed if all the rpcs across `sessionResetTimeWindowMs`
// Session will be actively closed if all the RPCs across `sessionResetTimeWindowMs`
// are timed out, in that case we suspect that the server is unavailable.

// Timestamp of the first timed out rpc.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.pegasus.rpc.interceptor.ReplicaSessionInterceptor;

public class AuthReplicaSessionInterceptor implements ReplicaSessionInterceptor, Closeable {
private AuthProtocol protocol;
private final AuthProtocol protocol;

public AuthReplicaSessionInterceptor(ClientOptions options) throws IllegalArgumentException {
this.protocol = options.getCredential().getProtocol();
Expand All @@ -37,7 +37,7 @@ public void onConnected(ReplicaSession session) {

@Override
public boolean onSendMessage(ReplicaSession session, final ReplicaSession.RequestEntry entry) {
// tryPendRequest returns false means that the negotiation is succeed now
// tryPendRequest returns false means that the negotiation is successful now.
return protocol.isAuthRequest(entry) || !session.tryPendRequest(entry);
}

Expand Down

0 comments on commit 6055811

Please sign in to comment.