diff --git a/build.sbt b/build.sbt index b9e870e79e3..2753dab5df4 100644 --- a/build.sbt +++ b/build.sbt @@ -369,6 +369,7 @@ val monocleV = "2.1.0" val jmxPrometheusJavaagentV = "0.18.0" val wireMockV = "2.35.0" val findBugsV = "3.0.2" +val igniteV = "2.10.0" // depending on scala version one of this jar lays in Flink lib dir def flinkLibScalaDeps(scalaVersion: String, configurations: Option[String] = None) = forScalaVersion( @@ -1647,12 +1648,15 @@ lazy val sqlComponents = (project in component("sql")) .settings( name := "nussknacker-sql", libraryDependencies ++= Seq( - "com.zaxxer" % "HikariCP" % hikariCpV, + "com.zaxxer" % "HikariCP" % hikariCpV, // It won't run on Java 16 as Hikari will fail while trying to load IgniteJdbcThinDriver https://issues.apache.org/jira/browse/IGNITE-14888 - "org.apache.ignite" % "ignite-core" % "2.10.0" % Provided, - "org.apache.ignite" % "ignite-indexing" % "2.10.0" % Provided, - "org.scalatest" %% "scalatest" % scalaTestV % "test", - "org.hsqldb" % "hsqldb" % hsqldbV % "test", + "org.apache.ignite" % "ignite-core" % igniteV % "test", + "org.apache.ignite" % "ignite-indexing" % igniteV % "test", + "org.postgresql" % "postgresql" % postgresV % "test", + "org.scalatest" %% "scalatest" % scalaTestV % "test", + "org.hsqldb" % "hsqldb" % hsqldbV % "test", + "com.dimafeng" %% "testcontainers-scala-scalatest" % testContainersScalaV % "test", + "com.dimafeng" %% "testcontainers-scala-postgresql" % testContainersScalaV % "test", ), ) .dependsOn( diff --git a/components/sql/src/main/scala/pl/touk/nussknacker/sql/db/ignite/IgniteQueryHelper.scala b/components/sql/src/main/scala/pl/touk/nussknacker/sql/db/ignite/IgniteQueryHelper.scala index 9ae00a0e83a..cc47ee6563f 100644 --- a/components/sql/src/main/scala/pl/touk/nussknacker/sql/db/ignite/IgniteQueryHelper.scala +++ b/components/sql/src/main/scala/pl/touk/nussknacker/sql/db/ignite/IgniteQueryHelper.scala @@ -33,7 +33,7 @@ class IgniteQueryHelper(getConnection: () => Connection) extends LazyLogging { columnName -> Typed.typedClass(Class.forName(klassName)) } - tableName -> TableDefinition(typedObjectDefinition = TypedObjectDefinition(columnTypings.toMap)) + tableName -> TableDefinition.applyList(columnTypings) } } } diff --git a/components/sql/src/main/scala/pl/touk/nussknacker/sql/db/query/QueryExecutor.scala b/components/sql/src/main/scala/pl/touk/nussknacker/sql/db/query/QueryExecutor.scala index 5dc6e1bf7fd..a4941685b49 100644 --- a/components/sql/src/main/scala/pl/touk/nussknacker/sql/db/query/QueryExecutor.scala +++ b/components/sql/src/main/scala/pl/touk/nussknacker/sql/db/query/QueryExecutor.scala @@ -14,7 +14,9 @@ trait QueryExecutor { protected def toTypedMap(tableDef: TableDefinition, resultSet: ResultSet): TypedMap = { val fields = tableDef.columnDefs.map { columnDef => - columnDef.name -> resultSet.getObject(columnDef.no) + // we could here use method resultSet.getObject(Int) and pass column number as argument + // but in case of ignite db it is not certain which column index corresponds to which column. + columnDef.name -> resultSet.getObject(columnDef.name) }.toMap TypedMap(fields) } diff --git a/components/sql/src/main/scala/pl/touk/nussknacker/sql/db/schema/ColumnDefinition.scala b/components/sql/src/main/scala/pl/touk/nussknacker/sql/db/schema/ColumnDefinition.scala index 22a246206a9..8ca94f3a268 100644 --- a/components/sql/src/main/scala/pl/touk/nussknacker/sql/db/schema/ColumnDefinition.scala +++ b/components/sql/src/main/scala/pl/touk/nussknacker/sql/db/schema/ColumnDefinition.scala @@ -8,18 +8,16 @@ object ColumnDefinition { def apply(columnNo: Int, resultMeta: ResultSetMetaData): ColumnDefinition = ColumnDefinition( - no = columnNo, name = resultMeta.getColumnName(columnNo), typing = Typed(Class.forName(resultMeta.getColumnClassName(columnNo))) ) - def apply(columnNo: Int, typing: (String, TypingResult)): ColumnDefinition = + def apply(typing: (String, TypingResult)): ColumnDefinition = ColumnDefinition( - no = columnNo, name = typing._1, typing = typing._2 ) } -final case class ColumnDefinition(no: Int, name: String, typing: TypingResult) +final case class ColumnDefinition(name: String, typing: TypingResult) diff --git a/components/sql/src/main/scala/pl/touk/nussknacker/sql/db/schema/TableDefinition.scala b/components/sql/src/main/scala/pl/touk/nussknacker/sql/db/schema/TableDefinition.scala index 34b328d1892..58aae59950a 100644 --- a/components/sql/src/main/scala/pl/touk/nussknacker/sql/db/schema/TableDefinition.scala +++ b/components/sql/src/main/scala/pl/touk/nussknacker/sql/db/schema/TableDefinition.scala @@ -1,6 +1,5 @@ package pl.touk.nussknacker.sql.db.schema -import pl.touk.nussknacker.engine.api.typed.TypedObjectDefinition import pl.touk.nussknacker.engine.api.typed.typing.{Typed, TypedObjectTypingResult, TypingResult} import java.sql.ResultSetMetaData @@ -12,13 +11,13 @@ object TableDefinition { columnDefs = (1 to resultMeta.getColumnCount).map(ColumnDefinition(_, resultMeta)).toList ) - def apply(typedObjectDefinition: TypedObjectDefinition): TableDefinition = { - val columnDefinitions = typedObjectDefinition.fields.zipWithIndex - .map { case (typing, index) => - ColumnDefinition(index + 1, typing) + def applyList(fields: List[(String, TypingResult)]): TableDefinition = { + val columnDefinitions = fields + .map { typing => + ColumnDefinition(typing) } TableDefinition( - columnDefs = columnDefinitions.toList + columnDefs = columnDefinitions ) } diff --git a/components/sql/src/main/scala/pl/touk/nussknacker/sql/service/DatabaseQueryEnricher.scala b/components/sql/src/main/scala/pl/touk/nussknacker/sql/service/DatabaseQueryEnricher.scala index f51ea1a4389..188db799ea4 100644 --- a/components/sql/src/main/scala/pl/touk/nussknacker/sql/service/DatabaseQueryEnricher.scala +++ b/components/sql/src/main/scala/pl/touk/nussknacker/sql/service/DatabaseQueryEnricher.scala @@ -229,29 +229,38 @@ class DatabaseQueryEnricher(val dbPoolConfig: DBPoolConfig, val dbMetaDataProvid ): ServiceInvoker = { val state = finalState.get val cacheTTLOption = extractOptional[Duration](params, CacheTTLParamName) + + val query = state.query + val argsCount = state.argsCount + val tableDef = state.tableDef + val strategy = state.strategy + val outputType = state.outputType + val getConnectionCallback = () => dataSource.getConnection() + val timeMeasurementCallback = () => timeMeasurement + cacheTTLOption match { case Some(cacheTTL) => new DatabaseEnricherInvokerWithCache( - state.query, - state.argsCount, - state.tableDef, - state.strategy, + query, + argsCount, + tableDef, + strategy, queryArgumentsExtractor, cacheTTL, - state.outputType, - () => dataSource.getConnection(), - () => timeMeasurement + outputType, + getConnectionCallback, + timeMeasurementCallback, ) case None => new DatabaseEnricherInvoker( - state.query, - state.argsCount, - state.tableDef, - state.strategy, + query, + argsCount, + tableDef, + strategy, queryArgumentsExtractor, - state.outputType, - () => dataSource.getConnection(), - () => timeMeasurement + outputType, + getConnectionCallback, + timeMeasurementCallback, ) } } diff --git a/components/sql/src/test/scala/pl/touk/nussknacker/sql/service/DatabaseLookupLiteRuntimeTest.scala b/components/sql/src/test/scala/pl/touk/nussknacker/sql/service/DatabaseLookupLiteRuntimeTest.scala index 340a4730cbe..d645dc0b08c 100644 --- a/components/sql/src/test/scala/pl/touk/nussknacker/sql/service/DatabaseLookupLiteRuntimeTest.scala +++ b/components/sql/src/test/scala/pl/touk/nussknacker/sql/service/DatabaseLookupLiteRuntimeTest.scala @@ -12,6 +12,7 @@ import pl.touk.nussknacker.sql.DatabaseEnricherComponentProvider import pl.touk.nussknacker.sql.utils._ import pl.touk.nussknacker.test.ValidatedValuesDetailedMessage +import java.util import scala.jdk.CollectionConverters._ class DatabaseLookupLiteRuntimeTest @@ -63,13 +64,17 @@ class DatabaseLookupLiteRuntimeTest "Key value" -> "#input", "Cache TTL" -> "" ) - .emptySink("response", TestScenarioRunner.testResultSink, "value" -> "#output.NAME") + .emptySink("response", TestScenarioRunner.testResultSink, "value" -> "#output") - val validatedResult = testScenarioRunner.runWithData[Int, String](process, List(1)) + val validatedResult = testScenarioRunner.runWithData[Int, AnyRef](process, List(1)) val resultList = validatedResult.validValue.successes resultList should have length 1 - resultList.head shouldEqual "John" + val resultScalaMap = resultList.head.asInstanceOf[util.HashMap[String, AnyRef]].asScala.map { case (key, value) => + (key, value.toString) + } + resultScalaMap.get("ID") shouldEqual Some("1") + resultScalaMap.get("NAME") shouldEqual Some("John") } test("should enrich input with table with lower cases in column names") { @@ -85,13 +90,17 @@ class DatabaseLookupLiteRuntimeTest "Key value" -> "#input", "Cache TTL" -> "" ) - .emptySink("response", TestScenarioRunner.testResultSink, "value" -> "#output.name") + .emptySink("response", TestScenarioRunner.testResultSink, "value" -> "#output") - val validatedResult = testScenarioRunner.runWithData[Int, String](process, List(1)) + val validatedResult = testScenarioRunner.runWithData[Int, AnyRef](process, List(1)) val resultList = validatedResult.validValue.successes resultList should have length 1 - resultList.head shouldEqual "John" + val resultScalaMap = resultList.head.asInstanceOf[util.HashMap[String, AnyRef]].asScala.map { case (key, value) => + (key, value.toString) + } + resultScalaMap.get("name") shouldEqual Some("John") + resultScalaMap.get("id") shouldEqual Some("1") } } diff --git a/components/sql/src/test/scala/pl/touk/nussknacker/sql/service/DatabaseQueryEnricherHsqlTest.scala b/components/sql/src/test/scala/pl/touk/nussknacker/sql/service/DatabaseQueryEnricherHsqlTest.scala new file mode 100644 index 00000000000..72e3b230be9 --- /dev/null +++ b/components/sql/src/test/scala/pl/touk/nussknacker/sql/service/DatabaseQueryEnricherHsqlTest.scala @@ -0,0 +1,68 @@ +package pl.touk.nussknacker.sql.service + +import pl.touk.nussknacker.engine.api.typed.TypedMap +import pl.touk.nussknacker.sql.db.schema.{MetaDataProviderFactory, TableDefinition} +import pl.touk.nussknacker.sql.utils.BaseHsqlQueryEnricherTest +import org.scalatest.BeforeAndAfterEach + +class DatabaseQueryEnricherHsqlTest + extends BaseHsqlQueryEnricherTest + with DatabaseQueryEnricherQueryWithEnricher + with BeforeAndAfterEach { + + override val service = + new DatabaseQueryEnricher(hsqlDbPoolConfig, new MetaDataProviderFactory().create(hsqlDbPoolConfig)) + + override val prepareHsqlDDLs: List[String] = List( + "CREATE TABLE people (id INT, name VARCHAR(40));", + "INSERT INTO people (id, name) VALUES (1, 'John')" + ) + + override protected def afterEach(): Unit = { + val cleanupStatements = List( + "TRUNCATE TABLE people;", + "INSERT INTO people (id, name) VALUES (1, 'John')" + ) + cleanupStatements.foreach { ddlStr => + val ddlStatement = conn.prepareStatement(ddlStr) + try ddlStatement.execute() + finally ddlStatement.close() + } + } + + test("DatabaseQueryEnricher#implementation without cache") { + val result = queryWithEnricher( + "select * from people where id = ?", + Map("arg1" -> 1.asInstanceOf[AnyRef]), + conn, + service, + "List[Record{ID: Integer, NAME: String}]" + ) + result shouldBe List( + TypedMap(Map("ID" -> 1, "NAME" -> "John")) + ) + } + + test("DatabaseQueryEnricher#implementation without cache and with mixed lowercase and uppercase characters") { + val result = queryWithEnricher( + "select iD, NaMe from people where id = ?", + Map("arg1" -> 1.asInstanceOf[AnyRef]), + conn, + service, + "List[Record{ID: Integer, NAME: String}]" + ) + result shouldBe List( + TypedMap(Map("NAME" -> "John", "ID" -> 1)) + ) + } + + test("DatabaseQueryEnricher#implementation update query") { + val query = "UPDATE people SET name = 'Don' where id = ?" + updateWithEnricher(query, conn, Map("arg1" -> 1.asInstanceOf[AnyRef]), service) + + val queryResultSet = conn.prepareStatement("SELECT * FROM people WHERE id = 1").executeQuery() + queryResultSet.next() + queryResultSet.getObject("name") shouldBe "Don" + } + +} diff --git a/components/sql/src/test/scala/pl/touk/nussknacker/sql/service/DatabaseQueryEnricherPostgresqlTest.scala b/components/sql/src/test/scala/pl/touk/nussknacker/sql/service/DatabaseQueryEnricherPostgresqlTest.scala new file mode 100644 index 00000000000..f674077afef --- /dev/null +++ b/components/sql/src/test/scala/pl/touk/nussknacker/sql/service/DatabaseQueryEnricherPostgresqlTest.scala @@ -0,0 +1,70 @@ +package pl.touk.nussknacker.sql.service + +import org.scalatest.BeforeAndAfterEach +import pl.touk.nussknacker.engine.api.typed.TypedMap +import pl.touk.nussknacker.sql.db.schema.{MetaDataProviderFactory, TableDefinition} +import pl.touk.nussknacker.sql.utils.BasePostgresqlQueryEnricherTest + +class DatabaseQueryEnricherPostgresqlTest + extends BasePostgresqlQueryEnricherTest + with DatabaseQueryEnricherQueryWithEnricher + with BeforeAndAfterEach { + + override val service = + new DatabaseQueryEnricher(postgresqlDbPoolConfig, new MetaDataProviderFactory().create(postgresqlDbPoolConfig)) + + override val preparePostgresqlDDLs: List[String] = List( + "CREATE TABLE people (id INT, name VARCHAR(40));", + "INSERT INTO people (id, name) VALUES (1, 'John')" + ) + + override protected def afterEach(): Unit = { + val cleanupStatements = List( + "TRUNCATE TABLE people;", + "INSERT INTO people (id, name) VALUES (1, 'John')" + ) + cleanupStatements.foreach { ddlStr => + val ddlStatement = conn.prepareStatement(ddlStr) + try ddlStatement.execute() + finally ddlStatement.close() + } + } + + test("DatabaseQueryEnricherPostgresqlTest#implementation without cache") { + val result = queryWithEnricher( + "select * from people where id = ?", + Map("arg1" -> 1.asInstanceOf[AnyRef]), + conn, + service, + "List[Record{id: Integer, name: String}]" + ) + result shouldBe List( + TypedMap(Map("name" -> "John", "id" -> 1)) + ) + } + + test( + "DatabaseQueryEnricherPostgresqlTest#implementation without cache and with mixed lowercase and uppercase characters" + ) { + val result = queryWithEnricher( + "select iD, NaMe from people where id = ?", + Map("arg1" -> 1.asInstanceOf[AnyRef]), + conn, + service, + "List[Record{id: Integer, name: String}]" + ) + result shouldBe List( + TypedMap(Map("name" -> "John", "id" -> 1)) + ) + } + + test("DatabaseQueryEnricherPostgresqlTest#implementation update query") { + val query = "UPDATE people SET name = 'Don' where id = ?" + updateWithEnricher(query, conn, Map("arg1" -> 1.asInstanceOf[AnyRef]), service) + + val queryResultSet = conn.prepareStatement("SELECT * FROM people WHERE id = 1").executeQuery() + queryResultSet.next() + queryResultSet.getObject("name") shouldBe "Don" + } + +} diff --git a/components/sql/src/test/scala/pl/touk/nussknacker/sql/service/DatabaseQueryEnricherQueryWithEnricher.scala b/components/sql/src/test/scala/pl/touk/nussknacker/sql/service/DatabaseQueryEnricherQueryWithEnricher.scala new file mode 100644 index 00000000000..d59f9765b00 --- /dev/null +++ b/components/sql/src/test/scala/pl/touk/nussknacker/sql/service/DatabaseQueryEnricherQueryWithEnricher.scala @@ -0,0 +1,58 @@ +package pl.touk.nussknacker.sql.service + +import pl.touk.nussknacker.engine.api.typed.TypedMap +import pl.touk.nussknacker.sql.db.query.{ResultSetStrategy, UpdateResultStrategy} +import pl.touk.nussknacker.sql.db.schema.TableDefinition +import pl.touk.nussknacker.sql.utils.BaseDatabaseQueryEnricherTest + +import java.sql.Connection +import scala.concurrent.Await + +trait DatabaseQueryEnricherQueryWithEnricher extends BaseDatabaseQueryEnricherTest { + + import scala.concurrent.duration._ + import scala.jdk.CollectionConverters._ + + def queryWithEnricher( + query: String, + parameters: Map[String, AnyRef], + connection: Connection, + databaseQueryEnricher: DatabaseQueryEnricher, + expectedDisplayType: String + ): List[TypedMap] = { + val st = connection.prepareStatement(query) + val meta = st.getMetaData + val state = DatabaseQueryEnricher.TransformationState( + query = query, + argsCount = 1, + tableDef = TableDefinition(meta), + strategy = ResultSetStrategy + ) + st.close() + val invoker = databaseQueryEnricher.implementation(Map.empty, dependencies = Nil, Some(state)) + returnType(databaseQueryEnricher, state).display shouldBe expectedDisplayType + val resultF = invoker.invokeService(parameters) + Await.result(resultF, 5 seconds).asInstanceOf[java.util.List[TypedMap]].asScala.toList + } + + def updateWithEnricher( + query: String, + connection: Connection, + parameters: Map[String, AnyRef], + databaseQueryEnricher: DatabaseQueryEnricher + ): Unit = { + val st = connection.prepareStatement(query) + st.close() + val state = DatabaseQueryEnricher.TransformationState( + query = query, + argsCount = 1, + tableDef = TableDefinition(Nil), + strategy = UpdateResultStrategy + ) + val invoker = databaseQueryEnricher.implementation(Map.empty, dependencies = Nil, Some(state)) + returnType(databaseQueryEnricher, state).display shouldBe "Integer" + val resultF = invoker.invokeService(parameters) + Await.result(resultF, 5 seconds).asInstanceOf[Integer] + } + +} diff --git a/components/sql/src/test/scala/pl/touk/nussknacker/sql/service/DatabaseQueryEnricherTest.scala b/components/sql/src/test/scala/pl/touk/nussknacker/sql/service/DatabaseQueryEnricherTest.scala deleted file mode 100644 index c50d4cc715d..00000000000 --- a/components/sql/src/test/scala/pl/touk/nussknacker/sql/service/DatabaseQueryEnricherTest.scala +++ /dev/null @@ -1,70 +0,0 @@ -package pl.touk.nussknacker.sql.service - -import pl.touk.nussknacker.engine.api.typed.TypedMap -import pl.touk.nussknacker.sql.db.query.{ResultSetStrategy, UpdateResultStrategy} -import pl.touk.nussknacker.sql.db.schema.{MetaDataProviderFactory, TableDefinition} -import pl.touk.nussknacker.sql.utils.BaseHsqlQueryEnricherTest - -import scala.concurrent.Await - -class DatabaseQueryEnricherTest extends BaseHsqlQueryEnricherTest { - - import scala.jdk.CollectionConverters._ - import scala.concurrent.duration._ - - override val service = - new DatabaseQueryEnricher(hsqlDbPoolConfig, new MetaDataProviderFactory().create(hsqlDbPoolConfig)) - - override val prepareHsqlDDLs: List[String] = List( - "CREATE TABLE persons (id INT, name VARCHAR(40));", - "INSERT INTO persons (id, name) VALUES (1, 'John')" - ) - - test("DatabaseQueryEnricher#implementation without cache") { - val query = "select * from persons where id = ?" - val st = conn.prepareStatement(query) - val meta = st.getMetaData - st.close() - val state = DatabaseQueryEnricher.TransformationState( - query = query, - argsCount = 1, - tableDef = TableDefinition(meta), - strategy = ResultSetStrategy - ) - val invoker = service.implementation(Map.empty, dependencies = Nil, Some(state)) - returnType(service, state).display shouldBe "List[Record{ID: Integer, NAME: String}]" - val resultF = invoker.invokeService(Map("arg1" -> 1)) - val result = Await.result(resultF, 5 seconds).asInstanceOf[java.util.List[TypedMap]].asScala.toList - result shouldBe List( - TypedMap(Map("ID" -> 1, "NAME" -> "John")) - ) - - conn.prepareStatement("UPDATE persons SET name = 'Alex' WHERE id = 1").execute() - val resultF2 = invoker.invokeService(Map("arg1" -> 1)) - val result2 = Await.result(resultF2, 5 seconds).asInstanceOf[java.util.List[TypedMap]].asScala.toList - result2 shouldBe List( - TypedMap(Map("ID" -> 1, "NAME" -> "Alex")) - ) - } - - test("DatabaseQueryEnricher#implementation update query") { - val query = "UPDATE persons SET name = 'Don' where id = ?" - val st = conn.prepareStatement(query) - st.close() - val state = DatabaseQueryEnricher.TransformationState( - query = query, - argsCount = 1, - tableDef = TableDefinition(Nil), - strategy = UpdateResultStrategy - ) - val invoker = service.implementation(Map.empty, dependencies = Nil, Some(state)) - returnType(service, state).display shouldBe "Integer" - val resultF = invoker.invokeService(Map("arg1" -> 1)) - val result = Await.result(resultF, 5 seconds).asInstanceOf[Integer] - result shouldBe 1 - val queryResultSet = conn.prepareStatement("SELECT * FROM persons WHERE id = 1").executeQuery() - queryResultSet.next() - queryResultSet.getObject("name") shouldBe "Don" - } - -} diff --git a/components/sql/src/test/scala/pl/touk/nussknacker/sql/service/IgniteEnrichmentLiteRuntimeTest.scala b/components/sql/src/test/scala/pl/touk/nussknacker/sql/service/IgniteEnrichmentLiteRuntimeTest.scala index cc7e254d0a5..a7423f6bbfb 100644 --- a/components/sql/src/test/scala/pl/touk/nussknacker/sql/service/IgniteEnrichmentLiteRuntimeTest.scala +++ b/components/sql/src/test/scala/pl/touk/nussknacker/sql/service/IgniteEnrichmentLiteRuntimeTest.scala @@ -12,6 +12,7 @@ import pl.touk.nussknacker.sql.DatabaseEnricherComponentProvider import pl.touk.nussknacker.sql.utils.ignite.WithIgniteDB import pl.touk.nussknacker.test.ValidatedValuesDetailedMessage +import java.util import scala.jdk.CollectionConverters._ class IgniteEnrichmentLiteRuntimeTest @@ -59,13 +60,20 @@ class IgniteEnrichmentLiteRuntimeTest "Key value" -> "#input", "Cache TTL" -> "" ) - .emptySink("response", TestScenarioRunner.testResultSink, "value" -> "#output.NAME") + .emptySink("response", TestScenarioRunner.testResultSink, "value" -> "#output") - val validatedResult = testScenarioRunner.runWithData[Int, String](process, List(1)) + val validatedResult = testScenarioRunner.runWithData[Int, AnyRef](process, List(1)) val resultList = validatedResult.validValue.successes resultList should have length 1 - resultList.head shouldEqual "Warszawa" + val resultScalaMap = resultList.head.asInstanceOf[util.HashMap[String, AnyRef]].asScala.map { case (key, value) => + (key, value.toString) + } + + resultScalaMap.get("POPULATION") shouldEqual Some("1793579") + resultScalaMap.get("ID") shouldEqual Some("1") + resultScalaMap.get("COUNTRY") shouldEqual Some("Poland") + resultScalaMap.get("NAME") shouldEqual Some("Warszawa") } } diff --git a/components/sql/src/test/scala/pl/touk/nussknacker/sql/utils/BasePostgresqlQueryEnricherTest.scala b/components/sql/src/test/scala/pl/touk/nussknacker/sql/utils/BasePostgresqlQueryEnricherTest.scala new file mode 100644 index 00000000000..aa9f046f913 --- /dev/null +++ b/components/sql/src/test/scala/pl/touk/nussknacker/sql/utils/BasePostgresqlQueryEnricherTest.scala @@ -0,0 +1,49 @@ +package pl.touk.nussknacker.sql.utils + +import com.dimafeng.testcontainers.ForAllTestContainer +import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory} +import org.scalatest.BeforeAndAfterAll +import org.scalatest.time.{Second, Seconds, Span} +import pl.touk.nussknacker.engine.lite.api.runtimecontext.LiteEngineRuntimeContextPreparer +import pl.touk.nussknacker.sql.db.pool.DBPoolConfig +import pl.touk.nussknacker.test.PatientScalaFutures + +import scala.jdk.CollectionConverters._ + +trait BasePostgresqlQueryEnricherTest + extends BaseDatabaseQueryEnricherTest + with PatientScalaFutures + with BeforeAndAfterAll + with ForAllTestContainer + with WithPostgresqlDB { + + val pc: PatienceConfig = PatienceConfig(Span(20, Seconds), Span(1, Second)) + + val postgresqlDbPoolConfig: DBPoolConfig = DBPoolConfig( + driverClassName = postgresqlConfigValues("driverClassName"), + url = postgresqlConfigValues("url"), + username = postgresqlConfigValues("username"), + password = postgresqlConfigValues("password") + ) + + val dbEnricherConfig: Config = ConfigFactory + .load() + .withValue("name", ConfigValueFactory.fromAnyRef("db-enricher")) + .withValue( + "dbPool", + ConfigValueFactory.fromMap( + postgresqlConfigValues.asJava + ) + ) + + override def beforeAll(): Unit = { + service.open(LiteEngineRuntimeContextPreparer.noOp.prepare(jobData)) + super.beforeAll() + } + + override protected def afterAll(): Unit = { + service.close() + super.afterAll() + } + +} diff --git a/components/sql/src/test/scala/pl/touk/nussknacker/sql/utils/WithPostgresqlDB.scala b/components/sql/src/test/scala/pl/touk/nussknacker/sql/utils/WithPostgresqlDB.scala new file mode 100644 index 00000000000..7d4e856cbae --- /dev/null +++ b/components/sql/src/test/scala/pl/touk/nussknacker/sql/utils/WithPostgresqlDB.scala @@ -0,0 +1,50 @@ +package pl.touk.nussknacker.sql.utils + +import com.dimafeng.testcontainers.{ForAllTestContainer, PostgreSQLContainer} +import org.scalatest.BeforeAndAfterAll +import org.testcontainers.utility.DockerImageName +import scala.jdk.CollectionConverters._ + +import java.sql.{Connection, DriverManager} + +trait WithPostgresqlDB { + self: BeforeAndAfterAll with ForAllTestContainer => + + var conn: Connection = _ + + override val container: PostgreSQLContainer = + PostgreSQLContainer(DockerImageName.parse("postgres:11.2")) + + { + container.container.setPortBindings(List("5432:5432").asJava) + } + + private val driverClassName = "org.postgresql.Driver" + private val username = container.username + private val password = container.password + // this url can be read as container.jdbcUrl when service is started, but it is hard to postpone this step until this service is started + private val url = "jdbc:postgresql://localhost:5432/test?loggerLevel=OFF" + + val postgresqlConfigValues: Map[String, String] = Map( + "driverClassName" -> driverClassName, + "username" -> username, + "password" -> password, + "url" -> url + ) + + def preparePostgresqlDDLs: List[String] + + override protected def beforeAll(): Unit = { + conn = DriverManager.getConnection(url, username, password) + preparePostgresqlDDLs.foreach { ddlStr => + val ddlStatement = conn.prepareStatement(ddlStr) + try ddlStatement.execute() + finally ddlStatement.close() + } + } + + override protected def afterAll(): Unit = { + Option(conn).foreach(_.close()) + } + +} diff --git a/docs/Changelog.md b/docs/Changelog.md index 5747abf9493..083ae2a3d32 100644 --- a/docs/Changelog.md +++ b/docs/Changelog.md @@ -1,5 +1,8 @@ # Changelog +1.13.3 (28 June 2024) This fix was introduced in 1.16.0 version and has only been backported to this version. +------------------------ +* [#6285](https://github.com/TouK/nussknacker/pull/6285) Fix for DatabaseLookupEnricher mixing fields values when it is connected to ignite db 1.13.2 (7 Mar 2024) ------------------------