Skip to content

Commit

Permalink
fix(java-client): fix client thread leak when kerberos relogin failed
Browse files Browse the repository at this point in the history
  • Loading branch information
yujingwei committed Apr 15, 2024
1 parent 4b1e0fc commit 512a098
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,17 @@
*/
package org.apache.pegasus.rpc.interceptor;

import java.io.Closeable;
import org.apache.pegasus.rpc.async.ReplicaSession;

public interface ReplicaSessionInterceptor {
public interface ReplicaSessionInterceptor extends Closeable {
// The behavior when a rpc session is connected.
void onConnected(ReplicaSession session);

// The behavior when rpc session is sending a message.
// @returns false if this message shouldn't be sent.
boolean onSendMessage(ReplicaSession session, final ReplicaSession.RequestEntry entry);

@Override
void close();
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public boolean onSendMessage(ReplicaSession session, final ReplicaSession.Reques
return true;
}

@Override
public void close() {
for (ReplicaSessionInterceptor interceptor : interceptors) {
interceptor.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,6 @@ public interface AuthProtocol extends Closeable {

boolean isAuthRequest(final ReplicaSession.RequestEntry entry);

@Override
void close();
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public boolean onSendMessage(ReplicaSession session, final ReplicaSession.Reques
return protocol.isAuthRequest(entry) || !session.tryPendRequest(entry);
}

@Override
public void close() {
protocol.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.pegasus.security;

import com.sun.security.auth.callback.TextCallbackHandler;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
Expand Down Expand Up @@ -60,7 +62,8 @@ class KerberosProtocol implements AuthProtocol {
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "TGT renew for pegasus");
String timestamp = new SimpleDateFormat("HHmmss").format(new Date());
Thread t = new Thread(r, "TGT renew for pegasus - " + timestamp);
t.setDaemon(true);
t.setUncaughtExceptionHandler(
(thread, error) -> logger.error("Uncaught exception", error));
Expand Down Expand Up @@ -97,23 +100,20 @@ public boolean isAuthRequest(final ReplicaSession.RequestEntry entry) {
}

private void scheduleCheckTGTAndRelogin() {
Runnable checkTGTAndReLogin =
new Runnable() {
@Override
public void run() {
try {
checkTGTAndRelogin();
} catch (Exception e) {
logger.warn(
"check TGT and ReLogin kerberos failed , will retry after {} seconds.",
CHECK_TGT_INTEVAL_SECONDS,
e);
}
}
};

service.scheduleAtFixedRate(
checkTGTAndReLogin, CHECK_TGT_INTEVAL_SECONDS, CHECK_TGT_INTEVAL_SECONDS, TimeUnit.SECONDS);
() -> {
try {
checkTGTAndRelogin();
} catch (Exception e) {
logger.warn(
"check TGT and ReLogin kerberos failed , will retry after {} seconds.",
CHECK_TGT_INTEVAL_SECONDS,
e);
}
},
CHECK_TGT_INTEVAL_SECONDS,
CHECK_TGT_INTEVAL_SECONDS,
TimeUnit.SECONDS);
}

private void checkTGTAndRelogin() {
Expand Down Expand Up @@ -213,6 +213,7 @@ public AppConfigurationEntry[] getAppConfigurationEntry(String name) {
};
}

@Override
public void close() {
service.shutdown();
}
Expand Down

0 comments on commit 512a098

Please sign in to comment.