Skip to content

Commit

Permalink
work around for missing type info registration applied
Browse files Browse the repository at this point in the history
  • Loading branch information
Arek Burdach committed Nov 6, 2024
1 parent 6dbf17c commit 3c0452a
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 32 deletions.
48 changes: 21 additions & 27 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,18 @@ val scalaCollectionsCompatV = "2.9.0"
val silencerV_2_12 = "1.6.0"
val silencerV = "1.7.17"

val flink118V = "1.18.1"
val flink116V = "1.16.0"
val currentFlinkV = "1.19.1"
val flinkKafkaConnectorV = "3.2.0"
val sttpV = "3.8.11"
val kafkaV = "3.3.1"
val testContainersScalaV = "0.41.0"
val logbackV = "1.5.12"

val baseVersion = "1.0-SNAPSHOT"

// todo: for now we should regularly bump the version until we start publish single "latest" -SNAPSHOT version
val nussknackerV = settingKey[String]("Nussknacker version")
ThisBuild / nussknackerV := "1.18.0-RC1"
ThisBuild / nussknackerV := "1.18.0-preview_flink-typeinfo-registration-opt-out-2024-11-06-21368-c5a33a0cd-SNAPSHOT"
ThisBuild / version := codeVersion(baseVersion, nussknackerV.value)

// Global publish settings
Expand Down Expand Up @@ -166,12 +166,12 @@ lazy val flink116ModelCompat = (project in file("flink116/model"))
name := "nussknacker-flink-compatibility-1-16-model",
libraryDependencies ++= {
val nussknackerVersion = nussknackerV.value
deps(flink118V, nussknackerVersion)
deps(flink116V, nussknackerVersion)
},
dependencyOverrides ++= Seq(
"org.apache.kafka" % "kafka-clients" % kafkaV,
"org.apache.kafka" %% "kafka" % kafkaV
) ++ flinkOverrides(flink118V)
) ++ flinkOverrides(flink116V)
)
.dependsOn(commonTest % Test)

