-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Kernel] Verify that the readSchema is a subset of the snapshotSchema #4038
base: master
Are you sure you want to change the base?
[Kernel] Verify that the readSchema is a subset of the snapshotSchema #4038
Conversation
kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaErrors.java
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/SchemaUtils.java
Outdated
Show resolved
Hide resolved
} | ||
StructType supersetStruct = (StructType) supersetType; | ||
StructType subsetStruct = (StructType) subsetType; | ||
return isSuperset(supersetStruct, subsetStruct); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to block on this, but it's a little weird that we recurse back and forth between these two fx. Wondering if the code in isSuperSet
(i.e. iterating through the fields) should just sit here? Seems like that makes sense and matches the pattern we follow for the other 2 complex types
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good observation. The desire for these two separate methods arises from us wanting a distinct public API that just compares two StructType
s.
For example, we wouldn't want to expose a public utility like isSupersetOrEquivalent(DataType supersetType, DataType subsetType)
. Just exposing one that takes in a StructType seems most intuitive for me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was wondering if we even need this API (the isSuperSet
) here in SchemaUtils
. Do you think having StructType::isSuperSet
is sufficient?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added it to try and encourage us to move away from a very c style of programming where you are just passing in structs/values into utils, and instead of use a more OOP style where you invoke methods on instances of classes.
I find the former not very discoverable and the latter to be very discoverable, in terms of what are the operations I can apply on a given struct/class?
open to thoughts!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I was asking for the opposite... I was wondering if we can just mainly have StructType::isSuperSet
and then some more "internal" method here in the SchemaUtils
that it calls (like this recursive isSupersetOrEquivalent
).
I'm definitely good with having StructType::isSuperSet
-- not asking to remove that at all
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree the recursion between the two functions feels non-ideal. what I was thinking (and maybe this is captured above - not clear what the exact proposal is):
since there is already a public interface for isSuperSetOf
and isSubsetOf
, can't there be a single isSupersetOrEquivalent
that just recurses itself? (i.e. combine this and the isSuperset
above)
if (!isSupersetOrEquivalent(supersetMap.getKeyType(), subsetMap.getKeyType())) { | ||
return false; | ||
} | ||
return isSupersetOrEquivalent(supersetMap.getValueType(), subsetMap.getValueType()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can these just be && together?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do!
private val field_g = new StructField("g", new MapType( | ||
new StructType(Arrays.asList(field_g_key_5)), | ||
new StructType(Arrays.asList(field_g_value_zipzap)), true), true) | ||
private val complexSchema = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if this is the exact schema as you had used in the other example but maybe could you add in a comment the visual diagram of the schema?
It's hard to reason about these just looking at the variables
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great callout, will do!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changes LGTM I just want to take a closer look at the tests after you add this :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some comments. Let me know if you have any qns,
/** Checks if the {@code supersetSchema} is a superset of the {@code subsetSchema}. */ | ||
public static boolean isSuperset(StructType supersetSchema, StructType subsetSchema) { | ||
for (StructField subsetField : subsetSchema.fields()) { | ||
final StructField supersetField = supersetSchema.get(subsetField.getName()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't this be case in-sensitive check?
* <li>Otherwise, use the standard {@link StructType#equivalent} check. | ||
* </ul> | ||
*/ | ||
private static boolean isSupersetOrEquivalent(DataType supersetType, DataType subsetType) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we have one utility method something like:
T schemaTransformer(StructType schema1, StructType schema2, FieldVisitor transformField) {
}
FieldVisitor {
visitMissingField(StructField missingField);
visitField(StructFiled right, StructField left);
}
Haven't thought through the details (may need to pass more arguments to FieldVisitor)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we want something like:
compareStructs(struct1, struct2, func)
- and
transform(struct, func)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for some context on the rust side, we have a SchemaTransform
to encapsulate a generic framework for recursive bottom-up schema transforms (might cover @scottsand-db's transform and also might be useful for the comparison itself if you could somehow traverse simultaneously?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another wonder of mine is we could avoid having compareStructs(struct1, struct2)
and only have transform(struct, func)
.
Now, if we wanted to compare two structs, we could flatten one struct to get a flattened list of all of its fullColumnPath (e.g. top.middle.bottom.nested.colA
) to StructFIeld.
Then we could visit the other struct to be compared, and this visitor knows the column path of all the fields it visits, and then just lookup in the other struct's path->field map if that field exits.
This could let us avoid a compareStructs(struct1, struct2)
which is rather complex
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code gets quite complex ...
public interface TwoStructVisitor {
abstract class BadResult {
public final String colNamePath;
public BadResult(String colNamePath) {
this.colNamePath = colNamePath;
}
}
class FieldMissingOnLeft extends BadResult {
public final StructField rightField;
public FieldMissingOnLeft(String colNamePath, StructField rightField) {
super(colNamePath);
this.rightField = rightField;
}
}
class FieldMissingOnRight extends BadResult {
public final StructField leftField;
public FieldMissingOnRight(String colNamePath, StructField leftField) {
super(colNamePath);
this.leftField = leftField;
}
}
class MismatchedDataType extends BadResult {
public final DataType leftDataType;
public final DataType rightDataType;
public MismatchedDataType(String colNamePath, DataType leftDataType, DataType rightDataType) {
super(colNamePath);
this.leftDataType = leftDataType;
this.rightDataType = rightDataType;
}
}
/** Do NOT recurse into nested structs. This will be done for you. */
default StructField transformLeftField(
String colNamePath, StructField leftField, StructField rightField) {
return leftField;
}
default DataType transformLeftPrimitiveDataType(
String colNamePath, DataType leftDataType, DataType rightDataType) {
return leftDataType;
}
/** Right struct has this field while the left struct does not. */
default FieldMissingOnLeft visitFieldMissingOnLeft(String colNamePath, StructField rightField) {
return new FieldMissingOnLeft(colNamePath, rightField);
}
/** Left struct has this field while the right struct does not. */
default FieldMissingOnRight visitFieldMissingOnRight(
String colNamePath, StructField leftField) {
return new FieldMissingOnRight(colNamePath, leftField);
}
/** The data types of the left and right fields do not match. */
default MismatchedDataType visitMismatchedDataType(
String colNamePath, DataType leftDataType, DataType rightDataType) {
return new MismatchedDataType(colNamePath, leftDataType, rightDataType);
}
}
public static Tuple2<Optional<DataType>, Optional<BadResult>> visitTwoStructTypes(
StructType structToVisit, StructType referenceStruct, TwoStructVisitor visitor) {
final Tuple2<Optional<DataType>, Optional<BadResult>> result =
visitTwoDataTypes("", structToVisit, referenceStruct, visitor);
if (result._1.isPresent() && !(result._1.get() instanceof StructType)) {
throw new RuntimeException("Expected a StructType");
}
return result;
}
private static Tuple2<Optional<DataType>, Optional<BadResult>> visitTwoDataTypes(
String colNamePathPrefix,
DataType leftDataType,
DataType rightDataType,
TwoStructVisitor visitor) {
if (leftDataType instanceof StructType) {
////////////////////////
// Case 1: StructType //
////////////////////////
if (!(rightDataType instanceof StructType)) {
// Assuming visitMismatchedDataType returns a BadResult object
return Tuple2.right(
visitor.visitMismatchedDataType(colNamePathPrefix, leftDataType, rightDataType));
}
final StructType leftStruct = (StructType) leftDataType;
final StructType rightStruct = (StructType) rightDataType;
final List<StructField> newFields = new ArrayList<>();
// Visit all of the fields that are (a) in BOTH structs and (b) ONLY in the left struct
for (StructField leftField : leftStruct.fields()) {
final String fullColNamePath = colNamePathPrefix + escapeDots(leftField.getName());
final StructField rightField = rightStruct.get(leftField.getName());
if (rightField == null) {
return Tuple2.right(visitor.visitFieldMissingOnRight(fullColNamePath, leftField));
} else {
final StructField newFieldOrig =
visitor.transformLeftField(fullColNamePath, leftField, rightField);
final Tuple2<Optional<DataType>, Optional<BadResult>> result =
visitTwoDataTypes(
fullColNamePath, leftField.getDataType(), rightField.getDataType(), visitor);
if (result._2.isPresent()) {
return result;
}
if (!result._1.isPresent()) {
throw new RuntimeException("Expected result._1 to be non-empty");
}
final StructField newFieldFinal =
new StructField(
newFieldOrig.getName(),
result._1.get(),
newFieldOrig.isNullable(),
newFieldOrig.getMetadata());
newFields.add(newFieldFinal);
}
}
// Visit all of the fields that are ONLY in the right struct
for (StructField rightField : rightStruct.fields()) {
final String fullColNamePath = colNamePathPrefix + escapeDots(rightField.getName());
if (leftStruct.get(rightField.getName()) == null) {
visitor.visitFieldMissingOnLeft(fullColNamePath, rightField);
}
}
return Tuple2.left(new StructType(newFields));
} else if (leftDataType instanceof ArrayType) {
///////////////////////
// Case 2: ArrayType //
///////////////////////
if (!(rightDataType instanceof ArrayType)) {
return Tuple2.right(
visitor.visitMismatchedDataType(colNamePathPrefix, leftDataType, rightDataType));
}
final ArrayType leftArray = (ArrayType) leftDataType;
final ArrayType rightArray = (ArrayType) rightDataType;
final Tuple2<Optional<DataType>, Optional<BadResult>> result =
visitTwoDataTypes(
colNamePathPrefix + ".element",
leftArray.getElementType(),
rightArray.getElementType(),
visitor);
if (result._2.isPresent()) {
return result;
}
if (!result._1.isPresent()) {
throw new RuntimeException("Expected result._1 to be non-empty");
}
// TODO: visit the left and right fields? and pick a better valueContainsNull?
return Tuple2.left(new ArrayType(result._1.get(), true));
} else if (leftDataType instanceof MapType) {
/////////////////////
// Case 3: MapType //
/////////////////////
if (!(rightDataType instanceof MapType)) {
return Tuple2.right(
visitor.visitMismatchedDataType(colNamePathPrefix, leftDataType, rightDataType));
}
final MapType leftMap = (MapType) leftDataType;
final MapType rightMap = (MapType) rightDataType;
final Tuple2<Optional<DataType>, Optional<BadResult>> leftResult =
visitTwoDataTypes(
colNamePathPrefix + ".key", leftMap.getKeyType(), rightMap.getKeyType(), visitor);
if (leftResult._2.isPresent()) {
return leftResult;
}
if (!leftResult._1.isPresent()) {
throw new RuntimeException("Expected result._1 to be non-empty");
}
final Tuple2<Optional<DataType>, Optional<BadResult>> rightResult =
visitTwoDataTypes(
colNamePathPrefix + ".value",
leftMap.getValueType(),
rightMap.getValueType(),
visitor);
if (rightResult._2.isPresent()) {
return rightResult;
}
if (!rightResult._1.isPresent()) {
throw new RuntimeException("Expected result._1 to be non-empty");
}
// TODO: visit the left and right fields? and pick a better valueContainsNull?
return Tuple2.left(new MapType(leftResult._1.get(), rightResult._1.get(), true));
} else {
/////////////////////////////
// Case 4: Primitive Types //
/////////////////////////////
if (!leftDataType.equivalent(rightDataType)) {
return Tuple2.right(
visitor.visitMismatchedDataType(colNamePathPrefix, leftDataType, rightDataType));
}
return Tuple2.left(
visitor.transformLeftPrimitiveDataType(colNamePathPrefix, leftDataType, rightDataType));
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update: I don't think we need a transform(schema1, schema2)
.
Instead we could
- flatten schema2 to a map of fullColumnPath --> structField
- define a
transformerFunc(fullPath, field) -> StructField
transform(schema1, transformerFunc)
- the transformerFunc is an interface. the conrete implemetnation can use the flattened schema2 to lookup and compare fields.
@FunctionalInterface
public interface StructFieldMapper {
StructField apply(String fieldFullPath, StructField field);
}
public static StructType transformSchema(StructType schema, StructFieldMapper transformer) {
final DataType result = transformDataType("", schema, transformer);
if (!(result instanceof StructType)) {
throw new RuntimeException("transformDataType should return a StructType");
}
return (StructType) result;
}
private static DataType transformDataType(String fullPath, DataType dataType, StructFieldMapper transformer) {
if (dataType instanceof StructType) {
final StructType currStructType = (StructType) dataType;
final List<StructField> newFields = new ArrayList<>(currStructType.length());
for (StructField currField : currStructType.fields()) {
final String fieldFullPath = fullPath + "." + escapeDots(currField.getName());
final DataType newFieldDataType =
transformDataType(fieldFullPath, currField.getDataType(), transformer);
final StructField transformedField = transformer.apply(fieldFullPath, currField);
final StructField newField =
new StructField(
transformedField.getName(),
newFieldDataType,
transformedField.isNullable(),
transformedField.getMetadata());
newFields.add(newField);
}
return new StructType(newFields);
} else if (dataType instanceof ArrayType) {
final ArrayType currArrayType = (ArrayType) dataType;
final String elementFullPath = fullPath + ".element";
final DataType newElementType =
transformDataType(elementFullPath, currArrayType.getElementType(), transformer);
return new ArrayType(newElementType, currArrayType.containsNull());
} else if (dataType instanceof MapType) {
final MapType currMapType = (MapType) dataType;
final String keyFullPath = fullPath + ".key";
final String valueFullPath = fullPath + ".value";
final DataType newKeyType =
transformDataType(keyFullPath, currMapType.getKeyType(), transformer);
final DataType newValueType =
transformDataType(valueFullPath, currMapType.getValueType(), transformer);
return new MapType(newKeyType, newValueType, currMapType.isValueContainsNull());
} else {
return dataType;
}
}
// TODO: validate the readSchema is a subset of the table schema | ||
if (!readSchema.isSubsetOf(snapshotSchema)) { | ||
throw DeltaErrors.invalidReadSchema(dataPath.toString(), readSchema, snapshotSchema); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Checking for subset is fine, but we also want to add any missing metadata fields in readSchema and add them from the table schema. For example the connector may just want to read colX
, but the table has CM enabled and it is missing the metadata fields such CM physical name. We want to add them back.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we do this later anyways? When we generate the physical schema
I had this same thought but thought we took care of it later; didn't look too far into it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Although on second look I am very confused at what we are doing...
We do
// Physical equivalent of the logical read schema.
StructType physicalReadSchema =
ColumnMapping.convertToPhysicalSchema(
readSchema,
snapshotSchema,
ColumnMapping.getColumnMappingMode(metadata.getConfiguration()));
But in the column mapping utils the parameter names and behavior doesn't seem as expected
/**
* Helper method that converts the logical schema (requested by the connector) to physical schema
* of the data stored in data files based on the table's column mapping mode.
*
* @param logicalSchema Logical schema of the scan
* @param physicalSchema Physical schema of the scan
* @param columnMappingMode Column mapping mode
*/
public static StructType convertToPhysicalSchema(
StructType logicalSchema, StructType physicalSchema, ColumnMappingMode columnMappingMode) {
switch (columnMappingMode) {
case NONE:
return logicalSchema;
case ID: // fall through
case NAME:
boolean includeFieldIds = columnMappingMode == ColumnMappingMode.ID;
return convertToPhysicalSchema(logicalSchema, physicalSchema, includeFieldIds);
default:
throw new UnsupportedOperationException(
"Unsupported column mapping mode: " + columnMappingMode);
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Although on second look I am very confused at what we are doing...
@allisonport-db -- setup a 15 min sync to get some answers here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On a second glance (looking at the actual implementations), I think the functions do what we want them to do, the parameter naming is just a little confusing/misleading.
Here, we use the readSchema to generate the physical schema using the full schema from the table, and prune/keep metadata as needed.
I think
- We should rename the parameters here
logicalSchema
-->readSchema
orconnectorProvidedReadSchema
andphysicalSchema
-->fullSnapshotSchema
orcompleteTableSchema
or something that reflects it's the full schema with all the metadata read directly from the log (it is NOT the physical schema!) - We should probably prune metadata from
case NONE
as well since we do it for the other cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vkorukanti does this align with your understanding of this? I think this code was committed 2 years ago when we added the initial read support
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be a separate PR. I can make this changes if you want.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is the motivation for this change to just fail-fast before doing any tangible work for the scan? in the existing flow I assume the scan would fail deeper at the point of attempting to read a column that doesn't exist?
} | ||
StructType supersetStruct = (StructType) supersetType; | ||
StructType subsetStruct = (StructType) subsetType; | ||
return isSuperset(supersetStruct, subsetStruct); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree the recursion between the two functions feels non-ideal. what I was thinking (and maybe this is captured above - not clear what the exact proposal is):
since there is already a public interface for isSuperSetOf
and isSubsetOf
, can't there be a single isSupersetOrEquivalent
that just recurses itself? (i.e. combine this and the isSuperset
above)
* <li>Otherwise, use the standard {@link StructType#equivalent} check. | ||
* </ul> | ||
*/ | ||
private static boolean isSupersetOrEquivalent(DataType supersetType, DataType subsetType) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for some context on the rust side, we have a SchemaTransform
to encapsulate a generic framework for recursive bottom-up schema transforms (might cover @scottsand-db's transform and also might be useful for the comparison itself if you could somehow traverse simultaneously?)
Which Delta project/connector is this regarding?
Description
Verify that the readSchema is a subset of the snapshotSchema
How was this patch tested?
New UTs.
Does this PR introduce any user-facing changes?
No.