Skip to content

Commit

Permalink
Amendment: Fields should be normalized as part of merging schemas
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Nov 22, 2024
1 parent 31a6e19 commit db04e30
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ object Migrations {
Generates list of all migration for the Schema pair. Top level of schema is always Struct.
*/
def assessSchemaMigration(source: Field, target: Field): ParquetSchemaMigrations =
MigrationFieldPair(Nil, source, Some(target)).migrations.migrations
MigrationFieldPair(Nil, Field.normalize(source), Some(Field.normalize(target))).migrations.migrations


// [parquet] to access this in tests
Expand All @@ -194,7 +194,7 @@ object Migrations {
})

def mergeSchemas(source: Field, target: Field): Either[List[Breaking], Field] = {
val merged = MigrationFieldPair(Nil, source, Some(target)).migrations
val merged = MigrationFieldPair(Nil, Field.normalize(source), Some(Field.normalize(target))).migrations
merged.result match {
case Some(field) => field.asRight
case None => merged.migrations.foldLeft(List.empty[Breaking])((accErr, migration) => migration match {
Expand All @@ -205,5 +205,5 @@ object Migrations {
}

def isSchemaMigrationBreaking(source: Field, target: Field): Boolean =
isSchemaMigrationBreakingFromMigrations(MigrationFieldPair(Nil, source, Some(target)).migrations.migrations)
isSchemaMigrationBreakingFromMigrations(MigrationFieldPair(Nil, Field.normalize(source), Some(Field.normalize(target))).migrations.migrations)
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class MigrationSpec extends org.specs2.Specification {
Produce migration for new top-level fields $e5
Error for type casting in top-level fields $e6
Produce migration for removal of top-level fields $e7
Error with invalid type change in arrays AND nullable change $e8
Error with invalid type change in arrays AND nullable change $e8
Produce migration for adding fields in structs in arrays $e9
Produce migration for removal fields in structs in arrays $e10
Error for type casting in nested arrays $e11
Expand All @@ -30,8 +30,9 @@ class MigrationSpec extends org.specs2.Specification {
Collapse field name collisions $e14
Drop not null constraint when field is removed in next generation $e15
Drop not null constraint when field is added in next generation $e16
Preserve ordering of fields in a struct $e17
Preserve ordering in nested arrays and structs $e18
Normalize the field order as part of merging schemas $e17
Preserve ordering of fields in a struct $e18
Preserve ordering in nested arrays and structs $e19
"""

def e1 = {
Expand Down Expand Up @@ -64,8 +65,8 @@ class MigrationSpec extends org.specs2.Specification {
|}
|}
""".stripMargin)
val schema2 = Field.normalize(Field.build("top", input2, enforceValuePresence = false).get)
Migrations.mergeSchemas(leanBase, schema2) should beRight(schema2) and (
val schema2 = Field.build("top", input2, enforceValuePresence = false).get
Migrations.mergeSchemas(leanBase, schema2) should beRight(Field.normalize(schema2)) and (
Migrations.assessSchemaMigration(leanBase, schema2).map(_.toString) shouldEqual Set("Schema key addition at /object_key/string2_key")
)
}
Expand All @@ -91,7 +92,7 @@ class MigrationSpec extends org.specs2.Specification {
|}
|}
""".stripMargin)
val schema2 = Field.normalize(Field.build("top", input2, enforceValuePresence = false).get)
val schema2 = Field.build("top", input2, enforceValuePresence = false).get
Migrations.mergeSchemas(leanBase, schema2).leftMap(_.map(_.toString)) should beLeft(List("Incompatible type change String to Long at /object_key/nested_key1")) and (
Migrations.assessSchemaMigration(leanBase, schema2).map(_.toString) shouldEqual Set("Incompatible type change String to Long at /object_key/nested_key1")
)
Expand All @@ -117,7 +118,7 @@ class MigrationSpec extends org.specs2.Specification {
|}
|}
""".stripMargin)
val schema2 = Field.normalize(Field.build("top", input2, enforceValuePresence = false).get)
val schema2 = Field.build("top", input2, enforceValuePresence = false).get

Migrations.mergeSchemas(leanBase, schema2) should beRight(leanBase) and (
Migrations.assessSchemaMigration(leanBase, schema2).map(_.toString) shouldEqual Set("Key removal at /object_key/nested_key3"))
Expand Down Expand Up @@ -148,8 +149,8 @@ class MigrationSpec extends org.specs2.Specification {
|}
|}
""".stripMargin)
val schema2 = Field.normalize(Field.build("top", input2, enforceValuePresence = false).get)
Migrations.mergeSchemas(leanBase, schema2) should beRight(schema2) and (
val schema2 = Field.build("top", input2, enforceValuePresence = false).get
Migrations.mergeSchemas(leanBase, schema2) should beRight(Field.normalize(schema2)) and (
Migrations.assessSchemaMigration(leanBase, schema2).map(_.toString) shouldEqual Set("Schema key addition at /string_key2")
)
}
Expand All @@ -174,7 +175,7 @@ class MigrationSpec extends org.specs2.Specification {
|}
|}
""".stripMargin)
val schema2 = Field.normalize(Field.build("top", input2, enforceValuePresence = false).get)
val schema2 = Field.build("top", input2, enforceValuePresence = false).get
Migrations.mergeSchemas(leanBase, schema2).leftMap(_.toString) should beLeft(Set("Incompatible type change String to Long at /string_key"))
Migrations.assessSchemaMigration(leanBase, schema2).map(_.toString) shouldEqual Set("Incompatible type change String to Long at /string_key")
}
Expand All @@ -196,7 +197,7 @@ class MigrationSpec extends org.specs2.Specification {
|}
|}
""".stripMargin)
val schema2 = Field.normalize(Field.build("top", input2, enforceValuePresence = false).get)
val schema2 = Field.build("top", input2, enforceValuePresence = false).get
Migrations.mergeSchemas(leanBase, schema2) should beRight(leanBase) and (
Migrations.assessSchemaMigration(leanBase, schema2).map(_.toString) shouldEqual Set("Key removal at /string_key"))
}
Expand Down Expand Up @@ -234,11 +235,11 @@ class MigrationSpec extends org.specs2.Specification {
val schema2 = Field.build("top", input2, enforceValuePresence = false).get

Migrations.mergeSchemas(schema1, schema2).leftMap(_.map(_.toString)) should beLeft(List(
"Incompatible type change Long to String at /arrayKey/[arrayDown]"
"Incompatible type change Long to String at /array_key/[arrayDown]"
)) and (
Migrations.assessSchemaMigration(schema1, schema2).map(_.toString) shouldEqual Set(
"Changing nullable property to required at /arrayKey/[arrayDown]",
"Incompatible type change Long to String at /arrayKey/[arrayDown]"
"Changing nullable property to required at /array_key/[arrayDown]",
"Incompatible type change Long to String at /array_key/[arrayDown]"
))
}

Expand All @@ -261,7 +262,7 @@ class MigrationSpec extends org.specs2.Specification {
|}
|}
""".stripMargin)
val schema1 = Field.normalize(Field.build("top", input1, enforceValuePresence = false).get)
val schema1 = Field.build("top", input1, enforceValuePresence = false).get

