Skip to content

Commit

Permalink
Prevent background commons pool evict task from closing connections
Browse files Browse the repository at this point in the history
The background eviction logic can close a connection when we don't want
it to be, we need to assert more control over the eviction logic to
ensure only our own idle checks are used as a reason to evict.

This closes #31

(cherry picked from commit 66e50f6)
  • Loading branch information
tabish121 committed Oct 15, 2023
1 parent 72e468f commit 2da9053
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.messaginghub.pooled.jms;

import java.time.Duration;
import java.util.NoSuchElementException;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -37,6 +38,8 @@
import org.apache.commons.pool2.KeyedPooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.EvictionConfig;
import org.apache.commons.pool2.impl.EvictionPolicy;
import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
import org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig;
import org.messaginghub.pooled.jms.pool.PooledConnection;
Expand Down Expand Up @@ -82,6 +85,8 @@ public class JmsPoolConnectionFactory implements ConnectionFactory, QueueConnect
private static final long EXHAUSTION_RECOVER_INITIAL_BACKOFF = 1_000L;
private static final long EXHAUSTION_RECOVER_BACKOFF_LIMIT = 10_000L;

private static final long DEFAULT_TIME_BETWEEN_EVICTION_RUNS = -1;

public static final int DEFAULT_MAX_SESSIONS_PER_CONNECTION = 500;
public static final int DEFAULT_MAX_CONNECTIONS = 1;

Expand Down Expand Up @@ -118,7 +123,7 @@ public PooledObject<PooledConnection> makeObject(PooledConnectionKey connectionK
connection.setIdleTimeout(getConnectionIdleTimeout());
connection.setMaxSessionsPerConnection(getMaxSessionsPerConnection());
connection.setMaxIdleSessionsPerConnection(
Math.min(getMaxIdleSessionsPerConnection(), getMaxSessionsPerConnection()));
Math.min(getMaxIdleSessionsPerConnection(), getMaxSessionsPerConnection()));
connection.setBlockIfSessionPoolIsFull(isBlockIfSessionPoolIsFull());
if (isBlockIfSessionPoolIsFull() && getBlockIfSessionPoolIsFullTimeout() > 0) {
connection.setBlockIfSessionPoolIsFullTimeout(getBlockIfSessionPoolIsFullTimeout());
Expand Down Expand Up @@ -177,13 +182,24 @@ public void passivateObject(PooledConnectionKey connectionKey, PooledObject<Pool

// Set max idle (not max active) since our connections always idle in the pool.
this.connectionsPool.setMaxIdlePerKey(DEFAULT_MAX_CONNECTIONS);
this.connectionsPool.setMinIdlePerKey(1); // Always want one connection pooled.
this.connectionsPool.setLifo(false);
this.connectionsPool.setMinIdlePerKey(1);
this.connectionsPool.setBlockWhenExhausted(false);

// We always want our validate method to control when idle objects are evicted.
this.connectionsPool.setTimeBetweenEvictionRuns(Duration.ofMillis(DEFAULT_TIME_BETWEEN_EVICTION_RUNS));
this.connectionsPool.setMinEvictableIdle(Duration.ofMillis(Long.MAX_VALUE));
this.connectionsPool.setTestOnBorrow(true);
this.connectionsPool.setTestWhileIdle(true);

// Don't use the default eviction policy as it ignores our own idle timeout option.
final EvictionPolicy<PooledConnection> policy = new EvictionPolicy<PooledConnection>() {

@Override
public boolean evict(EvictionConfig config, PooledObject<PooledConnection> underTest, int idleCount) {
return false; // We use the validation of the instance to check for idle.
}
};

this.connectionsPool.setEvictionPolicy(policy);
}
}

