From 889c24d9bf2cbd52caf4b12052290f4c462dc3b6 Mon Sep 17 00:00:00 2001 From: Pawel Czajka Date: Fri, 28 Jun 2024 11:17:41 +0200 Subject: [PATCH 1/5] Fix issue NU-1721 --- build.sbt | 14 ++-- .../sql/db/ignite/IgniteQueryHelper.scala | 2 +- .../sql/db/query/QueryExecutor.scala | 4 +- .../sql/db/schema/ColumnDefinition.scala | 6 +- .../sql/db/schema/TableDefinition.scala | 10 +-- .../sql/service/DatabaseQueryEnricher.scala | 41 ++++++---- .../DatabaseLookupLiteRuntimeTest.scala | 21 +++-- .../DatabaseQueryEnricherHsqlTest.scala | 68 +++++++++++++++++ .../DatabaseQueryEnricherPostgresqlTest.scala | 70 +++++++++++++++++ ...tabaseQueryEnricherQueryWithEnricher.scala | 76 +++++++++++++++++++ .../service/DatabaseQueryEnricherTest.scala | 70 ----------------- .../IgniteEnrichmentLiteRuntimeTest.scala | 14 +++- .../BasePostgresqlQueryEnricherTest.scala | 49 ++++++++++++ .../sql/utils/WithPostgresqlDB.scala | 52 +++++++++++++ docs/Changelog.md | 3 + 15 files changed, 390 insertions(+), 110 deletions(-) create mode 100644 components/sql/src/test/scala/pl/touk/nussknacker/sql/service/DatabaseQueryEnricherHsqlTest.scala create mode 100644 components/sql/src/test/scala/pl/touk/nussknacker/sql/service/DatabaseQueryEnricherPostgresqlTest.scala create mode 100644 components/sql/src/test/scala/pl/touk/nussknacker/sql/service/DatabaseQueryEnricherQueryWithEnricher.scala delete mode 100644 components/sql/src/test/scala/pl/touk/nussknacker/sql/service/DatabaseQueryEnricherTest.scala create mode 100644 components/sql/src/test/scala/pl/touk/nussknacker/sql/utils/BasePostgresqlQueryEnricherTest.scala create mode 100644 components/sql/src/test/scala/pl/touk/nussknacker/sql/utils/WithPostgresqlDB.scala diff --git a/build.sbt b/build.sbt index b9e870e79e3..2753dab5df4 100644 --- a/build.sbt +++ b/build.sbt @@ -369,6 +369,7 @@ val monocleV = "2.1.0" val jmxPrometheusJavaagentV = "0.18.0" val wireMockV = "2.35.0" val findBugsV = "3.0.2" +val igniteV = "2.10.0" // depending on scala version one of this jar lays in Flink lib dir def flinkLibScalaDeps(scalaVersion: String, configurations: Option[String] = None) = forScalaVersion( @@ -1647,12 +1648,15 @@ lazy val sqlComponents = (project in component("sql")) .settings( name := "nussknacker-sql", libraryDependencies ++= Seq( - "com.zaxxer" % "HikariCP" % hikariCpV, + "com.zaxxer" % "HikariCP" % hikariCpV, // It won't run on Java 16 as Hikari will fail while trying to load IgniteJdbcThinDriver https://issues.apache.org/jira/browse/IGNITE-14888 - "org.apache.ignite" % "ignite-core" % "2.10.0" % Provided, - "org.apache.ignite" % "ignite-indexing" % "2.10.0" % Provided, - "org.scalatest" %% "scalatest" % scalaTestV % "test", - "org.hsqldb" % "hsqldb" % hsqldbV % "test", + "org.apache.ignite" % "ignite-core" % igniteV % "test", + "org.apache.ignite" % "ignite-indexing" % igniteV % "test", + "org.postgresql" % "postgresql" % postgresV % "test", + "org.scalatest" %% "scalatest" % scalaTestV % "test", + "org.hsqldb" % "hsqldb" % hsqldbV % "test", + "com.dimafeng" %% "testcontainers-scala-scalatest" % testContainersScalaV % "test", + "com.dimafeng" %% "testcontainers-scala-postgresql" % testContainersScalaV % "test", ), ) .dependsOn( diff --git a/components/sql/src/main/scala/pl/touk/nussknacker/sql/db/ignite/IgniteQueryHelper.scala b/components/sql/src/main/scala/pl/touk/nussknacker/sql/db/ignite/IgniteQueryHelper.scala index 9ae00a0e83a..cc47ee6563f 100644 --- a/components/sql/src/main/scala/pl/touk/nussknacker/sql/db/ignite/IgniteQueryHelper.scala +++ b/components/sql/src/main/scala/pl/touk/nussknacker/sql/db/ignite/IgniteQueryHelper.scala @@ -33,7 +33,7 @@ class IgniteQueryHelper(getConnection: () => Connection) extends LazyLogging { columnName -> Typed.typedClass(Class.forName(klassName)) } - tableName -> TableDefinition(typedObjectDefinition = TypedObjectDefinition(columnTypings.toMap)) + tableName -> TableDefinition.applyList(columnTypings) } } } diff --git a/components/sql/src/main/scala/pl/touk/nussknacker/sql/db/query/QueryExecutor.scala b/components/sql/src/main/scala/pl/touk/nussknacker/sql/db/query/QueryExecutor.scala index 5dc6e1bf7fd..a4941685b49 100644 --- a/components/sql/src/main/scala/pl/touk/nussknacker/sql/db/query/QueryExecutor.scala +++ b/components/sql/src/main/scala/pl/touk/nussknacker/sql/db/query/QueryExecutor.scala @@ -14,7 +14,9 @@ trait QueryExecutor { protected def toTypedMap(tableDef: TableDefinition, resultSet: ResultSet): TypedMap = { val fields = tableDef.columnDefs.map { columnDef => - columnDef.name -> resultSet.getObject(columnDef.no) + // we could here use method resultSet.getObject(Int) and pass column number as argument + // but in case of ignite db it is not certain which column index corresponds to which column. + columnDef.name -> resultSet.getObject(columnDef.name) }.toMap TypedMap(fields) } diff --git a/components/sql/src/main/scala/pl/touk/nussknacker/sql/db/schema/ColumnDefinition.scala b/components/sql/src/main/scala/pl/touk/nussknacker/sql/db/schema/ColumnDefinition.scala index 22a246206a9..8ca94f3a268 100644 --- a/components/sql/src/main/scala/pl/touk/nussknacker/sql/db/schema/ColumnDefinition.scala +++ b/components/sql/src/main/scala/pl/touk/nussknacker/sql/db/schema/ColumnDefinition.scala @@ -8,18 +8,16 @@ object ColumnDefinition { def apply(columnNo: Int, resultMeta: ResultSetMetaData): ColumnDefinition = ColumnDefinition( - no = columnNo, name = resultMeta.getColumnName(columnNo), typing = Typed(Class.forName(resultMeta.getColumnClassName(columnNo))) ) - def apply(columnNo: Int, typing: (String, TypingResult)): ColumnDefinition = + def apply(typing: (String, TypingResult)): ColumnDefinition = ColumnDefinition( - no = columnNo, name = typing._1, typing = typing._2 ) } -final case class ColumnDefinition(no: Int, name: String, typing: TypingResult) +final case class ColumnDefinition(name: String, typing: TypingResult) diff --git a/components/sql/src/main/scala/pl/touk/nussknacker/sql/db/schema/TableDefinition.scala b/components/sql/src/main/scala/pl/touk/nussknacker/sql/db/schema/TableDefinition.scala index 34b328d1892..30bb8b67125 100644 --- a/components/sql/src/main/scala/pl/touk/nussknacker/sql/db/schema/TableDefinition.scala +++ b/components/sql/src/main/scala/pl/touk/nussknacker/sql/db/schema/TableDefinition.scala @@ -12,13 +12,13 @@ object TableDefinition { columnDefs = (1 to resultMeta.getColumnCount).map(ColumnDefinition(_, resultMeta)).toList ) - def apply(typedObjectDefinition: TypedObjectDefinition): TableDefinition = { - val columnDefinitions = typedObjectDefinition.fields.zipWithIndex - .map { case (typing, index) => - ColumnDefinition(index + 1, typing) + def applyList(fields: List[(String, TypingResult)]): TableDefinition = { + val columnDefinitions = fields + .map { typing => + ColumnDefinition(typing) } TableDefinition( - columnDefs = columnDefinitions.toList + columnDefs = columnDefinitions ) } diff --git a/components/sql/src/main/scala/pl/touk/nussknacker/sql/service/DatabaseQueryEnricher.scala b/components/sql/src/main/scala/pl/touk/nussknacker/sql/service/DatabaseQueryEnricher.scala index f51ea1a4389..17e8ddc7ced 100644 --- a/components/sql/src/main/scala/pl/touk/nussknacker/sql/service/DatabaseQueryEnricher.scala +++ b/components/sql/src/main/scala/pl/touk/nussknacker/sql/service/DatabaseQueryEnricher.scala @@ -229,29 +229,40 @@ class DatabaseQueryEnricher(val dbPoolConfig: DBPoolConfig, val dbMetaDataProvid ): ServiceInvoker = { val state = finalState.get val cacheTTLOption = extractOptional[Duration](params, CacheTTLParamName) - cacheTTLOption match { + + val query = state.query + val argsCount = state.argsCount + val tableDef = state.tableDef + val strategy = state.strategy + val outputType = state.outputType + val getConnectionCallback = () => dataSource.getConnection() + val timeMeasurementCallback = () => timeMeasurement + + val createInvoker = cacheTTLOption match { case Some(cacheTTL) => new DatabaseEnricherInvokerWithCache( - state.query, - state.argsCount, - state.tableDef, - state.strategy, + query, + argsCount, + tableDef, + strategy, queryArgumentsExtractor, cacheTTL, - state.outputType, - () => dataSource.getConnection(), - () => timeMeasurement + outputType, + getConnectionCallback, + timeMeasurementCallback, + params ) case None => new DatabaseEnricherInvoker( - state.query, - state.argsCount, - state.tableDef, - state.strategy, + query, + argsCount, + tableDef, + strategy, queryArgumentsExtractor, - state.outputType, - () => dataSource.getConnection(), - () => timeMeasurement + outputType, + getConnectionCallback, + timeMeasurementCallback, + params ) } } diff --git a/components/sql/src/test/scala/pl/touk/nussknacker/sql/service/DatabaseLookupLiteRuntimeTest.scala b/components/sql/src/test/scala/pl/touk/nussknacker/sql/service/DatabaseLookupLiteRuntimeTest.scala index 340a4730cbe..d645dc0b08c 100644 --- a/components/sql/src/test/scala/pl/touk/nussknacker/sql/service/DatabaseLookupLiteRuntimeTest.scala +++ b/components/sql/src/test/scala/pl/touk/nussknacker/sql/service/DatabaseLookupLiteRuntimeTest.scala @@ -12,6 +12,7 @@ import pl.touk.nussknacker.sql.DatabaseEnricherComponentProvider import pl.touk.nussknacker.sql.utils._ import pl.touk.nussknacker.test.ValidatedValuesDetailedMessage +import java.util import scala.jdk.CollectionConverters._ class DatabaseLookupLiteRuntimeTest @@ -63,13 +64,17 @@ class DatabaseLookupLiteRuntimeTest "Key value" -> "#input", "Cache TTL" -> "" ) - .emptySink("response", TestScenarioRunner.testResultSink, "value" -> "#output.NAME") + .emptySink("response", TestScenarioRunner.testResultSink, "value" -> "#output") - val validatedResult = testScenarioRunner.runWithData[Int, String](process, List(1)) + val validatedResult = testScenarioRunner.runWithData[Int, AnyRef](process, List(1)) val resultList = validatedResult.validValue.successes resultList should have length 1 - resultList.head shouldEqual "John" + val resultScalaMap = resultList.head.asInstanceOf[util.HashMap[String, AnyRef]].asScala.map { case (key, value) => + (key, value.toString) + } + resultScalaMap.get("ID") shouldEqual Some("1") + resultScalaMap.get("NAME") shouldEqual Some("John") } test("should enrich input with table with lower cases in column names") { @@ -85,13 +90,17 @@ class DatabaseLookupLiteRuntimeTest "Key value" -> "#input", "Cache TTL" -> "" ) - .emptySink("response", TestScenarioRunner.testResultSink, "value" -> "#output.name") + .emptySink("response", TestScenarioRunner.testResultSink, "value" -> "#output") - val validatedResult = testScenarioRunner.runWithData[Int, String](process, List(1)) + val validatedResult = testScenarioRunner.runWithData[Int, AnyRef](process, List(1)) val resultList = validatedResult.validValue.successes resultList should have length 1 - resultList.head shouldEqual "John" + val resultScalaMap = resultList.head.asInstanceOf[util.HashMap[String, AnyRef]].asScala.map { case (key, value) => + (key, value.toString) + } + resultScalaMap.get("name") shouldEqual Some("John") + resultScalaMap.get("id") shouldEqual Some("1") } } diff --git a/components/sql/src/test/scala/pl/touk/nussknacker/sql/service/DatabaseQueryEnricherHsqlTest.scala b/components/sql/src/test/scala/pl/touk/nussknacker/sql/service/DatabaseQueryEnricherHsqlTest.scala new file mode 100644 index 00000000000..72e3b230be9 --- /dev/null +++ b/components/sql/src/test/scala/pl/touk/nussknacker/sql/service/DatabaseQueryEnricherHsqlTest.scala @@ -0,0 +1,68 @@ +package pl.touk.nussknacker.sql.service + +import pl.touk.nussknacker.engine.api.typed.TypedMap +import pl.touk.nussknacker.sql.db.schema.{MetaDataProviderFactory, TableDefinition} +import pl.touk.nussknacker.sql.utils.BaseHsqlQueryEnricherTest +import org.scalatest.BeforeAndAfterEach + +class DatabaseQueryEnricherHsqlTest + extends BaseHsqlQueryEnricherTest + with DatabaseQueryEnricherQueryWithEnricher + with BeforeAndAfterEach { + + override val service = + new DatabaseQueryEnricher(hsqlDbPoolConfig, new MetaDataProviderFactory().create(hsqlDbPoolConfig)) + + override val prepareHsqlDDLs: List[String] = List( + "CREATE TABLE people (id INT, name VARCHAR(40));", + "INSERT INTO people (id, name) VALUES (1, 'John')" + ) + + override protected def afterEach(): Unit = { + val cleanupStatements = List( + "TRUNCATE TABLE people;", + "INSERT INTO people (id, name) VALUES (1, 'John')" + ) + cleanupStatements.foreach { ddlStr => + val ddlStatement = conn.prepareStatement(ddlStr) + try ddlStatement.execute() + finally ddlStatement.close() + } + } + + test("DatabaseQueryEnricher#implementation without cache") { + val result = queryWithEnricher( + "select * from people where id = ?", + Map("arg1" -> 1.asInstanceOf[AnyRef]), + conn, + service, + "List[Record{ID: Integer, NAME: String}]" + ) + result shouldBe List( + TypedMap(Map("ID" -> 1, "NAME" -> "John")) + ) + } + + test("DatabaseQueryEnricher#implementation without cache and with mixed lowercase and uppercase characters") { + val result = queryWithEnricher( + "select iD, NaMe from people where id = ?", + Map("arg1" -> 1.asInstanceOf[AnyRef]), + conn, + service, + "List[Record{ID: Integer, NAME: String}]" + ) + result shouldBe List( + TypedMap(Map("NAME" -> "John", "ID" -> 1)) + ) + } + + test("DatabaseQueryEnricher#implementation update query") { + val query = "UPDATE people SET name = 'Don' where id = ?" + updateWithEnricher(query, conn, Map("arg1" -> 1.asInstanceOf[AnyRef]), service) + + val queryResultSet = conn.prepareStatement("SELECT * FROM people WHERE id = 1").executeQuery() + queryResultSet.next() + queryResultSet.getObject("name") shouldBe "Don" + } + +} diff --git a/components/sql/src/test/scala/pl/touk/nussknacker/sql/service/DatabaseQueryEnricherPostgresqlTest.scala b/components/sql/src/test/scala/pl/touk/nussknacker/sql/service/DatabaseQueryEnricherPostgresqlTest.scala new file mode 100644 index 00000000000..f674077afef --- /dev/null +++ b/components/sql/src/test/scala/pl/touk/nussknacker/sql/service/DatabaseQueryEnricherPostgresqlTest.scala @@ -0,0 +1,70 @@ +package pl.touk.nussknacker.sql.service + +import org.scalatest.BeforeAndAfterEach +import pl.touk.nussknacker.engine.api.typed.TypedMap +import pl.touk.nussknacker.sql.db.schema.{MetaDataProviderFactory, TableDefinition} +import pl.touk.nussknacker.sql.utils.BasePostgresqlQueryEnricherTest + +class DatabaseQueryEnricherPostgresqlTest + extends BasePostgresqlQueryEnricherTest + with DatabaseQueryEnricherQueryWithEnricher + with BeforeAndAfterEach { + + override val service = + new DatabaseQueryEnricher(postgresqlDbPoolConfig, new MetaDataProviderFactory().create(postgresqlDbPoolConfig)) + + override val preparePostgresqlDDLs: List[String] = List( + "CREATE TABLE people (id INT, name VARCHAR(40));", + "INSERT INTO people (id, name) VALUES (1, 'John')" + ) + + override protected def afterEach(): Unit = { + val cleanupStatements = List( + "TRUNCATE TABLE people;", + "INSERT INTO people (id, name) VALUES (1, 'John')" + ) + cleanupStatements.foreach { ddlStr => + val ddlStatement = conn.prepareStatement(ddlStr) + try ddlStatement.execute() + finally ddlStatement.close() + } + } + + test("DatabaseQueryEnricherPostgresqlTest#implementation without cache") { + val result = queryWithEnricher( + "select * from people where id = ?", + Map("arg1" -> 1.asInstanceOf[AnyRef]), + conn, + service, + "List[Record{id: Integer, name: String}]" + ) + result shouldBe List( + TypedMap(Map("name" -> "John", "id" -> 1)) + ) + } + + test( + "DatabaseQueryEnricherPostgresqlTest#implementation without cache and with mixed lowercase and uppercase characters" + ) { + val result = queryWithEnricher( + "select iD, NaMe from people where id = ?", + Map("arg1" -> 1.asInstanceOf[AnyRef]), + conn, + service, + "List[Record{id: Integer, name: String}]" + ) + result shouldBe List( + TypedMap(Map("name" -> "John", "id" -> 1)) + ) + } + + test("DatabaseQueryEnricherPostgresqlTest#implementation update query") { + val query = "UPDATE people SET name = 'Don' where id = ?" + updateWithEnricher(query, conn, Map("arg1" -> 1.asInstanceOf[AnyRef]), service) + + val queryResultSet = conn.prepareStatement("SELECT * FROM people WHERE id = 1").executeQuery() + queryResultSet.next() + queryResultSet.getObject("name") shouldBe "Don" + } + +} diff --git a/components/sql/src/test/scala/pl/touk/nussknacker/sql/service/DatabaseQueryEnricherQueryWithEnricher.scala b/components/sql/src/test/scala/pl/touk/nussknacker/sql/service/DatabaseQueryEnricherQueryWithEnricher.scala new file mode 100644 index 00000000000..79ea2b30af7 --- /dev/null +++ b/components/sql/src/test/scala/pl/touk/nussknacker/sql/service/DatabaseQueryEnricherQueryWithEnricher.scala @@ -0,0 +1,76 @@ +package pl.touk.nussknacker.sql.service + +import pl.touk.nussknacker.engine.api.Params +import pl.touk.nussknacker.engine.api.parameter.ParameterName +import pl.touk.nussknacker.engine.api.typed.TypedMap +import pl.touk.nussknacker.engine.api.Context +import pl.touk.nussknacker.sql.db.query.{ResultSetStrategy, UpdateResultStrategy} +import pl.touk.nussknacker.sql.db.schema.TableDefinition +import pl.touk.nussknacker.sql.utils.BaseDatabaseQueryEnricherTest + +import java.sql.Connection +import scala.concurrent.Await + +trait DatabaseQueryEnricherQueryWithEnricher extends BaseDatabaseQueryEnricherTest { + + import scala.concurrent.duration._ + import scala.jdk.CollectionConverters._ + + def queryWithEnricher( + query: String, + parameters: Map[String, AnyRef], + connection: Connection, + databaseQueryEnricher: DatabaseQueryEnricher, + expectedDisplayType: String + ): List[TypedMap] = { + val st = connection.prepareStatement(query) + val meta = st.getMetaData + val state = DatabaseQueryEnricher.TransformationState( + query = query, + argsCount = 1, + tableDef = TableDefinition(meta), + strategy = ResultSetStrategy + ) + st.close() + val implementation = databaseQueryEnricher.implementation( + params = Params( + parameters.map { case (k, v) => (ParameterName(k), v) } + + (DatabaseQueryEnricher.cacheTTLParamName -> null) + ), + dependencies = Nil, + finalState = Some(state) + ) + returnType(databaseQueryEnricher, state).display shouldBe expectedDisplayType + val resultFuture = implementation.invoke(Context.withInitialId) + Await.result(resultFuture, 5 seconds).asInstanceOf[java.util.List[TypedMap]].asScala.toList + } + + def updateWithEnricher( + query: String, + connection: Connection, + parameters: Map[String, AnyRef], + databaseQueryEnricher: DatabaseQueryEnricher + ): Unit = { + val st = connection.prepareStatement(query) + st.close() + val state = DatabaseQueryEnricher.TransformationState( + query = query, + argsCount = 1, + tableDef = TableDefinition(Nil), + strategy = UpdateResultStrategy + ) + val implementation = databaseQueryEnricher.implementation( + params = Params( + parameters.map { case (k, v) => (ParameterName(k), v) } + + (DatabaseQueryEnricher.cacheTTLParamName -> null) + ), + dependencies = Nil, + finalState = Some(state) + ) + returnType(databaseQueryEnricher, state).display shouldBe "Integer" + val resultFuture = implementation.invoke(Context.withInitialId) + val result = Await.result(resultFuture, 5 seconds).asInstanceOf[Integer] + result shouldBe 1 + } + +} diff --git a/components/sql/src/test/scala/pl/touk/nussknacker/sql/service/DatabaseQueryEnricherTest.scala b/components/sql/src/test/scala/pl/touk/nussknacker/sql/service/DatabaseQueryEnricherTest.scala deleted file mode 100644 index c50d4cc715d..00000000000 --- a/components/sql/src/test/scala/pl/touk/nussknacker/sql/service/DatabaseQueryEnricherTest.scala +++ /dev/null @@ -1,70 +0,0 @@ -package pl.touk.nussknacker.sql.service - -import pl.touk.nussknacker.engine.api.typed.TypedMap -import pl.touk.nussknacker.sql.db.query.{ResultSetStrategy, UpdateResultStrategy} -import pl.touk.nussknacker.sql.db.schema.{MetaDataProviderFactory, TableDefinition} -import pl.touk.nussknacker.sql.utils.BaseHsqlQueryEnricherTest - -import scala.concurrent.Await - -class DatabaseQueryEnricherTest extends BaseHsqlQueryEnricherTest { - - import scala.jdk.CollectionConverters._ - import scala.concurrent.duration._ - - override val service = - new DatabaseQueryEnricher(hsqlDbPoolConfig, new MetaDataProviderFactory().create(hsqlDbPoolConfig)) - - override val prepareHsqlDDLs: List[String] = List( - "CREATE TABLE persons (id INT, name VARCHAR(40));", - "INSERT INTO persons (id, name) VALUES (1, 'John')" - ) - - test("DatabaseQueryEnricher#implementation without cache") { - val query = "select * from persons where id = ?" - val st = conn.prepareStatement(query) - val meta = st.getMetaData - st.close() - val state = DatabaseQueryEnricher.TransformationState( - query = query, - argsCount = 1, - tableDef = TableDefinition(meta), - strategy = ResultSetStrategy - ) - val invoker = service.implementation(Map.empty, dependencies = Nil, Some(state)) - returnType(service, state).display shouldBe "List[Record{ID: Integer, NAME: String}]" - val resultF = invoker.invokeService(Map("arg1" -> 1)) - val result = Await.result(resultF, 5 seconds).asInstanceOf[java.util.List[TypedMap]].asScala.toList - result shouldBe List( - TypedMap(Map("ID" -> 1, "NAME" -> "John")) - ) - - conn.prepareStatement("UPDATE persons SET name = 'Alex' WHERE id = 1").execute() - val resultF2 = invoker.invokeService(Map("arg1" -> 1)) - val result2 = Await.result(resultF2, 5 seconds).asInstanceOf[java.util.List[TypedMap]].asScala.toList - result2 shouldBe List( - TypedMap(Map("ID" -> 1, "NAME" -> "Alex")) - ) - } - - test("DatabaseQueryEnricher#implementation update query") { - val query = "UPDATE persons SET name = 'Don' where id = ?" - val st = conn.prepareStatement(query) - st.close() - val state = DatabaseQueryEnricher.TransformationState( - query = query, - argsCount = 1, - tableDef = TableDefinition(Nil), - strategy = UpdateResultStrategy - ) - val invoker = service.implementation(Map.empty, dependencies = Nil, Some(state)) - returnType(service, state).display shouldBe "Integer" - val resultF = invoker.invokeService(Map("arg1" -> 1)) - val result = Await.result(resultF, 5 seconds).asInstanceOf[Integer] - result shouldBe 1 - val queryResultSet = conn.prepareStatement("SELECT * FROM persons WHERE id = 1").executeQuery() - queryResultSet.next() - queryResultSet.getObject("name") shouldBe "Don" - } - -} diff --git a/components/sql/src/test/scala/pl/touk/nussknacker/sql/service/IgniteEnrichmentLiteRuntimeTest.scala b/components/sql/src/test/scala/pl/touk/nussknacker/sql/service/IgniteEnrichmentLiteRuntimeTest.scala index cc7e254d0a5..a7423f6bbfb 100644 --- a/components/sql/src/test/scala/pl/touk/nussknacker/sql/service/IgniteEnrichmentLiteRuntimeTest.scala +++ b/components/sql/src/test/scala/pl/touk/nussknacker/sql/service/IgniteEnrichmentLiteRuntimeTest.scala @@ -12,6 +12,7 @@ import pl.touk.nussknacker.sql.DatabaseEnricherComponentProvider import pl.touk.nussknacker.sql.utils.ignite.WithIgniteDB import pl.touk.nussknacker.test.ValidatedValuesDetailedMessage +import java.util import scala.jdk.CollectionConverters._ class IgniteEnrichmentLiteRuntimeTest @@ -59,13 +60,20 @@ class IgniteEnrichmentLiteRuntimeTest "Key value" -> "#input", "Cache TTL" -> "" ) - .emptySink("response", TestScenarioRunner.testResultSink, "value" -> "#output.NAME") + .emptySink("response", TestScenarioRunner.testResultSink, "value" -> "#output") - val validatedResult = testScenarioRunner.runWithData[Int, String](process, List(1)) + val validatedResult = testScenarioRunner.runWithData[Int, AnyRef](process, List(1)) val resultList = validatedResult.validValue.successes resultList should have length 1 - resultList.head shouldEqual "Warszawa" + val resultScalaMap = resultList.head.asInstanceOf[util.HashMap[String, AnyRef]].asScala.map { case (key, value) => + (key, value.toString) + } + + resultScalaMap.get("POPULATION") shouldEqual Some("1793579") + resultScalaMap.get("ID") shouldEqual Some("1") + resultScalaMap.get("COUNTRY") shouldEqual Some("Poland") + resultScalaMap.get("NAME") shouldEqual Some("Warszawa") } } diff --git a/components/sql/src/test/scala/pl/touk/nussknacker/sql/utils/BasePostgresqlQueryEnricherTest.scala b/components/sql/src/test/scala/pl/touk/nussknacker/sql/utils/BasePostgresqlQueryEnricherTest.scala new file mode 100644 index 00000000000..aa9f046f913 --- /dev/null +++ b/components/sql/src/test/scala/pl/touk/nussknacker/sql/utils/BasePostgresqlQueryEnricherTest.scala @@ -0,0 +1,49 @@ +package pl.touk.nussknacker.sql.utils + +import com.dimafeng.testcontainers.ForAllTestContainer +import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory} +import org.scalatest.BeforeAndAfterAll +import org.scalatest.time.{Second, Seconds, Span} +import pl.touk.nussknacker.engine.lite.api.runtimecontext.LiteEngineRuntimeContextPreparer +import pl.touk.nussknacker.sql.db.pool.DBPoolConfig +import pl.touk.nussknacker.test.PatientScalaFutures + +import scala.jdk.CollectionConverters._ + +trait BasePostgresqlQueryEnricherTest + extends BaseDatabaseQueryEnricherTest + with PatientScalaFutures + with BeforeAndAfterAll + with ForAllTestContainer + with WithPostgresqlDB { + + val pc: PatienceConfig = PatienceConfig(Span(20, Seconds), Span(1, Second)) + + val postgresqlDbPoolConfig: DBPoolConfig = DBPoolConfig( + driverClassName = postgresqlConfigValues("driverClassName"), + url = postgresqlConfigValues("url"), + username = postgresqlConfigValues("username"), + password = postgresqlConfigValues("password") + ) + + val dbEnricherConfig: Config = ConfigFactory + .load() + .withValue("name", ConfigValueFactory.fromAnyRef("db-enricher")) + .withValue( + "dbPool", + ConfigValueFactory.fromMap( + postgresqlConfigValues.asJava + ) + ) + + override def beforeAll(): Unit = { + service.open(LiteEngineRuntimeContextPreparer.noOp.prepare(jobData)) + super.beforeAll() + } + + override protected def afterAll(): Unit = { + service.close() + super.afterAll() + } + +} diff --git a/components/sql/src/test/scala/pl/touk/nussknacker/sql/utils/WithPostgresqlDB.scala b/components/sql/src/test/scala/pl/touk/nussknacker/sql/utils/WithPostgresqlDB.scala new file mode 100644 index 00000000000..7aea7ce6041 --- /dev/null +++ b/components/sql/src/test/scala/pl/touk/nussknacker/sql/utils/WithPostgresqlDB.scala @@ -0,0 +1,52 @@ +package pl.touk.nussknacker.sql.utils + +import com.dimafeng.testcontainers.{ForAllTestContainer, PostgreSQLContainer} +import org.hsqldb.jdbcDriver +import org.scalatest.BeforeAndAfterAll +import org.testcontainers.utility.DockerImageName +import scala.jdk.CollectionConverters._ + +import java.sql.{Connection, DriverManager} +import java.util.UUID + +trait WithPostgresqlDB { + self: BeforeAndAfterAll with ForAllTestContainer => + + var conn: Connection = _ + + override val container: PostgreSQLContainer = + PostgreSQLContainer(DockerImageName.parse("postgres:11.2")) + + { + container.container.setPortBindings(List("5432:5432").asJava) + } + + private val driverClassName = "org.postgresql.Driver" + private val username = container.username + private val password = container.password + // this url can be read as container.jdbcUrl when service is started, but it is hard to postpone this step until this service is started + private val url = "jdbc:postgresql://localhost:5432/test?loggerLevel=OFF" + + val postgresqlConfigValues: Map[String, String] = Map( + "driverClassName" -> driverClassName, + "username" -> username, + "password" -> password, + "url" -> url + ) + + def preparePostgresqlDDLs: List[String] + + override protected def beforeAll(): Unit = { + conn = DriverManager.getConnection(url, username, password) + preparePostgresqlDDLs.foreach { ddlStr => + val ddlStatement = conn.prepareStatement(ddlStr) + try ddlStatement.execute() + finally ddlStatement.close() + } + } + + override protected def afterAll(): Unit = { + Option(conn).foreach(_.close()) + } + +} diff --git a/docs/Changelog.md b/docs/Changelog.md index 5747abf9493..747696fd06d 100644 --- a/docs/Changelog.md +++ b/docs/Changelog.md @@ -4,6 +4,9 @@ 1.13.2 (7 Mar 2024) ------------------------ * [#5447](https://github.com/TouK/nussknacker/pull/5447) Fixed `java.lang.reflect.InaccessibleObjectException: Unable to make public java.lang.Object` exception by downgrade of JRE from 17 to 11 in lite runner image for scala 2.13 + +[//]: # ( // TODO_PAWEL should it be new pr number?) +* [#6264](https://github.com/TouK/nussknacker/pull/6264) Fix for DatabaseLookupEnricher mixing fields values when it is connected to ignite db 1.13.1 (7 Mar 2024) ------------------------- From 39b1e9bfbc20d6f139223f3b9f5061b4d2e52432 Mon Sep 17 00:00:00 2001 From: Pawel Czajka Date: Fri, 28 Jun 2024 11:59:40 +0200 Subject: [PATCH 2/5] compilation fixes --- .../sql/service/DatabaseQueryEnricher.scala | 4 +-- ...tabaseQueryEnricherQueryWithEnricher.scala | 29 ++++--------------- 2 files changed, 7 insertions(+), 26 deletions(-) diff --git a/components/sql/src/main/scala/pl/touk/nussknacker/sql/service/DatabaseQueryEnricher.scala b/components/sql/src/main/scala/pl/touk/nussknacker/sql/service/DatabaseQueryEnricher.scala index 17e8ddc7ced..188db799ea4 100644 --- a/components/sql/src/main/scala/pl/touk/nussknacker/sql/service/DatabaseQueryEnricher.scala +++ b/components/sql/src/main/scala/pl/touk/nussknacker/sql/service/DatabaseQueryEnricher.scala @@ -238,7 +238,7 @@ class DatabaseQueryEnricher(val dbPoolConfig: DBPoolConfig, val dbMetaDataProvid val getConnectionCallback = () => dataSource.getConnection() val timeMeasurementCallback = () => timeMeasurement - val createInvoker = cacheTTLOption match { + cacheTTLOption match { case Some(cacheTTL) => new DatabaseEnricherInvokerWithCache( query, @@ -250,7 +250,6 @@ class DatabaseQueryEnricher(val dbPoolConfig: DBPoolConfig, val dbMetaDataProvid outputType, getConnectionCallback, timeMeasurementCallback, - params ) case None => new DatabaseEnricherInvoker( @@ -262,7 +261,6 @@ class DatabaseQueryEnricher(val dbPoolConfig: DBPoolConfig, val dbMetaDataProvid outputType, getConnectionCallback, timeMeasurementCallback, - params ) } } diff --git a/components/sql/src/test/scala/pl/touk/nussknacker/sql/service/DatabaseQueryEnricherQueryWithEnricher.scala b/components/sql/src/test/scala/pl/touk/nussknacker/sql/service/DatabaseQueryEnricherQueryWithEnricher.scala index 79ea2b30af7..e63deb840ad 100644 --- a/components/sql/src/test/scala/pl/touk/nussknacker/sql/service/DatabaseQueryEnricherQueryWithEnricher.scala +++ b/components/sql/src/test/scala/pl/touk/nussknacker/sql/service/DatabaseQueryEnricherQueryWithEnricher.scala @@ -1,7 +1,5 @@ package pl.touk.nussknacker.sql.service -import pl.touk.nussknacker.engine.api.Params -import pl.touk.nussknacker.engine.api.parameter.ParameterName import pl.touk.nussknacker.engine.api.typed.TypedMap import pl.touk.nussknacker.engine.api.Context import pl.touk.nussknacker.sql.db.query.{ResultSetStrategy, UpdateResultStrategy} @@ -32,17 +30,10 @@ trait DatabaseQueryEnricherQueryWithEnricher extends BaseDatabaseQueryEnricherTe strategy = ResultSetStrategy ) st.close() - val implementation = databaseQueryEnricher.implementation( - params = Params( - parameters.map { case (k, v) => (ParameterName(k), v) } - + (DatabaseQueryEnricher.cacheTTLParamName -> null) - ), - dependencies = Nil, - finalState = Some(state) - ) + val invoker = databaseQueryEnricher.implementation(Map.empty, dependencies = Nil, Some(state)) returnType(databaseQueryEnricher, state).display shouldBe expectedDisplayType - val resultFuture = implementation.invoke(Context.withInitialId) - Await.result(resultFuture, 5 seconds).asInstanceOf[java.util.List[TypedMap]].asScala.toList + val resultF = invoker.invokeService(parameters) + Await.result(resultF, 5 seconds).asInstanceOf[java.util.List[TypedMap]].asScala.toList } def updateWithEnricher( @@ -59,18 +50,10 @@ trait DatabaseQueryEnricherQueryWithEnricher extends BaseDatabaseQueryEnricherTe tableDef = TableDefinition(Nil), strategy = UpdateResultStrategy ) - val implementation = databaseQueryEnricher.implementation( - params = Params( - parameters.map { case (k, v) => (ParameterName(k), v) } - + (DatabaseQueryEnricher.cacheTTLParamName -> null) - ), - dependencies = Nil, - finalState = Some(state) - ) + val invoker = databaseQueryEnricher.implementation(Map.empty, dependencies = Nil, Some(state)) returnType(databaseQueryEnricher, state).display shouldBe "Integer" - val resultFuture = implementation.invoke(Context.withInitialId) - val result = Await.result(resultFuture, 5 seconds).asInstanceOf[Integer] - result shouldBe 1 + val resultF = invoker.invokeService(parameters) + Await.result(resultF, 5 seconds).asInstanceOf[Integer] } } From 23fe48c3dc9b3100e49a9dbcc8694feefc8fef3e Mon Sep 17 00:00:00 2001 From: Pawel Czajka Date: Fri, 28 Jun 2024 12:06:01 +0200 Subject: [PATCH 3/5] fix pr number in changelog --- docs/Changelog.md | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/docs/Changelog.md b/docs/Changelog.md index 747696fd06d..67d74a7a74e 100644 --- a/docs/Changelog.md +++ b/docs/Changelog.md @@ -4,9 +4,7 @@ 1.13.2 (7 Mar 2024) ------------------------ * [#5447](https://github.com/TouK/nussknacker/pull/5447) Fixed `java.lang.reflect.InaccessibleObjectException: Unable to make public java.lang.Object` exception by downgrade of JRE from 17 to 11 in lite runner image for scala 2.13 - -[//]: # ( // TODO_PAWEL should it be new pr number?) -* [#6264](https://github.com/TouK/nussknacker/pull/6264) Fix for DatabaseLookupEnricher mixing fields values when it is connected to ignite db +* [#6285](https://github.com/TouK/nussknacker/pull/6264) Fix for DatabaseLookupEnricher mixing fields values when it is connected to ignite db 1.13.1 (7 Mar 2024) ------------------------- From 609963f2a980e0d8dc26ee86fe0aef7fb8f8ba2d Mon Sep 17 00:00:00 2001 From: Pawel Czajka Date: Fri, 28 Jun 2024 12:11:00 +0200 Subject: [PATCH 4/5] imports clean up --- .../pl/touk/nussknacker/sql/db/schema/TableDefinition.scala | 1 - .../sql/service/DatabaseQueryEnricherQueryWithEnricher.scala | 1 - .../scala/pl/touk/nussknacker/sql/utils/WithPostgresqlDB.scala | 2 -- 3 files changed, 4 deletions(-) diff --git a/components/sql/src/main/scala/pl/touk/nussknacker/sql/db/schema/TableDefinition.scala b/components/sql/src/main/scala/pl/touk/nussknacker/sql/db/schema/TableDefinition.scala index 30bb8b67125..58aae59950a 100644 --- a/components/sql/src/main/scala/pl/touk/nussknacker/sql/db/schema/TableDefinition.scala +++ b/components/sql/src/main/scala/pl/touk/nussknacker/sql/db/schema/TableDefinition.scala @@ -1,6 +1,5 @@ package pl.touk.nussknacker.sql.db.schema -import pl.touk.nussknacker.engine.api.typed.TypedObjectDefinition import pl.touk.nussknacker.engine.api.typed.typing.{Typed, TypedObjectTypingResult, TypingResult} import java.sql.ResultSetMetaData diff --git a/components/sql/src/test/scala/pl/touk/nussknacker/sql/service/DatabaseQueryEnricherQueryWithEnricher.scala b/components/sql/src/test/scala/pl/touk/nussknacker/sql/service/DatabaseQueryEnricherQueryWithEnricher.scala index e63deb840ad..d59f9765b00 100644 --- a/components/sql/src/test/scala/pl/touk/nussknacker/sql/service/DatabaseQueryEnricherQueryWithEnricher.scala +++ b/components/sql/src/test/scala/pl/touk/nussknacker/sql/service/DatabaseQueryEnricherQueryWithEnricher.scala @@ -1,7 +1,6 @@ package pl.touk.nussknacker.sql.service import pl.touk.nussknacker.engine.api.typed.TypedMap -import pl.touk.nussknacker.engine.api.Context import pl.touk.nussknacker.sql.db.query.{ResultSetStrategy, UpdateResultStrategy} import pl.touk.nussknacker.sql.db.schema.TableDefinition import pl.touk.nussknacker.sql.utils.BaseDatabaseQueryEnricherTest diff --git a/components/sql/src/test/scala/pl/touk/nussknacker/sql/utils/WithPostgresqlDB.scala b/components/sql/src/test/scala/pl/touk/nussknacker/sql/utils/WithPostgresqlDB.scala index 7aea7ce6041..7d4e856cbae 100644 --- a/components/sql/src/test/scala/pl/touk/nussknacker/sql/utils/WithPostgresqlDB.scala +++ b/components/sql/src/test/scala/pl/touk/nussknacker/sql/utils/WithPostgresqlDB.scala @@ -1,13 +1,11 @@ package pl.touk.nussknacker.sql.utils import com.dimafeng.testcontainers.{ForAllTestContainer, PostgreSQLContainer} -import org.hsqldb.jdbcDriver import org.scalatest.BeforeAndAfterAll import org.testcontainers.utility.DockerImageName import scala.jdk.CollectionConverters._ import java.sql.{Connection, DriverManager} -import java.util.UUID trait WithPostgresqlDB { self: BeforeAndAfterAll with ForAllTestContainer => From bb5bb71f933fec843d8f3c84b6f5c37c95c1de5c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Bigorajski?= Date: Fri, 28 Jun 2024 13:39:46 +0200 Subject: [PATCH 5/5] Fix changelong entry --- docs/Changelog.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/docs/Changelog.md b/docs/Changelog.md index 67d74a7a74e..083ae2a3d32 100644 --- a/docs/Changelog.md +++ b/docs/Changelog.md @@ -1,10 +1,12 @@ # Changelog +1.13.3 (28 June 2024) This fix was introduced in 1.16.0 version and has only been backported to this version. +------------------------ +* [#6285](https://github.com/TouK/nussknacker/pull/6285) Fix for DatabaseLookupEnricher mixing fields values when it is connected to ignite db 1.13.2 (7 Mar 2024) ------------------------ * [#5447](https://github.com/TouK/nussknacker/pull/5447) Fixed `java.lang.reflect.InaccessibleObjectException: Unable to make public java.lang.Object` exception by downgrade of JRE from 17 to 11 in lite runner image for scala 2.13 -* [#6285](https://github.com/TouK/nussknacker/pull/6264) Fix for DatabaseLookupEnricher mixing fields values when it is connected to ignite db 1.13.1 (7 Mar 2024) -------------------------