val input2 = SpecHelpers.parseSchema(
"""
Expand All @@ -281,9 +282,9 @@ class MigrationSpec extends org.specs2.Specification {
|}
|}
""".stripMargin)
val schema2 = Field.normalize(Field.build("top", input2, enforceValuePresence = false).get)
val schema2 = Field.build("top", input2, enforceValuePresence = false).get

Migrations.mergeSchemas(schema1, schema2) should beRight(schema2) and (
Migrations.mergeSchemas(schema1, schema2) should beRight(Field.normalize(schema2)) and (
Migrations.assessSchemaMigration(schema1, schema2).map(_.toString) shouldEqual
Set("Schema key addition at /array_key/[arrayDown]/nested_key3")
)
Expand All @@ -308,7 +309,7 @@ class MigrationSpec extends org.specs2.Specification {
|}
|}
""".stripMargin)
val schema1 = Field.normalize(Field.build("top", input1, enforceValuePresence = false).get)
val schema1 = Field.build("top", input1, enforceValuePresence = false).get

val input2 = SpecHelpers.parseSchema(
"""
Expand All @@ -328,9 +329,9 @@ class MigrationSpec extends org.specs2.Specification {
|}
|}
""".stripMargin)
val schema2 = Field.normalize(Field.build("top", input2, enforceValuePresence = false).get)
val schema2 = Field.build("top", input2, enforceValuePresence = false).get

Migrations.mergeSchemas(schema2, schema1) should beRight(schema2) and (
Migrations.mergeSchemas(schema2, schema1) should beRight(Field.normalize(schema2)) and (
Migrations.assessSchemaMigration(schema2, schema1).map(_.toString) shouldEqual
Set("Key removal at /array_key/[arrayDown]/nested_key3")
)
Expand Down Expand Up @@ -377,10 +378,10 @@ class MigrationSpec extends org.specs2.Specification {
val schema2 = Field.build("top", input2, enforceValuePresence = false).get

Migrations.mergeSchemas(schema1, schema2).leftMap(_.map(_.toString)) should beLeft(List(
"Incompatible type change String to Long at /arrayKey/[arrayDown]/nestedKey1"
"Incompatible type change String to Long at /array_key/[arrayDown]/nested_key1"
)) and (
Migrations.assessSchemaMigration(schema1, schema2).map(_.toString) shouldEqual Set(
"Incompatible type change String to Long at /arrayKey/[arrayDown]/nestedKey1"
"Incompatible type change String to Long at /array_key/[arrayDown]/nested_key1"
))
}

Expand All @@ -403,7 +404,7 @@ class MigrationSpec extends org.specs2.Specification {
|}
|}
""".stripMargin)
val schema1 = Field.normalize(Field.build("top", input1, enforceValuePresence = false).get)
val schema1 = Field.build("top", input1, enforceValuePresence = false).get

