Skip to content

Commit

Permalink
Shutdown recycler when closing ChannelFactory to avoid resource leak (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Jun 22, 2022
1 parent d59f247 commit ce9eb5c
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class ChannelFactory implements AutoCloseable {
private final long connRecycleTime;
private final CertContext certContext;
private final CertWatcher certWatcher;
private final ScheduledExecutorService recycler = Executors.newSingleThreadScheduledExecutor();
private final ScheduledExecutorService recycler;
private final ReadWriteLock lock = new ReentrantReadWriteLock();

public static class CertWatcher implements AutoCloseable {
Expand Down Expand Up @@ -191,6 +191,7 @@ public ChannelFactory(int maxFrameSize) {
this.maxFrameSize = maxFrameSize;
this.certWatcher = null;
this.certContext = null;
this.recycler = null;
this.connRecycleTime = 0;
}

Expand All @@ -205,6 +206,7 @@ public ChannelFactory(
this.connRecycleTime = connRecycleTime;
this.certContext =
new OpenSslContext(trustCertCollectionFilePath, keyCertChainFilePath, keyFilePath);
this.recycler = Executors.newSingleThreadScheduledExecutor();

File trustCert = new File(trustCertCollectionFilePath);
File keyCert = new File(keyCertChainFilePath);
Expand Down Expand Up @@ -232,6 +234,8 @@ public ChannelFactory(
this.maxFrameSize = maxFrameSize;
this.connRecycleTime = connRecycleTime;
this.certContext = new JksContext(jksKeyPath, jksKeyPassword, jksTrustPath, jksTrustPassword);
this.recycler = Executors.newSingleThreadScheduledExecutor();

File jksKey = new File(jksKeyPath);
File jksTrust = new File(jksTrustPath);
if (certReloadInterval > 0) {
Expand Down Expand Up @@ -331,9 +335,12 @@ public synchronized void close() {
});
connPool.clear();

if (certContext != null) {
if (recycler != null) {
recycler.shutdown();
if (certWatcher != null) certWatcher.close();
}

if (certWatcher != null) {
certWatcher.close();
}
}
}
28 changes: 17 additions & 11 deletions tikv-client/src/test/java/com/pingcap/tikv/ChannelFactoryTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import com.google.common.collect.ImmutableList;
import com.pingcap.tikv.util.ChannelFactory;
import com.pingcap.tikv.util.ChannelFactory.CertWatcher;
import io.grpc.ManagedChannel;
import java.io.File;
import java.util.ArrayList;
Expand Down Expand Up @@ -54,23 +55,28 @@ public void testCertWatcher() throws InterruptedException {
File a = new File(caPath);
File b = new File(clientCertPath);
File c = new File(clientKeyPath);
new ChannelFactory.CertWatcher(2, ImmutableList.of(a, b, c), () -> changed.set(true));
Thread.sleep(5000);
assertTrue(changed.get());
try (CertWatcher certWatcher =
new CertWatcher(2, ImmutableList.of(a, b, c), () -> changed.set(true))) {
Thread.sleep(5000);
assertTrue(changed.get());
}
}

@Test
public void testCertWatcherWithExceptionTask() throws InterruptedException {
AtomicInteger timesOfReloadTask = new AtomicInteger(0);
new ChannelFactory.CertWatcher(
1,
ImmutableList.of(new File(caPath), new File(clientCertPath), new File(clientKeyPath)),
() -> {
timesOfReloadTask.getAndIncrement();
touchCert();
throw new RuntimeException("Mock exception in reload task");
});
CertWatcher certWatcher =
new CertWatcher(
1,
ImmutableList.of(new File(caPath), new File(clientCertPath), new File(clientKeyPath)),
() -> {
timesOfReloadTask.getAndIncrement();
touchCert();
throw new RuntimeException("Mock exception in reload task");
});

Thread.sleep(5000);
certWatcher.close();
assertTrue(timesOfReloadTask.get() > 1);
}

Expand Down

0 comments on commit ce9eb5c

Please sign in to comment.