diff --git a/engine/flink/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/helpers/KafkaAvroSpecMixin.scala b/engine/flink/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/helpers/KafkaAvroSpecMixin.scala index 17132cbfa62..9da9ccf9398 100644 --- a/engine/flink/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/helpers/KafkaAvroSpecMixin.scala +++ b/engine/flink/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/helpers/KafkaAvroSpecMixin.scala @@ -37,9 +37,9 @@ import pl.touk.nussknacker.engine.schemedkafka.KafkaUniversalComponentTransforme import pl.touk.nussknacker.engine.schemedkafka.kryo.AvroSerializersRegistrar import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.universal.UniversalSchemaBasedSerdeProvider import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.{ - DynamicSchemaVersion, ExistingSchemaVersion, LatestSchemaVersion, + PassedContentType, SchemaRegistryClientFactory, SchemaVersionOption } @@ -175,7 +175,7 @@ trait KafkaAvroSpecMixin versionOption match { case LatestSchemaVersion => s"'${SchemaVersionOption.LatestOptionName}'" case ExistingSchemaVersion(version) => s"'$version'" - case DynamicSchemaVersion(typ) => s"'$typ'" + case PassedContentType(typ) => s"'$typ'" } protected def runAndVerifyResultSingleEvent( @@ -312,9 +312,9 @@ trait KafkaAvroSpecMixin protected def versionOptionToString(versionOption: SchemaVersionOption): String = { versionOption match { - case LatestSchemaVersion => SchemaVersionOption.LatestOptionName - case ExistingSchemaVersion(v) => v.toString - case DynamicSchemaVersion(typ) => typ.toString + case LatestSchemaVersion => SchemaVersionOption.LatestOptionName + case ExistingSchemaVersion(v) => v.toString + case PassedContentType(typ) => typ.toString } } diff --git a/engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/FlinkWithKafkaSuite.scala b/engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/FlinkWithKafkaSuite.scala index b61272a0e9c..b72326ed841 100644 --- a/engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/FlinkWithKafkaSuite.scala +++ b/engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/FlinkWithKafkaSuite.scala @@ -39,9 +39,9 @@ import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.Confluen import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.client.MockSchemaRegistryClient import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.universal.MockSchemaRegistryClientFactory import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.{ - DynamicSchemaVersion, ExistingSchemaVersion, LatestSchemaVersion, + PassedContentType, SchemaRegistryClientFactory, SchemaVersionOption } @@ -179,7 +179,7 @@ abstract class FlinkWithKafkaSuite versionOption match { case LatestSchemaVersion => s"'${SchemaVersionOption.LatestOptionName}'" case ExistingSchemaVersion(version) => s"'$version'" - case DynamicSchemaVersion(typ) => s"'$typ'" + case PassedContentType(typ) => s"'$typ'" } protected def createAndRegisterAvroTopicConfig(name: String, schemas: List[Schema]): TopicConfig = diff --git a/engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/KafkaJsonItSpec.scala b/engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/KafkaJsonItSpec.scala new file mode 100644 index 00000000000..7ed18f6bb74 --- /dev/null +++ b/engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/KafkaJsonItSpec.scala @@ -0,0 +1,83 @@ +package pl.touk.nussknacker.defaultmodel + +import com.typesafe.scalalogging.LazyLogging +import io.confluent.kafka.schemaregistry.ParsedSchema +import org.apache.flink.formats.avro.typeutils.NkSerializableParsedSchema +import pl.touk.nussknacker.defaultmodel.SampleSchemas.RecordSchemaV2 +import pl.touk.nussknacker.engine.api.process.TopicName.ForSource +import pl.touk.nussknacker.engine.api.validation.ValidationMode +import pl.touk.nussknacker.engine.build.ScenarioBuilder +import pl.touk.nussknacker.engine.graph.expression.Expression +import pl.touk.nussknacker.engine.kafka.KafkaTestUtils.richConsumer +import pl.touk.nussknacker.engine.schemedkafka.{KafkaUniversalComponentTransformer, RuntimeSchemaData} +import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.{ContentTypes, SchemaId, SchemaWithMetadata} +import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.client.OpenAPIJsonSchema +import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.universal.UniversalSchemaSupportDispatcher +import pl.touk.nussknacker.engine.spel.SpelExtension.SpelExpresion +import pl.touk.nussknacker.test.PatientScalaFutures + +import java.nio.ByteBuffer +import java.time.Instant +import java.util + +class KafkaJsonItSpec extends FlinkWithKafkaSuite with PatientScalaFutures with LazyLogging { + + private val givenMatchingAvroObjV2 = avroEncoder.encodeRecordOrError( + Map("first" -> "Jan", "middle" -> "Tomek", "last" -> "Kowalski"), + RecordSchemaV2 + ) + + test("should read json message from kafka without provided schema") { + val inputTopic = "input-topic-without-schema" + val outputTopic = "output-topic-without-schema" + + kafkaClient.createTopic(inputTopic, 1) + kafkaClient.createTopic(outputTopic, 1) + sendAsJson(givenMatchingAvroObjV2.toString, ForSource(inputTopic), Instant.now.toEpochMilli) + + val process = + ScenarioBuilder + .streaming("without-schema") + .parallelism(1) + .source( + "start", + "kafka", + KafkaUniversalComponentTransformer.topicParamName.value -> Expression.spel(s"'$inputTopic'"), + KafkaUniversalComponentTransformer.contentTypeParamName.value -> s"'${ContentTypes.JSON.toString}'".spel + ) + .emptySink( + "end", + "kafka", + KafkaUniversalComponentTransformer.sinkKeyParamName.value -> "".spel, + KafkaUniversalComponentTransformer.sinkRawEditorParamName.value -> "true".spel, + KafkaUniversalComponentTransformer.sinkValueParamName.value -> "#input".spel, + KafkaUniversalComponentTransformer.topicParamName.value -> s"'${outputTopic}'".spel, + KafkaUniversalComponentTransformer.contentTypeParamName.value -> s"'${ContentTypes.JSON.toString}'".spel, + KafkaUniversalComponentTransformer.sinkValidationModeParamName.value -> s"'${ValidationMode.lax.name}'".spel + ) + + run(process) { + val processed = kafkaClient.createConsumer().consumeWithConsumerRecord(outputTopic).take(1).head + + val schema = SchemaWithMetadata( + OpenAPIJsonSchema("""{"type": "object"}"""), + SchemaId.fromString(ContentTypes.JSON.toString) + ) + val runtimeSchema = new RuntimeSchemaData(new NkSerializableParsedSchema[ParsedSchema](schema.schema), None) + val response = + UniversalSchemaSupportDispatcher(kafkaConfig) + .forSchemaType("JSON") + .payloadDeserializer + .deserialize( + Some(runtimeSchema), + runtimeSchema, + ByteBuffer.wrap(processed.value()) + ) + .asInstanceOf[util.HashMap[String, String]] + + response.forEach((key, value) => givenMatchingAvroObjV2.get(key) shouldBe value) + + } + } + +} diff --git a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/KafkaUniversalComponentTransformer.scala b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/KafkaUniversalComponentTransformer.scala index 561831736fa..e184235cfab 100644 --- a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/KafkaUniversalComponentTransformer.scala +++ b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/KafkaUniversalComponentTransformer.scala @@ -36,6 +36,7 @@ object KafkaUniversalComponentTransformer { final val sinkValueParamName = ParameterName("Value") final val sinkValidationModeParamName = ParameterName("Value validation mode") final val sinkRawEditorParamName = ParameterName("Raw editor") + final val contentTypeParamName = ParameterName("Content type") def extractValidationMode(value: String): ValidationMode = ValidationMode.fromString(value, sinkValidationModeParamName) @@ -74,7 +75,6 @@ abstract class KafkaUniversalComponentTransformer[T, TN <: TopicName: TopicValid ): WithError[ParameterCreatorWithNoDependency with ParameterExtractor[String]] = { val allTopics = getAllTopics val topics = allTopics match { - // TODO: previously schemaRegistryClient made validation case Some(topicsFromKafka) => // For test purposes mostly topicSelectionStrategy @@ -114,7 +114,7 @@ abstract class KafkaUniversalComponentTransformer[T, TN <: TopicName: TopicValid ) } - protected def getVersionParam( + protected def getVersionOrContentTypeParam( preparedTopic: PreparedKafkaTopic[TN], )(implicit nodeId: NodeId): WithError[ParameterCreatorWithNoDependency with ParameterExtractor[String]] = { val topicsWithSchema = topicSelectionStrategy.getTopics(schemaRegistryClient) @@ -129,16 +129,16 @@ abstract class KafkaUniversalComponentTransformer[T, TN <: TopicName: TopicValid ) }).map(getVersionParam) } else { - val versionValues = List( - FixedExpressionValue("'Json'", "Json"), - FixedExpressionValue("'Plain'", "Plain") + val contentTypesValues = List( + FixedExpressionValue("'JSON'", "JSON"), + FixedExpressionValue("'PLAIN'", "PLAIN") ) - Writer[List[ProcessCompilationError], List[FixedExpressionValue]](Nil, versionValues).map(versions => + Writer[List[ProcessCompilationError], List[FixedExpressionValue]](Nil, contentTypesValues).map(contentTypes => ParameterDeclaration - .mandatory[String](KafkaUniversalComponentTransformer.schemaVersionParamName) + .mandatory[String](KafkaUniversalComponentTransformer.contentTypeParamName) .withCreator( - modify = _.copy(editor = Some(FixedValuesParameterEditor(versions))) + modify = _.copy(editor = Some(FixedValuesParameterEditor(contentTypes))) ) ) } @@ -224,13 +224,13 @@ abstract class KafkaUniversalComponentTransformer[T, TN <: TopicName: TopicValid nextParams: List[Parameter] )(implicit nodeId: NodeId): ContextTransformationDefinition = { case TransformationStep((topicParamName, DefinedEagerParameter(topic: String, _)) :: Nil, _) => - val preparedTopic = prepareTopic(topic) - val versionParam = getVersionParam(preparedTopic) + val preparedTopic = prepareTopic(topic) + val versionOrContentTypeParam = getVersionOrContentTypeParam(preparedTopic) val topicValidationErrors = validateTopic(preparedTopic.prepared).swap.toList.map(_.toCustomNodeError(nodeId.id, Some(topicParamName))) NextParameters( - versionParam.value.createParameter() :: nextParams, - errors = versionParam.written ++ topicValidationErrors + versionOrContentTypeParam.value.createParameter() :: nextParams, + errors = versionOrContentTypeParam.written ++ topicValidationErrors ) case TransformationStep((`topicParamName`, _) :: Nil, _) => NextParameters(parameters = fallbackVersionOptionParam.createParameter() :: nextParams) @@ -245,12 +245,12 @@ abstract class KafkaUniversalComponentTransformer[T, TN <: TopicName: TopicValid // override it if you use other parameter name for topic @transient protected lazy val topicParamName: ParameterName = KafkaUniversalComponentTransformer.topicParamName + @transient protected lazy val contentTypeParamName: ParameterName = + KafkaUniversalComponentTransformer.contentTypeParamName protected def getAllTopics: Option[List[UnspecializedTopicName]] = { Try { val validatorConfig = kafkaConfig.topicsExistenceValidationConfig.validatorConfig - - // TODO: check here if can check before logging that server is unavailable KafkaUtils .usingAdminClient(kafkaConfig) { _.listTopics(new ListTopicsOptions().timeoutMs(validatorConfig.adminClientTimeout.toMillis.toInt)) diff --git a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/BasedOnVersionAvroSchemaDeterminer.scala b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/BasedOnVersionAvroSchemaDeterminer.scala index e334181c06e..fa685f41563 100644 --- a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/BasedOnVersionAvroSchemaDeterminer.scala +++ b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/BasedOnVersionAvroSchemaDeterminer.scala @@ -6,6 +6,7 @@ import io.confluent.kafka.schemaregistry.ParsedSchema import io.confluent.kafka.schemaregistry.avro.AvroSchema import org.apache.flink.formats.avro.typeutils.NkSerializableParsedSchema import pl.touk.nussknacker.engine.kafka.UnspecializedTopicName +import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.ContentTypes.ContentType import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.client.OpenAPIJsonSchema import pl.touk.nussknacker.engine.schemedkafka.{AvroSchemaDeterminer, RuntimeSchemaData, SchemaDeterminerError} @@ -57,8 +58,8 @@ class ParsedSchemaDeterminer( case LatestSchemaVersion => val version = None getTypedSchema(version) - case DynamicSchemaVersion(typ) => - getDynamicSchema(typ) + case PassedContentType(typ) => + getEmptyJsonSchema(typ) } } @@ -76,9 +77,11 @@ class ParsedSchemaDeterminer( ) } - private def getDynamicSchema(typ: JsonTypes): Validated[SchemaDeterminerError, RuntimeSchemaData[ParsedSchema]] = { + private def getEmptyJsonSchema( + typ: ContentType + ): Validated[SchemaDeterminerError, RuntimeSchemaData[ParsedSchema]] = { typ match { - case JsonTypes.Json => + case ContentTypes.JSON => Valid( RuntimeSchemaData[ParsedSchema]( new NkSerializableParsedSchema[ParsedSchema]( @@ -87,14 +90,14 @@ class ParsedSchemaDeterminer( "{}" ) ), - Some(SchemaId.fromInt(JsonTypes.Json.value)) + Some(SchemaId.fromString(ContentTypes.JSON.toString)) ) ) - case JsonTypes.Plain => + case ContentTypes.PLAIN => Valid( RuntimeSchemaData[ParsedSchema]( new NkSerializableParsedSchema[ParsedSchema](OpenAPIJsonSchema("")), - Some(SchemaId.fromInt(JsonTypes.Plain.value)) + Some(SchemaId.fromString(ContentTypes.PLAIN.toString)) ) ) case _ => Invalid(new SchemaDeterminerError("Wrong dynamic type", SchemaError.apply("Wrong dynamic type"))) diff --git a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/SchemaVersionOption.scala b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/SchemaVersionOption.scala index 9b56bf72e2c..5f4165dca01 100644 --- a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/SchemaVersionOption.scala +++ b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/SchemaVersionOption.scala @@ -1,6 +1,6 @@ package pl.touk.nussknacker.engine.schemedkafka.schemaregistry -import enumeratum.values.{IntEnum, IntEnumEntry} +import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.ContentTypes.ContentType import pl.touk.nussknacker.engine.util.convert.IntValue sealed trait SchemaVersionOption @@ -15,8 +15,8 @@ object SchemaVersionOption { name match { case `LatestOptionName` => LatestSchemaVersion case IntValue(version) => ExistingSchemaVersion(version) - case `JsonOptionName` => DynamicSchemaVersion(JsonTypes.Json) - case `PlainOptionName` => DynamicSchemaVersion(JsonTypes.Plain) + case `JsonOptionName` => PassedContentType(ContentTypes.JSON) + case `PlainOptionName` => PassedContentType(ContentTypes.PLAIN) case _ => throw new IllegalArgumentException(s"Unexpected schema version option: $name") } } @@ -27,13 +27,10 @@ case class ExistingSchemaVersion(version: Int) extends SchemaVersionOption case object LatestSchemaVersion extends SchemaVersionOption -case class DynamicSchemaVersion(typ: JsonTypes) extends SchemaVersionOption +case class PassedContentType(typ: ContentType) extends SchemaVersionOption -sealed abstract class JsonTypes(val value: Int) extends IntEnumEntry +object ContentTypes extends Enumeration { + type ContentType = Value -object JsonTypes extends IntEnum[JsonTypes] { - val values = findValues - - case object Json extends JsonTypes(1) - case object Plain extends JsonTypes(2) + val JSON, PLAIN = Value } diff --git a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/formatter/AbstractSchemaBasedRecordFormatter.scala b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/formatter/AbstractSchemaBasedRecordFormatter.scala index 62a712866ff..d63fbe4790f 100644 --- a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/formatter/AbstractSchemaBasedRecordFormatter.scala +++ b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/formatter/AbstractSchemaBasedRecordFormatter.scala @@ -10,12 +10,12 @@ import pl.touk.nussknacker.engine.kafka.consumerrecord.SerializableConsumerRecor import pl.touk.nussknacker.engine.kafka.{KafkaConfig, RecordFormatter, UnspecializedTopicName, serialization} import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.client.OpenAPIJsonSchema import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.{ - IntSchemaId, - JsonTypes, + ContentTypes, SchemaId, SchemaIdFromMessageExtractor, SchemaRegistryClient, - SchemaWithMetadata + SchemaWithMetadata, + StringSchemaId } import java.nio.charset.StandardCharsets @@ -117,25 +117,12 @@ abstract class AbstractSchemaBasedRecordFormatter[K: ClassTag, V: ClassTag] exte (keyBytes, valueBytes) } else { val valueSchemaOpt = - record.valueSchemaId match { - case Some(IntSchemaId(JsonTypes.Json.value)) => - Option( - SchemaWithMetadata( - OpenAPIJsonSchema("""{"type": "object"}"""), - SchemaId.fromInt(JsonTypes.Json.value) - ).schema - ) - case Some(IntSchemaId(JsonTypes.Plain.value)) => - Option( - SchemaWithMetadata( - OpenAPIJsonSchema("""{"type": "string"}"""), - SchemaId.fromInt(JsonTypes.Plain.value) - ).schema - ) - case None => - Option(SchemaWithMetadata(OpenAPIJsonSchema("{}"), SchemaId.fromInt(JsonTypes.Json.value)).schema) - case _ => throw new IllegalStateException() - } + Option( + SchemaWithMetadata( + OpenAPIJsonSchema("""{"type": "object"}"""), + SchemaId.fromString(ContentTypes.JSON.toString) + ).schema + ) val valueBytes = readValueMessage(valueSchemaOpt, topic, value) (keyBytes, valueBytes) } diff --git a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/universal/ParsedSchemaSupport.scala b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/universal/ParsedSchemaSupport.scala index b770ed0c356..91c455b007d 100644 --- a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/universal/ParsedSchemaSupport.scala +++ b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/universal/ParsedSchemaSupport.scala @@ -161,9 +161,9 @@ object JsonSchemaSupport extends ParsedSchemaSupport[OpenAPIJsonSchema] { (value: Any) => { // In ad-hoc test without schema we create object `{ "Value" = userInputInAdHoc }`, so if present we should just take the input Try { - val temp = value.asInstanceOf[Map[String, Map[String, Any]]].head - if (temp._1.equals("Value")) { - temp._2 + val (key, values) = value.asInstanceOf[Map[String, Map[String, Any]]].head + if (key.equals("Value")) { + values } else Failure } match { // For normal usage diff --git a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/universal/UniversalKafkaDeserializer.scala b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/universal/UniversalKafkaDeserializer.scala index 45ba798acf8..7e6fbda228f 100644 --- a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/universal/UniversalKafkaDeserializer.scala +++ b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/universal/UniversalKafkaDeserializer.scala @@ -10,7 +10,7 @@ import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.client.O import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.serialization.SchemaRegistryBasedDeserializerFactory import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.{ ChainedSchemaIdFromMessageExtractor, - JsonTypes, + ContentTypes, SchemaId, SchemaRegistryClient, SchemaWithMetadata @@ -44,20 +44,11 @@ class UniversalKafkaDeserializer[T]( if (schemaRegistryClient.getAllTopics.exists(_.contains(UnspecializedTopicName(topic)))) { schemaRegistryClient.getSchemaById(writerSchemaId.value) } else { - writerSchemaId.value.asInt match { - case JsonTypes.Json.value => - SchemaWithMetadata( - // I don't know how these schemas affect deserialization later - OpenAPIJsonSchema("""{"type": "object"}"""), - SchemaId.fromInt(JsonTypes.Json.value) - ) - case JsonTypes.Plain.value => - SchemaWithMetadata( - OpenAPIJsonSchema("""{"type": "string"}"""), - SchemaId.fromInt(JsonTypes.Plain.value) - ) - } - + SchemaWithMetadata( + // I don't know how these schemas affect deserialization later + OpenAPIJsonSchema("""{"type": "object"}"""), + SchemaId.fromString(ContentTypes.JSON.toString) + ) } } diff --git a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/sink/UniversalKafkaSinkFactory.scala b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/sink/UniversalKafkaSinkFactory.scala index a672d844e35..1b0e721eb74 100644 --- a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/sink/UniversalKafkaSinkFactory.scala +++ b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/sink/UniversalKafkaSinkFactory.scala @@ -2,6 +2,7 @@ package pl.touk.nussknacker.engine.schemedkafka.sink import cats.data.NonEmptyList import io.confluent.kafka.schemaregistry.ParsedSchema +import org.apache.flink.formats.avro.typeutils.NkSerializableParsedSchema import pl.touk.nussknacker.engine.api.component.Component.AllowedProcessingModes import pl.touk.nussknacker.engine.api.component.ProcessingMode import pl.touk.nussknacker.engine.api.context.ProcessCompilationError.CustomNodeError @@ -18,6 +19,7 @@ import pl.touk.nussknacker.engine.api.validation.ValidationMode import pl.touk.nussknacker.engine.api.{LazyParameter, MetaData, NodeId, Params} import pl.touk.nussknacker.engine.graph.expression.Expression import pl.touk.nussknacker.engine.schemedkafka.KafkaUniversalComponentTransformer._ +import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.client.OpenAPIJsonSchema import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.{SchemaBasedSerdeProvider, SchemaRegistryClientFactory} import pl.touk.nussknacker.engine.schemedkafka.sink.UniversalKafkaSinkFactory.TransformationState import pl.touk.nussknacker.engine.schemedkafka.{ @@ -71,12 +73,18 @@ class UniversalKafkaSinkFactory( ) ) + private val jsonSchema = RuntimeSchemaData[ParsedSchema]( + new NkSerializableParsedSchema[ParsedSchema](OpenAPIJsonSchema("{}")), + None + ) + private val restrictedParamNames: Set[ParameterName] = Set( topicParamName, schemaVersionParamName, sinkKeyParamName, sinkRawEditorParamName, - sinkValidationModeParamName + sinkValidationModeParamName, + contentTypeParamName ) override protected def topicFrom(value: String): TopicName.ForSink = TopicName.ForSink(value) @@ -84,6 +92,16 @@ class UniversalKafkaSinkFactory( protected def rawEditorParameterStep( context: ValidationContext )(implicit nodeId: NodeId): ContextTransformationDefinition = { + case TransformationStep( + (`topicParamName`, _) :: + (`contentTypeParamName`, _) :: + (`sinkKeyParamName`, _) :: + (`sinkRawEditorParamName`, DefinedEagerParameter(true, _)) :: Nil, + _ + ) => + NextParameters( + validationModeParamDeclaration.createParameter() :: rawValueParamDeclaration.createParameter() :: Nil + ) case TransformationStep( (`topicParamName`, _) :: (`schemaVersionParamName`, _) :: @@ -137,6 +155,40 @@ class UniversalKafkaSinkFactory( .valueOr { errors => FinalResults(context, errors.toList) } + case TransformationStep( + (`topicParamName`, DefinedEagerParameter(_: String, _)) :: + (`contentTypeParamName`, DefinedEagerParameter(contentType: String, _)) :: + (`sinkKeyParamName`, _) :: + (`sinkRawEditorParamName`, _) :: + (`sinkValidationModeParamName`, DefinedEagerParameter(mode: String, _)) :: + (`sinkValueParamName`, value: BaseDefinedParameter) :: Nil, + _ + ) => + val runtimeSchemaData = jsonSchema + schemaSupportDispatcher + .forSchemaType(runtimeSchemaData.schema.schemaType()) + .extractParameter( + runtimeSchemaData.schema, + rawMode = true, + validationMode = extractValidationMode(mode), + rawParameter = rawValueParamDeclaration.createParameter(), + restrictedParamNames + ) + .map { extractedSinkParameter => + val validationAgainstSchemaErrors = extractedSinkParameter + .validateParams(Map(sinkValueParamName -> value)) + .swap + .map(_.toList) + .getOrElse(List.empty) + FinalResults( + context, + validationAgainstSchemaErrors, + Some(TransformationState(runtimeSchemaData, extractedSinkParameter)) + ) + } + .valueOr { errors => + FinalResults(context, errors.toList) + } case TransformationStep( (`topicParamName`, _) :: (`schemaVersionParamName`, _) :: @@ -190,6 +242,36 @@ class UniversalKafkaSinkFactory( .valueOr { errors => FinalResults(context, errors.toList) } + case TransformationStep( + (`topicParamName`, DefinedEagerParameter(_: String, _)) :: + (`contentTypeParamName`, DefinedEagerParameter(contentType: String, _)) :: + (`sinkKeyParamName`, _) :: + (`sinkRawEditorParamName`, DefinedEagerParameter(false, _)) :: Nil, + _ + ) => + val schemaData = jsonSchema + + schemaSupportDispatcher + .forSchemaType(schemaData.schema.schemaType()) + .extractParameter( + schemaData.schema, + rawMode = false, + validationMode = ValidationMode.lax, + rawValueParamDeclaration.createParameter(), + restrictedParamNames + ) + .map[TransformationStepResult] { valueParam => + val state = TransformationState(schemaData, valueParam) + // shouldn't happen except for empty schema, but it can lead to infinite loop... + if (valueParam.toParameters.isEmpty) { + FinalResults(context, Nil, Some(state)) + } else { + NextParameters(valueParam.toParameters, state = Some(state)) + } + } + .valueOr { errors => + FinalResults(context, errors.toList) + } case TransformationStep( (`topicParamName`, _) :: (`schemaVersionParamName`, _) :: @@ -200,6 +282,16 @@ class UniversalKafkaSinkFactory( ) => val errors = state.schemaBasedParameter.validateParams(valueParams.toMap).swap.map(_.toList).getOrElse(Nil) FinalResults(context, errors, Some(state)) + case TransformationStep( + (`topicParamName`, _) :: + (`contentTypeParamName`, _) :: + (`sinkKeyParamName`, _) :: + (`sinkRawEditorParamName`, DefinedEagerParameter(false, _)) :: + valueParams, + Some(state) + ) => + val errors = state.schemaBasedParameter.validateParams(valueParams.toMap).swap.map(_.toList).getOrElse(Nil) + FinalResults(context, errors, Some(state)) } private def getSchema(topic: String, version: String)(implicit nodeId: NodeId) = { diff --git a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/source/UniversalKafkaSourceFactory.scala b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/source/UniversalKafkaSourceFactory.scala index 12a6c6cf9d1..96c5bd9c5e5 100644 --- a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/source/UniversalKafkaSourceFactory.scala +++ b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/source/UniversalKafkaSourceFactory.scala @@ -1,6 +1,6 @@ package pl.touk.nussknacker.engine.schemedkafka.source -import cats.data.Validated.{Invalid, Valid} +import cats.data.Validated.Valid import cats.data.{NonEmptyList, Validated} import io.circe.Json import io.circe.syntax._ @@ -10,7 +10,7 @@ import org.apache.flink.formats.avro.typeutils.NkSerializableParsedSchema import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.record.TimestampType import pl.touk.nussknacker.engine.api.component.UnboundedStreamComponent -import pl.touk.nussknacker.engine.api.context.ProcessCompilationError.{CustomNodeError, FatalUnknownError} +import pl.touk.nussknacker.engine.api.context.ProcessCompilationError.CustomNodeError import pl.touk.nussknacker.engine.api.context.transformation.{DefinedEagerParameter, NodeDependencyValue} import pl.touk.nussknacker.engine.api.context.{ProcessCompilationError, ValidationContext} import pl.touk.nussknacker.engine.api.definition._ @@ -66,61 +66,56 @@ class UniversalKafkaSourceFactory( protected def nextSteps(context: ValidationContext, dependencies: List[NodeDependencyValue])( implicit nodeId: NodeId ): ContextTransformationDefinition = { + case step @ TransformationStep( + (`topicParamName`, DefinedEagerParameter(topic: String, _)) :: + (`contentTypeParamName`, DefinedEagerParameter(contentType: String, _)) :: _, + _ + ) => + val preparedTopic = prepareTopic(topic) + val valueValidationResult = if (contentType.equals("JSON")) { + Valid( + ( + Some( + RuntimeSchemaData[ParsedSchema]( + new NkSerializableParsedSchema[ParsedSchema](OpenAPIJsonSchema("{}")), + Some(SchemaId.fromString(ContentTypes.JSON.toString)) + ) + ), + // This is the type after it leaves source + Unknown + ) + ) + } else { + Valid( + ( + Some( + RuntimeSchemaData[ParsedSchema]( + new NkSerializableParsedSchema[ParsedSchema](OpenAPIJsonSchema("")), + Some(SchemaId.fromString(ContentTypes.PLAIN.toString)) + ) + ), + // This is the type after it leaves source + // TODO: Should be Array[Byte] when handling is implemented + Unknown + ) + ) + } + prepareSourceFinalResults(preparedTopic, valueValidationResult, context, dependencies, step.parameters, Nil) case step @ TransformationStep( (`topicParamName`, DefinedEagerParameter(topic: String, _)) :: (`schemaVersionParamName`, DefinedEagerParameter(version: String, _)) :: _, state ) => - val preparedTopic = prepareTopic(topic) - val topicsWithSchema = topicSelectionStrategy.getTopics(schemaRegistryClient) - val hasSchema: Boolean = - topicsWithSchema.exists(_.contains(UnspecializedTopicName(topic))) + val preparedTopic = prepareTopic(topic) val versionOption = parseVersionOption(version) - val valueValidationResult = - if (hasSchema) { - state match { - case Some(PrecalculatedValueSchemaUniversalKafkaSourceFactoryState(results)) => results - case _ => - determineSchemaAndType( - prepareUniversalValueSchemaDeterminer(preparedTopic, versionOption), - Some(schemaVersionParamName) - ) - } - } else { - versionOption match { - case DynamicSchemaVersion(JsonTypes.Json) => - Valid( - ( - Some( - RuntimeSchemaData[ParsedSchema]( - new NkSerializableParsedSchema[ParsedSchema](OpenAPIJsonSchema("{}")), - Some(SchemaId.fromInt(JsonTypes.Json.value)) - ) - ), - // This is the type after it leaves source - Unknown - ) - ) - case DynamicSchemaVersion(JsonTypes.Plain) => - Valid( - ( - Some( - RuntimeSchemaData[ParsedSchema]( - new NkSerializableParsedSchema[ParsedSchema](OpenAPIJsonSchema("")), - Some(SchemaId.fromInt(JsonTypes.Plain.value)) - ) - ), - // This is the type after it leaves source - Unknown - ) - ) - case _ => - determineSchemaAndType( - prepareUniversalValueSchemaDeterminer(preparedTopic, versionOption), - Some(schemaVersionParamName) - ) - } + state match { + case Some(PrecalculatedValueSchemaUniversalKafkaSourceFactoryState(results)) => results + case _ => + determineSchemaAndType( + prepareUniversalValueSchemaDeterminer(preparedTopic, versionOption), + Some(schemaVersionParamName) + ) } prepareSourceFinalResults(preparedTopic, valueValidationResult, context, dependencies, step.parameters, Nil)