val input2 = SpecHelpers.parseSchema(
"""
Expand All @@ -423,9 +424,9 @@ class MigrationSpec extends org.specs2.Specification {
|}
|}
""".stripMargin)
val schema2 = Field.normalize(Field.build("top", input2, enforceValuePresence = false).get)
val schema2 = Field.build("top", input2, enforceValuePresence = false).get

Migrations.mergeSchemas(schema1, schema2) should beRight(schema1) and (
Migrations.mergeSchemas(schema1, schema2) should beRight(Field.normalize(schema1)) and (
Migrations.assessSchemaMigration(schema1, schema2).map(_.toString) shouldEqual Set(
"Changing nullable property to required at /array_key/[arrayDown]/nested_key2"
))
Expand All @@ -449,7 +450,7 @@ class MigrationSpec extends org.specs2.Specification {
|}
|}
""".stripMargin)
val schema1 = Field.normalize(Field.build("top", input1, enforceValuePresence = false).get)
val schema1 = Field.build("top", input1, enforceValuePresence = false).get

val input2 = SpecHelpers.parseSchema(
"""
Expand All @@ -459,7 +460,7 @@ class MigrationSpec extends org.specs2.Specification {
|}
|}
""".stripMargin)
val schema2 = Field.normalize(Field.build("top", input2, enforceValuePresence = false).get)
val schema2 = Field.build("top", input2, enforceValuePresence = false).get

Migrations.mergeSchemas(schema1, schema2).map(
f => f.fieldType.asInstanceOf[Struct].fields.head.accessors
Expand All @@ -476,7 +477,7 @@ class MigrationSpec extends org.specs2.Specification {
|"required" : ["k1"]
|}
""".stripMargin)
val schema1 = Field.normalize(Field.build("top", input1, enforceValuePresence = false).get)
val schema1 = Field.build("top", input1, enforceValuePresence = false).get

