Skip to content

Commit

Permalink
Evict connection pool in case of data source build errors (finos#2909)
Browse files Browse the repository at this point in the history
  • Loading branch information
prabodhtr authored Jun 14, 2024
1 parent 686cfb7 commit 29f2aab
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public class DataSourceStatistics
{
private final AtomicInteger builtConnections;
private final AtomicInteger requestedConnections;
private final AtomicInteger connectionErrors = new AtomicInteger();
private final AtomicInteger connectionErrors;
private final AtomicLong firstConnectionRequest;
private AtomicLong lastConnectionRequest;

Expand All @@ -34,19 +34,22 @@ public DataSourceStatistics()
this.builtConnections = new AtomicInteger(0);
this.lastConnectionRequest = new AtomicLong(getCurrentTimeInInMillis());
this.requestedConnections = new AtomicInteger(0);
this.connectionErrors = new AtomicInteger(0);
}

private DataSourceStatistics(int builtConnections, long firstConnectionRequest, long lastConnectionRequest, int requestedConnections)
private DataSourceStatistics(int builtConnections, long firstConnectionRequest, long lastConnectionRequest, int requestedConnections, int connectionErrors)
{
this.builtConnections = new AtomicInteger(builtConnections);
this.firstConnectionRequest = new AtomicLong(firstConnectionRequest);
this.lastConnectionRequest = new AtomicLong(lastConnectionRequest);
this.requestedConnections = new AtomicInteger(requestedConnections);
this.connectionErrors = new AtomicInteger(connectionErrors);
}

public static DataSourceStatistics clone(DataSourceStatistics statistics)
{
return new DataSourceStatistics(statistics.builtConnections.get(), statistics.firstConnectionRequest.get(), statistics.lastConnectionRequest.get(), statistics.requestedConnections.get());
return new DataSourceStatistics(statistics.builtConnections.get(), statistics.firstConnectionRequest.get(),
statistics.lastConnectionRequest.get(), statistics.requestedConnections.get(), statistics.connectionErrors.get());
}

public int getRequestedConnections()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,17 @@ public void evictUnusedPoolsOlderThan(Duration duration)
});
}

private void evictPool(String poolName)
{
LOGGER.info("Manually evicting pool {}", poolName);
DataSourceStatistics ds = getDataSourceByPoolName(poolName).getStatistics();

// we call atomicallyRemovePool() to make sure pool is not used by other connections by using lock
this.atomicallyRemovePool(poolName, DataSourceStatistics.clone(ds));
MetricsHandler.removeConnectionMetrics(poolName);
LOGGER.info("Evicted pool {}", poolName);
}

public int size()
{
return this.connectionPools.size();
Expand Down Expand Up @@ -334,7 +345,7 @@ public DataSourceWithStatistics getDataSourceForIdentityIfAbsentBuild(IdentitySt

if (!identityState.isValid())
{
throw new RuntimeException(String.format("Invalid Identity found, cannot build connection pool for %s for %s",principal,connectionKey.shortId()));
throw new RuntimeException(String.format("Invalid Identity found, cannot build connection pool for %s for %s", principal, connectionKey.shortId()));
}

//why do we need getIfAbsentPut? the first ever pool creation request will create a new Hikari Data Source
Expand Down Expand Up @@ -365,6 +376,8 @@ public DataSourceWithStatistics getDataSourceForIdentityIfAbsentBuild(IdentitySt
catch (Exception e)
{
LOGGER.error("Error creating pool {} {}", poolName, e);
//evict pool having invalid or null datasource caused by having exception thrown while building datasource
evictPool(poolName);
throw new RuntimeException(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,11 @@ void assertPoolStateExists(String... poolNames)
assertNotNull("State not found for pool=" + poolName, connectionStateManager.getConnectionStateManagerPOJO(poolName));
}
}

DataSourceWithStatistics getDataSourceWithStatistics(String user, ConnectionKey key)
{
Identity identity = new Identity(user);
String poolName = connectionStateManager.poolNameFor(identity, key);
return connectionStateManager.getDataSourceByPoolName(poolName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
package org.finos.legend.engine.plan.execution.stores.relational.connection.ds.state;

import io.prometheus.client.CollectorRegistry;
import org.finos.legend.engine.plan.execution.stores.relational.connection.authentication.strategy.TestDatabaseAuthenticationStrategy;
import org.finos.legend.engine.plan.execution.stores.relational.connection.driver.vendors.h2.H2Manager;
import org.finos.legend.engine.plan.execution.stores.relational.connection.ds.DataSourceSpecification;
import org.finos.legend.engine.plan.execution.stores.relational.connection.ds.DataSourceWithStatistics;
import org.finos.legend.engine.plan.execution.stores.relational.connection.ds.specifications.LocalH2DataSourceSpecification;
import org.finos.legend.engine.shared.core.identity.Credential;
import org.finos.legend.engine.shared.core.identity.Identity;
import org.junit.Assert;
Expand Down Expand Up @@ -378,6 +381,66 @@ public void testDataSourceEvictionWithUnclosedConnection() throws SQLException
assertPoolStateExists(pool3);
}

@Test
public void testDataSourceEvictionForDatasourceWithConnectionErrors() throws SQLException
{
Identity user1 = new Identity("user1");

DataSourceSpecification ds1 = buildLocalDataSourceSpecification(Collections.singletonList("DROP TABLE IF EXISTS T1"));

Assert.assertEquals(0, connectionStateManager.size());
assertPoolExists(false, user1.getName(), ds1.getConnectionKey());

ConnectionStateManager.ConnectionStateHousekeepingTask houseKeeper = new ConnectionStateManager.ConnectionStateHousekeepingTask(Duration.ofMinutes(5).getSeconds());

Connection connection1 = requestConnection(user1, ds1);
//assume a connection error happened while building/connecting to datasource
DataSourceWithStatistics ds1WithStatistics = getDataSourceWithStatistics(user1.getName(), ds1.getConnectionKey());
ds1WithStatistics.getStatistics().logConnectionError();

connection1.close();

// advance clock by 4 minutes and run housekeeper
clock.advance(Duration.ofMinutes(4));
houseKeeper.run();

// pool should still exist as eviction duration is 5min
Assert.assertEquals(1, connectionStateManager.size());
assertPoolExists(true, user1.getName(), ds1.getConnectionKey());

clock.advance(Duration.ofMinutes(2));
houseKeeper.run();

// eviction time is 5 min, no new connection for user1 , should be removed
Assert.assertEquals(0, connectionStateManager.size());
assertPoolExists(false, user1.getName(), ds1.getConnectionKey());
}

@Test
public void testDataSourceManualEvictionOnExceptionsDuringDatasourceBuild()
{
Identity user1 = new Identity("user1");

// creating invalid datasource without db manager to enact situations where an exception is thrown while building datasource
// ideal scenario: exception thrown while building HikariDataSource in DataSourceSpecification.buildDataSource()
DataSourceSpecification invalidDs = new LocalH2DataSourceSpecification(Collections.emptyList(), null, new TestDatabaseAuthenticationStrategy());

Assert.assertEquals(0, connectionStateManager.size());
assertPoolExists(false, user1.getName(), invalidDs.getConnectionKey());

try
{
requestConnection(user1, invalidDs);
Assert.fail("Exception not thrown while building datasource");
}
catch (Exception e)
{
// pool should have been evicted on exceptions during datasource build
Assert.assertEquals(0, connectionStateManager.size());
assertPoolExists(false, user1.getName(), invalidDs.getConnectionKey());
}
}

@Test
public void canGetAggregatedStats()
{
Expand Down

0 comments on commit 29f2aab

Please sign in to comment.