Expand All @@ -183,7 +183,7 @@ lazy val flink116ManagerCompat = (project in file("flink116/manager"))
name := "nussknacker-flink-compatibility-1-16-manager",
libraryDependencies ++= {
val nussknackerVersion = nussknackerV.value
managerDeps(flink118V, nussknackerVersion)
managerDeps(flink116V, nussknackerVersion)
},
dependencyOverrides ++= Seq(
// For some strange reason, docker client libraries have conflict with schema registry client :/
Expand Down Expand Up @@ -211,7 +211,7 @@ lazy val flink116KafkaComponents = (project in file("flink116/kafka-components")
"pl.touk.nussknacker" %% "nussknacker-flink-extensions-api" % nussknackerVersion % "provided",
"pl.touk.nussknacker" %% "nussknacker-utils" % nussknackerVersion % "provided",
"pl.touk.nussknacker" %% "nussknacker-components-utils" % nussknackerVersion % "provided",
"org.apache.flink" % "flink-streaming-java" % flink118V % "provided"
"org.apache.flink" % "flink-streaming-java" % flink116V % "provided"
)
},
dependencyOverrides ++= Seq(
Expand All @@ -226,6 +226,7 @@ def managerDeps(flinkV: String, nussknackerV: String) = Seq(
),
"pl.touk.nussknacker" %% "nussknacker-http-utils" % nussknackerV % "provided,it,test",
"pl.touk.nussknacker" %% "nussknacker-scenario-compiler" % nussknackerV % "provided,it,test",
"ch.qos.logback" % "logback-classic" % logbackV,
"pl.touk.nussknacker" %% "nussknacker-deployment-manager-api" % nussknackerV % "provided",
"org.apache.flink" % "flink-streaming-java" % flinkV excludeAll (
ExclusionRule("log4j", "log4j"),
Expand All @@ -248,26 +249,19 @@ def deps(flinkV: String, nussknackerV: String) = Seq(
"org.apache.flink" % "flink-metrics-dropwizard" % flinkV,
)

def flinkOverrides(flinkV: String) = {
val VersionPattern = """^(\d+\.\d+)\..*""".r("majorMinorPart")
val flinkMajorMinorVersionPart = flinkV match {
case VersionPattern(majorMinorPart) => majorMinorPart
case _ => "Version format not recognized"
}
Seq(
"org.apache.flink" %% "flink-streaming-scala" % flinkV % "provided",
"org.apache.flink" % "flink-streaming-java" % flinkV % "provided",
"org.apache.flink" % "flink-core" % flinkV % "provided",
"org.apache.flink" % "flink-rpc-akka-loader" % flinkV % "provided",
"org.apache.flink" %% "flink-scala" % flinkV % "provided",
"org.apache.flink" % "flink-avro" % flinkV % "provided",
"org.apache.flink" % "flink-runtime" % flinkV % "provided",
"org.apache.flink" % "flink-test-utils" % flinkV % "provided",
"org.apache.flink" % "flink-statebackend-rocksdb" % flinkV % "provided",
"org.apache.flink" % "flink-connector-kafka" % s"$flinkKafkaConnectorV-$flinkMajorMinorVersionPart" % "provided",
"org.apache.flink" % "flink-metrics-dropwizard" % flinkV % "test",
)
}
def flinkOverrides(flinkV: String) = Seq(
"org.apache.flink" %% "flink-streaming-scala" % flinkV % "provided",
"org.apache.flink" % "flink-streaming-java" % flinkV % "provided",
"org.apache.flink" % "flink-core" % flinkV % "provided",
"org.apache.flink" % "flink-rpc-akka-loader" % flinkV % "provided",
"org.apache.flink" %% "flink-scala" % flinkV % "provided",
"org.apache.flink" % "flink-avro" % flinkV % "provided",
"org.apache.flink" % "flink-runtime" % flinkV % "provided",
"org.apache.flink" % "flink-test-utils" % flinkV % "provided",
"org.apache.flink" % "flink-statebackend-rocksdb" % flinkV % "provided",
"org.apache.flink" % "flink-connector-kafka" % flinkV % "provided",
"org.apache.flink" % "flink-metrics-dropwizard" % flinkV % "test",
)

def nussknackerAssemblyStrategy: String => MergeStrategy = {
case PathList(ps @ _*) if ps.last == "NumberUtils.class" => MergeStrategy.first
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import pl.touk.nussknacker.engine.api.{JobData, ProcessVersion}
import pl.touk.nussknacker.engine.build.ScenarioBuilder
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
import pl.touk.nussknacker.engine.deployment.DeploymentData
import pl.touk.nussknacker.engine.flink.api.typeinformation.FlinkTypeInfoRegistrar
import pl.touk.nussknacker.engine.flink.test.FlinkMiniClusterHolder
import pl.touk.nussknacker.engine.flink.util.transformer.{FlinkBaseComponentProvider, FlinkKafkaComponentProvider}
import pl.touk.nussknacker.engine.kafka.{KafkaConfig, KafkaSpec, KafkaTestUtils, UnspecializedTopicName}
Expand Down Expand Up @@ -255,11 +256,15 @@ trait BaseGenericITSpec extends AnyFunSuiteLike with Matchers with KafkaSpec wit

override protected def beforeAll(): Unit = {
super.beforeAll()

FlinkTypeInfoRegistrar.disableFlinkTypeInfoRegistration()

val components = FlinkBaseComponentProvider.Components :::
new MockFlinkKafkaComponentProvider().create(
config.getConfig("components.mockKafka"),
ProcessObjectDependencies.withConfig(ConfigFactory.empty())
)

val modelData = LocalModelData(config, components, creator)
registrar = FlinkProcessRegistrar(
new FlinkProcessCompilerDataFactory(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ import org.apache.flink.api.common.ExecutionConfig
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers
import pl.touk.nussknacker.engine.api.typed.typing.Typed
import pl.touk.nussknacker.engine.flink.api.typeinformation.TypeInformationDetection
import pl.touk.nussknacker.engine.flink.api.typeinformation.{FlinkTypeInfoRegistrar, TypeInformationDetection}

@silent("deprecated")
trait BaseSchemaCompatibilityTest extends AnyFunSuite with Matchers {

test("Cheks schema compatibility") {
test("Checks schema compatibility") {
FlinkTypeInfoRegistrar.disableFlinkTypeInfoRegistration()
val detection = TypeInformationDetection.instance
val typingResult = Typed.record(Map("int" -> Typed[Int]))
val executionConfigWithoutKryo: ExecutionConfig = new ExecutionConfig
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ object JobManagerContainer {
GenericContainer(
FlinkContainer.flinkImage(flinkVersion),
command = List("jobmanager"),
env = Map("SAVEPOINT_DIR_NAME" -> savepointDir.getFileName.toString),
env = Map(
"SAVEPOINT_DIR_NAME" -> savepointDir.getFileName.toString,
"NU_DISABLE_FLINK_TYPE_INFO_REGISTRATION" -> "true"
),
waitStrategy = new LogMessageWaitStrategy()
.withRegEx(".*Recover all persisted job graphs.*")
.withStartupTimeout(Duration.ofSeconds(250)),
Expand All @@ -74,8 +77,9 @@ object TaskManagerContainer {
FlinkContainer.flinkImage(flinkVersion),
command = List("taskmanager"),
env = Map(
"TASK_MANAGER_NUMBER_OF_TASK_SLOTS" -> TaskManagerSlots.toString,
"JOB_MANAGER_RPC_ADDRESS" -> jobmanagerRpcAddress
"TASK_MANAGER_NUMBER_OF_TASK_SLOTS" -> TaskManagerSlots.toString,
"JOB_MANAGER_RPC_ADDRESS" -> jobmanagerRpcAddress,
"NU_DISABLE_FLINK_TYPE_INFO_REGISTRATION" -> "true"
),
waitStrategy = new LogMessageWaitStrategy().withRegEx(".*Successful registration at resource manager.*")
),
Expand Down

0 comments on commit 3c0452a

Please sign in to comment.