Expand Down Expand Up @@ -347,46 +363,46 @@ public void clear() {

//----- Connection Pool Configuration ------------------------------------//

/**
* Returns the currently configured maximum idle sessions per connection which by
* default matches the configured maximum active sessions per connection.
*
* @return the number if idle sessions allowed per connection before they are closed.
*
* @see setMaxSessionsPerConnection
* @see setMaxIdleSessionsPerConnection
*/
public int getMaxIdleSessionsPerConnection() {
return maxIdleSessionsPerConnection;
}

/**
* Sets the configured maximum idle sessions per connection which by default matches the
* configured maximum active sessions per connection. This option allows the pool to be
* configured to close sessions that are returned to the pool if the number of idle (not
* in use Sessions) exceeds this amount which can reduce the amount of resources that are
* allocated but not in use.
* <p>
* If the application in use opens and closes large amounts of sessions then leaving this
* option at the default means that there is a higher chance that an idle session will be
* available in the pool without the need to create a new instance however this does allow
* for more idle resources to exist so in cases where turnover is low with only occasional
* bursts in workloads it can be advantageous to lower this value to allow sessions to be
* fully closed on return to the pool if there are already enough idle sessions to exceed
* this amount.
* <p>
* If the max idle sessions per connection is configured larger than the max sessions value
* it will be truncated to the max sessions value to conform to the total limit on how many
* sessions can exists at any given time on a per connection basis.
*
* @param maxIdleSessionsPerConnection
* the number if idle sessions allowed per connection before they are closed.
*
* @see setMaxSessionsPerConnection
*/
public void setMaxIdleSessionsPerConnection(int maxIdleSessionsPerConnection) {
this.maxIdleSessionsPerConnection = maxIdleSessionsPerConnection;
}
/**
* Returns the currently configured maximum idle sessions per connection which by
* default matches the configured maximum active sessions per connection.
*
* @return the number if idle sessions allowed per connection before they are closed.
*
* @see setMaxSessionsPerConnection
* @see setMaxIdleSessionsPerConnection
*/
public int getMaxIdleSessionsPerConnection() {
return maxIdleSessionsPerConnection;
}

/**
* Sets the configured maximum idle sessions per connection which by default matches the
* configured maximum active sessions per connection. This option allows the pool to be
* configured to close sessions that are returned to the pool if the number of idle (not
* in use Sessions) exceeds this amount which can reduce the amount of resources that are
* allocated but not in use.
* <p>
* If the application in use opens and closes large amounts of sessions then leaving this
* option at the default means that there is a higher chance that an idle session will be
* available in the pool without the need to create a new instance however this does allow
* for more idle resources to exist so in cases where turnover is low with only occasional
* bursts in workloads it can be advantageous to lower this value to allow sessions to be
* fully closed on return to the pool if there are already enough idle sessions to exceed
* this amount.
* <p>
* If the max idle sessions per connection is configured larger than the max sessions value
* it will be truncated to the max sessions value to conform to the total limit on how many
* sessions can exists at any given time on a per connection basis.
*
* @param maxIdleSessionsPerConnection
* the number if idle sessions allowed per connection before they are closed.
*
* @see setMaxSessionsPerConnection
*/
public void setMaxIdleSessionsPerConnection(int maxIdleSessionsPerConnection) {
this.maxIdleSessionsPerConnection = maxIdleSessionsPerConnection;
}

/**
* Returns the currently configured maximum number of sessions a pooled Connection will
Expand Down Expand Up @@ -499,6 +515,8 @@ public int getConnectionIdleTimeout() {
*
* @param connectionIdleTimeout
* The maximum time a pooled Connection can sit unused before it is eligible for removal.
*
* @see #setConnectionCheckInterval(long)
*/
public void setConnectionIdleTimeout(int connectionIdleTimeout) {
this.connectionIdleTimeout = connectionIdleTimeout;
Expand Down Expand Up @@ -571,6 +589,8 @@ public void setExplicitProducerCacheSize(int cacheSize) {
*
* @param connectionCheckInterval
* The time to wait between runs of the Connection check thread.
*
* @see #setConnectionIdleTimeout(int)
*/
public void setConnectionCheckInterval(long connectionCheckInterval) {
getConnectionsPool().setTimeBetweenEvictionRunsMillis(connectionCheckInterval);
Expand Down Expand Up @@ -616,6 +636,8 @@ public long getBlockIfSessionPoolIsFullTimeout() {
* @param blockIfSessionPoolIsFullTimeout
* if blockIfSessionPoolIsFullTimeout is true then use this setting
* to configure how long to block before an error is thrown.
*
* @see #setMaxSessionsPerConnection(int)
*/
public void setBlockIfSessionPoolIsFullTimeout(long blockIfSessionPoolIsFullTimeout) {
this.blockIfSessionPoolIsFullTimeout = blockIfSessionPoolIsFullTimeout;
Expand Down Expand Up @@ -790,14 +812,14 @@ private synchronized JmsPoolConnection createJmsPoolConnection(String userName,
connection = connectionsPool.borrowObject(key);
} catch (NoSuchElementException nse) {
if (exhaustedPoolRecoveryAttempts++ < EXHUASTION_RECOVER_RETRY_LIMIT) {
LOG.trace("Recover attempt {} from exhausted pool by refilling pool key and creating new Connection", exhaustedPoolRecoveryAttempts);
if (exhaustedPoolRecoveryAttempts > 1) {
LockSupport.parkNanos(exhaustedPoolRecoveryBackoff);
exhaustedPoolRecoveryBackoff = Math.min(EXHAUSTION_RECOVER_BACKOFF_LIMIT,
exhaustedPoolRecoveryBackoff + exhaustedPoolRecoveryBackoff);
} else {
Thread.yield();
}
LOG.trace("Recover attempt {} from exhausted pool by refilling pool key and creating new Connection", exhaustedPoolRecoveryAttempts);
if (exhaustedPoolRecoveryAttempts > 1) {
LockSupport.parkNanos(exhaustedPoolRecoveryBackoff);
exhaustedPoolRecoveryBackoff = Math.min(EXHAUSTION_RECOVER_BACKOFF_LIMIT,
exhaustedPoolRecoveryBackoff + exhaustedPoolRecoveryBackoff);
} else {
Thread.yield();
}

connectionsPool.addObject(key);
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public class PooledConnection implements ExceptionListener {
private final AtomicBoolean started = new AtomicBoolean(false);
private final GenericKeyedObjectPool<PooledSessionKey, PooledSessionHolder> sessionPool;
private final List<JmsPoolSession> loanedSessions = new CopyOnWriteArrayList<JmsPoolSession>();
private final String connectionId;
private ExceptionListener parentExceptionListener;

public PooledConnection(Connection connection) {
Expand All @@ -72,6 +73,7 @@ public PooledConnection(Connection connection) {
poolConfig.setTestOnBorrow(true);

this.connection = wrap(connection);
this.connectionId = connection.toString();

try {
// Check if wrapped connection already had an exception listener and preserve it
Expand Down Expand Up @@ -429,7 +431,7 @@ public void onException(JMSException exception) {

@Override
public String toString() {
return "ConnectionPool[" + connection + "]";
return "ConnectionPool[" + connectionId + "]";
}

public void checkClientJMSVersionSupport(int requiredMajor, int requiredMinor) throws JMSException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.util.concurrent.TimeUnit;

Expand All @@ -34,6 +35,8 @@ public class JmsPoolConnectionIdleEvictionsFromPoolTest extends JmsPoolTestSuppo
@Test(timeout = 60000)
public void testEvictionOfIdle() throws Exception {
cf.setConnectionIdleTimeout(10);
cf.setConnectionCheckInterval(10);

JmsPoolConnection connection = (JmsPoolConnection) cf.createConnection();
Connection amq1 = connection.getConnection();

Expand All @@ -50,6 +53,8 @@ public void testEvictionOfIdle() throws Exception {
@Test(timeout = 60000)
public void testEvictionOfSeeminglyClosedConnection() throws Exception {
cf.setConnectionIdleTimeout(10);
cf.setConnectionCheckInterval(50_000); // Validation check should capture and evicit this

JmsPoolConnection connection = (JmsPoolConnection) cf.createConnection();
MockJMSConnection mockConnection1 = (MockJMSConnection) connection.getConnection();

Expand All @@ -66,6 +71,8 @@ public void testEvictionOfSeeminglyClosedConnection() throws Exception {
@Test(timeout = 60000)
public void testNotIdledWhenInUse() throws Exception {
cf.setConnectionIdleTimeout(10);
cf.setConnectionCheckInterval(10);

JmsPoolConnection connection = (JmsPoolConnection) cf.createConnection();
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

Expand All @@ -82,7 +89,7 @@ public void testNotIdledWhenInUse() throws Exception {
// any operation on session first checks whether session is closed
s.getTransacted();
} catch (IllegalStateException e) {
assertTrue("Session should be fine, instead: " + e.getMessage(), false);
fail("Session should be fine, instead: " + e.getMessage());
}

Connection original = connection.getConnection();
Expand All @@ -98,4 +105,52 @@ public void testNotIdledWhenInUse() throws Exception {
JmsPoolConnection connection3 = (JmsPoolConnection) cf.createConnection();
assertNotSame(original, connection3.getConnection());
}

@Test
public void testConnectionsAboveProtectedMinIdleAreNotIdledOutIfActive() throws Exception {
cf.setConnectionIdleTimeout(15);
cf.setConnectionCheckInterval(20);
cf.setMaxConnections(10);

final JmsPoolConnection safeConnection = (JmsPoolConnection) cf.createConnection();

// get a connection from pool again, it should be another connection which is now
// above the in-built minimum idle protections which guard the first connection and
// would be subject to reaping on idle timeout if not active but should never be
// reaped if active.
final JmsPoolConnection connection = (JmsPoolConnection) cf.createConnection();
assertNotSame(connection.getConnection(), safeConnection.getConnection());

// Session we will use to check on liveness.
final Session s = connection.createSession(Session.SESSION_TRANSACTED);

TimeUnit.MILLISECONDS.sleep(40); // Test that connection is not idled out when borrowed

try {
// any operation on session first checks whether session is closed and we
// are not expecting that here.
s.rollback();
} catch (IllegalStateException e) {
fail("Session should be fine, instead: " + e.getMessage());
}

final Connection underlying = connection.getConnection();

connection.close();

TimeUnit.MILLISECONDS.sleep(40); // Test that connection is idled out when not borrowed

try {
// Now the connection was idle and not loaned out for long enough that
// we expect it to have been closed.
s.rollback();
fail("Session should be closed because its connection was expected to closed");
} catch (IllegalStateException e) {
}

// get a connection from pool again, it should be a new Connection instance as the
// old one should have been inactive and idled out.
final JmsPoolConnection another = (JmsPoolConnection) cf.createConnection();
assertNotSame(underlying, another.getConnection());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,13 @@ public void close() throws JMSException {
connected.set(false);
started.set(false);

sessions.forEach((k, v) -> {
try {
v.close();
} catch (JMSException e) {
}
});

// Refuse any new work, and let any existing work complete.
executor.shutdown();
}
Expand Down

0 comments on commit 2da9053

Please sign in to comment.