Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Kernel] Verify that the readSchema is a subset of the snapshotSchema #4038

Open
wants to merge 7 commits into
base: master
Choose a base branch
from

Conversation

scottsand-db
Copy link
Collaborator

Which Delta project/connector is this regarding?

  • Spark
  • Standalone
  • Flink
  • Kernel
  • Other (fill in here)

Description

Verify that the readSchema is a subset of the snapshotSchema

How was this patch tested?

New UTs.

Does this PR introduce any user-facing changes?

No.

}
StructType supersetStruct = (StructType) supersetType;
StructType subsetStruct = (StructType) subsetType;
return isSuperset(supersetStruct, subsetStruct);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to block on this, but it's a little weird that we recurse back and forth between these two fx. Wondering if the code in isSuperSet (i.e. iterating through the fields) should just sit here? Seems like that makes sense and matches the pattern we follow for the other 2 complex types

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good observation. The desire for these two separate methods arises from us wanting a distinct public API that just compares two StructTypes.

For example, we wouldn't want to expose a public utility like isSupersetOrEquivalent(DataType supersetType, DataType subsetType). Just exposing one that takes in a StructType seems most intuitive for me.

Copy link
Collaborator

@allisonport-db allisonport-db Jan 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering if we even need this API (the isSuperSet) here in SchemaUtils. Do you think having StructType::isSuperSet is sufficient?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added it to try and encourage us to move away from a very c style of programming where you are just passing in structs/values into utils, and instead of use a more OOP style where you invoke methods on instances of classes.

I find the former not very discoverable and the latter to be very discoverable, in terms of what are the operations I can apply on a given struct/class?

open to thoughts!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I was asking for the opposite... I was wondering if we can just mainly have StructType::isSuperSet and then some more "internal" method here in the SchemaUtils that it calls (like this recursive isSupersetOrEquivalent).

I'm definitely good with having StructType::isSuperSet-- not asking to remove that at all

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree the recursion between the two functions feels non-ideal. what I was thinking (and maybe this is captured above - not clear what the exact proposal is):

since there is already a public interface for isSuperSetOf and isSubsetOf, can't there be a single isSupersetOrEquivalent that just recurses itself? (i.e. combine this and the isSuperset above)

Comment on lines +234 to +237
if (!isSupersetOrEquivalent(supersetMap.getKeyType(), subsetMap.getKeyType())) {
return false;
}
return isSupersetOrEquivalent(supersetMap.getValueType(), subsetMap.getValueType());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can these just be && together?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do!

private val field_g = new StructField("g", new MapType(
new StructType(Arrays.asList(field_g_key_5)),
new StructType(Arrays.asList(field_g_value_zipzap)), true), true)
private val complexSchema =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this is the exact schema as you had used in the other example but maybe could you add in a comment the visual diagram of the schema?

It's hard to reason about these just looking at the variables

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great callout, will do!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes LGTM I just want to take a closer look at the tests after you add this :)

Copy link
Collaborator

@vkorukanti vkorukanti left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some comments. Let me know if you have any qns,

