Skip to content

Commit

Permalink
add test connection execution support for duck db in pure runtime
Browse files Browse the repository at this point in the history
- use dbtype in testdbconnection to switch between h2 and duckDB test connection in legend-pure connection
manager
- use singleton connection for duckdb
  • Loading branch information
PrateekGarg-gs committed Oct 4, 2024
1 parent 96c6bea commit 0a1fdfe
Show file tree
Hide file tree
Showing 10 changed files with 271 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,66 @@ public void testExecuteInDbError()
assertPureException(PureExecutionException.class, Pattern.compile("Error executing sql query; SQL reason: Table \"TT\" not found \\(this database is empty\\); SQL statement:\n" +
"select \\* from tt \\[42104-\\d++]; SQL error code: 42104; SQL state: 42S04"), 8, 4, e);
}

@Test
public void testExecuteInDb_H2()
{
compileTestSource(
TEST_SOURCE_ID,
"import meta::external::store::relational::runtime::*;\n" +
"import meta::relational::metamodel::*;\n" +
"import meta::relational::metamodel::execute::*;\n" +
"import meta::relational::functions::toDDL::*;\n" +
"function test():Any[0..1]\n" +
"{\n" +
" let dbConnection = ^TestDatabaseConnection(type = meta::relational::runtime::DatabaseType.H2);\n" +
" let res = executeInDb('select H2VERSION();', $dbConnection, 0, 1000);\n" +
"}\n" +
"###Relational\n" +
"Database mydb()\n"
);
compileAndExecute("test():Any[0..1]");
}

@Test
public void testExecuteInb_DuckDB()
{
compileTestSource(
TEST_SOURCE_ID,
"import meta::external::store::relational::runtime::*;\n" +
"import meta::relational::metamodel::*;\n" +
"import meta::relational::metamodel::execute::*;\n" +
"import meta::relational::functions::toDDL::*;\n" +
"function test():Any[0..1]\n" +
"{\n" +
" let dbConnection = ^TestDatabaseConnection(type = meta::relational::runtime::DatabaseType.DuckDB);\n" +
" let res = executeInDb('select * from duckdb_settings();', $dbConnection, 0, 1000);\n" +
"}\n" +
"###Relational\n" +
"Database mydb()\n"
);
compileAndExecute("test():Any[0..1]");
}

