Skip to content

Commit

Permalink
Merge pull request #204 from liquibase/DAT-18897
Browse files Browse the repository at this point in the history
DAT-18897: added databricks specific configurations to the XSD element 'extendedTableProperties'
  • Loading branch information
SvampX authored Nov 14, 2024
2 parents ce73780 + 63c553b commit 71266d7
Show file tree
Hide file tree
Showing 25 changed files with 580 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import java.util.ArrayList;
import java.util.List;

import static liquibase.ext.databricks.parser.NamespaceDetailsDatabricks.DATABRICKS_NAMESPACE;

@DatabaseChange(name = "alterCluster", description = "Alter Cluster", priority = PrioritizedService.PRIORITY_DATABASE +500)
public class AlterClusterChangeDatabricks extends AbstractChange {

Expand Down Expand Up @@ -104,4 +106,9 @@ public List<NoneConfig> getClusterBy() {
public void setClusterBy(List<NoneConfig> clusterBy) {
this.clusterBy = clusterBy;
}

@Override
public String getSerializedObjectNamespace() {
return DATABRICKS_NAMESPACE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import liquibase.database.Database;
import liquibase.exception.ValidationErrors;
import liquibase.ext.databricks.database.DatabricksDatabase;
import liquibase.ext.databricks.parser.NamespaceDetailsDatabricks;
import liquibase.servicelocator.PrioritizedService;
import liquibase.statement.core.CreateTableStatement;
import lombok.Setter;
Expand All @@ -15,6 +14,10 @@
@DatabaseChange(name = "createTable", description = "Create Table", priority = PrioritizedService.PRIORITY_DATABASE)
@Setter
public class CreateTableChangeDatabricks extends CreateTableChange {
private static final String DOUBLE_INIT_ERROR = "Double initialization of extended table properties is not allowed. " +
"Please avoid using both EXT createTable attributes and Databricks specific extendedTableProperties element. " +
"Element databricks:extendedTableProperties is preferred way to set databricks specific configurations.";
private static final String PARTITION_CLUSTER_COLLISION_ERROR = "Databricks does not support CLUSTER columns AND PARTITION BY columns, please pick one.";
private String tableFormat;
private String tableLocation;
private String clusterColumns;
Expand All @@ -32,7 +35,16 @@ public ValidationErrors validate(Database database) {
validationErrors.addAll(super.validate(database));

if (partitionColumns != null && clusterColumns != null) {
validationErrors.addError("Databricks does not support CLUSTER columns AND PARTITION BY columns, please pick one. And do not supply the other");
validationErrors.addError(PARTITION_CLUSTER_COLLISION_ERROR);
}
if(extendedTableProperties != null) {
boolean anyPropertyDuplicated = tableFormat != null && extendedTableProperties.getTableFormat() != null
|| tableLocation != null && extendedTableProperties.getTableLocation() != null
|| clusterColumns != null && extendedTableProperties.getClusterColumns() != null
|| partitionColumns != null && extendedTableProperties.getPartitionColumns() !=null;
if(anyPropertyDuplicated) {
validationErrors.addError(DOUBLE_INIT_ERROR);
}
}
return validationErrors;
}
Expand All @@ -58,11 +70,18 @@ protected CreateTableStatement generateCreateTableStatement() {

CreateTableStatementDatabricks ctas = new CreateTableStatementDatabricks(getCatalogName(), getSchemaName(), getTableName());

ctas.setTableFormat(this.getTableFormat());
ctas.setTableLocation(this.getTableLocation());
ctas.setClusterColumns(this.getClusterColumns());
ctas.setPartitionColumns(this.getPartitionColumns());
ctas.setExtendedTableProperties(this.getExtendedTableProperties());
if(this.getExtendedTableProperties() != null) {
ctas.setTableLocation(getExtendedTableProperties().getTableLocation());
ctas.setTableFormat(getExtendedTableProperties().getTableFormat());
ctas.setClusterColumns(getExtendedTableProperties().getClusterColumns());
ctas.setPartitionColumns(getExtendedTableProperties().getPartitionColumns());
ctas.setExtendedTableProperties(this.getExtendedTableProperties());
} else {
ctas.setTableFormat(this.getTableFormat());
ctas.setTableLocation(this.getTableLocation());
ctas.setClusterColumns(this.getClusterColumns());
ctas.setPartitionColumns(this.getPartitionColumns());
}

return ctas;
}
Expand All @@ -71,13 +90,4 @@ protected CreateTableStatement generateCreateTableStatement() {
public ExtendedTableProperties getExtendedTableProperties() {
return extendedTableProperties;
}

@Override
public String getSerializableFieldNamespace(String field) {
if("clusterColumns".equalsIgnoreCase(field)) {
return NamespaceDetailsDatabricks.DATABRICKS_NAMESPACE;
}
return getSerializedObjectNamespace();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@
@Setter
@Getter
public class ExtendedTableProperties extends AbstractLiquibaseSerializable{
private String tableFormat;
private String tableLocation;
private String tblProperties;
private String clusterColumns;
private String partitionColumns;

@Override
public String getSerializedObjectName() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,18 @@
import liquibase.diff.output.changelog.ChangeGeneratorChain;
import liquibase.diff.output.changelog.core.ChangedTableChangeGenerator;
import liquibase.ext.databricks.change.AbstractAlterPropertiesChangeDatabricks;
import liquibase.ext.databricks.change.alterCluster.AlterClusterChangeDatabricks;
import liquibase.ext.databricks.change.alterCluster.ColumnConfig;
import liquibase.ext.databricks.change.alterCluster.NoneConfig;
import liquibase.ext.databricks.database.DatabricksDatabase;
import liquibase.structure.DatabaseObject;
import liquibase.structure.core.Table;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

import static liquibase.ext.databricks.diff.output.changelog.ChangedTblPropertiesUtil.getAlterTablePropertiesChangeDatabricks;

Expand Down Expand Up @@ -43,8 +50,50 @@ public Change[] fixChanged(DatabaseObject changedObject, ObjectDifferences diffe
System.arraycopy(change, 0, changes, changes.length - change.length, change.length);
}
}
if (difference.getField().equals("clusteringColumns")) {
AlterClusterChangeDatabricks[] change = getAlterClusterChangeDatabricks((Table) changedObject, control, difference);
if (changes == null || changes.length == 0) {
changes = change;
} else {
changes = Arrays.copyOf(changes, changes.length + change.length);
System.arraycopy(change, 0, changes, changes.length - change.length, change.length);
}
}
}
return changes;
}

private AlterClusterChangeDatabricks[] getAlterClusterChangeDatabricks(Table changedObject, DiffOutputControl control, Difference difference) {
AlterClusterChangeDatabricks[] changes = new AlterClusterChangeDatabricks[0];
List<String> referencedValues = difference.getReferenceValue() == null ? null :
Arrays.asList(((String)difference.getReferenceValue()).split(","));
List<String> comparedValues = difference.getComparedValue() == null ? null :
Arrays.asList(((String)difference.getComparedValue()).split(","));
if(!Objects.equals(referencedValues, comparedValues)) {
AlterClusterChangeDatabricks change = new AlterClusterChangeDatabricks();
change.setTableName(changedObject.getName());
if (control.getIncludeCatalog()) {
change.setCatalogName(changedObject.getSchema().getCatalogName());
}
if (control.getIncludeSchema()) {
change.setSchemaName(changedObject.getSchema().getName());
}
if (referencedValues == null) {
NoneConfig noneConfig = new NoneConfig();
noneConfig.setNone("true");
change.setClusterBy(Collections.singletonList(noneConfig));
} else {
List<ColumnConfig> columnConfigList = referencedValues.stream().map(colName -> {
ColumnConfig columnConfig = new ColumnConfig();
columnConfig.setName(colName);
return columnConfig;
}).collect(Collectors.toList());
change.setColumns(columnConfigList);
}
changes = new AlterClusterChangeDatabricks[]{change};
}

return changes;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ private static Map<String, String> convertToMapExcludingDeltaParameters(Object r
/**
* Get the extended properties excluding delta parameters
*/
public static String getExtendedProperties(String tblProperties) {
public static String getFilteredTblProperties(String tblProperties) {
Map<String, String> properties = convertToMapExcludingDeltaParameters(tblProperties);
return properties.entrySet().stream().map(e -> e.getKey() + "=" + e.getValue()).collect(Collectors.joining(","));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import liquibase.diff.output.changelog.ChangeGeneratorChain;
import liquibase.diff.output.changelog.core.ChangedViewChangeGenerator;
import liquibase.ext.databricks.change.AbstractAlterPropertiesChangeDatabricks;
import liquibase.ext.databricks.change.alterViewProperties.AlterViewPropertiesChangeDatabricks;
import liquibase.ext.databricks.database.DatabricksDatabase;
import liquibase.structure.DatabaseObject;
import liquibase.structure.core.View;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@
import liquibase.ext.databricks.database.DatabricksDatabase;
import liquibase.structure.DatabaseObject;
import liquibase.structure.core.Table;
import org.apache.commons.lang3.ObjectUtils;

import static liquibase.ext.databricks.diff.output.changelog.ChangedTblPropertiesUtil.getExtendedProperties;
import static liquibase.ext.databricks.diff.output.changelog.ChangedTblPropertiesUtil.getFilteredTblProperties;

public class MissingTableChangeGeneratorDatabricks extends MissingTableChangeGenerator {

Expand All @@ -32,18 +33,21 @@ public Change[] fixMissing(DatabaseObject missingObject, DiffOutputControl contr
if (changes == null || changes.length == 0) {
return changes;
}
//so far we intentionally omit tableLocation in generated changelog
ExtendedTableProperties extendedTableProperties = new ExtendedTableProperties(
null,
getExtendedProperties(missingObject.getAttribute("tblProperties", String.class)));
String clusterColumns = missingObject.getAttribute("clusteringColumns", "");
String tblProperties = getFilteredTblProperties(missingObject.getAttribute("tblProperties", String.class));
tblProperties = tblProperties.isEmpty() ? null : tblProperties;
String clusteringColumns = missingObject.getAttribute("clusteringColumns", String.class);
String partitionColumns = missingObject.getAttribute("partitionColumns", String.class);
ExtendedTableProperties extendedTableProperties = null;
//so far we intentionally omit tableLocation and tableFormat in generated changelog
if(ObjectUtils.anyNotNull(clusteringColumns, partitionColumns, tblProperties)) {
extendedTableProperties = new ExtendedTableProperties(null, null, tblProperties, clusteringColumns, partitionColumns);
}

changes[0] = getCreateTableChangeDatabricks(extendedTableProperties, changes, clusterColumns);
changes[0] = getCreateTableChangeDatabricks(extendedTableProperties, changes);
return changes;
}

private CreateTableChangeDatabricks getCreateTableChangeDatabricks(ExtendedTableProperties extendedTableProperties,
Change[] changes, String clusterColumns) {
private CreateTableChangeDatabricks getCreateTableChangeDatabricks(ExtendedTableProperties extendedTableProperties, Change[] changes) {
CreateTableChange temp = (CreateTableChange) changes[0];
CreateTableChangeDatabricks createTableChangeDatabricks = new CreateTableChangeDatabricks();
createTableChangeDatabricks.setColumns(temp.getColumns());
Expand All @@ -55,10 +59,7 @@ private CreateTableChangeDatabricks getCreateTableChangeDatabricks(ExtendedTable
createTableChangeDatabricks.setRemarks(temp.getRemarks());
createTableChangeDatabricks.setIfNotExists(temp.getIfNotExists());
createTableChangeDatabricks.setRowDependencies(temp.getRowDependencies());
if (!clusterColumns.isEmpty()) {
createTableChangeDatabricks.setClusterColumns(clusterColumns);
}

//All not null properties should be attached in the CreateTableChangeDatabricks::generateCreateTableStatement
createTableChangeDatabricks.setExtendedTableProperties(extendedTableProperties);
return createTableChangeDatabricks;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import liquibase.structure.DatabaseObject;
import liquibase.structure.core.View;

import static liquibase.ext.databricks.diff.output.changelog.ChangedTblPropertiesUtil.getExtendedProperties;
import static liquibase.ext.databricks.diff.output.changelog.ChangedTblPropertiesUtil.getFilteredTblProperties;

/**
* Custom implementation of {@link MissingViewChangeGenerator} for Databricks.
Expand All @@ -33,7 +33,7 @@ public Change[] fixMissing(DatabaseObject missingObject, DiffOutputControl contr
if (changes == null || changes.length == 0) {
return changes;
}
changes[0] = getCreateViewChangeDatabricks(getExtendedProperties(missingObject.getAttribute("tblProperties", String.class)), changes);
changes[0] = getCreateViewChangeDatabricks(getFilteredTblProperties(missingObject.getAttribute("tblProperties", String.class)), changes);
return changes;
}

Expand Down
Loading

0 comments on commit 71266d7

Please sign in to comment.