Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DAT-18897: added databricks specific configurations to the XSD element 'extendedTableProperties' #204

Merged
merged 16 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import java.util.ArrayList;
import java.util.List;

import static liquibase.ext.databricks.parser.NamespaceDetailsDatabricks.DATABRICKS_NAMESPACE;

@DatabaseChange(name = "alterCluster", description = "Alter Cluster", priority = PrioritizedService.PRIORITY_DATABASE +500)
public class AlterClusterChangeDatabricks extends AbstractChange {

Expand Down Expand Up @@ -104,4 +106,9 @@ public List<NoneConfig> getClusterBy() {
public void setClusterBy(List<NoneConfig> clusterBy) {
this.clusterBy = clusterBy;
}

@Override
public String getSerializedObjectNamespace() {
return DATABRICKS_NAMESPACE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import liquibase.database.Database;
import liquibase.exception.ValidationErrors;
import liquibase.ext.databricks.database.DatabricksDatabase;
import liquibase.ext.databricks.parser.NamespaceDetailsDatabricks;
import liquibase.servicelocator.PrioritizedService;
import liquibase.statement.core.CreateTableStatement;
import lombok.Setter;
Expand All @@ -15,6 +14,10 @@
@DatabaseChange(name = "createTable", description = "Create Table", priority = PrioritizedService.PRIORITY_DATABASE)
@Setter
public class CreateTableChangeDatabricks extends CreateTableChange {
private static final String DOUBLE_INIT_ERROR = "Double initialization of extended table properties is not allowed. " +
"Please avoid using both EXT createTable attributes and Databricks specific extendedTableProperties element. " +
"Element databricks:extendedTableProperties is preferred way to set databricks specific configurations.";
private static final String PARTITION_CLUSTER_COLLISION_ERROR = "Databricks does not support CLUSTER columns AND PARTITION BY columns, please pick one.";
private String tableFormat;
private String tableLocation;
private String clusterColumns;
Expand All @@ -32,7 +35,16 @@ public ValidationErrors validate(Database database) {
validationErrors.addAll(super.validate(database));

if (partitionColumns != null && clusterColumns != null) {
validationErrors.addError("Databricks does not support CLUSTER columns AND PARTITION BY columns, please pick one. And do not supply the other");
validationErrors.addError(PARTITION_CLUSTER_COLLISION_ERROR);
}
if(extendedTableProperties != null) {
boolean anyPropertyDuplicated = tableFormat != null && extendedTableProperties.getTableFormat() != null
|| tableLocation != null && extendedTableProperties.getTableLocation() != null
|| clusterColumns != null && extendedTableProperties.getClusterColumns() != null
|| partitionColumns != null && extendedTableProperties.getPartitionColumns() !=null;
if(anyPropertyDuplicated) {
validationErrors.addError(DOUBLE_INIT_ERROR);
}
}
return validationErrors;
}
Expand All @@ -58,11 +70,18 @@ protected CreateTableStatement generateCreateTableStatement() {

CreateTableStatementDatabricks ctas = new CreateTableStatementDatabricks(getCatalogName(), getSchemaName(), getTableName());

ctas.setTableFormat(this.getTableFormat());
ctas.setTableLocation(this.getTableLocation());
ctas.setClusterColumns(this.getClusterColumns());
ctas.setPartitionColumns(this.getPartitionColumns());
ctas.setExtendedTableProperties(this.getExtendedTableProperties());
if(this.getExtendedTableProperties() != null) {
ctas.setTableLocation(getExtendedTableProperties().getTableLocation());
ctas.setTableFormat(getExtendedTableProperties().getTableFormat());
ctas.setClusterColumns(getExtendedTableProperties().getClusterColumns());
ctas.setPartitionColumns(getExtendedTableProperties().getPartitionColumns());
ctas.setExtendedTableProperties(this.getExtendedTableProperties());
} else {
ctas.setTableFormat(this.getTableFormat());
ctas.setTableLocation(this.getTableLocation());
ctas.setClusterColumns(this.getClusterColumns());
ctas.setPartitionColumns(this.getPartitionColumns());
}

return ctas;
}
Expand All @@ -71,13 +90,4 @@ protected CreateTableStatement generateCreateTableStatement() {
public ExtendedTableProperties getExtendedTableProperties() {
return extendedTableProperties;
}

@Override
public String getSerializableFieldNamespace(String field) {
if("clusterColumns".equalsIgnoreCase(field)) {
filipelautert marked this conversation as resolved.
Show resolved Hide resolved
return NamespaceDetailsDatabricks.DATABRICKS_NAMESPACE;
}
return getSerializedObjectNamespace();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@
@Setter
@Getter
public class ExtendedTableProperties extends AbstractLiquibaseSerializable{
private String tableFormat;
private String tableLocation;
private String tblProperties;
private String clusterColumns;
private String partitionColumns;

@Override
public String getSerializedObjectName() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,18 @@
import liquibase.diff.output.changelog.ChangeGeneratorChain;
import liquibase.diff.output.changelog.core.ChangedTableChangeGenerator;
import liquibase.ext.databricks.change.AbstractAlterPropertiesChangeDatabricks;
import liquibase.ext.databricks.change.alterCluster.AlterClusterChangeDatabricks;
import liquibase.ext.databricks.change.alterCluster.ColumnConfig;
import liquibase.ext.databricks.change.alterCluster.NoneConfig;
import liquibase.ext.databricks.database.DatabricksDatabase;
import liquibase.structure.DatabaseObject;
import liquibase.structure.core.Table;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

import static liquibase.ext.databricks.diff.output.changelog.ChangedTblPropertiesUtil.getAlterTablePropertiesChangeDatabricks;

Expand Down Expand Up @@ -43,8 +50,50 @@ public Change[] fixChanged(DatabaseObject changedObject, ObjectDifferences diffe
System.arraycopy(change, 0, changes, changes.length - change.length, change.length);
}
}
if (difference.getField().equals("clusteringColumns")) {
AlterClusterChangeDatabricks[] change = getAlterClusterChangeDatabricks((Table) changedObject, control, difference);
if (changes == null || changes.length == 0) {
changes = change;
} else {
changes = Arrays.copyOf(changes, changes.length + change.length);
System.arraycopy(change, 0, changes, changes.length - change.length, change.length);
}
}
}
return changes;
}

private AlterClusterChangeDatabricks[] getAlterClusterChangeDatabricks(Table changedObject, DiffOutputControl control, Difference difference) {
AlterClusterChangeDatabricks[] changes = new AlterClusterChangeDatabricks[0];
List<String> referencedValues = difference.getReferenceValue() == null ? null :
Arrays.asList(((String)difference.getReferenceValue()).split(","));
List<String> comparedValues = difference.getComparedValue() == null ? null :
Arrays.asList(((String)difference.getComparedValue()).split(","));
if(!Objects.equals(referencedValues, comparedValues)) {
AlterClusterChangeDatabricks change = new AlterClusterChangeDatabricks();
change.setTableName(changedObject.getName());
if (control.getIncludeCatalog()) {
change.setCatalogName(changedObject.getSchema().getCatalogName());
}
if (control.getIncludeSchema()) {
change.setSchemaName(changedObject.getSchema().getName());
}
if (referencedValues == null) {
NoneConfig noneConfig = new NoneConfig();
noneConfig.setNone("true");
change.setClusterBy(Collections.singletonList(noneConfig));
} else {
List<ColumnConfig> columnConfigList = referencedValues.stream().map(colName -> {
ColumnConfig columnConfig = new ColumnConfig();
columnConfig.setName(colName);
return columnConfig;
}).collect(Collectors.toList());
change.setColumns(columnConfigList);
}
changes = new AlterClusterChangeDatabricks[]{change};
}

return changes;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ private static Map<String, String> convertToMapExcludingDeltaParameters(Object r
/**
* Get the extended properties excluding delta parameters
*/
public static String getExtendedProperties(String tblProperties) {
public static String getFilteredTblProperties(String tblProperties) {
Map<String, String> properties = convertToMapExcludingDeltaParameters(tblProperties);
return properties.entrySet().stream().map(e -> e.getKey() + "=" + e.getValue()).collect(Collectors.joining(","));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import liquibase.diff.output.changelog.ChangeGeneratorChain;
import liquibase.diff.output.changelog.core.ChangedViewChangeGenerator;
import liquibase.ext.databricks.change.AbstractAlterPropertiesChangeDatabricks;
import liquibase.ext.databricks.change.alterViewProperties.AlterViewPropertiesChangeDatabricks;
import liquibase.ext.databricks.database.DatabricksDatabase;
import liquibase.structure.DatabaseObject;
import liquibase.structure.core.View;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@
import liquibase.ext.databricks.database.DatabricksDatabase;
import liquibase.structure.DatabaseObject;
import liquibase.structure.core.Table;
import org.apache.commons.lang3.ObjectUtils;

import static liquibase.ext.databricks.diff.output.changelog.ChangedTblPropertiesUtil.getExtendedProperties;
import static liquibase.ext.databricks.diff.output.changelog.ChangedTblPropertiesUtil.getFilteredTblProperties;

public class MissingTableChangeGeneratorDatabricks extends MissingTableChangeGenerator {

Expand All @@ -32,18 +33,21 @@ public Change[] fixMissing(DatabaseObject missingObject, DiffOutputControl contr
if (changes == null || changes.length == 0) {
return changes;
}
//so far we intentionally omit tableLocation in generated changelog
ExtendedTableProperties extendedTableProperties = new ExtendedTableProperties(
null,
getExtendedProperties(missingObject.getAttribute("tblProperties", String.class)));
String clusterColumns = missingObject.getAttribute("clusteringColumns", "");
String tblProperties = getFilteredTblProperties(missingObject.getAttribute("tblProperties", String.class));
tblProperties = tblProperties.isEmpty() ? null : tblProperties;
String clusteringColumns = missingObject.getAttribute("clusteringColumns", String.class);
String partitionColumns = missingObject.getAttribute("partitionColumns", String.class);
ExtendedTableProperties extendedTableProperties = null;
//so far we intentionally omit tableLocation and tableFormat in generated changelog
if(ObjectUtils.anyNotNull(clusteringColumns, partitionColumns, tblProperties)) {
extendedTableProperties = new ExtendedTableProperties(null, null, tblProperties, clusteringColumns, partitionColumns);
}

changes[0] = getCreateTableChangeDatabricks(extendedTableProperties, changes, clusterColumns);
changes[0] = getCreateTableChangeDatabricks(extendedTableProperties, changes);
return changes;
}

private CreateTableChangeDatabricks getCreateTableChangeDatabricks(ExtendedTableProperties extendedTableProperties,
Change[] changes, String clusterColumns) {
private CreateTableChangeDatabricks getCreateTableChangeDatabricks(ExtendedTableProperties extendedTableProperties, Change[] changes) {
CreateTableChange temp = (CreateTableChange) changes[0];
CreateTableChangeDatabricks createTableChangeDatabricks = new CreateTableChangeDatabricks();
createTableChangeDatabricks.setColumns(temp.getColumns());
Expand All @@ -55,10 +59,7 @@ private CreateTableChangeDatabricks getCreateTableChangeDatabricks(ExtendedTable
createTableChangeDatabricks.setRemarks(temp.getRemarks());
createTableChangeDatabricks.setIfNotExists(temp.getIfNotExists());
createTableChangeDatabricks.setRowDependencies(temp.getRowDependencies());
if (!clusterColumns.isEmpty()) {
createTableChangeDatabricks.setClusterColumns(clusterColumns);
}

//All not null properties should be attached in the CreateTableChangeDatabricks::generateCreateTableStatement
createTableChangeDatabricks.setExtendedTableProperties(extendedTableProperties);
return createTableChangeDatabricks;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import liquibase.structure.DatabaseObject;
import liquibase.structure.core.View;

import static liquibase.ext.databricks.diff.output.changelog.ChangedTblPropertiesUtil.getExtendedProperties;
import static liquibase.ext.databricks.diff.output.changelog.ChangedTblPropertiesUtil.getFilteredTblProperties;

/**
* Custom implementation of {@link MissingViewChangeGenerator} for Databricks.
Expand All @@ -33,7 +33,7 @@ public Change[] fixMissing(DatabaseObject missingObject, DiffOutputControl contr
if (changes == null || changes.length == 0) {
return changes;
}
changes[0] = getCreateViewChangeDatabricks(getExtendedProperties(missingObject.getAttribute("tblProperties", String.class)), changes);
changes[0] = getCreateViewChangeDatabricks(getFilteredTblProperties(missingObject.getAttribute("tblProperties", String.class)), changes);
return changes;
}

Expand Down
Loading
Loading