diff --git a/.gitignore b/.gitignore index 2d20a2f..5e83283 100644 --- a/.gitignore +++ b/.gitignore @@ -23,3 +23,4 @@ _book/ .DS_Store *.log.logstashjson out/ +dumps/ diff --git a/build.sbt b/build.sbt index 83acb3f..67eaf01 100644 --- a/build.sbt +++ b/build.sbt @@ -23,12 +23,13 @@ val currentFlinkV = "1.19.1" val sttpV = "3.8.11" val kafkaV = "3.3.1" val testContainersScalaV = "0.41.0" +val logbackV = "1.5.12" val baseVersion = "1.0-SNAPSHOT" // todo: for now we should regularly bump the version until we start publish single "latest" -SNAPSHOT version val nussknackerV = settingKey[String]("Nussknacker version") -ThisBuild / nussknackerV := "1.18.0-staging-2024-10-10-20916-2224d333f-SNAPSHOT" +ThisBuild / nussknackerV := "1.18.0-preview_flink-typeinfo-registration-opt-out-2024-11-06-21368-c5a33a0cd-SNAPSHOT" ThisBuild / version := codeVersion(baseVersion, nussknackerV.value) // Global publish settings @@ -67,9 +68,7 @@ lazy val commonSettings = Seq( resolvers ++= Seq( Resolver.sonatypeRepo("public"), Opts.resolver.sonatypeSnapshots, - "confluent" at "https://packages.confluent.io/maven", - "nexus" at sys.env - .getOrElse("nexus", "https://nexus.touk.pl/nexus/content/groups/public") + "confluent" at "https://packages.confluent.io/maven" ), crossScalaVersions := supportedScalaVersions, scalacOptions := Seq( @@ -227,6 +226,7 @@ def managerDeps(flinkV: String, nussknackerV: String) = Seq( ), "pl.touk.nussknacker" %% "nussknacker-http-utils" % nussknackerV % "provided,it,test", "pl.touk.nussknacker" %% "nussknacker-scenario-compiler" % nussknackerV % "provided,it,test", + "ch.qos.logback" % "logback-classic" % logbackV, "pl.touk.nussknacker" %% "nussknacker-deployment-manager-api" % nussknackerV % "provided", "org.apache.flink" % "flink-streaming-java" % flinkV excludeAll ( ExclusionRule("log4j", "log4j"), diff --git a/commonTest/src/main/scala/pl/touk/nussknacker/compatibility/common/BaseGenericITSpec.scala b/commonTest/src/main/scala/pl/touk/nussknacker/compatibility/common/BaseGenericITSpec.scala index 2a0b614..a0b648a 100644 --- a/commonTest/src/main/scala/pl/touk/nussknacker/compatibility/common/BaseGenericITSpec.scala +++ b/commonTest/src/main/scala/pl/touk/nussknacker/compatibility/common/BaseGenericITSpec.scala @@ -17,6 +17,7 @@ import pl.touk.nussknacker.engine.api.{JobData, ProcessVersion} import pl.touk.nussknacker.engine.build.ScenarioBuilder import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.deployment.DeploymentData +import pl.touk.nussknacker.engine.flink.api.typeinformation.FlinkTypeInfoRegistrar import pl.touk.nussknacker.engine.flink.test.FlinkMiniClusterHolder import pl.touk.nussknacker.engine.flink.util.transformer.{FlinkBaseComponentProvider, FlinkKafkaComponentProvider} import pl.touk.nussknacker.engine.kafka.{KafkaConfig, KafkaSpec, KafkaTestUtils, UnspecializedTopicName} @@ -255,11 +256,15 @@ trait BaseGenericITSpec extends AnyFunSuiteLike with Matchers with KafkaSpec wit override protected def beforeAll(): Unit = { super.beforeAll() + + FlinkTypeInfoRegistrar.disableFlinkTypeInfoRegistration() + val components = FlinkBaseComponentProvider.Components ::: new MockFlinkKafkaComponentProvider().create( config.getConfig("components.mockKafka"), ProcessObjectDependencies.withConfig(ConfigFactory.empty()) ) + val modelData = LocalModelData(config, components, creator) registrar = FlinkProcessRegistrar( new FlinkProcessCompilerDataFactory( diff --git a/commonTest/src/main/scala/pl/touk/nussknacker/compatibility/common/BaseSchemaCompatibilityTest.scala b/commonTest/src/main/scala/pl/touk/nussknacker/compatibility/common/BaseSchemaCompatibilityTest.scala index 72139ab..c75aa61 100644 --- a/commonTest/src/main/scala/pl/touk/nussknacker/compatibility/common/BaseSchemaCompatibilityTest.scala +++ b/commonTest/src/main/scala/pl/touk/nussknacker/compatibility/common/BaseSchemaCompatibilityTest.scala @@ -5,12 +5,13 @@ import org.apache.flink.api.common.ExecutionConfig import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers import pl.touk.nussknacker.engine.api.typed.typing.Typed -import pl.touk.nussknacker.engine.flink.api.typeinformation.TypeInformationDetection +import pl.touk.nussknacker.engine.flink.api.typeinformation.{FlinkTypeInfoRegistrar, TypeInformationDetection} @silent("deprecated") trait BaseSchemaCompatibilityTest extends AnyFunSuite with Matchers { - test("Cheks schema compatibility") { + test("Checks schema compatibility") { + FlinkTypeInfoRegistrar.disableFlinkTypeInfoRegistration() val detection = TypeInformationDetection.instance val typingResult = Typed.record(Map("int" -> Typed[Int])) val executionConfigWithoutKryo: ExecutionConfig = new ExecutionConfig diff --git a/commonTest/src/main/scala/pl/touk/nussknacker/engine/management/common/CommonFlinkStreamingDeploymentManagerSpec.scala b/commonTest/src/main/scala/pl/touk/nussknacker/engine/management/common/CommonFlinkStreamingDeploymentManagerSpec.scala index 1048671..2d14be8 100644 --- a/commonTest/src/main/scala/pl/touk/nussknacker/engine/management/common/CommonFlinkStreamingDeploymentManagerSpec.scala +++ b/commonTest/src/main/scala/pl/touk/nussknacker/engine/management/common/CommonFlinkStreamingDeploymentManagerSpec.scala @@ -18,7 +18,7 @@ trait CommonFlinkStreamingDeploymentManagerSpec extends AnyFunSuite with Matcher val deploymentManager = createDeploymentManager(jobManager.jobmanagerRestUrl) val processId = "runningFlink" - val version = ProcessVersion(VersionId(15), ProcessName(processId), ProcessId(1), List.empty, "user1", Some(13)) + val version = ProcessVersion(VersionId(15), ProcessName(processId), ProcessId(1), labels = List.empty, user = "user1", modelVersion = Some(13)) val process = prepareProcess(processId, Some(1)) deployProcessAndWaitIfRunning( diff --git a/commonTest/src/main/scala/pl/touk/nussknacker/engine/management/common/FlinkContainer.scala b/commonTest/src/main/scala/pl/touk/nussknacker/engine/management/common/FlinkContainer.scala index 7e094f1..a5cfcc0 100644 --- a/commonTest/src/main/scala/pl/touk/nussknacker/engine/management/common/FlinkContainer.scala +++ b/commonTest/src/main/scala/pl/touk/nussknacker/engine/management/common/FlinkContainer.scala @@ -47,7 +47,10 @@ object JobManagerContainer { GenericContainer( FlinkContainer.flinkImage(flinkVersion), command = List("jobmanager"), - env = Map("SAVEPOINT_DIR_NAME" -> savepointDir.getFileName.toString), + env = Map( + "SAVEPOINT_DIR_NAME" -> savepointDir.getFileName.toString, + "NU_DISABLE_FLINK_TYPE_INFO_REGISTRATION" -> "true" + ), waitStrategy = new LogMessageWaitStrategy() .withRegEx(".*Recover all persisted job graphs.*") .withStartupTimeout(Duration.ofSeconds(250)), @@ -74,8 +77,9 @@ object TaskManagerContainer { FlinkContainer.flinkImage(flinkVersion), command = List("taskmanager"), env = Map( - "TASK_MANAGER_NUMBER_OF_TASK_SLOTS" -> TaskManagerSlots.toString, - "JOB_MANAGER_RPC_ADDRESS" -> jobmanagerRpcAddress + "TASK_MANAGER_NUMBER_OF_TASK_SLOTS" -> TaskManagerSlots.toString, + "JOB_MANAGER_RPC_ADDRESS" -> jobmanagerRpcAddress, + "NU_DISABLE_FLINK_TYPE_INFO_REGISTRATION" -> "true" ), waitStrategy = new LogMessageWaitStrategy().withRegEx(".*Successful registration at resource manager.*") ), diff --git a/commonTest/src/main/scala/pl/touk/nussknacker/engine/management/common/StreamingDockerTest.scala b/commonTest/src/main/scala/pl/touk/nussknacker/engine/management/common/StreamingDockerTest.scala index 408827e..f75216b 100644 --- a/commonTest/src/main/scala/pl/touk/nussknacker/engine/management/common/StreamingDockerTest.scala +++ b/commonTest/src/main/scala/pl/touk/nussknacker/engine/management/common/StreamingDockerTest.scala @@ -19,6 +19,7 @@ import pl.touk.nussknacker.engine.api.deployment._ import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus import pl.touk.nussknacker.engine.api.process.ProcessName import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess +import pl.touk.nussknacker.engine.definition.component.Components.ComponentDefinitionExtractionMode.FinalDefinition import pl.touk.nussknacker.engine.deployment.{DeploymentData, User} import pl.touk.nussknacker.engine.management.FlinkStreamingDeploymentManagerProvider import sttp.client3.SttpBackend @@ -78,7 +79,8 @@ trait StreamingDockerTest additionalConfigsFromProvider = Map.empty, determineDesignerWideId = componentId => DesignerWideComponentId(componentId.toString), workingDirectoryOpt = None, - shouldIncludeComponentProvider = _ => true + shouldIncludeComponentProvider = _ => true, + componentDefinitionExtractionMode = FinalDefinition ) } val deploymentManagerDependencies = DeploymentManagerDependencies(