Skip to content

Commit

Permalink
fix(timeline): fixes primary key change events (datahub-project#11819)
Browse files Browse the repository at this point in the history
  • Loading branch information
RyanHolstien authored Nov 7, 2024
1 parent b3d3b63 commit ca063dd
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -515,8 +515,8 @@ private static List<ChangeEvent> getPrimaryKeyChangeEvents(
SchemaMetadata targetSchema,
Urn datasetUrn,
AuditStamp auditStamp) {
List<ChangeEvent> primaryKeyChangeEvents = new ArrayList<>();
if (changeCategories != null && changeCategories.contains(ChangeCategory.TECHNICAL_SCHEMA)) {
List<ChangeEvent> primaryKeyChangeEvents = new ArrayList<>();
Set<String> basePrimaryKeys =
(baseSchema != null && baseSchema.getPrimaryKeys() != null)
? new HashSet<>(baseSchema.getPrimaryKeys())
Expand All @@ -529,51 +529,53 @@ private static List<ChangeEvent> getPrimaryKeyChangeEvents(
basePrimaryKeys.stream()
.filter(key -> !targetPrimaryKeys.contains(key))
.collect(Collectors.toSet());
for (String removedBaseKeyField : removedBaseKeys) {
Urn schemaFieldUrn = getSchemaFieldUrn(datasetUrn.toString(), removedBaseKeyField);
primaryKeyChangeEvents.add(
DatasetSchemaFieldChangeEvent.schemaFieldChangeEventBuilder()
.category(ChangeCategory.TECHNICAL_SCHEMA)
.modifier(schemaFieldUrn.toString())
.fieldUrn(schemaFieldUrn)
.fieldPath(removedBaseKeyField)
.entityUrn(datasetUrn.toString())
.operation(ChangeOperation.MODIFY)
.semVerChange(SemanticChangeType.MAJOR)
.description(
BACKWARDS_INCOMPATIBLE_DESC
+ " removal of the primary key field '"
+ removedBaseKeyField
+ "'")
.auditStamp(auditStamp)
.modificationCategory(SchemaFieldModificationCategory.OTHER)
.build());
}

Set<String> addedTargetKeys =
targetPrimaryKeys.stream()
.filter(key -> !basePrimaryKeys.contains(key))
.collect(Collectors.toSet());
if (!removedBaseKeys.isEmpty() || !addedTargetKeys.isEmpty()) {
String keyChangeTarget;
// Just pick the first schema field we can find for the change event
if (!removedBaseKeys.isEmpty()) {
keyChangeTarget = removedBaseKeys.stream().findFirst().get();
} else {
keyChangeTarget = addedTargetKeys.stream().findFirst().get();
}

StringBuilder description =
new StringBuilder(BACKWARDS_INCOMPATIBLE_DESC + " a primary key constraint change.");
if (!removedBaseKeys.isEmpty()) {
description.append(" The following fields were removed:");
removedBaseKeys.forEach(
removedBaseKey -> description.append(" '").append(removedBaseKey).append("'"));
description.append(".");
}
if (!addedTargetKeys.isEmpty()) {
description.append(" The following fields were added:");
addedTargetKeys.forEach(
addedTargetKey -> description.append(" '").append(addedTargetKey).append("'"));
description.append(".");
}
for (String addedTargetKeyField : addedTargetKeys) {
Urn schemaFieldUrn = getSchemaFieldUrn(datasetUrn.toString(), addedTargetKeyField);
primaryKeyChangeEvents.add(
DatasetSchemaFieldChangeEvent.schemaFieldChangeEventBuilder()
.category(ChangeCategory.TECHNICAL_SCHEMA)
.fieldUrn(getSchemaFieldUrn(datasetUrn, keyChangeTarget))
.fieldPath(keyChangeTarget)
.modifier(getSchemaFieldUrn(datasetUrn, keyChangeTarget).toString())
.modifier(getSchemaFieldUrn(datasetUrn, addedTargetKeyField).toString())
.fieldUrn(schemaFieldUrn)
.fieldPath(addedTargetKeyField)
.entityUrn(datasetUrn.toString())
.operation(ChangeOperation.MODIFY)
.semVerChange(SemanticChangeType.MAJOR)
.description(description.toString())
.modificationCategory(SchemaFieldModificationCategory.OTHER)
.description(
BACKWARDS_INCOMPATIBLE_DESC
+ " addition of the primary key field '"
+ addedTargetKeyField
+ "'")
.auditStamp(auditStamp)
.modificationCategory(SchemaFieldModificationCategory.OTHER)
.build());
return primaryKeyChangeEvents;
}
}
return Collections.emptyList();
return primaryKeyChangeEvents;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,10 +204,10 @@ public void testSchemaFieldPrimaryKeyChange() throws Exception {
List<ChangeEvent> actual = test.getChangeEvents(urn, entity, aspect, from, to3, auditStamp);
compareDescriptions(
Set.of(
"A backwards incompatible change due to a primary key constraint change. "
+ "The following fields were removed: 'ID'. The following fields were added: 'ID2'."),
"A backwards incompatible change due to addition of the primary key field 'ID2'",
"A backwards incompatible change due to removal of the primary key field 'ID'"),
actual);
assertEquals(1, actual.size());
assertEquals(actual.size(), 2);
compareModificationCategories(Set.of(SchemaFieldModificationCategory.OTHER.toString()), actual);
}

Expand Down Expand Up @@ -274,10 +274,10 @@ public void testSchemaFieldPrimaryKeyChangeRenameAdd() throws Exception {
List<ChangeEvent> actual = test.getChangeEvents(urn, entity, aspect, from, to3, auditStamp);
compareDescriptions(
Set.of(
"A backwards incompatible change due to a primary key constraint change. "
+ "The following fields were removed: 'ID'. The following fields were added: 'ID2'."),
"A backwards incompatible change due to addition of the primary key field 'ID2'",
"A backwards incompatible change due to removal of the primary key field 'ID'"),
actual);
assertEquals(1, actual.size());
assertEquals(actual.size(), 2);
compareModificationCategories(Set.of(SchemaFieldModificationCategory.OTHER.toString()), actual);

Aspect<SchemaMetadata> to4 =
Expand Down

0 comments on commit ca063dd

Please sign in to comment.