/** Checks if the {@code supersetSchema} is a superset of the {@code subsetSchema}. */
public static boolean isSuperset(StructType supersetSchema, StructType subsetSchema) {
for (StructField subsetField : subsetSchema.fields()) {
final StructField supersetField = supersetSchema.get(subsetField.getName());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this be case in-sensitive check?

* <li>Otherwise, use the standard {@link StructType#equivalent} check.
* </ul>
*/
private static boolean isSupersetOrEquivalent(DataType supersetType, DataType subsetType) {
Copy link
Collaborator

@vkorukanti vkorukanti Jan 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we have one utility method something like:

T schemaTransformer(StructType schema1, StructType schema2, FieldVisitor transformField) {
  
}

FieldVisitor {
   visitMissingField(StructField missingField);

   visitField(StructFiled right, StructField left);
}

Haven't thought through the details (may need to pass more arguments to FieldVisitor)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we want something like:

  • compareStructs(struct1, struct2, func)
  • and
  • transform(struct, func)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for some context on the rust side, we have a SchemaTransform to encapsulate a generic framework for recursive bottom-up schema transforms (might cover @scottsand-db's transform and also might be useful for the comparison itself if you could somehow traverse simultaneously?)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another wonder of mine is we could avoid having compareStructs(struct1, struct2) and only have transform(struct, func).

Now, if we wanted to compare two structs, we could flatten one struct to get a flattened list of all of its fullColumnPath (e.g. top.middle.bottom.nested.colA) to StructFIeld.

Then we could visit the other struct to be compared, and this visitor knows the column path of all the fields it visits, and then just lookup in the other struct's path->field map if that field exits.

This could let us avoid a compareStructs(struct1, struct2) which is rather complex

Copy link
Collaborator Author

@scottsand-db scottsand-db Jan 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code gets quite complex ...

public interface TwoStructVisitor {
    abstract class BadResult {
      public final String colNamePath;

      public BadResult(String colNamePath) {
        this.colNamePath = colNamePath;
      }
    }

    class FieldMissingOnLeft extends BadResult {
      public final StructField rightField;

      public FieldMissingOnLeft(String colNamePath, StructField rightField) {
        super(colNamePath);
        this.rightField = rightField;
      }
    }

    class FieldMissingOnRight extends BadResult {
      public final StructField leftField;

      public FieldMissingOnRight(String colNamePath, StructField leftField) {
        super(colNamePath);
        this.leftField = leftField;
      }
    }

    class MismatchedDataType extends BadResult {
      public final DataType leftDataType;
      public final DataType rightDataType;

      public MismatchedDataType(String colNamePath, DataType leftDataType, DataType rightDataType) {
        super(colNamePath);
        this.leftDataType = leftDataType;
        this.rightDataType = rightDataType;
      }
    }

    /** Do NOT recurse into nested structs. This will be done for you. */
    default StructField transformLeftField(
        String colNamePath, StructField leftField, StructField rightField) {
      return leftField;
    }

    default DataType transformLeftPrimitiveDataType(
        String colNamePath, DataType leftDataType, DataType rightDataType) {
      return leftDataType;
    }

    /** Right struct has this field while the left struct does not. */
    default FieldMissingOnLeft visitFieldMissingOnLeft(String colNamePath, StructField rightField) {
      return new FieldMissingOnLeft(colNamePath, rightField);
    }

    /** Left struct has this field while the right struct does not. */
    default FieldMissingOnRight visitFieldMissingOnRight(
        String colNamePath, StructField leftField) {
      return new FieldMissingOnRight(colNamePath, leftField);
    }

    /** The data types of the left and right fields do not match. */
    default MismatchedDataType visitMismatchedDataType(
        String colNamePath, DataType leftDataType, DataType rightDataType) {
      return new MismatchedDataType(colNamePath, leftDataType, rightDataType);
    }
  }

  public static Tuple2<Optional<DataType>, Optional<BadResult>> visitTwoStructTypes(
      StructType structToVisit, StructType referenceStruct, TwoStructVisitor visitor) {
    final Tuple2<Optional<DataType>, Optional<BadResult>> result =
        visitTwoDataTypes("", structToVisit, referenceStruct, visitor);

    if (result._1.isPresent() && !(result._1.get() instanceof StructType)) {
      throw new RuntimeException("Expected a StructType");
    }

    return result;
  }

  private static Tuple2<Optional<DataType>, Optional<BadResult>> visitTwoDataTypes(
      String colNamePathPrefix,
      DataType leftDataType,
      DataType rightDataType,
      TwoStructVisitor visitor) {
    if (leftDataType instanceof StructType) {
      ////////////////////////
      // Case 1: StructType //
      ////////////////////////
      if (!(rightDataType instanceof StructType)) {
        // Assuming visitMismatchedDataType returns a BadResult object
        return Tuple2.right(
            visitor.visitMismatchedDataType(colNamePathPrefix, leftDataType, rightDataType));
      }

      final StructType leftStruct = (StructType) leftDataType;
      final StructType rightStruct = (StructType) rightDataType;
      final List<StructField> newFields = new ArrayList<>();

      // Visit all of the fields that are (a) in BOTH structs and (b) ONLY in the left struct
      for (StructField leftField : leftStruct.fields()) {
        final String fullColNamePath = colNamePathPrefix + escapeDots(leftField.getName());
        final StructField rightField = rightStruct.get(leftField.getName());
        if (rightField == null) {
          return Tuple2.right(visitor.visitFieldMissingOnRight(fullColNamePath, leftField));
        } else {
          final StructField newFieldOrig =
              visitor.transformLeftField(fullColNamePath, leftField, rightField);
          final Tuple2<Optional<DataType>, Optional<BadResult>> result =
              visitTwoDataTypes(
                  fullColNamePath, leftField.getDataType(), rightField.getDataType(), visitor);
          if (result._2.isPresent()) {
            return result;
          }
          if (!result._1.isPresent()) {
            throw new RuntimeException("Expected result._1 to be non-empty");
          }

          final StructField newFieldFinal =
              new StructField(
                  newFieldOrig.getName(),
                  result._1.get(),
                  newFieldOrig.isNullable(),
                  newFieldOrig.getMetadata());
          newFields.add(newFieldFinal);
        }
      }

      // Visit all of the fields that are ONLY in the right struct
      for (StructField rightField : rightStruct.fields()) {
        final String fullColNamePath = colNamePathPrefix + escapeDots(rightField.getName());
        if (leftStruct.get(rightField.getName()) == null) {
          visitor.visitFieldMissingOnLeft(fullColNamePath, rightField);
        }
      }

      return Tuple2.left(new StructType(newFields));
    } else if (leftDataType instanceof ArrayType) {
      ///////////////////////
      // Case 2: ArrayType //
      ///////////////////////
      if (!(rightDataType instanceof ArrayType)) {
        return Tuple2.right(
            visitor.visitMismatchedDataType(colNamePathPrefix, leftDataType, rightDataType));
      }

      final ArrayType leftArray = (ArrayType) leftDataType;
      final ArrayType rightArray = (ArrayType) rightDataType;

      final Tuple2<Optional<DataType>, Optional<BadResult>> result =
          visitTwoDataTypes(
              colNamePathPrefix + ".element",
              leftArray.getElementType(),
              rightArray.getElementType(),
              visitor);
      if (result._2.isPresent()) {
        return result;
      }
      if (!result._1.isPresent()) {
        throw new RuntimeException("Expected result._1 to be non-empty");
      }

      // TODO: visit the left and right fields? and pick a better valueContainsNull?
      return Tuple2.left(new ArrayType(result._1.get(), true));
    } else if (leftDataType instanceof MapType) {
      /////////////////////
      // Case 3: MapType //
      /////////////////////
      if (!(rightDataType instanceof MapType)) {
        return Tuple2.right(
            visitor.visitMismatchedDataType(colNamePathPrefix, leftDataType, rightDataType));
      }

      final MapType leftMap = (MapType) leftDataType;
      final MapType rightMap = (MapType) rightDataType;

      final Tuple2<Optional<DataType>, Optional<BadResult>> leftResult =
          visitTwoDataTypes(
              colNamePathPrefix + ".key", leftMap.getKeyType(), rightMap.getKeyType(), visitor);

      if (leftResult._2.isPresent()) {
        return leftResult;
      }
      if (!leftResult._1.isPresent()) {
        throw new RuntimeException("Expected result._1 to be non-empty");
      }

      final Tuple2<Optional<DataType>, Optional<BadResult>> rightResult =
          visitTwoDataTypes(
              colNamePathPrefix + ".value",
              leftMap.getValueType(),
              rightMap.getValueType(),
              visitor);

      if (rightResult._2.isPresent()) {
        return rightResult;
      }
      if (!rightResult._1.isPresent()) {
        throw new RuntimeException("Expected result._1 to be non-empty");
      }

      // TODO: visit the left and right fields? and pick a better valueContainsNull?
      return Tuple2.left(new MapType(leftResult._1.get(), rightResult._1.get(), true));
    } else {
      /////////////////////////////
      // Case 4: Primitive Types //
      /////////////////////////////

      if (!leftDataType.equivalent(rightDataType)) {
        return Tuple2.right(
            visitor.visitMismatchedDataType(colNamePathPrefix, leftDataType, rightDataType));
      }

      return Tuple2.left(
          visitor.transformLeftPrimitiveDataType(colNamePathPrefix, leftDataType, rightDataType));
    }
  }

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update: I don't think we need a transform(schema1, schema2).

Instead we could

  1. flatten schema2 to a map of fullColumnPath --> structField
  2. define a transformerFunc(fullPath, field) -> StructField
  3. transform(schema1, transformerFunc)
  4. the transformerFunc is an interface. the conrete implemetnation can use the flattened schema2 to lookup and compare fields.
@FunctionalInterface
  public interface StructFieldMapper {
    StructField apply(String fieldFullPath, StructField field);
  }

  public static StructType transformSchema(StructType schema, StructFieldMapper transformer) {
    final DataType result = transformDataType("", schema, transformer);
    if (!(result instanceof StructType)) {
      throw new RuntimeException("transformDataType should return a StructType");
    }
    return (StructType) result;
  }

  private static DataType transformDataType(String fullPath, DataType dataType, StructFieldMapper transformer) {
    if (dataType instanceof StructType) {
      final StructType currStructType = (StructType) dataType;
      final List<StructField> newFields = new ArrayList<>(currStructType.length());
      for (StructField currField : currStructType.fields()) {
        final String fieldFullPath = fullPath + "." + escapeDots(currField.getName());
        final DataType newFieldDataType =
            transformDataType(fieldFullPath, currField.getDataType(), transformer);
        final StructField transformedField = transformer.apply(fieldFullPath, currField);
        final StructField newField =
            new StructField(
                transformedField.getName(),
                newFieldDataType,
                transformedField.isNullable(),
                transformedField.getMetadata());
        newFields.add(newField);
      }
      return new StructType(newFields);
    } else if (dataType instanceof ArrayType) {
      final ArrayType currArrayType = (ArrayType) dataType;
      final String elementFullPath = fullPath + ".element";
      final DataType newElementType =
          transformDataType(elementFullPath, currArrayType.getElementType(), transformer);
      return new ArrayType(newElementType, currArrayType.containsNull());
    } else if (dataType instanceof MapType) {
      final MapType currMapType = (MapType) dataType;
      final String keyFullPath = fullPath + ".key";
      final String valueFullPath = fullPath + ".value";
      final DataType newKeyType =
          transformDataType(keyFullPath, currMapType.getKeyType(), transformer);
      final DataType newValueType =
          transformDataType(valueFullPath, currMapType.getValueType(), transformer);
      return new MapType(newKeyType, newValueType, currMapType.isValueContainsNull());
    } else {
      return dataType;
    }
  }

// TODO: validate the readSchema is a subset of the table schema
if (!readSchema.isSubsetOf(snapshotSchema)) {
throw DeltaErrors.invalidReadSchema(dataPath.toString(), readSchema, snapshotSchema);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checking for subset is fine, but we also want to add any missing metadata fields in readSchema and add them from the table schema. For example the connector may just want to read colX, but the table has CM enabled and it is missing the metadata fields such CM physical name. We want to add them back.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we do this later anyways? When we generate the physical schema

I had this same thought but thought we took care of it later; didn't look too far into it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although on second look I am very confused at what we are doing...

We do

    // Physical equivalent of the logical read schema.
    StructType physicalReadSchema =
        ColumnMapping.convertToPhysicalSchema(
            readSchema,
            snapshotSchema,
            ColumnMapping.getColumnMappingMode(metadata.getConfiguration()));

But in the column mapping utils the parameter names and behavior doesn't seem as expected

  /**
   * Helper method that converts the logical schema (requested by the connector) to physical schema
   * of the data stored in data files based on the table's column mapping mode.
   *
   * @param logicalSchema Logical schema of the scan
   * @param physicalSchema Physical schema of the scan
   * @param columnMappingMode Column mapping mode
   */
  public static StructType convertToPhysicalSchema(
      StructType logicalSchema, StructType physicalSchema, ColumnMappingMode columnMappingMode) {
    switch (columnMappingMode) {
      case NONE:
        return logicalSchema;
      case ID: // fall through
      case NAME:
        boolean includeFieldIds = columnMappingMode == ColumnMappingMode.ID;
        return convertToPhysicalSchema(logicalSchema, physicalSchema, includeFieldIds);
      default:
        throw new UnsupportedOperationException(
            "Unsupported column mapping mode: " + columnMappingMode);
    }
  }

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although on second look I am very confused at what we are doing...

@allisonport-db -- setup a 15 min sync to get some answers here?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On a second glance (looking at the actual implementations), I think the functions do what we want them to do, the parameter naming is just a little confusing/misleading.

Here, we use the readSchema to generate the physical schema using the full schema from the table, and prune/keep metadata as needed.

I think

  1. We should rename the parameters here logicalSchema --> readSchema or connectorProvidedReadSchema and physicalSchema --> fullSnapshotSchema or completeTableSchema or something that reflects it's the full schema with all the metadata read directly from the log (it is NOT the physical schema!)
  2. We should probably prune metadata from case NONE as well since we do it for the other cases.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vkorukanti does this align with your understanding of this? I think this code was committed 2 years ago when we added the initial read support

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be a separate PR. I can make this changes if you want.

Copy link
Collaborator

@zachschuermann zachschuermann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the motivation for this change to just fail-fast before doing any tangible work for the scan? in the existing flow I assume the scan would fail deeper at the point of attempting to read a column that doesn't exist?

}
StructType supersetStruct = (StructType) supersetType;
StructType subsetStruct = (StructType) subsetType;
return isSuperset(supersetStruct, subsetStruct);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree the recursion between the two functions feels non-ideal. what I was thinking (and maybe this is captured above - not clear what the exact proposal is):

since there is already a public interface for isSuperSetOf and isSubsetOf, can't there be a single isSupersetOrEquivalent that just recurses itself? (i.e. combine this and the isSuperset above)

* <li>Otherwise, use the standard {@link StructType#equivalent} check.
* </ul>
*/
private static boolean isSupersetOrEquivalent(DataType supersetType, DataType subsetType) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for some context on the rust side, we have a SchemaTransform to encapsulate a generic framework for recursive bottom-up schema transforms (might cover @scottsand-db's transform and also might be useful for the comparison itself if you could somehow traverse simultaneously?)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants