Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding support for required fields #42

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 30 additions & 47 deletions src/main/scala/org/zalando/spark/jsonschema/SchemaConverter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import scala.io.Source
* to the type given in the schema. If this is not possible the whole row will be null (!).
* A field can be null if its type is a 2-element array, one of which is "null". The converted
* schema doesn't check for 'enum' fields, i.e. fields which are limited to a given set.
* It also doesn't check for required fields or if additional properties are set to true
* It also doesn't check for additional properties are set to true
* or false. If a field is specified in the schema, than you can select it and it will
* be null if missing. If a field is not in the schema, it cannot be selected even if
* given in the dataset.
Expand All @@ -24,7 +24,6 @@ case class SchemaType(typeName: String, nullable: Boolean)
private case class NullableDataType(dataType: DataType, nullable: Boolean)

object SchemaConverter {

val SchemaFieldName = "name"
val SchemaFieldType = "type"
val SchemaFieldId = "id"
Expand All @@ -33,6 +32,7 @@ object SchemaConverter {
val SchemaRoot = "/"
val Definitions = "definitions"
val Reference = "$ref"
val Required = "required"
val TypeMap = Map(
"string" -> StringType,
"number" -> DoubleType,
Expand All @@ -44,50 +44,42 @@ object SchemaConverter {
)
var definitions: JsObject = JsObject(Seq.empty)
private var isStrictTypingEnabled: Boolean = true

def disableStrictTyping(): SchemaConverter.type = {
setStrictTyping(false)
}

def enableStrictTyping(): SchemaConverter.type = {
setStrictTyping(true)
}

private def setStrictTyping(b: Boolean) = {
isStrictTypingEnabled = b
this
}

def convertContent(schemaContent: String): StructType = convert(parseSchemaJson(schemaContent))

def convert(inputPath: String): StructType = convert(loadSchemaJson(inputPath))

def convert(inputSchema: JsObject): StructType = {
definitions = (inputSchema \ Definitions).asOpt[JsObject].getOrElse(definitions)
val name = getJsonName(inputSchema).getOrElse(SchemaRoot)
val typeName = getJsonType(inputSchema, name).typeName
if (name == SchemaRoot && typeName == "object") {
val properties = (inputSchema \ SchemaStructContents).asOpt[JsObject].getOrElse(
//TODO validation do something with this
(inputSchema \ SchemaStructContents).asOpt[JsObject].getOrElse(
throw new NoSuchElementException(
s"Root level of schema needs to have a [$SchemaStructContents]-field"
)
)
convertJsonStruct(new StructType, properties, properties.keys.toList)
//End validation do something with this
convertJsonStruct(new StructType, inputSchema)
} else {
throw new IllegalArgumentException(
s"schema needs root level called <$SchemaRoot> and root type <object>. " +
s"Current root is <$name> and type is <$typeName>"
)
}
}

def getJsonName(json: JsValue): Option[String] = (json \ SchemaFieldName).asOpt[String]

def getJsonId(json: JsValue): Option[String] = (json \ SchemaFieldId).asOpt[String]

def getJsonType(json: JsObject, name: String): SchemaType = {
val id = getJsonId(json).getOrElse(name)

(json \ SchemaFieldType).getOrElse(JsNull) match {
case JsString(s) => SchemaType(s, nullable = false)
case JsArray(array) =>
Expand All @@ -96,7 +88,7 @@ object SchemaConverter {
case 1 if nullable =>
throw new IllegalArgumentException("Null type only is not supported")
case 1 =>
SchemaType(array.apply(0).as[String], nullable = nullable)
SchemaType(array.head.as[String], nullable = nullable)
case 2 if nullable =>
array.find(_ != JsString("null"))
.map(i => SchemaType(i.as[String], nullable = nullable))
Expand All @@ -119,33 +111,38 @@ object SchemaConverter {
)
}
}

private def parseSchemaJson(schemaContent: String) = Json.parse(schemaContent).as[JsObject]

private def parseSchemaJson(schemaContent: String): JsObject = Json.parse(schemaContent).as[JsObject]
def loadSchemaJson(filePath: String): JsObject = {
Option(getClass.getResource(filePath)) match {
case Some(relPath) => parseSchemaJson(Source.fromURL(relPath).mkString)
case None => throw new IllegalArgumentException(s"Path can not be reached: $filePath")
}
}

@tailrec
private def convertJsonStruct(schema: StructType, json: JsObject, jsonKeys: List[String]): StructType = {
jsonKeys match {
case Nil => schema
case head :: tail =>
val enrichedSchema = addJsonField(schema, (json \ head).as[JsObject], head)
convertJsonStruct(enrichedSchema, json, tail)
private def convertJsonStruct(schema: StructType, objectDefinition: JsObject): StructType = {
val properties = (JsPath \ SchemaStructContents).asSingleJson(objectDefinition) match {
case JsDefined(v) => v.as[JsObject]
case _: JsUndefined => JsObject(Seq.empty)
}
val requiredProperies: Seq[String] = ((JsPath \ Required).asSingleJson(objectDefinition) match {
case JsDefined(v) => v.as[JsArray]
case _: JsUndefined => JsArray(Seq.empty)
}).as[Seq[String]]
properties.keys.toList.foldLeft(schema) {
(seedSchema, key) =>
addJsonField(
seedSchema,
(properties \ key).as[JsObject],
key,
requiredProperies.exists(k => k.equals(key))
)
}
}

def traversePath(loc: List[String], path: JsPath): JsPath = {
loc match {
case head :: tail => traversePath(tail, path \ head)
case Nil => path
}
}

private def checkRefs(inputJson: JsObject): JsObject = {
val schemaRef = (inputJson \ Reference).asOpt[JsString]
schemaRef match {
Expand All @@ -167,42 +164,28 @@ object SchemaConverter {
case None => inputJson
}
}

private def addJsonField(schema: StructType, inputJson: JsObject, name: String): StructType = {

private def addJsonField(schema: StructType, inputJson: JsObject, name: String, isRequired: Boolean): StructType = {
val json = checkRefs(inputJson)
val fieldType = getFieldType(json, name)

schema.add(getJsonName(json).getOrElse(name), fieldType.dataType, nullable = fieldType.nullable)
schema.add(getJsonName(json).getOrElse(name), fieldType.dataType, nullable = fieldType.nullable || (!isRequired))
}

private def getFieldType(json: JsObject, name: String): NullableDataType = {
val fieldType = getJsonType(json, name)
TypeMap(fieldType.typeName) match {

case dataType: DataType =>
NullableDataType(dataType, fieldType.nullable)

case ArrayType =>
val innerJson = checkRefs((json \ SchemaArrayContents).as[JsObject])
val innerJsonType = getFieldType(innerJson, "")
val dataType = ArrayType(innerJsonType.dataType, innerJsonType.nullable)
NullableDataType(dataType, fieldType.nullable)

case StructType =>
val dataType = getDataType(json, JsPath \ SchemaStructContents)
val dataType = getDataType(json)
NullableDataType(dataType, fieldType.nullable)
}
}

private def getDataType(inputJson: JsObject, contentPath: JsPath): DataType = {
private def getDataType(inputJson: JsObject): DataType = {
val json = checkRefs(inputJson)

val content = contentPath.asSingleJson(json) match {
case JsDefined(v) => v.as[JsObject]
case _: JsUndefined => JsObject(Seq.empty)
}

convertJsonStruct(new StructType, content, content.keys.toList)
convertJsonStruct(new StructType, inputJson)
}
}
1 change: 1 addition & 0 deletions src/test/resources/testJsonSchema4.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{
"$schema": "smallTestSchema",
"type": "object",
"required": ["name", "addressA"],
"properties": {
"name": {
"type": "string"
Expand Down
Loading