diff --git a/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/FlinkTypeInfoRegistrar.scala b/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/FlinkTypeInfoRegistrar.scala index 80832fb5790..a7b024d47df 100644 --- a/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/FlinkTypeInfoRegistrar.scala +++ b/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/FlinkTypeInfoRegistrar.scala @@ -6,27 +6,38 @@ import org.apache.flink.api.java.typeutils.TypeExtractor import java.lang.reflect.Type import java.time.{LocalDate, LocalDateTime, LocalTime} import java.util +import java.util.concurrent.atomic.AtomicBoolean +// This class contains registers TypeInfoFactory for commonly used classes in Nussknacker. +// It is a singleton as Flink's only contains a global registry for such purpose object FlinkTypeInfoRegistrar { - private val DisableFlinkTypeInfosRegistrationEnvVarName = "NU_DISABLE_FLINK_TYPE_INFOS_REGISTRATION" + private val typeInfoRegistrationEnabled = new AtomicBoolean(true) + + private val DisableFlinkTypeInfoRegistrationEnvVarName = "NU_DISABLE_FLINK_TYPE_INFO_REGISTRATION" private case class RegistrationEntry[T](klass: Class[T], factoryClass: Class[_ <: TypeInfoFactory[T]]) - private val typeInfosToRegister = List( + private val typeInfoToRegister = List( RegistrationEntry(classOf[LocalDate], classOf[LocalDateTypeInfoFactory]), RegistrationEntry(classOf[LocalTime], classOf[LocalTimeTypeInfoFactory]), RegistrationEntry(classOf[LocalDateTime], classOf[LocalDateTimeTypeInfoFactory]), ) - def ensureBaseTypesAreRegistered(): Unit = { - if (!Option(System.getenv(DisableFlinkTypeInfosRegistrationEnvVarName)).exists(java.lang.Boolean.parseBoolean)) { - typeInfosToRegister.foreach { entry => + def ensureTypeInfosAreRegistered(): Unit = { + // TypeInfo registration is available in Flink >= 1.19. For backward compatibility purpose we allow + // to disable this by either environment variable or programmatically + if (typeInfoRegistrationEnabled.get() && !typeInfoRegistrationDisabledByEnvVariable) { + typeInfoToRegister.foreach { entry => register(entry) } } } + private def typeInfoRegistrationDisabledByEnvVariable = { + Option(System.getenv(DisableFlinkTypeInfoRegistrationEnvVarName)).exists(_.toBoolean) + } + private def register(entry: RegistrationEntry[_]): Unit = { val opt = Option(TypeExtractor.getTypeInfoFactory(entry.klass)) if (opt.isEmpty) { @@ -34,6 +45,16 @@ object FlinkTypeInfoRegistrar { } } + // These methods are mainly for purpose of tests in nussknacker-flink-compatibility project + // It should be used in caution as it changes the global state + def enableFlinkTypeInfoRegistration(): Unit = { + typeInfoRegistrationEnabled.set(true) + } + + def disableFlinkTypeInfoRegistration(): Unit = { + typeInfoRegistrationEnabled.set(false) + } + class LocalDateTypeInfoFactory extends TypeInfoFactory[LocalDate] { override def createTypeInfo( diff --git a/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/TypeInformationDetection.scala b/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/TypeInformationDetection.scala index 35a14eab36e..50ad362f158 100644 --- a/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/TypeInformationDetection.scala +++ b/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/TypeInformationDetection.scala @@ -52,7 +52,7 @@ object TypeInformationDetection { // We use SPI to provide implementation of TypeInformationDetection because we don't want to make // implementation classes available in flink-components-api module. val instance: TypeInformationDetection = { - FlinkTypeInfoRegistrar.ensureBaseTypesAreRegistered() + FlinkTypeInfoRegistrar.ensureTypeInfosAreRegistered() val classloader = Thread.currentThread().getContextClassLoader ServiceLoader diff --git a/engine/flink/components-api/src/test/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/FlinkTypeInfoRegistrarTest.scala b/engine/flink/components-api/src/test/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/FlinkTypeInfoRegistrarTest.scala index 71bc968124a..6514ba0c21d 100644 --- a/engine/flink/components-api/src/test/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/FlinkTypeInfoRegistrarTest.scala +++ b/engine/flink/components-api/src/test/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/FlinkTypeInfoRegistrarTest.scala @@ -42,7 +42,7 @@ class FlinkTypeInfoRegistrarTest extends AnyFunSuite with Matchers { } test("Looking for TypeInformation for a NU types with registrar should return a specific TypeInformation") { - FlinkTypeInfoRegistrar.ensureBaseTypesAreRegistered() + FlinkTypeInfoRegistrar.ensureTypeInfosAreRegistered() nuTypesMapping.foreach { case (klass, expected) => val typeInfo = TypeInformation.of(klass) diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/ExecutionConfigPreparer.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/ExecutionConfigPreparer.scala index e605dfc5a4d..d614af52f67 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/ExecutionConfigPreparer.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/ExecutionConfigPreparer.scala @@ -101,7 +101,7 @@ object ExecutionConfigPreparer extends LazyLogging { override def prepareExecutionConfig( config: ExecutionConfig )(jobData: JobData, deploymentData: DeploymentData): Unit = { - FlinkTypeInfoRegistrar.ensureBaseTypesAreRegistered() + FlinkTypeInfoRegistrar.ensureTypeInfosAreRegistered() Serializers.registerSerializers(modelData, config) if (enableObjectReuse) { config.enableObjectReuse()