// Duck db implicitly converts result of int sum to hugeint >> test to ensure we can read duckdb specific hugeint into pure (as int)
@Test
public void testExecuteInb_DuckDB_HugeInt()
{
compileTestSource(
TEST_SOURCE_ID,
"import meta::external::store::relational::runtime::*;\n" +
"import meta::relational::metamodel::*;\n" +
"import meta::relational::metamodel::execute::*;\n" +
"import meta::relational::functions::toDDL::*;\n" +
"function test():Any[0..1]\n" +
"{\n" +
" let dbConnection = ^TestDatabaseConnection(type = meta::relational::runtime::DatabaseType.DuckDB);\n" +
" let res = executeInDb('select 1+1;', $dbConnection, 0, 1000);\n" +
" assertEquals(2, $res.rows->at(0).values->at(0));\n" +
"}\n" +
"###Relational\n" +
"Database mydb()\n"
);
compileAndExecute("test():Any[0..1]");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,11 @@
<artifactId>h2</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.duckdb</groupId>
<artifactId>duckdb_jdbc</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.eclipse.collections.impl.factory.Lists;
import org.eclipse.collections.impl.map.mutable.primitive.IntObjectHashMap;
import org.finos.legend.pure.m3.exception.PureExecutionException;
import org.finos.legend.pure.m3.navigation.M3Paths;
import org.finos.legend.pure.m3.tools.BinaryUtils;
import org.finos.legend.pure.m4.coreinstance.CoreInstance;
import org.finos.legend.pure.m4.coreinstance.primitive.date.DateFunctions;
Expand All @@ -33,6 +34,9 @@
import java.sql.Timestamp;
import java.sql.Types;
import java.util.Calendar;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

public final class ResultSetValueHandlers
{
Expand Down Expand Up @@ -137,6 +141,7 @@ public Object value(ResultSet rs, int i, CoreInstance nullSqlInstance, Calendar


private static final MutableIntObjectMap<ResultSetValueHandler> HANDLERS = IntObjectHashMap.newMap();
private static Map<Integer, Map<String, ResultSetValueHandler>> DB_SPECIFIC_HANDLERS = new HashMap<>();

static
{
Expand Down Expand Up @@ -165,6 +170,8 @@ public Object value(ResultSet rs, int i, CoreInstance nullSqlInstance, Calendar
HANDLERS.put(Types.BINARY, BINARY);
HANDLERS.put(Types.VARBINARY, BINARY);
HANDLERS.put(Types.LONGVARBINARY, BINARY);

DB_SPECIFIC_HANDLERS.put(Types.JAVA_OBJECT, Collections.singletonMap("HUGEINT", LONG));
}


Expand All @@ -186,13 +193,21 @@ public static ListIterable<ResultSetValueHandler> getHandlers(ResultSetMetaData
for (int i = 1; i <= count; i++)
{
ResultSetValueHandler handler = HANDLERS.get(metaData.getColumnType(i));

if (handler == null)
{
throw new PureExecutionException("Unhandled SQL data type (java.sql.Types): " + metaData.getColumnType(i) + ", column: " + i + " " + metaData.getColumnName(i) + " " + metaData.getColumnTypeName(i));
ResultSetValueHandler handlerDbSpecific = DB_SPECIFIC_HANDLERS.get(metaData.getColumnType(i)).get(metaData.getColumnTypeName(i));
if (handler == null && handlerDbSpecific == null)
{
throw new PureExecutionException("Unhandled SQL data type (java.sql.Types): " + metaData.getColumnType(i) + ", column: " + i + " " + metaData.getColumnName(i) + " " + metaData.getColumnTypeName(i));
}
handlers.add(handlerDbSpecific);
}
else
{
handlers.add(handler); //core type handlers take preference
}
handlers.add(handler);
}

return handlers;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@
<artifactId>h2</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.duckdb</groupId>
<artifactId>duckdb_jdbc</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.finos.legend.pure</groupId>
<artifactId>legend-pure-m2-store-relational-grammar</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Types;
import java.util.Collections;
import java.util.GregorianCalendar;
import java.util.HashMap;
import java.util.Map;
import java.util.Stack;
import java.util.TimeZone;

Expand Down Expand Up @@ -91,6 +94,13 @@ public class ExecuteInDb extends NativeFunction
.withKeyValue(Types.LONGVARBINARY, M3Paths.String)
.toImmutable();

private static Map<Integer, Map<String, String>> dbSpecificTypeToPureType = new HashMap<>();

static
{
dbSpecificTypeToPureType.put(Types.JAVA_OBJECT, Collections.singletonMap("HUGEINT", M3Paths.Integer));
}

private static final IConnectionManagerHandler connectionManagerHandler = IConnectionManagerHandler.CONNECTION_MANAGER_HANDLER;

private final ModelRepository repository;
Expand Down Expand Up @@ -364,6 +374,19 @@ public static void createPureResultSetFromDatabaseResultSet(CoreInstance pureRes
}
break;
}
case Types.JAVA_OBJECT:
{
if (metaData.getColumnTypeName(i).equals("HUGEINT")) // DuckDB Specific datatype
{
long num = rs.getLong(i);
if (!rs.wasNull())
{
value = repository.newIntegerCoreInstance(num);
}
break;
}
break;
}
case Types.NULL:
{
// do nothing: value is already assigned to null
Expand Down Expand Up @@ -462,10 +485,18 @@ private static String pathFromColumnType(ResultSetMetaData metaData, int columnI

if (pureType == null)
{
throw new RuntimeException("No compatible PURE type found for column type (java.sql.Types): " + sqlType + ", column: " + columnIndex +
" " + metaData.getColumnName(columnIndex) + " " + metaData.getColumnTypeName(columnIndex));
}
String pureTypeDbSpecific = dbSpecificTypeToPureType.get(sqlType).get(metaData.getColumnTypeName(columnIndex));

return pureType;
if (pureType == null && pureTypeDbSpecific == null)
{
throw new RuntimeException("No compatible PURE type found for column type (java.sql.Types): " + sqlType + ", column: " + columnIndex +
" " + metaData.getColumnName(columnIndex) + " " + metaData.getColumnTypeName(columnIndex));
}
return pureTypeDbSpecific;
}
else
{
return pureType;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.eclipse.collections.impl.factory.Lists;
import org.eclipse.collections.impl.list.mutable.SynchronizedMutableList;
import org.eclipse.collections.impl.map.mutable.ConcurrentHashMap;
import org.finos.legend.pure.m3.navigation.Instance;
import org.finos.legend.pure.m3.navigation.ProcessorSupport;
import org.finos.legend.pure.m4.coreinstance.CoreInstance;
import org.finos.legend.pure.runtime.java.shared.identity.IdentityManager;
Expand Down Expand Up @@ -69,6 +70,7 @@ public boolean accept(ConnectionManager.StatementProperties each, Statement stat

private static final String TestDatabaseConnection = "meta::external::store::relational::runtime::TestDatabaseConnection";
private static final TestDatabaseConnect testDatabaseConnect = new TestDatabaseConnect();
private static final TestDatabaseConnectDuckDB testDatabaseConnectDuckDB = new TestDatabaseConnectDuckDB();

private ConnectionManager()
{
Expand All @@ -78,7 +80,16 @@ public static ConnectionWithDataSourceInfo getConnectionWithDataSourceInfo(CoreI
{
if (processorSupport.instance_instanceOf(connectionInformation, TestDatabaseConnection))
{
return testDatabaseConnect.getConnectionWithDataSourceInfo(IdentityManager.getAuthenticatedUserId());
CoreInstance dbType = Instance.getValueForMetaPropertyToOneResolved(connectionInformation, "type", processorSupport);
String dbTypeStr = dbType.getName();
if (dbTypeStr.equals("DuckDB"))
{
return testDatabaseConnectDuckDB.getConnectionWithDataSourceInfo(IdentityManager.getAuthenticatedUserId());
}
else // default to H2
{
return testDatabaseConnect.getConnectionWithDataSourceInfo(IdentityManager.getAuthenticatedUserId());
}
}

throw new RuntimeException(connectionInformation + " is not supported for execution!!");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright 2024 Goldman Sachs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.finos.legend.pure.runtime.java.extension.store.relational.shared.connectionManager;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.SQLException;

public class DuckDBConnectionWrapper extends ConnectionWrapper
{
Logger logger = LoggerFactory.getLogger(DuckDBConnectionWrapper.class);

private int borrowedCounter;
String user;

public DuckDBConnectionWrapper(Connection connection, String user)
{
super(connection);
this.user = user;
}

public void incrementBorrowedCounter()
{
borrowedCounter++;
}

private void decrementBorrowedCounter()
{
borrowedCounter--;
}

@Override
public void close() throws SQLException
{
this.decrementBorrowedCounter();
//never actually close duck db connection and re-use same connection over
// if (borrowedCounter <= 0)
// {
// this.closeConnection();
// }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public ConnectionWithDataSourceInfo getConnectionWithDataSourceInfo(String user)
}
catch (SQLException ex)
{
throw new PureExecutionException("Unable to create TestDatabaseConnection for user: " + user, ex);
throw new PureExecutionException("Unable to create TestDatabaseConnection of type: H2 for user: " + user + ", message: " + ex.getMessage(), ex);
}

pcw.incrementBorrowedCounter();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright 2024 Goldman Sachs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.finos.legend.pure.runtime.java.extension.store.relational.shared.connectionManager;

import org.finos.legend.pure.m3.exception.PureExecutionException;
import org.finos.legend.pure.runtime.java.extension.store.relational.shared.ConnectionWithDataSourceInfo;
import org.finos.legend.pure.runtime.java.extension.store.relational.shared.DataSource;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;

public class TestDatabaseConnectDuckDB
{
private DuckDBConnectionWrapper singletonConnection; // disabled thread-level and user-level segregation,
public static final String TEST_DB_HOST_NAME = "local";
private static final String TEST_DB_NAME = "pure-duckDB-test-Db";
private static final DataSource TEST_DATA_SOURCE = new DataSource(TEST_DB_HOST_NAME, -1, TEST_DB_NAME, null);

public TestDatabaseConnectDuckDB()
{
try
{
Class.forName("org.duckdb.DuckDBDriver");
}
catch (ClassNotFoundException ignore)
{
// ignore exception about not finding the duckDB driver
}
}

public ConnectionWithDataSourceInfo getConnectionWithDataSourceInfo(String user)
{
try
{
if (this.singletonConnection == null || this.singletonConnection.isClosed())
{
Connection connection = DriverManager.getConnection(getConnectionURL());
//there is no way to configure timezone as jdbc url param or system properties
connection.createStatement().execute("SET TimeZone = 'UTC'");

this.singletonConnection = new DuckDBConnectionWrapper(connection, user);
}
}
catch (SQLException ex)
{
throw new PureExecutionException("Unable to create TestDatabaseConnection of type: DuckDB for user: " + user + ", message: " + ex.getMessage(), ex);
}
this.singletonConnection.incrementBorrowedCounter();
return new ConnectionWithDataSourceInfo(this.singletonConnection, TEST_DATA_SOURCE, this.getClass().getSimpleName());
}


private static String getConnectionURL()
{
return "jdbc:duckdb:" + TEST_DB_NAME;
}

}
Loading

0 comments on commit 0a1fdfe

Please sign in to comment.