Skip to content

Commit

Permalink
Merge pull request #6285 from TouK/ignite-problem-backport
Browse files Browse the repository at this point in the history
[NU-1721] Fix for DatabaseLookupEnricher mixing fields values when it is connected to ignite db, backport of pr #6264
  • Loading branch information
pielas authored Jun 28, 2024
2 parents 05aa38e + bb5bb71 commit 6545237
Show file tree
Hide file tree
Showing 15 changed files with 367 additions and 110 deletions.
14 changes: 9 additions & 5 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,7 @@ val monocleV = "2.1.0"
val jmxPrometheusJavaagentV = "0.18.0"
val wireMockV = "2.35.0"
val findBugsV = "3.0.2"
val igniteV = "2.10.0"

// depending on scala version one of this jar lays in Flink lib dir
def flinkLibScalaDeps(scalaVersion: String, configurations: Option[String] = None) = forScalaVersion(
Expand Down Expand Up @@ -1647,12 +1648,15 @@ lazy val sqlComponents = (project in component("sql"))
.settings(
name := "nussknacker-sql",
libraryDependencies ++= Seq(
"com.zaxxer" % "HikariCP" % hikariCpV,
"com.zaxxer" % "HikariCP" % hikariCpV,
// It won't run on Java 16 as Hikari will fail while trying to load IgniteJdbcThinDriver https://issues.apache.org/jira/browse/IGNITE-14888
"org.apache.ignite" % "ignite-core" % "2.10.0" % Provided,
"org.apache.ignite" % "ignite-indexing" % "2.10.0" % Provided,
"org.scalatest" %% "scalatest" % scalaTestV % "test",
"org.hsqldb" % "hsqldb" % hsqldbV % "test",
"org.apache.ignite" % "ignite-core" % igniteV % "test",
"org.apache.ignite" % "ignite-indexing" % igniteV % "test",
"org.postgresql" % "postgresql" % postgresV % "test",
"org.scalatest" %% "scalatest" % scalaTestV % "test",
"org.hsqldb" % "hsqldb" % hsqldbV % "test",
"com.dimafeng" %% "testcontainers-scala-scalatest" % testContainersScalaV % "test",
"com.dimafeng" %% "testcontainers-scala-postgresql" % testContainersScalaV % "test",
),
)
.dependsOn(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class IgniteQueryHelper(getConnection: () => Connection) extends LazyLogging {
columnName -> Typed.typedClass(Class.forName(klassName))
}

tableName -> TableDefinition(typedObjectDefinition = TypedObjectDefinition(columnTypings.toMap))
tableName -> TableDefinition.applyList(columnTypings)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ trait QueryExecutor {

protected def toTypedMap(tableDef: TableDefinition, resultSet: ResultSet): TypedMap = {
val fields = tableDef.columnDefs.map { columnDef =>
columnDef.name -> resultSet.getObject(columnDef.no)
// we could here use method resultSet.getObject(Int) and pass column number as argument
// but in case of ignite db it is not certain which column index corresponds to which column.
columnDef.name -> resultSet.getObject(columnDef.name)
}.toMap
TypedMap(fields)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,16 @@ object ColumnDefinition {

def apply(columnNo: Int, resultMeta: ResultSetMetaData): ColumnDefinition =
ColumnDefinition(
no = columnNo,
name = resultMeta.getColumnName(columnNo),
typing = Typed(Class.forName(resultMeta.getColumnClassName(columnNo)))
)

def apply(columnNo: Int, typing: (String, TypingResult)): ColumnDefinition =
def apply(typing: (String, TypingResult)): ColumnDefinition =
ColumnDefinition(
no = columnNo,
name = typing._1,
typing = typing._2
)

}

final case class ColumnDefinition(no: Int, name: String, typing: TypingResult)
final case class ColumnDefinition(name: String, typing: TypingResult)
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package pl.touk.nussknacker.sql.db.schema

import pl.touk.nussknacker.engine.api.typed.TypedObjectDefinition
import pl.touk.nussknacker.engine.api.typed.typing.{Typed, TypedObjectTypingResult, TypingResult}

import java.sql.ResultSetMetaData
Expand All @@ -12,13 +11,13 @@ object TableDefinition {
columnDefs = (1 to resultMeta.getColumnCount).map(ColumnDefinition(_, resultMeta)).toList
)

def apply(typedObjectDefinition: TypedObjectDefinition): TableDefinition = {
val columnDefinitions = typedObjectDefinition.fields.zipWithIndex
.map { case (typing, index) =>
ColumnDefinition(index + 1, typing)
def applyList(fields: List[(String, TypingResult)]): TableDefinition = {
val columnDefinitions = fields
.map { typing =>
ColumnDefinition(typing)
}
TableDefinition(
columnDefs = columnDefinitions.toList
columnDefs = columnDefinitions
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,29 +229,38 @@ class DatabaseQueryEnricher(val dbPoolConfig: DBPoolConfig, val dbMetaDataProvid
): ServiceInvoker = {
val state = finalState.get
val cacheTTLOption = extractOptional[Duration](params, CacheTTLParamName)

val query = state.query
val argsCount = state.argsCount
val tableDef = state.tableDef
val strategy = state.strategy
val outputType = state.outputType
val getConnectionCallback = () => dataSource.getConnection()
val timeMeasurementCallback = () => timeMeasurement

cacheTTLOption match {
case Some(cacheTTL) =>
new DatabaseEnricherInvokerWithCache(
state.query,
state.argsCount,
state.tableDef,
state.strategy,
query,
argsCount,
tableDef,
strategy,
queryArgumentsExtractor,
cacheTTL,
state.outputType,
() => dataSource.getConnection(),
() => timeMeasurement
outputType,
getConnectionCallback,
timeMeasurementCallback,
)
case None =>
new DatabaseEnricherInvoker(
state.query,
state.argsCount,
state.tableDef,
state.strategy,
query,
argsCount,
tableDef,
strategy,
queryArgumentsExtractor,
state.outputType,
() => dataSource.getConnection(),
() => timeMeasurement
outputType,
getConnectionCallback,
timeMeasurementCallback,
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import pl.touk.nussknacker.sql.DatabaseEnricherComponentProvider
import pl.touk.nussknacker.sql.utils._
import pl.touk.nussknacker.test.ValidatedValuesDetailedMessage

import java.util
import scala.jdk.CollectionConverters._

class DatabaseLookupLiteRuntimeTest
Expand Down Expand Up @@ -63,13 +64,17 @@ class DatabaseLookupLiteRuntimeTest
"Key value" -> "#input",
"Cache TTL" -> ""
)
.emptySink("response", TestScenarioRunner.testResultSink, "value" -> "#output.NAME")
.emptySink("response", TestScenarioRunner.testResultSink, "value" -> "#output")

val validatedResult = testScenarioRunner.runWithData[Int, String](process, List(1))
val validatedResult = testScenarioRunner.runWithData[Int, AnyRef](process, List(1))

val resultList = validatedResult.validValue.successes
resultList should have length 1
resultList.head shouldEqual "John"
val resultScalaMap = resultList.head.asInstanceOf[util.HashMap[String, AnyRef]].asScala.map { case (key, value) =>
(key, value.toString)
}
resultScalaMap.get("ID") shouldEqual Some("1")
resultScalaMap.get("NAME") shouldEqual Some("John")
}

test("should enrich input with table with lower cases in column names") {
Expand All @@ -85,13 +90,17 @@ class DatabaseLookupLiteRuntimeTest
"Key value" -> "#input",
"Cache TTL" -> ""
)
.emptySink("response", TestScenarioRunner.testResultSink, "value" -> "#output.name")
.emptySink("response", TestScenarioRunner.testResultSink, "value" -> "#output")

val validatedResult = testScenarioRunner.runWithData[Int, String](process, List(1))
val validatedResult = testScenarioRunner.runWithData[Int, AnyRef](process, List(1))

val resultList = validatedResult.validValue.successes
resultList should have length 1
resultList.head shouldEqual "John"
val resultScalaMap = resultList.head.asInstanceOf[util.HashMap[String, AnyRef]].asScala.map { case (key, value) =>
(key, value.toString)
}
resultScalaMap.get("name") shouldEqual Some("John")
resultScalaMap.get("id") shouldEqual Some("1")
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package pl.touk.nussknacker.sql.service

import pl.touk.nussknacker.engine.api.typed.TypedMap
import pl.touk.nussknacker.sql.db.schema.{MetaDataProviderFactory, TableDefinition}
import pl.touk.nussknacker.sql.utils.BaseHsqlQueryEnricherTest
import org.scalatest.BeforeAndAfterEach

class DatabaseQueryEnricherHsqlTest
extends BaseHsqlQueryEnricherTest
with DatabaseQueryEnricherQueryWithEnricher
with BeforeAndAfterEach {

override val service =
new DatabaseQueryEnricher(hsqlDbPoolConfig, new MetaDataProviderFactory().create(hsqlDbPoolConfig))

override val prepareHsqlDDLs: List[String] = List(
"CREATE TABLE people (id INT, name VARCHAR(40));",
"INSERT INTO people (id, name) VALUES (1, 'John')"
)

override protected def afterEach(): Unit = {
val cleanupStatements = List(
"TRUNCATE TABLE people;",
"INSERT INTO people (id, name) VALUES (1, 'John')"
)
cleanupStatements.foreach { ddlStr =>
val ddlStatement = conn.prepareStatement(ddlStr)
try ddlStatement.execute()
finally ddlStatement.close()
}
}

test("DatabaseQueryEnricher#implementation without cache") {
val result = queryWithEnricher(
"select * from people where id = ?",
Map("arg1" -> 1.asInstanceOf[AnyRef]),
conn,
service,
"List[Record{ID: Integer, NAME: String}]"
)
result shouldBe List(
TypedMap(Map("ID" -> 1, "NAME" -> "John"))
)
}

test("DatabaseQueryEnricher#implementation without cache and with mixed lowercase and uppercase characters") {
val result = queryWithEnricher(
"select iD, NaMe from people where id = ?",
Map("arg1" -> 1.asInstanceOf[AnyRef]),
conn,
service,
"List[Record{ID: Integer, NAME: String}]"
)
result shouldBe List(
TypedMap(Map("NAME" -> "John", "ID" -> 1))
)
}

test("DatabaseQueryEnricher#implementation update query") {
val query = "UPDATE people SET name = 'Don' where id = ?"
updateWithEnricher(query, conn, Map("arg1" -> 1.asInstanceOf[AnyRef]), service)

val queryResultSet = conn.prepareStatement("SELECT * FROM people WHERE id = 1").executeQuery()
queryResultSet.next()
queryResultSet.getObject("name") shouldBe "Don"
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package pl.touk.nussknacker.sql.service

import org.scalatest.BeforeAndAfterEach
import pl.touk.nussknacker.engine.api.typed.TypedMap
import pl.touk.nussknacker.sql.db.schema.{MetaDataProviderFactory, TableDefinition}
import pl.touk.nussknacker.sql.utils.BasePostgresqlQueryEnricherTest

class DatabaseQueryEnricherPostgresqlTest
extends BasePostgresqlQueryEnricherTest
with DatabaseQueryEnricherQueryWithEnricher
with BeforeAndAfterEach {

override val service =
new DatabaseQueryEnricher(postgresqlDbPoolConfig, new MetaDataProviderFactory().create(postgresqlDbPoolConfig))

override val preparePostgresqlDDLs: List[String] = List(
"CREATE TABLE people (id INT, name VARCHAR(40));",
"INSERT INTO people (id, name) VALUES (1, 'John')"
)

override protected def afterEach(): Unit = {
val cleanupStatements = List(
"TRUNCATE TABLE people;",
"INSERT INTO people (id, name) VALUES (1, 'John')"
)
cleanupStatements.foreach { ddlStr =>
val ddlStatement = conn.prepareStatement(ddlStr)
try ddlStatement.execute()
finally ddlStatement.close()
}
}

test("DatabaseQueryEnricherPostgresqlTest#implementation without cache") {
val result = queryWithEnricher(
"select * from people where id = ?",
Map("arg1" -> 1.asInstanceOf[AnyRef]),
conn,
service,
"List[Record{id: Integer, name: String}]"
)
result shouldBe List(
TypedMap(Map("name" -> "John", "id" -> 1))
)
}

test(
"DatabaseQueryEnricherPostgresqlTest#implementation without cache and with mixed lowercase and uppercase characters"
) {
val result = queryWithEnricher(
"select iD, NaMe from people where id = ?",
Map("arg1" -> 1.asInstanceOf[AnyRef]),
conn,
service,
"List[Record{id: Integer, name: String}]"
)
result shouldBe List(
TypedMap(Map("name" -> "John", "id" -> 1))
)
}

test("DatabaseQueryEnricherPostgresqlTest#implementation update query") {
val query = "UPDATE people SET name = 'Don' where id = ?"
updateWithEnricher(query, conn, Map("arg1" -> 1.asInstanceOf[AnyRef]), service)

val queryResultSet = conn.prepareStatement("SELECT * FROM people WHERE id = 1").executeQuery()
queryResultSet.next()
queryResultSet.getObject("name") shouldBe "Don"
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package pl.touk.nussknacker.sql.service

import pl.touk.nussknacker.engine.api.typed.TypedMap
import pl.touk.nussknacker.sql.db.query.{ResultSetStrategy, UpdateResultStrategy}
import pl.touk.nussknacker.sql.db.schema.TableDefinition
import pl.touk.nussknacker.sql.utils.BaseDatabaseQueryEnricherTest

import java.sql.Connection
import scala.concurrent.Await

trait DatabaseQueryEnricherQueryWithEnricher extends BaseDatabaseQueryEnricherTest {

import scala.concurrent.duration._
import scala.jdk.CollectionConverters._

def queryWithEnricher(
query: String,
parameters: Map[String, AnyRef],
connection: Connection,
databaseQueryEnricher: DatabaseQueryEnricher,
expectedDisplayType: String
): List[TypedMap] = {
val st = connection.prepareStatement(query)
val meta = st.getMetaData
val state = DatabaseQueryEnricher.TransformationState(
query = query,
argsCount = 1,
tableDef = TableDefinition(meta),
strategy = ResultSetStrategy
)
st.close()
val invoker = databaseQueryEnricher.implementation(Map.empty, dependencies = Nil, Some(state))
returnType(databaseQueryEnricher, state).display shouldBe expectedDisplayType
val resultF = invoker.invokeService(parameters)
Await.result(resultF, 5 seconds).asInstanceOf[java.util.List[TypedMap]].asScala.toList
}

def updateWithEnricher(
query: String,
connection: Connection,
parameters: Map[String, AnyRef],
databaseQueryEnricher: DatabaseQueryEnricher
): Unit = {
val st = connection.prepareStatement(query)
st.close()
val state = DatabaseQueryEnricher.TransformationState(
query = query,
argsCount = 1,
tableDef = TableDefinition(Nil),
strategy = UpdateResultStrategy
)
val invoker = databaseQueryEnricher.implementation(Map.empty, dependencies = Nil, Some(state))
returnType(databaseQueryEnricher, state).display shouldBe "Integer"
val resultF = invoker.invokeService(parameters)
Await.result(resultF, 5 seconds).asInstanceOf[Integer]
}

}
Loading

0 comments on commit 6545237

Please sign in to comment.