From d6edb9db0d2d9314c6a3547f08de3bb89c974df2 Mon Sep 17 00:00:00 2001 From: Dan Wang Date: Wed, 16 Oct 2024 15:27:42 +0800 Subject: [PATCH] add comments --- .../apache/pegasus/rpc/async/ReplicaSession.java | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/java-client/src/main/java/org/apache/pegasus/rpc/async/ReplicaSession.java b/java-client/src/main/java/org/apache/pegasus/rpc/async/ReplicaSession.java index a212322958..0a85a75bc3 100644 --- a/java-client/src/main/java/org/apache/pegasus/rpc/async/ReplicaSession.java +++ b/java-client/src/main/java/org/apache/pegasus/rpc/async/ReplicaSession.java @@ -229,16 +229,25 @@ public void operationComplete(ChannelFuture channelFuture) throws Exception { private void markSessionConnected(Channel activeChannel) { VolatileFields newCache = new VolatileFields(ConnState.CONNECTED, activeChannel); - // + // Note that actions in interceptor such as Negotiation might send request by + // ReplicaSession#asyncSend(), inside which ReplicaSession#tryConnect() would + // also be called since current state of this session is still CONNECTING. + // However, it would never create another connection, thus it is safe to do + // `fields = newCache` at the end. interceptorManager.onConnected(this); synchronized (pendingSend) { if (fields.state != ConnState.CONNECTING) { - // this session may have been closed or connected already + // This session may have been closed or connected already. logger.info("{}: session is {}, skip to mark it connected", name(), fields.state); return; } + // Once the authentication is enabled, any request except Negotiation such as + // query_cfg_operator for meta would be cached in authPendingSend and sent after + // Negotiation is successful. Negotiation would be performed first before any other + // request for the reason that AuthProtocol#isAuthRequest() would return true for + // negotiation_operator. sendPendingRequests(pendingSend, newCache); fields = newCache; } @@ -284,9 +293,11 @@ void markSessionDisconnect() { } } + // Reset the authentication once the connection is closed. resetAuth(); } + // After the authentication is reset, a new Negotiation would be launched. private void resetAuth() { synchronized (authPendingSend) { authSucceed = false; @@ -340,6 +351,7 @@ void tryNotifyFailureWithSeqID(int seqID, error_code.error_types errno, boolean } private void write(final RequestEntry entry, VolatileFields cache) { + // Under some circumstances requests are not allowed to be sent or delayed. if (!interceptorManager.onSendMessage(this, entry)) { return; }