From 29f2aab4524752779bdfa272acc72b89ed1056c2 Mon Sep 17 00:00:00 2001 From: Prabodh T R <43966094+prabodhtr@users.noreply.github.com> Date: Fri, 14 Jun 2024 09:50:33 +0530 Subject: [PATCH] Evict connection pool in case of data source build errors (#2909) --- .../connection/ds/DataSourceStatistics.java | 9 ++- .../ds/state/ConnectionStateManager.java | 15 ++++- .../ds/state/TestConnectionManagement.java | 7 +++ .../ds/state/TestConnectionStateManager.java | 63 +++++++++++++++++++ 4 files changed, 90 insertions(+), 4 deletions(-) diff --git a/legend-engine-xts-relationalStore/legend-engine-xt-relationalStore-execution/legend-engine-xt-relationalStore-executionPlan-connection/src/main/java/org/finos/legend/engine/plan/execution/stores/relational/connection/ds/DataSourceStatistics.java b/legend-engine-xts-relationalStore/legend-engine-xt-relationalStore-execution/legend-engine-xt-relationalStore-executionPlan-connection/src/main/java/org/finos/legend/engine/plan/execution/stores/relational/connection/ds/DataSourceStatistics.java index 43c8d08db10..c0617a35504 100644 --- a/legend-engine-xts-relationalStore/legend-engine-xt-relationalStore-execution/legend-engine-xt-relationalStore-executionPlan-connection/src/main/java/org/finos/legend/engine/plan/execution/stores/relational/connection/ds/DataSourceStatistics.java +++ b/legend-engine-xts-relationalStore/legend-engine-xt-relationalStore-execution/legend-engine-xt-relationalStore-executionPlan-connection/src/main/java/org/finos/legend/engine/plan/execution/stores/relational/connection/ds/DataSourceStatistics.java @@ -24,7 +24,7 @@ public class DataSourceStatistics { private final AtomicInteger builtConnections; private final AtomicInteger requestedConnections; - private final AtomicInteger connectionErrors = new AtomicInteger(); + private final AtomicInteger connectionErrors; private final AtomicLong firstConnectionRequest; private AtomicLong lastConnectionRequest; @@ -34,19 +34,22 @@ public DataSourceStatistics() this.builtConnections = new AtomicInteger(0); this.lastConnectionRequest = new AtomicLong(getCurrentTimeInInMillis()); this.requestedConnections = new AtomicInteger(0); + this.connectionErrors = new AtomicInteger(0); } - private DataSourceStatistics(int builtConnections, long firstConnectionRequest, long lastConnectionRequest, int requestedConnections) + private DataSourceStatistics(int builtConnections, long firstConnectionRequest, long lastConnectionRequest, int requestedConnections, int connectionErrors) { this.builtConnections = new AtomicInteger(builtConnections); this.firstConnectionRequest = new AtomicLong(firstConnectionRequest); this.lastConnectionRequest = new AtomicLong(lastConnectionRequest); this.requestedConnections = new AtomicInteger(requestedConnections); + this.connectionErrors = new AtomicInteger(connectionErrors); } public static DataSourceStatistics clone(DataSourceStatistics statistics) { - return new DataSourceStatistics(statistics.builtConnections.get(), statistics.firstConnectionRequest.get(), statistics.lastConnectionRequest.get(), statistics.requestedConnections.get()); + return new DataSourceStatistics(statistics.builtConnections.get(), statistics.firstConnectionRequest.get(), + statistics.lastConnectionRequest.get(), statistics.requestedConnections.get(), statistics.connectionErrors.get()); } public int getRequestedConnections() diff --git a/legend-engine-xts-relationalStore/legend-engine-xt-relationalStore-execution/legend-engine-xt-relationalStore-executionPlan-connection/src/main/java/org/finos/legend/engine/plan/execution/stores/relational/connection/ds/state/ConnectionStateManager.java b/legend-engine-xts-relationalStore/legend-engine-xt-relationalStore-execution/legend-engine-xt-relationalStore-executionPlan-connection/src/main/java/org/finos/legend/engine/plan/execution/stores/relational/connection/ds/state/ConnectionStateManager.java index 0654b5943f6..277977f7cae 100644 --- a/legend-engine-xts-relationalStore/legend-engine-xt-relationalStore-execution/legend-engine-xt-relationalStore-executionPlan-connection/src/main/java/org/finos/legend/engine/plan/execution/stores/relational/connection/ds/state/ConnectionStateManager.java +++ b/legend-engine-xts-relationalStore/legend-engine-xt-relationalStore-execution/legend-engine-xt-relationalStore-executionPlan-connection/src/main/java/org/finos/legend/engine/plan/execution/stores/relational/connection/ds/state/ConnectionStateManager.java @@ -244,6 +244,17 @@ public void evictUnusedPoolsOlderThan(Duration duration) }); } + private void evictPool(String poolName) + { + LOGGER.info("Manually evicting pool {}", poolName); + DataSourceStatistics ds = getDataSourceByPoolName(poolName).getStatistics(); + + // we call atomicallyRemovePool() to make sure pool is not used by other connections by using lock + this.atomicallyRemovePool(poolName, DataSourceStatistics.clone(ds)); + MetricsHandler.removeConnectionMetrics(poolName); + LOGGER.info("Evicted pool {}", poolName); + } + public int size() { return this.connectionPools.size(); @@ -334,7 +345,7 @@ public DataSourceWithStatistics getDataSourceForIdentityIfAbsentBuild(IdentitySt if (!identityState.isValid()) { - throw new RuntimeException(String.format("Invalid Identity found, cannot build connection pool for %s for %s",principal,connectionKey.shortId())); + throw new RuntimeException(String.format("Invalid Identity found, cannot build connection pool for %s for %s", principal, connectionKey.shortId())); } //why do we need getIfAbsentPut? the first ever pool creation request will create a new Hikari Data Source @@ -365,6 +376,8 @@ public DataSourceWithStatistics getDataSourceForIdentityIfAbsentBuild(IdentitySt catch (Exception e) { LOGGER.error("Error creating pool {} {}", poolName, e); + //evict pool having invalid or null datasource caused by having exception thrown while building datasource + evictPool(poolName); throw new RuntimeException(e); } } diff --git a/legend-engine-xts-relationalStore/legend-engine-xt-relationalStore-execution/legend-engine-xt-relationalStore-executionPlan-connection/src/test/java/org/finos/legend/engine/plan/execution/stores/relational/connection/ds/state/TestConnectionManagement.java b/legend-engine-xts-relationalStore/legend-engine-xt-relationalStore-execution/legend-engine-xt-relationalStore-executionPlan-connection/src/test/java/org/finos/legend/engine/plan/execution/stores/relational/connection/ds/state/TestConnectionManagement.java index a43d9adc622..9220ff876a6 100644 --- a/legend-engine-xts-relationalStore/legend-engine-xt-relationalStore-execution/legend-engine-xt-relationalStore-executionPlan-connection/src/test/java/org/finos/legend/engine/plan/execution/stores/relational/connection/ds/state/TestConnectionManagement.java +++ b/legend-engine-xts-relationalStore/legend-engine-xt-relationalStore-execution/legend-engine-xt-relationalStore-executionPlan-connection/src/test/java/org/finos/legend/engine/plan/execution/stores/relational/connection/ds/state/TestConnectionManagement.java @@ -91,4 +91,11 @@ void assertPoolStateExists(String... poolNames) assertNotNull("State not found for pool=" + poolName, connectionStateManager.getConnectionStateManagerPOJO(poolName)); } } + + DataSourceWithStatistics getDataSourceWithStatistics(String user, ConnectionKey key) + { + Identity identity = new Identity(user); + String poolName = connectionStateManager.poolNameFor(identity, key); + return connectionStateManager.getDataSourceByPoolName(poolName); + } } diff --git a/legend-engine-xts-relationalStore/legend-engine-xt-relationalStore-execution/legend-engine-xt-relationalStore-executionPlan-connection/src/test/java/org/finos/legend/engine/plan/execution/stores/relational/connection/ds/state/TestConnectionStateManager.java b/legend-engine-xts-relationalStore/legend-engine-xt-relationalStore-execution/legend-engine-xt-relationalStore-executionPlan-connection/src/test/java/org/finos/legend/engine/plan/execution/stores/relational/connection/ds/state/TestConnectionStateManager.java index cfd4bc4ebaa..9b079a26dea 100644 --- a/legend-engine-xts-relationalStore/legend-engine-xt-relationalStore-execution/legend-engine-xt-relationalStore-executionPlan-connection/src/test/java/org/finos/legend/engine/plan/execution/stores/relational/connection/ds/state/TestConnectionStateManager.java +++ b/legend-engine-xts-relationalStore/legend-engine-xt-relationalStore-execution/legend-engine-xt-relationalStore-executionPlan-connection/src/test/java/org/finos/legend/engine/plan/execution/stores/relational/connection/ds/state/TestConnectionStateManager.java @@ -15,8 +15,11 @@ package org.finos.legend.engine.plan.execution.stores.relational.connection.ds.state; import io.prometheus.client.CollectorRegistry; +import org.finos.legend.engine.plan.execution.stores.relational.connection.authentication.strategy.TestDatabaseAuthenticationStrategy; +import org.finos.legend.engine.plan.execution.stores.relational.connection.driver.vendors.h2.H2Manager; import org.finos.legend.engine.plan.execution.stores.relational.connection.ds.DataSourceSpecification; import org.finos.legend.engine.plan.execution.stores.relational.connection.ds.DataSourceWithStatistics; +import org.finos.legend.engine.plan.execution.stores.relational.connection.ds.specifications.LocalH2DataSourceSpecification; import org.finos.legend.engine.shared.core.identity.Credential; import org.finos.legend.engine.shared.core.identity.Identity; import org.junit.Assert; @@ -378,6 +381,66 @@ public void testDataSourceEvictionWithUnclosedConnection() throws SQLException assertPoolStateExists(pool3); } + @Test + public void testDataSourceEvictionForDatasourceWithConnectionErrors() throws SQLException + { + Identity user1 = new Identity("user1"); + + DataSourceSpecification ds1 = buildLocalDataSourceSpecification(Collections.singletonList("DROP TABLE IF EXISTS T1")); + + Assert.assertEquals(0, connectionStateManager.size()); + assertPoolExists(false, user1.getName(), ds1.getConnectionKey()); + + ConnectionStateManager.ConnectionStateHousekeepingTask houseKeeper = new ConnectionStateManager.ConnectionStateHousekeepingTask(Duration.ofMinutes(5).getSeconds()); + + Connection connection1 = requestConnection(user1, ds1); + //assume a connection error happened while building/connecting to datasource + DataSourceWithStatistics ds1WithStatistics = getDataSourceWithStatistics(user1.getName(), ds1.getConnectionKey()); + ds1WithStatistics.getStatistics().logConnectionError(); + + connection1.close(); + + // advance clock by 4 minutes and run housekeeper + clock.advance(Duration.ofMinutes(4)); + houseKeeper.run(); + + // pool should still exist as eviction duration is 5min + Assert.assertEquals(1, connectionStateManager.size()); + assertPoolExists(true, user1.getName(), ds1.getConnectionKey()); + + clock.advance(Duration.ofMinutes(2)); + houseKeeper.run(); + + // eviction time is 5 min, no new connection for user1 , should be removed + Assert.assertEquals(0, connectionStateManager.size()); + assertPoolExists(false, user1.getName(), ds1.getConnectionKey()); + } + + @Test + public void testDataSourceManualEvictionOnExceptionsDuringDatasourceBuild() + { + Identity user1 = new Identity("user1"); + + // creating invalid datasource without db manager to enact situations where an exception is thrown while building datasource + // ideal scenario: exception thrown while building HikariDataSource in DataSourceSpecification.buildDataSource() + DataSourceSpecification invalidDs = new LocalH2DataSourceSpecification(Collections.emptyList(), null, new TestDatabaseAuthenticationStrategy()); + + Assert.assertEquals(0, connectionStateManager.size()); + assertPoolExists(false, user1.getName(), invalidDs.getConnectionKey()); + + try + { + requestConnection(user1, invalidDs); + Assert.fail("Exception not thrown while building datasource"); + } + catch (Exception e) + { + // pool should have been evicted on exceptions during datasource build + Assert.assertEquals(0, connectionStateManager.size()); + assertPoolExists(false, user1.getName(), invalidDs.getConnectionKey()); + } + } + @Test public void canGetAggregatedStats() {