Skip to content

Commit

Permalink
[NU-1848] Work around for broken compatibility with Flink < 1.19: env…
Browse files Browse the repository at this point in the history
…ironment variable allowing to disable Flink TypeInfos
  • Loading branch information
arkadius committed Nov 6, 2024
1 parent 062fdaf commit 6847a1c
Showing 1 changed file with 11 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,25 @@ import java.util

object FlinkTypeInfoRegistrar {

private case class RegistrationEntry[T, K <: TypeInfoFactory[T]](klass: Class[T], factoryClass: Class[K])
private val DisableFlinkTypeInfosRegistrationEnvVarName = "NU_DISABLE_FLINK_TYPE_INFOS_REGISTRATION"

private val typesToRegister = List(
private case class RegistrationEntry[T](klass: Class[T], factoryClass: Class[_ <: TypeInfoFactory[T]])

private val typeInfosToRegister = List(
RegistrationEntry(classOf[LocalDate], classOf[LocalDateTypeInfoFactory]),
RegistrationEntry(classOf[LocalTime], classOf[LocalTimeTypeInfoFactory]),
RegistrationEntry(classOf[LocalDateTime], classOf[LocalDateTimeTypeInfoFactory]),
)

def ensureBaseTypesAreRegistered(): Unit =
typesToRegister.foreach { base =>
register(base)
def ensureBaseTypesAreRegistered(): Unit = {
if (!Option(System.getenv(DisableFlinkTypeInfosRegistrationEnvVarName)).exists(java.lang.Boolean.parseBoolean)) {
typeInfosToRegister.foreach { entry =>
register(entry)
}
}
}

private def register(entry: RegistrationEntry[_, _ <: TypeInfoFactory[_]]): Unit = {
private def register(entry: RegistrationEntry[_]): Unit = {
val opt = Option(TypeExtractor.getTypeInfoFactory(entry.klass))
if (opt.isEmpty) {
TypeExtractor.registerFactory(entry.klass, entry.factoryClass)
Expand Down

0 comments on commit 6847a1c

Please sign in to comment.