Skip to content

Commit

Permalink
Add query timeout to database connection (#3218)
Browse files Browse the repository at this point in the history
Co-authored-by: Sai Sriharsha Annepu <[email protected]>
  • Loading branch information
Paarth002 and gs-ssh16 authored Nov 12, 2024
1 parent 9f61247 commit 01e4528
Show file tree
Hide file tree
Showing 25 changed files with 223 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ public SQLUpdateResult execute(RelationalSaveNode node, Identity identity, Execu

this.prepareForSQLExecution(node.sqlQuery, node.sqlComment, connectionManagerConnection, databaseTimeZone, databaseType, tempTableList, identity, executionState, false);

return new SQLUpdateResult(executionState.activities, databaseType, connectionManagerConnection, identity, tempTableList, executionState.getRequestContext());
return new SQLUpdateResult(executionState.activities, databaseType, connectionManagerConnection, node.connection, identity, tempTableList, executionState.getRequestContext());
}

private void prepareForSQLExecution(String sqlQuery, String sqlComment, Connection connection, String databaseTimeZone, String databaseTypeName, List<String> tempTableList, Identity identity, ExecutionState executionState, boolean shouldLogSQL)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ public static org.finos.legend.engine.protocol.pure.v1.model.packageableElement.
db.type = DatabaseType.H2;
db.element = originalConnection.element;
db.timeZone = originalConnection instanceof DatabaseConnection ? ((DatabaseConnection) originalConnection).timeZone : null;
db.queryTimeOutInSeconds = originalConnection instanceof DatabaseConnection ? ((DatabaseConnection) originalConnection).queryTimeOutInSeconds : null;
db.quoteIdentifiers = originalConnection instanceof DatabaseConnection ? ((DatabaseConnection) originalConnection).quoteIdentifiers : null;
return db;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public SQLExecutionResult(List<ExecutionActivity> activities, SQLExecutionNode S

public SQLExecutionResult(List<ExecutionActivity> activities, SQLExecutionNode SQLExecutionNode, String databaseType, String databaseTimeZone, Connection connection, Identity identity, List<String> temporaryTables, Span topSpan, RequestContext requestContext, boolean logSQLWithParamValues)
{
super("success", connection, activities, databaseType, temporaryTables, requestContext);
super("success", connection, SQLExecutionNode.connection, activities, databaseType, temporaryTables, requestContext);
this.SQLExecutionNode = SQLExecutionNode;
this.databaseTimeZone = databaseTimeZone;
this.calendar = new GregorianCalendar(TimeZone.getTimeZone(databaseTimeZone));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.finos.legend.engine.plan.execution.stores.StoreExecutable;
import org.finos.legend.engine.plan.execution.stores.relational.connection.driver.DatabaseManager;
import org.finos.legend.engine.protocol.pure.v1.model.packageableElement.store.relational.connection.DatabaseType;
import org.finos.legend.engine.protocol.pure.v1.model.packageableElement.store.relational.connection.DatabaseConnection;
import org.finos.legend.engine.shared.core.api.request.RequestContext;
import org.finos.legend.engine.shared.core.identity.Identity;
import org.finos.legend.engine.shared.core.identity.factory.*;
Expand All @@ -45,7 +46,7 @@ public abstract class SQLResult extends Result implements StoreExecutable

private final RequestContext requestContext;

public SQLResult(String status, Connection connection, List<ExecutionActivity> activities, String databaseType, List<String> temporaryTables, RequestContext requestContext)
public SQLResult(String status, Connection connection, DatabaseConnection protocolConnection, List<ExecutionActivity> activities, String databaseType, List<String> temporaryTables, RequestContext requestContext)
{
super(status, activities);

Expand All @@ -61,6 +62,10 @@ public SQLResult(String status, Connection connection, List<ExecutionActivity> a
{
this.statement.setFetchSize(100);
}
if (protocolConnection.queryTimeOutInSeconds != null)
{
this.statement.setQueryTimeout(protocolConnection.queryTimeOutInSeconds);
}
}
catch (Throwable e)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.finos.legend.engine.plan.execution.stores.relational.activity.RelationalExecutionActivity;
import org.finos.legend.engine.shared.core.api.request.RequestContext;
import org.finos.legend.engine.shared.core.identity.Identity;
import org.finos.legend.engine.protocol.pure.v1.model.packageableElement.store.relational.connection.DatabaseConnection;
import org.finos.legend.engine.shared.core.operational.logs.LogInfo;
import org.finos.legend.engine.shared.core.operational.logs.LoggingEventType;
import org.slf4j.Logger;
Expand All @@ -32,9 +33,9 @@ public class SQLUpdateResult extends SQLResult

private final int updateCount;

public SQLUpdateResult(List<ExecutionActivity> activities, String databaseType, Connection connection, Identity identity, List<String> temporaryTables, RequestContext requestContext)
public SQLUpdateResult(List<ExecutionActivity> activities, String databaseType, Connection connection, DatabaseConnection dbConnection, Identity identity, List<String> temporaryTables, RequestContext requestContext)
{
super("success", connection, activities, databaseType, temporaryTables, requestContext);
super("success", connection, dbConnection, activities, databaseType, temporaryTables, requestContext);
try
{
long start = System.currentTimeMillis();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
// Copyright 2022 Goldman Sachs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.finos.legend.engine.plan.execution.stores.relational.connection;

import org.eclipse.collections.api.factory.Maps;
import org.finos.legend.engine.protocol.pure.v1.model.executionPlan.SingleExecutionPlan;
import org.junit.Assert;
import org.junit.Test;

import java.sql.SQLException;
import java.sql.Statement;
import java.time.LocalDate;
import java.time.LocalDateTime;

public class TestQueryTimeOut extends AlloyTestServer
{

private static final String TEST_FUNCTION = "###Pure\n" +
"function test::fetch(): Any[1]\n" +
"{\n" +
" {names:String[*] | test::Person.all()\n" +
" ->project([x | $x.fullName, x | $x.firmName], ['fullName', 'firmName'])\n" +
" ->extend([col(row:TDSRow[1] | $row.getString('fullName'), 'string')])\n" +
" ->olapGroupBy(['fullName'], asc('firmName'), func(y | $y->meta::pure::functions::math::olap::rank()), 'RANK 1')\n" +
" ->olapGroupBy(['fullName'], desc('firmName'), func(y | $y->meta::pure::functions::math::olap::rowNumber()), 'ROW 1')\n" +
" ->olapGroupBy(['fullName'], asc('firmName'), func(y | $y->meta::pure::functions::math::olap::denseRank()), 'DENSE RANK 1')\n" +
" ->olapGroupBy(['fullName'], desc('firmName'), func(y | $y->meta::pure::functions::math::olap::rank()), 'RANK 2')\n" +
" ->olapGroupBy(['firmName'], asc('fullName'), func(y | $y->meta::pure::functions::math::olap::rank()), 'RANK 3')\n" +
" ->olapGroupBy(['firmName'], desc('fullName'), func(y | $y->meta::pure::functions::math::olap::rowNumber()), 'ROW 2')\n" +
" ->olapGroupBy(['firmName'], asc('fullName'), func(y | $y->meta::pure::functions::math::olap::denseRank()), 'DENSE RANK 2')\n" +
" ->olapGroupBy(['firmName'], desc('fullName'), func(y | $y->meta::pure::functions::math::olap::rank()), 'RANK 3')\n" +
" }\n" +
"}";

private static final String LOGICAL_MODEL = "###Pure\n" +
"Class test::Person\n" +
"{\n" +
" fullName: String[1];\n" +
" addressName: String[1];\n" +
" firmName: String[1];\n" +
"}\n\n\n";

private static final String STORE_MODEL = "###Relational\n" +
"Database test::DB\n" +
"(\n" +
" Table PERSON (\n" +
" fullName VARCHAR(100) PRIMARY KEY,\n" +
" firmName VARCHAR(100),\n" +
" addressName VARCHAR(100)\n" +
" )\n" +
")\n\n\n";

private static final String MAPPING = "###Mapping\n" +
"Mapping test::Map\n" +
"(\n" +
" test::Person: Relational\n" +
" {\n" +
" ~primaryKey\n" +
" (\n" +
" [test::DB]PERSON.fullName\n" +
" )\n" +
" ~mainTable [test::DB]PERSON\n" +
" fullName: [test::DB]PERSON.fullName,\n" +
" firmName: [test::DB]PERSON.firmName,\n" +
" addressName: [test::DB]PERSON.addressName\n" +
" }\n" +
")\n\n\n";

private static final String RUNTIME = "###Runtime\n" +
"Runtime test::Runtime\n" +
"{\n" +
" mappings:\n" +
" [\n" +
" test::Map\n" +
" ];\n" +
" connections:\n" +
" [\n" +
" test::DB:\n" +
" [\n" +
" c1: #{\n" +
" RelationalDatabaseConnection\n" +
" {\n" +
" type: H2;\n" +
" specification: LocalH2 {};\n" +
" auth: DefaultH2;\n" +
" queryTimeOutInSeconds: 1;\n" +
" }\n" +
" }#\n" +
" ]\n" +
" ];\n" +
"}\n";

public static final String TEST_EXECUTION_PLAN = LOGICAL_MODEL + STORE_MODEL + MAPPING + RUNTIME + TEST_FUNCTION;


@Override
protected void insertTestData(Statement statement) throws SQLException
{
statement.execute("Drop table if exists PERSON;");
statement.execute("Create Table PERSON(fullName VARCHAR(100) NOT NULL,firmName VARCHAR(100) NULL,addressName VARCHAR(100) NULL,PRIMARY KEY(fullName));");

Integer personTableLength = 100000;
for (int i = 1; i <= personTableLength; i++)
{
statement.execute(String.format("insert into PERSON (fullName,firmName,addressName) values ('fullName%d','firmName%d','addressName%d');", personTableLength - i, i, personTableLength - i));
}
}

@Test
public void testQueryTimeOutInSeconds()
{
try
{
SingleExecutionPlan executionPlan = buildPlan(TEST_EXECUTION_PLAN);
Assert.assertNotNull(executionPlan);
Assert.assertNotNull(executePlan(executionPlan, Maps.mutable.empty()));
}
catch (Exception e)
{
Assert.assertEquals("org.h2.jdbc.JdbcSQLTimeoutException: Statement was canceled or the session timed out; SQL statement:", e.getMessage().substring(0, e.getMessage().indexOf('\n')));
return;
}
Assert.fail("Cannot test QueryTimeOut as query runs for less than 1 second");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ MODE: 'mode';
RELATIONAL_DATASOURCE_SPEC: 'specification';
RELATIONAL_AUTH_STRATEGY: 'auth';
RELATIONAL_POST_PROCESSORS: 'postProcessors';
QUERY_TIMEOUT: 'queryTimeOutInSeconds';

DB_TIMEZONE: 'timezone';
TIMEZONE: TimeZone;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ options

identifier: VALID_STRING | STRING
| STORE
| TYPE | RELATIONAL_DATASOURCE_SPEC | RELATIONAL_AUTH_STRATEGY
| TYPE | QUERY_TIMEOUT | RELATIONAL_DATASOURCE_SPEC | RELATIONAL_AUTH_STRATEGY
| DB_TIMEZONE | QUOTE_IDENTIFIERS
;

Expand All @@ -22,6 +22,7 @@ identifier: VALID_STRING | STRING
definition: (
connectionStore
| dbType
| queryTimeOutInSeconds
| connectionMode
| dbConnectionTimezone
| dbQuoteIdentifiers
Expand All @@ -39,6 +40,8 @@ dbQuoteIdentifiers: QUOTE_IDENTIFIERS COLON BOOLEAN SEMI_COL
;
dbType: TYPE COLON identifier SEMI_COLON
;
queryTimeOutInSeconds: QUERY_TIMEOUT COLON INTEGER SEMI_COLON
;

connectionMode: MODE COLON identifier SEMI_COLON
;
Expand Down Expand Up @@ -67,4 +70,5 @@ specificationValueBody: BRACE_OPEN (specificationValue)*
;

specificationValue: SPECIFICATION_BRACE_OPEN | SPECIFICATION_CONTENT | SPECIFICATION_BRACE_CLOSE
;
;

Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,14 @@ public static void addTestDataSetUp(Root_meta_external_store_relational_runtime_
}
}

public static void addDatabaseConnectionProperties(Root_meta_external_store_relational_runtime_DatabaseConnection pureConnection, String element, SourceInformation elementSourceInformation, String connectionType, String timeZone, Boolean quoteIdentifiers, CompileContext context)
public static void addDatabaseConnectionProperties(Root_meta_external_store_relational_runtime_DatabaseConnection pureConnection, String element, SourceInformation elementSourceInformation, String connectionType, String timeZone, Integer queryTimeOutInSeconds, Boolean quoteIdentifiers, CompileContext context)
{
Root_meta_external_store_relational_runtime_DatabaseConnection connection = pureConnection._type(context.pureModel.getEnumValue("meta::relational::runtime::DatabaseType", connectionType));
connection._timeZone(timeZone);
if (queryTimeOutInSeconds != null)
{
connection._queryTimeOutInSeconds(Long.valueOf(queryTimeOutInSeconds));
}
connection._quoteIdentifiers(quoteIdentifiers);
if (element != null)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ public List<Function2<Connection, CompileContext, Root_meta_core_runtime_Connect
RelationalDatabaseConnection relationalDatabaseConnection = (RelationalDatabaseConnection) connectionValue;

Root_meta_external_store_relational_runtime_RelationalDatabaseConnection relational = new Root_meta_external_store_relational_runtime_RelationalDatabaseConnection_Impl("", SourceInformationHelper.toM3SourceInformation(relationalDatabaseConnection.sourceInformation), context.pureModel.getClass("meta::external::store::relational::runtime::RelationalDatabaseConnection"));
HelperRelationalDatabaseConnectionBuilder.addDatabaseConnectionProperties(relational, relationalDatabaseConnection.element, relationalDatabaseConnection.elementSourceInformation, relationalDatabaseConnection.type.name(), relationalDatabaseConnection.timeZone, relationalDatabaseConnection.quoteIdentifiers, context);
HelperRelationalDatabaseConnectionBuilder.addDatabaseConnectionProperties(relational, relationalDatabaseConnection.element, relationalDatabaseConnection.elementSourceInformation, relationalDatabaseConnection.type.name(), relationalDatabaseConnection.timeZone, relationalDatabaseConnection.queryTimeOutInSeconds, relationalDatabaseConnection.quoteIdentifiers, context);

List<IRelationalCompilerExtension> extensions = IRelationalCompilerExtension.getExtensions(context);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ public void visitRelationalDatabaseConnectionValue(RelationalDatabaseConnectionP
{
throw new EngineException("Unknown database type '" + PureGrammarParserUtility.fromIdentifier(dbTypeCtx.identifier()) + "'", this.walkerSourceInformation.getSourceInformation(dbTypeCtx), EngineErrorType.PARSER);
}
//queryTimeoutInSeconds (optional)
RelationalDatabaseConnectionParserGrammar.QueryTimeOutInSecondsContext queryTimeOutInSecondsCtx = PureGrammarParserUtility.validateAndExtractOptionalField(ctx.queryTimeOutInSeconds(), "queryTimeOutInSeconds", connectionValue.sourceInformation);
connectionValue.queryTimeOutInSeconds = queryTimeOutInSecondsCtx != null ? Integer.parseInt(queryTimeOutInSecondsCtx.INTEGER().getText()) : null;
// timezone (optional)
RelationalDatabaseConnectionParserGrammar.DbConnectionTimezoneContext timezoneCtx = PureGrammarParserUtility.validateAndExtractOptionalField(ctx.dbConnectionTimezone(), "timezone", connectionValue.sourceInformation);
connectionValue.timeZone = timezoneCtx != null ? timezoneCtx.TIMEZONE().getText() : null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ public List<Function2<Connection, PureGrammarComposerContext, Pair<String, Strin
context.getIndentationString() + getTabString(baseIndentation + 1) + "specification: " + specification + ";\n" +
context.getIndentationString() + getTabString(baseIndentation + 1) + "auth: " + authenticationStrategy + ";\n"
)) +
(relationalDatabaseConnection.queryTimeOutInSeconds != null ? (context.getIndentationString() + getTabString(baseIndentation + 1) + "queryTimeOutInSeconds: " + String.valueOf(relationalDatabaseConnection.queryTimeOutInSeconds) + ";\n") : "") +
(postProcessors != null
? context.getIndentationString() + getTabString(baseIndentation + 1) + postProcessors
: "") +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public void testSqlServerConnectionPropertiesPropagatedToCompiledGraph()
" userNameVaultReference: 'value';\n" +
" passwordVaultReference: 'value';\n" +
" };\n" +
" queryTimeOutInSeconds: 7777;\n" +
"}\n");
}

Expand Down Expand Up @@ -138,10 +139,13 @@ public void testH2ConnectionPropertiesPropagatedToCompiledGraph()
" auth: Test\n" +
" {\n" +
" };\n" +
" queryTimeOutInSeconds: 7777;\n" +
"}\n");
Root_meta_external_store_relational_runtime_RelationalDatabaseConnection connection = (Root_meta_external_store_relational_runtime_RelationalDatabaseConnection) compiledGraph.getTwo().getConnection("simple::H2Connection", SourceInformation.getUnknownSourceInformation());
Boolean quoteIdentifiers = connection._quoteIdentifiers();
Long expectedQueryTimeOutInSeconds = 7777L;

Assert.assertTrue(quoteIdentifiers);
Assert.assertEquals(expectedQueryTimeOutInSeconds, connection._queryTimeOutInSeconds());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,20 @@ private String getTemplateConnectionWithTz(String offsetOrCode)
"}\n\n";
}

@Test
public void testConnectionWithTimeOut()
{
test("###Connection\n" +
"RelationalDatabaseConnection meta::mySimpleConnection\n" +
"{\n" +
" store: model::firm::Person;\n" +
" queryTimeOutInSeconds: 5000;\n" +
" type: H2;\n" +
" specification: LocalH2 { testDataSetupCSV: 'testCSV'; };\n" +
" auth: DefaultH2;\n" +
"}\n\n");
}

@Test
public void testTimezoneConfiguration()
{
Expand Down
Loading

0 comments on commit 01e4528

Please sign in to comment.