From 86aaba5e496b29bbf4dac8bba189b9a5770da07f Mon Sep 17 00:00:00 2001 From: yujingwei Date: Mon, 15 Apr 2024 11:06:52 +0800 Subject: [PATCH] fix(java-client): fix client thread leak when kerberos relogin failed --- .../ReplicaSessionInterceptor.java | 4 ++- .../ReplicaSessionInterceptorManager.java | 1 + .../apache/pegasus/security/AuthProtocol.java | 1 + .../AuthReplicaSessionInterceptor.java | 1 + .../pegasus/security/KerberosProtocol.java | 35 ++++++++++--------- 5 files changed, 24 insertions(+), 18 deletions(-) diff --git a/java-client/src/main/java/org/apache/pegasus/rpc/interceptor/ReplicaSessionInterceptor.java b/java-client/src/main/java/org/apache/pegasus/rpc/interceptor/ReplicaSessionInterceptor.java index 0194f3bebd..5b7dcfe0bd 100644 --- a/java-client/src/main/java/org/apache/pegasus/rpc/interceptor/ReplicaSessionInterceptor.java +++ b/java-client/src/main/java/org/apache/pegasus/rpc/interceptor/ReplicaSessionInterceptor.java @@ -18,9 +18,10 @@ */ package org.apache.pegasus.rpc.interceptor; +import java.io.Closeable; import org.apache.pegasus.rpc.async.ReplicaSession; -public interface ReplicaSessionInterceptor { +public interface ReplicaSessionInterceptor extends Closeable { // The behavior when a rpc session is connected. void onConnected(ReplicaSession session); @@ -28,5 +29,6 @@ public interface ReplicaSessionInterceptor { // @returns false if this message shouldn't be sent. boolean onSendMessage(ReplicaSession session, final ReplicaSession.RequestEntry entry); + @Override void close(); } diff --git a/java-client/src/main/java/org/apache/pegasus/rpc/interceptor/ReplicaSessionInterceptorManager.java b/java-client/src/main/java/org/apache/pegasus/rpc/interceptor/ReplicaSessionInterceptorManager.java index fa44420f53..e08c44590a 100644 --- a/java-client/src/main/java/org/apache/pegasus/rpc/interceptor/ReplicaSessionInterceptorManager.java +++ b/java-client/src/main/java/org/apache/pegasus/rpc/interceptor/ReplicaSessionInterceptorManager.java @@ -51,6 +51,7 @@ public boolean onSendMessage(ReplicaSession session, final ReplicaSession.Reques return true; } + @Override public void close() { for (ReplicaSessionInterceptor interceptor : interceptors) { interceptor.close(); diff --git a/java-client/src/main/java/org/apache/pegasus/security/AuthProtocol.java b/java-client/src/main/java/org/apache/pegasus/security/AuthProtocol.java index b8f9be933a..9aefe330c2 100644 --- a/java-client/src/main/java/org/apache/pegasus/security/AuthProtocol.java +++ b/java-client/src/main/java/org/apache/pegasus/security/AuthProtocol.java @@ -29,5 +29,6 @@ public interface AuthProtocol extends Closeable { boolean isAuthRequest(final ReplicaSession.RequestEntry entry); + @Override void close(); } diff --git a/java-client/src/main/java/org/apache/pegasus/security/AuthReplicaSessionInterceptor.java b/java-client/src/main/java/org/apache/pegasus/security/AuthReplicaSessionInterceptor.java index 2e7e07e3f8..6f99527232 100644 --- a/java-client/src/main/java/org/apache/pegasus/security/AuthReplicaSessionInterceptor.java +++ b/java-client/src/main/java/org/apache/pegasus/security/AuthReplicaSessionInterceptor.java @@ -41,6 +41,7 @@ public boolean onSendMessage(ReplicaSession session, final ReplicaSession.Reques return protocol.isAuthRequest(entry) || !session.tryPendRequest(entry); } + @Override public void close() { protocol.close(); } diff --git a/java-client/src/main/java/org/apache/pegasus/security/KerberosProtocol.java b/java-client/src/main/java/org/apache/pegasus/security/KerberosProtocol.java index bdabf82c7c..59b10e97df 100644 --- a/java-client/src/main/java/org/apache/pegasus/security/KerberosProtocol.java +++ b/java-client/src/main/java/org/apache/pegasus/security/KerberosProtocol.java @@ -19,6 +19,8 @@ package org.apache.pegasus.security; import com.sun.security.auth.callback.TextCallbackHandler; +import java.text.SimpleDateFormat; +import java.util.Date; import java.util.HashMap; import java.util.Iterator; import java.util.Map; @@ -60,7 +62,8 @@ class KerberosProtocol implements AuthProtocol { new ThreadFactory() { @Override public Thread newThread(Runnable r) { - Thread t = new Thread(r, "TGT renew for pegasus"); + String timestamp = new SimpleDateFormat("HHmmss").format(new Date()); + Thread t = new Thread(r, "TGT renew for pegasus - " + timestamp); t.setDaemon(true); t.setUncaughtExceptionHandler( (thread, error) -> logger.error("Uncaught exception", error)); @@ -97,23 +100,20 @@ public boolean isAuthRequest(final ReplicaSession.RequestEntry entry) { } private void scheduleCheckTGTAndRelogin() { - Runnable checkTGTAndReLogin = - new Runnable() { - @Override - public void run() { - try { - checkTGTAndRelogin(); - } catch (Exception e) { - logger.warn( - "check TGT and ReLogin kerberos failed , will retry after {} seconds.", - CHECK_TGT_INTEVAL_SECONDS, - e); - } - } - }; - service.scheduleAtFixedRate( - checkTGTAndReLogin, CHECK_TGT_INTEVAL_SECONDS, CHECK_TGT_INTEVAL_SECONDS, TimeUnit.SECONDS); + () -> { + try { + checkTGTAndRelogin(); + } catch (Exception e) { + logger.warn( + "check TGT and ReLogin kerberos failed , will retry after {} seconds.", + CHECK_TGT_INTEVAL_SECONDS, + e); + } + }, + CHECK_TGT_INTEVAL_SECONDS, + CHECK_TGT_INTEVAL_SECONDS, + TimeUnit.SECONDS); } private void checkTGTAndRelogin() { @@ -213,6 +213,7 @@ public AppConfigurationEntry[] getAppConfigurationEntry(String name) { }; } + @Override public void close() { service.shutdown(); }