Skip to content

Commit

Permalink
Nu bump 1.17.0 -> 1.18.0-RC1 (#67)
Browse files Browse the repository at this point in the history
* Nu bump 1.17.0 -> 1.18.0-RC1
* touk's nexus removed
* workaround for missing type info registration applied
  • Loading branch information
arkadius authored Nov 7, 2024
1 parent a546a26 commit 15d0741
Show file tree
Hide file tree
Showing 7 changed files with 24 additions and 11 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@ _book/
.DS_Store
*.log.logstashjson
out/
dumps/
8 changes: 4 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@ val currentFlinkV = "1.19.1"
val sttpV = "3.8.11"
val kafkaV = "3.3.1"
val testContainersScalaV = "0.41.0"
val logbackV = "1.5.12"

val baseVersion = "1.0-SNAPSHOT"

// todo: for now we should regularly bump the version until we start publish single "latest" -SNAPSHOT version
val nussknackerV = settingKey[String]("Nussknacker version")
ThisBuild / nussknackerV := "1.18.0-staging-2024-10-10-20916-2224d333f-SNAPSHOT"
ThisBuild / nussknackerV := "1.18.0-preview_flink-typeinfo-registration-opt-out-2024-11-06-21368-c5a33a0cd-SNAPSHOT"
ThisBuild / version := codeVersion(baseVersion, nussknackerV.value)

// Global publish settings
Expand Down Expand Up @@ -67,9 +68,7 @@ lazy val commonSettings = Seq(
resolvers ++= Seq(
Resolver.sonatypeRepo("public"),
Opts.resolver.sonatypeSnapshots,
"confluent" at "https://packages.confluent.io/maven",
"nexus" at sys.env
.getOrElse("nexus", "https://nexus.touk.pl/nexus/content/groups/public")
"confluent" at "https://packages.confluent.io/maven"
),
crossScalaVersions := supportedScalaVersions,
scalacOptions := Seq(
Expand Down Expand Up @@ -227,6 +226,7 @@ def managerDeps(flinkV: String, nussknackerV: String) = Seq(
),
"pl.touk.nussknacker" %% "nussknacker-http-utils" % nussknackerV % "provided,it,test",
"pl.touk.nussknacker" %% "nussknacker-scenario-compiler" % nussknackerV % "provided,it,test",
"ch.qos.logback" % "logback-classic" % logbackV,
"pl.touk.nussknacker" %% "nussknacker-deployment-manager-api" % nussknackerV % "provided",
"org.apache.flink" % "flink-streaming-java" % flinkV excludeAll (
ExclusionRule("log4j", "log4j"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import pl.touk.nussknacker.engine.api.{JobData, ProcessVersion}
import pl.touk.nussknacker.engine.build.ScenarioBuilder
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
import pl.touk.nussknacker.engine.deployment.DeploymentData
import pl.touk.nussknacker.engine.flink.api.typeinformation.FlinkTypeInfoRegistrar
import pl.touk.nussknacker.engine.flink.test.FlinkMiniClusterHolder
import pl.touk.nussknacker.engine.flink.util.transformer.{FlinkBaseComponentProvider, FlinkKafkaComponentProvider}
import pl.touk.nussknacker.engine.kafka.{KafkaConfig, KafkaSpec, KafkaTestUtils, UnspecializedTopicName}
Expand Down Expand Up @@ -255,11 +256,15 @@ trait BaseGenericITSpec extends AnyFunSuiteLike with Matchers with KafkaSpec wit

override protected def beforeAll(): Unit = {
super.beforeAll()

FlinkTypeInfoRegistrar.disableFlinkTypeInfoRegistration()

val components = FlinkBaseComponentProvider.Components :::
new MockFlinkKafkaComponentProvider().create(
config.getConfig("components.mockKafka"),
ProcessObjectDependencies.withConfig(ConfigFactory.empty())
)

val modelData = LocalModelData(config, components, creator)
registrar = FlinkProcessRegistrar(
new FlinkProcessCompilerDataFactory(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ import org.apache.flink.api.common.ExecutionConfig
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers
import pl.touk.nussknacker.engine.api.typed.typing.Typed
import pl.touk.nussknacker.engine.flink.api.typeinformation.TypeInformationDetection
import pl.touk.nussknacker.engine.flink.api.typeinformation.{FlinkTypeInfoRegistrar, TypeInformationDetection}

@silent("deprecated")
trait BaseSchemaCompatibilityTest extends AnyFunSuite with Matchers {

test("Cheks schema compatibility") {
test("Checks schema compatibility") {
FlinkTypeInfoRegistrar.disableFlinkTypeInfoRegistration()
val detection = TypeInformationDetection.instance
val typingResult = Typed.record(Map("int" -> Typed[Int]))
val executionConfigWithoutKryo: ExecutionConfig = new ExecutionConfig
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ trait CommonFlinkStreamingDeploymentManagerSpec extends AnyFunSuite with Matcher
val deploymentManager = createDeploymentManager(jobManager.jobmanagerRestUrl)
val processId = "runningFlink"

val version = ProcessVersion(VersionId(15), ProcessName(processId), ProcessId(1), List.empty, "user1", Some(13))
val version = ProcessVersion(VersionId(15), ProcessName(processId), ProcessId(1), labels = List.empty, user = "user1", modelVersion = Some(13))
val process = prepareProcess(processId, Some(1))

deployProcessAndWaitIfRunning(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ object JobManagerContainer {
GenericContainer(
FlinkContainer.flinkImage(flinkVersion),
command = List("jobmanager"),
env = Map("SAVEPOINT_DIR_NAME" -> savepointDir.getFileName.toString),
env = Map(
"SAVEPOINT_DIR_NAME" -> savepointDir.getFileName.toString,
"NU_DISABLE_FLINK_TYPE_INFO_REGISTRATION" -> "true"
),
waitStrategy = new LogMessageWaitStrategy()
.withRegEx(".*Recover all persisted job graphs.*")
.withStartupTimeout(Duration.ofSeconds(250)),
Expand All @@ -74,8 +77,9 @@ object TaskManagerContainer {
FlinkContainer.flinkImage(flinkVersion),
command = List("taskmanager"),
env = Map(
"TASK_MANAGER_NUMBER_OF_TASK_SLOTS" -> TaskManagerSlots.toString,
"JOB_MANAGER_RPC_ADDRESS" -> jobmanagerRpcAddress
"TASK_MANAGER_NUMBER_OF_TASK_SLOTS" -> TaskManagerSlots.toString,
"JOB_MANAGER_RPC_ADDRESS" -> jobmanagerRpcAddress,
"NU_DISABLE_FLINK_TYPE_INFO_REGISTRATION" -> "true"
),
waitStrategy = new LogMessageWaitStrategy().withRegEx(".*Successful registration at resource manager.*")
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import pl.touk.nussknacker.engine.api.deployment._
import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus
import pl.touk.nussknacker.engine.api.process.ProcessName
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
import pl.touk.nussknacker.engine.definition.component.Components.ComponentDefinitionExtractionMode.FinalDefinition
import pl.touk.nussknacker.engine.deployment.{DeploymentData, User}
import pl.touk.nussknacker.engine.management.FlinkStreamingDeploymentManagerProvider
import sttp.client3.SttpBackend
Expand Down Expand Up @@ -78,7 +79,8 @@ trait StreamingDockerTest
additionalConfigsFromProvider = Map.empty,
determineDesignerWideId = componentId => DesignerWideComponentId(componentId.toString),
workingDirectoryOpt = None,
shouldIncludeComponentProvider = _ => true
shouldIncludeComponentProvider = _ => true,
componentDefinitionExtractionMode = FinalDefinition
)
}
val deploymentManagerDependencies = DeploymentManagerDependencies(
Expand Down

0 comments on commit 15d0741

Please sign in to comment.