Skip to content

Commit

Permalink
Review fix - created a test with kafka source using topic without schema
Browse files Browse the repository at this point in the history
When topic without schema is selected instead of version, user can now choose ContentType (which for now doesn't change anything, need to implement handling bytes array)
  • Loading branch information
Szymon Bogusz authored and ForrestFairy committed Nov 6, 2024
1 parent 6283b6e commit 4ef3b59
Show file tree
Hide file tree
Showing 11 changed files with 277 additions and 129 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ import pl.touk.nussknacker.engine.schemedkafka.KafkaUniversalComponentTransforme
import pl.touk.nussknacker.engine.schemedkafka.kryo.AvroSerializersRegistrar
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.universal.UniversalSchemaBasedSerdeProvider
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.{
DynamicSchemaVersion,
ExistingSchemaVersion,
LatestSchemaVersion,
PassedContentType,
SchemaRegistryClientFactory,
SchemaVersionOption
}
Expand Down Expand Up @@ -175,7 +175,7 @@ trait KafkaAvroSpecMixin
versionOption match {
case LatestSchemaVersion => s"'${SchemaVersionOption.LatestOptionName}'"
case ExistingSchemaVersion(version) => s"'$version'"
case DynamicSchemaVersion(typ) => s"'$typ'"
case PassedContentType(typ) => s"'$typ'"
}

protected def runAndVerifyResultSingleEvent(
Expand Down Expand Up @@ -312,9 +312,9 @@ trait KafkaAvroSpecMixin

protected def versionOptionToString(versionOption: SchemaVersionOption): String = {
versionOption match {
case LatestSchemaVersion => SchemaVersionOption.LatestOptionName
case ExistingSchemaVersion(v) => v.toString
case DynamicSchemaVersion(typ) => typ.toString
case LatestSchemaVersion => SchemaVersionOption.LatestOptionName
case ExistingSchemaVersion(v) => v.toString
case PassedContentType(typ) => typ.toString
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.Confluen
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.client.MockSchemaRegistryClient
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.universal.MockSchemaRegistryClientFactory
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.{
DynamicSchemaVersion,
ExistingSchemaVersion,
LatestSchemaVersion,
PassedContentType,
SchemaRegistryClientFactory,
SchemaVersionOption
}
Expand Down Expand Up @@ -179,7 +179,7 @@ abstract class FlinkWithKafkaSuite
versionOption match {
case LatestSchemaVersion => s"'${SchemaVersionOption.LatestOptionName}'"
case ExistingSchemaVersion(version) => s"'$version'"
case DynamicSchemaVersion(typ) => s"'$typ'"
case PassedContentType(typ) => s"'$typ'"
}

protected def createAndRegisterAvroTopicConfig(name: String, schemas: List[Schema]): TopicConfig =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package pl.touk.nussknacker.defaultmodel

import com.typesafe.scalalogging.LazyLogging
import io.confluent.kafka.schemaregistry.ParsedSchema
import org.apache.flink.formats.avro.typeutils.NkSerializableParsedSchema
import pl.touk.nussknacker.defaultmodel.SampleSchemas.RecordSchemaV2
import pl.touk.nussknacker.engine.api.process.TopicName.ForSource
import pl.touk.nussknacker.engine.api.validation.ValidationMode
import pl.touk.nussknacker.engine.build.ScenarioBuilder
import pl.touk.nussknacker.engine.graph.expression.Expression
import pl.touk.nussknacker.engine.kafka.KafkaTestUtils.richConsumer
import pl.touk.nussknacker.engine.schemedkafka.{KafkaUniversalComponentTransformer, RuntimeSchemaData}
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.{ContentTypes, SchemaId, SchemaWithMetadata}
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.client.OpenAPIJsonSchema
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.universal.UniversalSchemaSupportDispatcher
import pl.touk.nussknacker.engine.spel.SpelExtension.SpelExpresion
import pl.touk.nussknacker.test.PatientScalaFutures

import java.nio.ByteBuffer
import java.time.Instant
import java.util

class KafkaJsonItSpec extends FlinkWithKafkaSuite with PatientScalaFutures with LazyLogging {

private val givenMatchingAvroObjV2 = avroEncoder.encodeRecordOrError(
Map("first" -> "Jan", "middle" -> "Tomek", "last" -> "Kowalski"),
RecordSchemaV2
)

test("should read json message from kafka without provided schema") {
val inputTopic = "input-topic-without-schema"
val outputTopic = "output-topic-without-schema"

kafkaClient.createTopic(inputTopic, 1)
kafkaClient.createTopic(outputTopic, 1)
sendAsJson(givenMatchingAvroObjV2.toString, ForSource(inputTopic), Instant.now.toEpochMilli)

val process =
ScenarioBuilder
.streaming("without-schema")
.parallelism(1)
.source(
"start",
"kafka",
KafkaUniversalComponentTransformer.topicParamName.value -> Expression.spel(s"'$inputTopic'"),
KafkaUniversalComponentTransformer.contentTypeParamName.value -> s"'${ContentTypes.JSON.toString}'".spel
)
.emptySink(
"end",
"kafka",
KafkaUniversalComponentTransformer.sinkKeyParamName.value -> "".spel,
KafkaUniversalComponentTransformer.sinkRawEditorParamName.value -> "true".spel,
KafkaUniversalComponentTransformer.sinkValueParamName.value -> "#input".spel,
KafkaUniversalComponentTransformer.topicParamName.value -> s"'${outputTopic}'".spel,
KafkaUniversalComponentTransformer.contentTypeParamName.value -> s"'${ContentTypes.JSON.toString}'".spel,
KafkaUniversalComponentTransformer.sinkValidationModeParamName.value -> s"'${ValidationMode.lax.name}'".spel
)

run(process) {
val processed = kafkaClient.createConsumer().consumeWithConsumerRecord(outputTopic).take(1).head

val schema = SchemaWithMetadata(
OpenAPIJsonSchema("""{"type": "object"}"""),
SchemaId.fromString(ContentTypes.JSON.toString)
)
val runtimeSchema = new RuntimeSchemaData(new NkSerializableParsedSchema[ParsedSchema](schema.schema), None)
val response =
UniversalSchemaSupportDispatcher(kafkaConfig)
.forSchemaType("JSON")
.payloadDeserializer
.deserialize(
Some(runtimeSchema),
runtimeSchema,
ByteBuffer.wrap(processed.value())
)
.asInstanceOf[util.HashMap[String, String]]

response.forEach((key, value) => givenMatchingAvroObjV2.get(key) shouldBe value)

}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ object KafkaUniversalComponentTransformer {
final val sinkValueParamName = ParameterName("Value")
final val sinkValidationModeParamName = ParameterName("Value validation mode")
final val sinkRawEditorParamName = ParameterName("Raw editor")
final val contentTypeParamName = ParameterName("Content type")

def extractValidationMode(value: String): ValidationMode =
ValidationMode.fromString(value, sinkValidationModeParamName)
Expand Down Expand Up @@ -74,7 +75,6 @@ abstract class KafkaUniversalComponentTransformer[T, TN <: TopicName: TopicValid
): WithError[ParameterCreatorWithNoDependency with ParameterExtractor[String]] = {
val allTopics = getAllTopics
val topics = allTopics match {
// TODO: previously schemaRegistryClient made validation
case Some(topicsFromKafka) =>
// For test purposes mostly
topicSelectionStrategy
Expand Down Expand Up @@ -114,7 +114,7 @@ abstract class KafkaUniversalComponentTransformer[T, TN <: TopicName: TopicValid
)
}

protected def getVersionParam(
protected def getVersionOrContentTypeParam(
preparedTopic: PreparedKafkaTopic[TN],
)(implicit nodeId: NodeId): WithError[ParameterCreatorWithNoDependency with ParameterExtractor[String]] = {
val topicsWithSchema = topicSelectionStrategy.getTopics(schemaRegistryClient)
Expand All @@ -129,16 +129,16 @@ abstract class KafkaUniversalComponentTransformer[T, TN <: TopicName: TopicValid
)
}).map(getVersionParam)
} else {
val versionValues = List(
FixedExpressionValue("'Json'", "Json"),
FixedExpressionValue("'Plain'", "Plain")
val contentTypesValues = List(
FixedExpressionValue("'JSON'", "JSON"),
FixedExpressionValue("'PLAIN'", "PLAIN")
)

Writer[List[ProcessCompilationError], List[FixedExpressionValue]](Nil, versionValues).map(versions =>
Writer[List[ProcessCompilationError], List[FixedExpressionValue]](Nil, contentTypesValues).map(contentTypes =>
ParameterDeclaration
.mandatory[String](KafkaUniversalComponentTransformer.schemaVersionParamName)
.mandatory[String](KafkaUniversalComponentTransformer.contentTypeParamName)
.withCreator(
modify = _.copy(editor = Some(FixedValuesParameterEditor(versions)))
modify = _.copy(editor = Some(FixedValuesParameterEditor(contentTypes)))
)
)
}
Expand Down Expand Up @@ -224,13 +224,13 @@ abstract class KafkaUniversalComponentTransformer[T, TN <: TopicName: TopicValid
nextParams: List[Parameter]
)(implicit nodeId: NodeId): ContextTransformationDefinition = {
case TransformationStep((topicParamName, DefinedEagerParameter(topic: String, _)) :: Nil, _) =>
val preparedTopic = prepareTopic(topic)
val versionParam = getVersionParam(preparedTopic)
val preparedTopic = prepareTopic(topic)
val versionOrContentTypeParam = getVersionOrContentTypeParam(preparedTopic)
val topicValidationErrors =
validateTopic(preparedTopic.prepared).swap.toList.map(_.toCustomNodeError(nodeId.id, Some(topicParamName)))
NextParameters(
versionParam.value.createParameter() :: nextParams,
errors = versionParam.written ++ topicValidationErrors
versionOrContentTypeParam.value.createParameter() :: nextParams,
errors = versionOrContentTypeParam.written ++ topicValidationErrors
)
case TransformationStep((`topicParamName`, _) :: Nil, _) =>
NextParameters(parameters = fallbackVersionOptionParam.createParameter() :: nextParams)
Expand All @@ -245,12 +245,12 @@ abstract class KafkaUniversalComponentTransformer[T, TN <: TopicName: TopicValid

// override it if you use other parameter name for topic
@transient protected lazy val topicParamName: ParameterName = KafkaUniversalComponentTransformer.topicParamName
@transient protected lazy val contentTypeParamName: ParameterName =
KafkaUniversalComponentTransformer.contentTypeParamName

protected def getAllTopics: Option[List[UnspecializedTopicName]] = {
Try {
val validatorConfig = kafkaConfig.topicsExistenceValidationConfig.validatorConfig

// TODO: check here if can check before logging that server is unavailable
KafkaUtils
.usingAdminClient(kafkaConfig) {
_.listTopics(new ListTopicsOptions().timeoutMs(validatorConfig.adminClientTimeout.toMillis.toInt))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import io.confluent.kafka.schemaregistry.ParsedSchema
import io.confluent.kafka.schemaregistry.avro.AvroSchema
import org.apache.flink.formats.avro.typeutils.NkSerializableParsedSchema
import pl.touk.nussknacker.engine.kafka.UnspecializedTopicName
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.ContentTypes.ContentType
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.client.OpenAPIJsonSchema
import pl.touk.nussknacker.engine.schemedkafka.{AvroSchemaDeterminer, RuntimeSchemaData, SchemaDeterminerError}

Expand Down Expand Up @@ -57,8 +58,8 @@ class ParsedSchemaDeterminer(
case LatestSchemaVersion =>
val version = None
getTypedSchema(version)
case DynamicSchemaVersion(typ) =>
getDynamicSchema(typ)
case PassedContentType(typ) =>
getEmptyJsonSchema(typ)
}

}
Expand All @@ -76,9 +77,11 @@ class ParsedSchemaDeterminer(
)
}

private def getDynamicSchema(typ: JsonTypes): Validated[SchemaDeterminerError, RuntimeSchemaData[ParsedSchema]] = {
private def getEmptyJsonSchema(
typ: ContentType
): Validated[SchemaDeterminerError, RuntimeSchemaData[ParsedSchema]] = {
typ match {
case JsonTypes.Json =>
case ContentTypes.JSON =>
Valid(
RuntimeSchemaData[ParsedSchema](
new NkSerializableParsedSchema[ParsedSchema](
Expand All @@ -87,14 +90,14 @@ class ParsedSchemaDeterminer(
"{}"
)
),
Some(SchemaId.fromInt(JsonTypes.Json.value))
Some(SchemaId.fromString(ContentTypes.JSON.toString))
)
)
case JsonTypes.Plain =>
case ContentTypes.PLAIN =>
Valid(
RuntimeSchemaData[ParsedSchema](
new NkSerializableParsedSchema[ParsedSchema](OpenAPIJsonSchema("")),
Some(SchemaId.fromInt(JsonTypes.Plain.value))
Some(SchemaId.fromString(ContentTypes.PLAIN.toString))
)
)
case _ => Invalid(new SchemaDeterminerError("Wrong dynamic type", SchemaError.apply("Wrong dynamic type")))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package pl.touk.nussknacker.engine.schemedkafka.schemaregistry

import enumeratum.values.{IntEnum, IntEnumEntry}
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.ContentTypes.ContentType
import pl.touk.nussknacker.engine.util.convert.IntValue

sealed trait SchemaVersionOption
Expand All @@ -15,8 +15,8 @@ object SchemaVersionOption {
name match {
case `LatestOptionName` => LatestSchemaVersion
case IntValue(version) => ExistingSchemaVersion(version)
case `JsonOptionName` => DynamicSchemaVersion(JsonTypes.Json)
case `PlainOptionName` => DynamicSchemaVersion(JsonTypes.Plain)
case `JsonOptionName` => PassedContentType(ContentTypes.JSON)
case `PlainOptionName` => PassedContentType(ContentTypes.PLAIN)
case _ => throw new IllegalArgumentException(s"Unexpected schema version option: $name")
}
}
Expand All @@ -27,13 +27,10 @@ case class ExistingSchemaVersion(version: Int) extends SchemaVersionOption

case object LatestSchemaVersion extends SchemaVersionOption

case class DynamicSchemaVersion(typ: JsonTypes) extends SchemaVersionOption
case class PassedContentType(typ: ContentType) extends SchemaVersionOption

sealed abstract class JsonTypes(val value: Int) extends IntEnumEntry
object ContentTypes extends Enumeration {
type ContentType = Value

object JsonTypes extends IntEnum[JsonTypes] {
val values = findValues

case object Json extends JsonTypes(1)
case object Plain extends JsonTypes(2)
val JSON, PLAIN = Value
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ import pl.touk.nussknacker.engine.kafka.consumerrecord.SerializableConsumerRecor
import pl.touk.nussknacker.engine.kafka.{KafkaConfig, RecordFormatter, UnspecializedTopicName, serialization}
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.client.OpenAPIJsonSchema
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.{
IntSchemaId,
JsonTypes,
ContentTypes,
SchemaId,
SchemaIdFromMessageExtractor,
SchemaRegistryClient,
SchemaWithMetadata
SchemaWithMetadata,
StringSchemaId
}

import java.nio.charset.StandardCharsets
Expand Down Expand Up @@ -117,25 +117,12 @@ abstract class AbstractSchemaBasedRecordFormatter[K: ClassTag, V: ClassTag] exte
(keyBytes, valueBytes)
} else {
val valueSchemaOpt =
record.valueSchemaId match {
case Some(IntSchemaId(JsonTypes.Json.value)) =>
Option(
SchemaWithMetadata(
OpenAPIJsonSchema("""{"type": "object"}"""),
SchemaId.fromInt(JsonTypes.Json.value)
).schema
)
case Some(IntSchemaId(JsonTypes.Plain.value)) =>
Option(
SchemaWithMetadata(
OpenAPIJsonSchema("""{"type": "string"}"""),
SchemaId.fromInt(JsonTypes.Plain.value)
).schema
)
case None =>
Option(SchemaWithMetadata(OpenAPIJsonSchema("{}"), SchemaId.fromInt(JsonTypes.Json.value)).schema)
case _ => throw new IllegalStateException()
}
Option(
SchemaWithMetadata(
OpenAPIJsonSchema("""{"type": "object"}"""),
SchemaId.fromString(ContentTypes.JSON.toString)
).schema
)
val valueBytes = readValueMessage(valueSchemaOpt, topic, value)
(keyBytes, valueBytes)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,9 @@ object JsonSchemaSupport extends ParsedSchemaSupport[OpenAPIJsonSchema] {
(value: Any) => {
// In ad-hoc test without schema we create object `{ "Value" = userInputInAdHoc }`, so if present we should just take the input
Try {
val temp = value.asInstanceOf[Map[String, Map[String, Any]]].head
if (temp._1.equals("Value")) {
temp._2
val (key, values) = value.asInstanceOf[Map[String, Map[String, Any]]].head
if (key.equals("Value")) {
values
} else Failure
} match {
// For normal usage
Expand Down
Loading

0 comments on commit 4ef3b59

Please sign in to comment.