val input2 = SpecHelpers.parseSchema(
"""
Expand All @@ -486,7 +487,7 @@ class MigrationSpec extends org.specs2.Specification {
|}
|}
""".stripMargin)
val schema2 = Field.normalize(Field.build("top", input2, enforceValuePresence = false).get)
val schema2 = Field.build("top", input2, enforceValuePresence = false).get

Migrations.mergeSchemas(schema1, schema2).map(
f => f.fieldType.asInstanceOf[Struct].fields.last.nullability.nullable
Expand All @@ -503,7 +504,7 @@ class MigrationSpec extends org.specs2.Specification {
|}
|}
""".stripMargin)
val schema1 = Field.normalize(Field.build("top", input1, enforceValuePresence = false).get)
val schema1 = Field.build("top", input1, enforceValuePresence = false).get

val input2 = SpecHelpers.parseSchema(
"""
Expand All @@ -514,16 +515,39 @@ class MigrationSpec extends org.specs2.Specification {
| "required" : ["k2"]
|}
""".stripMargin)
val schema2 = Field.normalize(Field.build("top", input2, enforceValuePresence = false).get)
val schema2 = Field.build("top", input2, enforceValuePresence = false).get

Migrations.mergeSchemas(schema1, schema2).map(
f => f.fieldType.asInstanceOf[Struct].fields.forall(_.nullability.nullable)
) should beRight(true)

}

// Preserve ordering of fields in a struct $e17
// Normalize field order as part of merging schemas $e17
def e17 = {
val inputStruct = Type.Struct(
NonEmptyVector.of(
Field("ccc", Type.String, Nullable),
Field("aaa", Type.String, Nullable),
Field("bbb", Type.String, Nullable),
)
)

val expectedStruct = Type.Struct(
NonEmptyVector.of(
Field("aaa", Type.String, Nullable),
Field("bbb", Type.String, Nullable),
Field("ccc", Type.String, Nullable),
)
)
val input = Field("top", inputStruct, Required)
val expected = Field("top", expectedStruct, Required)

Migrations.mergeSchemas(input, input) must beRight(expected)
}

// Preserve ordering of fields in a struct $e18
def e18 = {
val struct1 = Type.Struct(
NonEmptyVector.of(
Field("vvv", Type.String, Nullable),
Expand Down Expand Up @@ -561,8 +585,8 @@ class MigrationSpec extends org.specs2.Specification {
Migrations.mergeSchemas(field1, field2) must beRight(expected)
}

// Preserve ordering in nested arrays and structs $e18
def e18 = {
// Preserve ordering in nested arrays and structs $e19
def e19 = {
val input1 = SpecHelpers.parseSchema(
"""
|{"type": "object",
Expand Down Expand Up @@ -593,7 +617,7 @@ class MigrationSpec extends org.specs2.Specification {
|}
|}
""".stripMargin)
val schema1 = Field.normalize((Field.build("top", input1, enforceValuePresence = true).get))
val schema1 = (Field.build("top", input1, enforceValuePresence = true).get)

val input2 = SpecHelpers.parseSchema(
"""
Expand Down Expand Up @@ -626,7 +650,7 @@ class MigrationSpec extends org.specs2.Specification {
|}
|}
""".stripMargin)
val schema2 = Field.normalize((Field.build("top", input2, enforceValuePresence = true).get))
val schema2 = (Field.build("top", input2, enforceValuePresence = true).get)

val expected = {

Expand Down

0 comments on commit db04e30

Please sign in to comment.