diff --git a/src/main/java/liquibase/ext/databricks/change/alterCluster/AlterClusterChangeDatabricks.java b/src/main/java/liquibase/ext/databricks/change/alterCluster/AlterClusterChangeDatabricks.java index 65f64084..b2b832d7 100644 --- a/src/main/java/liquibase/ext/databricks/change/alterCluster/AlterClusterChangeDatabricks.java +++ b/src/main/java/liquibase/ext/databricks/change/alterCluster/AlterClusterChangeDatabricks.java @@ -12,6 +12,8 @@ import java.util.ArrayList; import java.util.List; +import static liquibase.ext.databricks.parser.NamespaceDetailsDatabricks.DATABRICKS_NAMESPACE; + @DatabaseChange(name = "alterCluster", description = "Alter Cluster", priority = PrioritizedService.PRIORITY_DATABASE +500) public class AlterClusterChangeDatabricks extends AbstractChange { @@ -104,4 +106,9 @@ public List getClusterBy() { public void setClusterBy(List clusterBy) { this.clusterBy = clusterBy; } + + @Override + public String getSerializedObjectNamespace() { + return DATABRICKS_NAMESPACE; + } } diff --git a/src/main/java/liquibase/ext/databricks/change/createTable/CreateTableChangeDatabricks.java b/src/main/java/liquibase/ext/databricks/change/createTable/CreateTableChangeDatabricks.java index 5cb21a5c..67a33a1c 100644 --- a/src/main/java/liquibase/ext/databricks/change/createTable/CreateTableChangeDatabricks.java +++ b/src/main/java/liquibase/ext/databricks/change/createTable/CreateTableChangeDatabricks.java @@ -6,7 +6,6 @@ import liquibase.database.Database; import liquibase.exception.ValidationErrors; import liquibase.ext.databricks.database.DatabricksDatabase; -import liquibase.ext.databricks.parser.NamespaceDetailsDatabricks; import liquibase.servicelocator.PrioritizedService; import liquibase.statement.core.CreateTableStatement; import lombok.Setter; @@ -15,6 +14,10 @@ @DatabaseChange(name = "createTable", description = "Create Table", priority = PrioritizedService.PRIORITY_DATABASE) @Setter public class CreateTableChangeDatabricks extends CreateTableChange { + private static final String DOUBLE_INIT_ERROR = "Double initialization of extended table properties is not allowed. " + + "Please avoid using both EXT createTable attributes and Databricks specific extendedTableProperties element. " + + "Element databricks:extendedTableProperties is preferred way to set databricks specific configurations."; + private static final String PARTITION_CLUSTER_COLLISION_ERROR = "Databricks does not support CLUSTER columns AND PARTITION BY columns, please pick one."; private String tableFormat; private String tableLocation; private String clusterColumns; @@ -32,7 +35,16 @@ public ValidationErrors validate(Database database) { validationErrors.addAll(super.validate(database)); if (partitionColumns != null && clusterColumns != null) { - validationErrors.addError("Databricks does not support CLUSTER columns AND PARTITION BY columns, please pick one. And do not supply the other"); + validationErrors.addError(PARTITION_CLUSTER_COLLISION_ERROR); + } + if(extendedTableProperties != null) { + boolean anyPropertyDuplicated = tableFormat != null && extendedTableProperties.getTableFormat() != null + || tableLocation != null && extendedTableProperties.getTableLocation() != null + || clusterColumns != null && extendedTableProperties.getClusterColumns() != null + || partitionColumns != null && extendedTableProperties.getPartitionColumns() !=null; + if(anyPropertyDuplicated) { + validationErrors.addError(DOUBLE_INIT_ERROR); + } } return validationErrors; } @@ -58,11 +70,18 @@ protected CreateTableStatement generateCreateTableStatement() { CreateTableStatementDatabricks ctas = new CreateTableStatementDatabricks(getCatalogName(), getSchemaName(), getTableName()); - ctas.setTableFormat(this.getTableFormat()); - ctas.setTableLocation(this.getTableLocation()); - ctas.setClusterColumns(this.getClusterColumns()); - ctas.setPartitionColumns(this.getPartitionColumns()); - ctas.setExtendedTableProperties(this.getExtendedTableProperties()); + if(this.getExtendedTableProperties() != null) { + ctas.setTableLocation(getExtendedTableProperties().getTableLocation()); + ctas.setTableFormat(getExtendedTableProperties().getTableFormat()); + ctas.setClusterColumns(getExtendedTableProperties().getClusterColumns()); + ctas.setPartitionColumns(getExtendedTableProperties().getPartitionColumns()); + ctas.setExtendedTableProperties(this.getExtendedTableProperties()); + } else { + ctas.setTableFormat(this.getTableFormat()); + ctas.setTableLocation(this.getTableLocation()); + ctas.setClusterColumns(this.getClusterColumns()); + ctas.setPartitionColumns(this.getPartitionColumns()); + } return ctas; } @@ -71,13 +90,4 @@ protected CreateTableStatement generateCreateTableStatement() { public ExtendedTableProperties getExtendedTableProperties() { return extendedTableProperties; } - - @Override - public String getSerializableFieldNamespace(String field) { - if("clusterColumns".equalsIgnoreCase(field)) { - return NamespaceDetailsDatabricks.DATABRICKS_NAMESPACE; - } - return getSerializedObjectNamespace(); - } - } diff --git a/src/main/java/liquibase/ext/databricks/change/createTable/ExtendedTableProperties.java b/src/main/java/liquibase/ext/databricks/change/createTable/ExtendedTableProperties.java index 4dddfc5b..b4f8398e 100644 --- a/src/main/java/liquibase/ext/databricks/change/createTable/ExtendedTableProperties.java +++ b/src/main/java/liquibase/ext/databricks/change/createTable/ExtendedTableProperties.java @@ -12,8 +12,11 @@ @Setter @Getter public class ExtendedTableProperties extends AbstractLiquibaseSerializable{ + private String tableFormat; private String tableLocation; private String tblProperties; + private String clusterColumns; + private String partitionColumns; @Override public String getSerializedObjectName() { diff --git a/src/main/java/liquibase/ext/databricks/diff/output/changelog/ChangedTableChangeGeneratorDatabricks.java b/src/main/java/liquibase/ext/databricks/diff/output/changelog/ChangedTableChangeGeneratorDatabricks.java index 22390e15..fbb57b80 100644 --- a/src/main/java/liquibase/ext/databricks/diff/output/changelog/ChangedTableChangeGeneratorDatabricks.java +++ b/src/main/java/liquibase/ext/databricks/diff/output/changelog/ChangedTableChangeGeneratorDatabricks.java @@ -8,11 +8,18 @@ import liquibase.diff.output.changelog.ChangeGeneratorChain; import liquibase.diff.output.changelog.core.ChangedTableChangeGenerator; import liquibase.ext.databricks.change.AbstractAlterPropertiesChangeDatabricks; +import liquibase.ext.databricks.change.alterCluster.AlterClusterChangeDatabricks; +import liquibase.ext.databricks.change.alterCluster.ColumnConfig; +import liquibase.ext.databricks.change.alterCluster.NoneConfig; import liquibase.ext.databricks.database.DatabricksDatabase; import liquibase.structure.DatabaseObject; import liquibase.structure.core.Table; import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; import static liquibase.ext.databricks.diff.output.changelog.ChangedTblPropertiesUtil.getAlterTablePropertiesChangeDatabricks; @@ -43,8 +50,50 @@ public Change[] fixChanged(DatabaseObject changedObject, ObjectDifferences diffe System.arraycopy(change, 0, changes, changes.length - change.length, change.length); } } + if (difference.getField().equals("clusteringColumns")) { + AlterClusterChangeDatabricks[] change = getAlterClusterChangeDatabricks((Table) changedObject, control, difference); + if (changes == null || changes.length == 0) { + changes = change; + } else { + changes = Arrays.copyOf(changes, changes.length + change.length); + System.arraycopy(change, 0, changes, changes.length - change.length, change.length); + } + } } return changes; } + private AlterClusterChangeDatabricks[] getAlterClusterChangeDatabricks(Table changedObject, DiffOutputControl control, Difference difference) { + AlterClusterChangeDatabricks[] changes = new AlterClusterChangeDatabricks[0]; + List referencedValues = difference.getReferenceValue() == null ? null : + Arrays.asList(((String)difference.getReferenceValue()).split(",")); + List comparedValues = difference.getComparedValue() == null ? null : + Arrays.asList(((String)difference.getComparedValue()).split(",")); + if(!Objects.equals(referencedValues, comparedValues)) { + AlterClusterChangeDatabricks change = new AlterClusterChangeDatabricks(); + change.setTableName(changedObject.getName()); + if (control.getIncludeCatalog()) { + change.setCatalogName(changedObject.getSchema().getCatalogName()); + } + if (control.getIncludeSchema()) { + change.setSchemaName(changedObject.getSchema().getName()); + } + if (referencedValues == null) { + NoneConfig noneConfig = new NoneConfig(); + noneConfig.setNone("true"); + change.setClusterBy(Collections.singletonList(noneConfig)); + } else { + List columnConfigList = referencedValues.stream().map(colName -> { + ColumnConfig columnConfig = new ColumnConfig(); + columnConfig.setName(colName); + return columnConfig; + }).collect(Collectors.toList()); + change.setColumns(columnConfigList); + } + changes = new AlterClusterChangeDatabricks[]{change}; + } + + return changes; + } + } diff --git a/src/main/java/liquibase/ext/databricks/diff/output/changelog/ChangedTblPropertiesUtil.java b/src/main/java/liquibase/ext/databricks/diff/output/changelog/ChangedTblPropertiesUtil.java index c402dac7..99fbdab0 100644 --- a/src/main/java/liquibase/ext/databricks/diff/output/changelog/ChangedTblPropertiesUtil.java +++ b/src/main/java/liquibase/ext/databricks/diff/output/changelog/ChangedTblPropertiesUtil.java @@ -105,7 +105,7 @@ private static Map convertToMapExcludingDeltaParameters(Object r /** * Get the extended properties excluding delta parameters */ - public static String getExtendedProperties(String tblProperties) { + public static String getFilteredTblProperties(String tblProperties) { Map properties = convertToMapExcludingDeltaParameters(tblProperties); return properties.entrySet().stream().map(e -> e.getKey() + "=" + e.getValue()).collect(Collectors.joining(",")); } diff --git a/src/main/java/liquibase/ext/databricks/diff/output/changelog/ChangedViewChangeGeneratorDatabricks.java b/src/main/java/liquibase/ext/databricks/diff/output/changelog/ChangedViewChangeGeneratorDatabricks.java index bf14a4a9..00413caa 100644 --- a/src/main/java/liquibase/ext/databricks/diff/output/changelog/ChangedViewChangeGeneratorDatabricks.java +++ b/src/main/java/liquibase/ext/databricks/diff/output/changelog/ChangedViewChangeGeneratorDatabricks.java @@ -8,7 +8,6 @@ import liquibase.diff.output.changelog.ChangeGeneratorChain; import liquibase.diff.output.changelog.core.ChangedViewChangeGenerator; import liquibase.ext.databricks.change.AbstractAlterPropertiesChangeDatabricks; -import liquibase.ext.databricks.change.alterViewProperties.AlterViewPropertiesChangeDatabricks; import liquibase.ext.databricks.database.DatabricksDatabase; import liquibase.structure.DatabaseObject; import liquibase.structure.core.View; diff --git a/src/main/java/liquibase/ext/databricks/diff/output/changelog/MissingTableChangeGeneratorDatabricks.java b/src/main/java/liquibase/ext/databricks/diff/output/changelog/MissingTableChangeGeneratorDatabricks.java index ab5cf2ce..f144becb 100644 --- a/src/main/java/liquibase/ext/databricks/diff/output/changelog/MissingTableChangeGeneratorDatabricks.java +++ b/src/main/java/liquibase/ext/databricks/diff/output/changelog/MissingTableChangeGeneratorDatabricks.java @@ -11,8 +11,9 @@ import liquibase.ext.databricks.database.DatabricksDatabase; import liquibase.structure.DatabaseObject; import liquibase.structure.core.Table; +import org.apache.commons.lang3.ObjectUtils; -import static liquibase.ext.databricks.diff.output.changelog.ChangedTblPropertiesUtil.getExtendedProperties; +import static liquibase.ext.databricks.diff.output.changelog.ChangedTblPropertiesUtil.getFilteredTblProperties; public class MissingTableChangeGeneratorDatabricks extends MissingTableChangeGenerator { @@ -32,18 +33,21 @@ public Change[] fixMissing(DatabaseObject missingObject, DiffOutputControl contr if (changes == null || changes.length == 0) { return changes; } - //so far we intentionally omit tableLocation in generated changelog - ExtendedTableProperties extendedTableProperties = new ExtendedTableProperties( - null, - getExtendedProperties(missingObject.getAttribute("tblProperties", String.class))); - String clusterColumns = missingObject.getAttribute("clusteringColumns", ""); + String tblProperties = getFilteredTblProperties(missingObject.getAttribute("tblProperties", String.class)); + tblProperties = tblProperties.isEmpty() ? null : tblProperties; + String clusteringColumns = missingObject.getAttribute("clusteringColumns", String.class); + String partitionColumns = missingObject.getAttribute("partitionColumns", String.class); + ExtendedTableProperties extendedTableProperties = null; + //so far we intentionally omit tableLocation and tableFormat in generated changelog + if(ObjectUtils.anyNotNull(clusteringColumns, partitionColumns, tblProperties)) { + extendedTableProperties = new ExtendedTableProperties(null, null, tblProperties, clusteringColumns, partitionColumns); + } - changes[0] = getCreateTableChangeDatabricks(extendedTableProperties, changes, clusterColumns); + changes[0] = getCreateTableChangeDatabricks(extendedTableProperties, changes); return changes; } - private CreateTableChangeDatabricks getCreateTableChangeDatabricks(ExtendedTableProperties extendedTableProperties, - Change[] changes, String clusterColumns) { + private CreateTableChangeDatabricks getCreateTableChangeDatabricks(ExtendedTableProperties extendedTableProperties, Change[] changes) { CreateTableChange temp = (CreateTableChange) changes[0]; CreateTableChangeDatabricks createTableChangeDatabricks = new CreateTableChangeDatabricks(); createTableChangeDatabricks.setColumns(temp.getColumns()); @@ -55,10 +59,7 @@ private CreateTableChangeDatabricks getCreateTableChangeDatabricks(ExtendedTable createTableChangeDatabricks.setRemarks(temp.getRemarks()); createTableChangeDatabricks.setIfNotExists(temp.getIfNotExists()); createTableChangeDatabricks.setRowDependencies(temp.getRowDependencies()); - if (!clusterColumns.isEmpty()) { - createTableChangeDatabricks.setClusterColumns(clusterColumns); - } - + //All not null properties should be attached in the CreateTableChangeDatabricks::generateCreateTableStatement createTableChangeDatabricks.setExtendedTableProperties(extendedTableProperties); return createTableChangeDatabricks; } diff --git a/src/main/java/liquibase/ext/databricks/diff/output/changelog/MissingViewChangeGeneratorDatabricks.java b/src/main/java/liquibase/ext/databricks/diff/output/changelog/MissingViewChangeGeneratorDatabricks.java index f94275f2..b303fff2 100644 --- a/src/main/java/liquibase/ext/databricks/diff/output/changelog/MissingViewChangeGeneratorDatabricks.java +++ b/src/main/java/liquibase/ext/databricks/diff/output/changelog/MissingViewChangeGeneratorDatabricks.java @@ -11,7 +11,7 @@ import liquibase.structure.DatabaseObject; import liquibase.structure.core.View; -import static liquibase.ext.databricks.diff.output.changelog.ChangedTblPropertiesUtil.getExtendedProperties; +import static liquibase.ext.databricks.diff.output.changelog.ChangedTblPropertiesUtil.getFilteredTblProperties; /** * Custom implementation of {@link MissingViewChangeGenerator} for Databricks. @@ -33,7 +33,7 @@ public Change[] fixMissing(DatabaseObject missingObject, DiffOutputControl contr if (changes == null || changes.length == 0) { return changes; } - changes[0] = getCreateViewChangeDatabricks(getExtendedProperties(missingObject.getAttribute("tblProperties", String.class)), changes); + changes[0] = getCreateViewChangeDatabricks(getFilteredTblProperties(missingObject.getAttribute("tblProperties", String.class)), changes); return changes; } diff --git a/src/main/java/liquibase/ext/databricks/snapshot/jvm/TableSnapshotGeneratorDatabricks.java b/src/main/java/liquibase/ext/databricks/snapshot/jvm/TableSnapshotGeneratorDatabricks.java index 2fc3e798..c4041108 100644 --- a/src/main/java/liquibase/ext/databricks/snapshot/jvm/TableSnapshotGeneratorDatabricks.java +++ b/src/main/java/liquibase/ext/databricks/snapshot/jvm/TableSnapshotGeneratorDatabricks.java @@ -14,21 +14,23 @@ import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; public class TableSnapshotGeneratorDatabricks extends TableSnapshotGenerator { private static final String LOCATION = "Location"; + private static final String PROVIDER = "Provider"; + private static final String STORAGE_PROPERTIES = "Storage Properties"; + private static final String TABLE_FORMAT = "tableFormat"; private static final String TBL_PROPERTIES = "tblProperties"; private static final String CLUSTER_COLUMNS = "clusteringColumns"; + private static final String PARTITION_COLUMNS = "partitionColumns"; private static final String DETAILED_TABLE_INFORMATION_NODE = "# Detailed Table Information"; - private static final List PROPERTIES_STOP_LIST = Arrays.asList( - "delta.columnMapping.maxColumnId", - "delta.rowTracking.materializedRowCommitVersionColumnName", - "delta.rowTracking.materializedRowIdColumnName", - "delta.feature.clustering" - ); + private static final String TABLE_PARTITION_INFORMATION_NODE = "# Partition Information"; + private static final String DATA_TYPE = "DATA_TYPE"; + private static final List FILE_TYPE_PROVIDERS = Arrays.asList("AVRO", "BINARYFILE", "CSV", "JSON", "ORC", "PARQUET", "TEXT"); @Override public int getPriority(Class objectType, Database database) { @@ -47,28 +49,75 @@ protected DatabaseObject snapshotObject(DatabaseObject example, DatabaseSnapshot example.getName()); List> tablePropertiesResponse = Scope.getCurrentScope().getSingleton(ExecutorService.class) .getExecutor("jdbc", database).queryForList(new RawParameterizedSqlStatement(query)); + StringBuilder tableFormat = new StringBuilder(); // DESCRIBE TABLE EXTENDED returns both columns and additional information. // We need to make sure "Location" is not column in the table, but table location in s3 boolean detailedInformationNode = false; + boolean partitionInformationNode = false; + StringBuilder partitionColumns = new StringBuilder(); for (Map tableProperty : tablePropertiesResponse) { - if (tableProperty.get("COL_NAME").equals(DETAILED_TABLE_INFORMATION_NODE)) { + String currentColName = (String) tableProperty.get("COL_NAME"); + if (currentColName.equals(DETAILED_TABLE_INFORMATION_NODE)) { detailedInformationNode = true; continue; } - if (detailedInformationNode && tableProperty.get("COL_NAME").equals(LOCATION)) { - table.setAttribute(LOCATION, tableProperty.get("DATA_TYPE")); + if (detailedInformationNode) { + if (currentColName.equals(LOCATION)) { + table.setAttribute(LOCATION, tableProperty.get(DATA_TYPE)); + } + if (currentColName.equals(PROVIDER) && FILE_TYPE_PROVIDERS.contains(tableProperty.get(DATA_TYPE).toString().toUpperCase())) { + tableFormat.append(tableProperty.get(DATA_TYPE)); + } + if (!tableFormat.toString().isEmpty() && currentColName.equals(STORAGE_PROPERTIES)) { + if(table.getAttribute(LOCATION, String.class) != null) { + tableFormat.append(" ").append(LOCATION.toUpperCase()).append("'").append(table.getAttribute(LOCATION, String.class)).append("' "); + } + tableFormat.append(extractOptionsFromStorageProperties(tableProperty.get(DATA_TYPE))); + table.setAttribute(TABLE_FORMAT, tableFormat.toString()); + } + } + if (currentColName.equals(TABLE_PARTITION_INFORMATION_NODE)) { + partitionInformationNode = true; + continue; + } + if (partitionInformationNode && !currentColName.equals("# col_name")) { + if (currentColName.equals("")) { + partitionInformationNode = false; + continue; + } + if (partitionColumns.toString().isEmpty()) { + partitionColumns.append(currentColName); + } else { + partitionColumns.append(',').append(currentColName); + } } } Map tblProperties = getTblPropertiesMap(database, example.getName()); if (tblProperties.containsKey(CLUSTER_COLUMNS)) { - // removing clusterColumns and other properties which are not allowed in create/alter table statements - PROPERTIES_STOP_LIST.forEach(tblProperties::remove); table.setAttribute(CLUSTER_COLUMNS, sanitizeClusterColumns(tblProperties.remove(CLUSTER_COLUMNS))); } + if (!partitionColumns.toString().isEmpty()) { + table.setAttribute(PARTITION_COLUMNS, partitionColumns.toString()); + } table.setAttribute(TBL_PROPERTIES, getTblPropertiesString(tblProperties)); } return table; } + + private String extractOptionsFromStorageProperties(Object storageProperties) { + StringBuilder options = new StringBuilder(); + if (storageProperties instanceof String) { + Matcher matcher = Pattern.compile("(\\b\\w+\\b)=(.*?)(,|\\])").matcher((String) storageProperties); + if (matcher.find()) { + options.append(" OPTIONS (").append(matcher.group(1)).append(" '").append(matcher.group(2)).append("'"); + while (matcher.find()) { + options.append(", ").append(matcher.group(1)).append(" '").append(matcher.group(2)).append("'"); + } + options.append(")"); + } + } + return options.toString(); + } //TODO another way of getting Location is query like // select * from `system`.`information_schema`.`tables` where table_name = 'test_table_properties' AND table_schema='liquibase_harness_test_ds'; // get column 'table_type', if 'EXTERNAL' then diff --git a/src/main/java/liquibase/ext/databricks/sqlgenerator/CreateTableGeneratorDatabricks.java b/src/main/java/liquibase/ext/databricks/sqlgenerator/CreateTableGeneratorDatabricks.java index 2377f643..e820cddb 100644 --- a/src/main/java/liquibase/ext/databricks/sqlgenerator/CreateTableGeneratorDatabricks.java +++ b/src/main/java/liquibase/ext/databricks/sqlgenerator/CreateTableGeneratorDatabricks.java @@ -47,10 +47,13 @@ public Sql[] generateSql(CreateTableStatement statement, Database database, SqlG if ((!StringUtils.isEmpty(thisStatement.getTableFormat()))) { finalsql.append(" USING ").append(thisStatement.getTableFormat()); - } else if (thisStatement.getExtendedTableProperties() != null && StringUtils.isNoneEmpty(thisStatement.getExtendedTableProperties().getTblProperties())) { + } else { + finalsql.append(" USING delta"); + } + if (thisStatement.getExtendedTableProperties() != null && StringUtils.isNotEmpty(thisStatement.getExtendedTableProperties().getTblProperties())) { finalsql.append(" TBLPROPERTIES (").append(thisStatement.getExtendedTableProperties().getTblProperties()).append(")"); } else { - finalsql.append(" USING delta TBLPROPERTIES('delta.feature.allowColumnDefaults' = 'supported', 'delta.columnMapping.mode' = 'name', 'delta.enableDeletionVectors' = true)"); + finalsql.append(" TBLPROPERTIES('delta.feature.allowColumnDefaults' = 'supported', 'delta.columnMapping.mode' = 'name', 'delta.enableDeletionVectors' = true)"); } // Databricks can decide to have tables live in a particular location. If null, Databricks will handle the location automatically in DBFS diff --git a/src/main/resources/www.liquibase.org/xml/ns/databricks/liquibase-databricks-1.0.xsd b/src/main/resources/www.liquibase.org/xml/ns/databricks/liquibase-databricks-1.0.xsd index efb88df5..41e5d22b 100644 --- a/src/main/resources/www.liquibase.org/xml/ns/databricks/liquibase-databricks-1.0.xsd +++ b/src/main/resources/www.liquibase.org/xml/ns/databricks/liquibase-databricks-1.0.xsd @@ -7,8 +7,11 @@ + + + diff --git a/src/main/resources/www.liquibase.org/xml/ns/databricks/liquibase-databricks-latest.xsd b/src/main/resources/www.liquibase.org/xml/ns/databricks/liquibase-databricks-latest.xsd index d28157a9..c10df712 100644 --- a/src/main/resources/www.liquibase.org/xml/ns/databricks/liquibase-databricks-latest.xsd +++ b/src/main/resources/www.liquibase.org/xml/ns/databricks/liquibase-databricks-latest.xsd @@ -7,8 +7,11 @@ + + + diff --git a/src/test/java/liquibase/ext/databricks/change/createTable/CreateTableChangeDatabricksTest.java b/src/test/java/liquibase/ext/databricks/change/createTable/CreateTableChangeDatabricksTest.java new file mode 100644 index 00000000..397ec682 --- /dev/null +++ b/src/test/java/liquibase/ext/databricks/change/createTable/CreateTableChangeDatabricksTest.java @@ -0,0 +1,110 @@ +package liquibase.ext.databricks.change.createTable; + +import liquibase.change.ColumnConfig; +import liquibase.database.Database; +import liquibase.exception.ValidationErrors; +import liquibase.ext.databricks.database.DatabricksDatabase; +import liquibase.statement.core.CreateTableStatement; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Collections; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class CreateTableChangeDatabricksTest { + + private static final String EXPECTED_TABLE_FORMAT = "delta"; + private static final String EXPECTED_TABLE_LOCATION = "Location"; + private static final String EXPECTED_PARTITION_COLUMNS = "col1,col2"; + private static final String EXPECTED_CLUSTER_COLUMNS = "col2,col3"; + private static final String EXPECTED_TABLE_NAME = "test_table"; + private static final String CLUSTER_AND_PARTITION_ERROR_MESSAGE = "Databricks does not support CLUSTER columns AND " + + "PARTITION BY columns, please pick one."; + private static final String DOUBLE_INITIALIZATION_ERROR_MESSAGE = "Double initialization of extended table properties is not allowed. " + + "Please avoid using both EXT createTable attributes and Databricks specific extendedTableProperties element. " + + "Element databricks:extendedTableProperties is preferred way to set databricks specific configurations."; + + private CreateTableChangeDatabricks createTableChangeDatabricks; + private Database database = new DatabricksDatabase(); + + @BeforeEach + void setup() { + createTableChangeDatabricks = new CreateTableChangeDatabricks(); + createTableChangeDatabricks.setTableFormat(EXPECTED_TABLE_FORMAT); + createTableChangeDatabricks.setTableLocation(EXPECTED_TABLE_LOCATION); + createTableChangeDatabricks.setPartitionColumns(EXPECTED_PARTITION_COLUMNS); + createTableChangeDatabricks.setTableName(EXPECTED_TABLE_NAME); + ColumnConfig columnConfig = new ColumnConfig(); + columnConfig.setType("INT"); + columnConfig.setName("id"); + columnConfig.setComputed(false); + createTableChangeDatabricks.setColumns(Collections.singletonList(columnConfig)); + } + + @Test + void testValidate_shouldFailWithAmbiguousClusterOrPartition() { + createTableChangeDatabricks.setClusterColumns(EXPECTED_CLUSTER_COLUMNS); + + ValidationErrors validationErrors = createTableChangeDatabricks.validate(database); + assertNotNull(validationErrors); + assertTrue(validationErrors.hasErrors()); + assertFalse(validationErrors.getErrorMessages().isEmpty()); + assertEquals(CLUSTER_AND_PARTITION_ERROR_MESSAGE, validationErrors.getErrorMessages().get(0)); + } + + @Test + void testValidate_shouldFailWithDoublePropertyInitialization() { + ExtendedTableProperties extendedTableProperties = new ExtendedTableProperties(); + extendedTableProperties.setPartitionColumns(EXPECTED_PARTITION_COLUMNS); + createTableChangeDatabricks.setExtendedTableProperties(extendedTableProperties); + + ValidationErrors validationErrors = createTableChangeDatabricks.validate(database); + assertNotNull(validationErrors); + assertTrue(validationErrors.hasErrors()); + assertFalse(validationErrors.getErrorMessages().isEmpty()); + assertEquals(DOUBLE_INITIALIZATION_ERROR_MESSAGE, validationErrors.getErrorMessages().get(0)); + } + + @Test + void testGenerateCreateTableStatement_noExtendedTableProperties() { + CreateTableStatement createTableStatement = createTableChangeDatabricks.generateCreateTableStatement(); + assertTrue(createTableStatement instanceof CreateTableStatementDatabricks); + CreateTableStatementDatabricks databricksStatement = (CreateTableStatementDatabricks) createTableStatement; + assertNull(databricksStatement.getExtendedTableProperties()); + assertEquals(EXPECTED_TABLE_FORMAT, databricksStatement.getTableFormat()); + assertEquals(EXPECTED_TABLE_LOCATION, databricksStatement.getTableLocation()); + assertEquals(new ArrayList<>(), databricksStatement.getClusterColumns()); + String actualPartitionColumns = databricksStatement.getPartitionColumns() + .stream().reduce("", (col1, col2) -> col1 + ',' + col2).substring(1); + assertEquals(EXPECTED_PARTITION_COLUMNS, actualPartitionColumns); + } + + @Test + void testGenerateCreateTableStatement_withExtendedTableProperties() { + createTableChangeDatabricks.setTableFormat(null); + createTableChangeDatabricks.setTableLocation(null); + createTableChangeDatabricks.setPartitionColumns(null); + ExtendedTableProperties extendedTableProperties = new ExtendedTableProperties(); + extendedTableProperties.setTableFormat(EXPECTED_TABLE_FORMAT); + extendedTableProperties.setTableLocation(EXPECTED_TABLE_LOCATION); + extendedTableProperties.setClusterColumns(EXPECTED_CLUSTER_COLUMNS); + createTableChangeDatabricks.setExtendedTableProperties(extendedTableProperties); + + CreateTableStatement createTableStatement = createTableChangeDatabricks.generateCreateTableStatement(); + assertTrue(createTableStatement instanceof CreateTableStatementDatabricks); + CreateTableStatementDatabricks databricksStatement = (CreateTableStatementDatabricks) createTableStatement; + assertNotNull(databricksStatement.getExtendedTableProperties()); + assertEquals(EXPECTED_TABLE_FORMAT, databricksStatement.getTableFormat()); + assertEquals(EXPECTED_TABLE_LOCATION, databricksStatement.getTableLocation()); + String actualClusterColumns = databricksStatement.getClusterColumns() + .stream().reduce("", (col1, col2) -> col1 + ',' + col2).substring(1); + assertEquals(EXPECTED_CLUSTER_COLUMNS, actualClusterColumns); + assertEquals(new ArrayList<>(), databricksStatement.getPartitionColumns()); + } +} \ No newline at end of file diff --git a/src/test/resources/liquibase/harness/change/changelogs/databricks/createClusteredTableNew.json b/src/test/resources/liquibase/harness/change/changelogs/databricks/createClusteredTableNew.json index 6b822b60..4a8cebf7 100644 --- a/src/test/resources/liquibase/harness/change/changelogs/databricks/createClusteredTableNew.json +++ b/src/test/resources/liquibase/harness/change/changelogs/databricks/createClusteredTableNew.json @@ -42,10 +42,10 @@ } }, { - "changeSet":{ + "changeSet": { "id": 2, - "author":"your.name", - "changes":[ + "author": "your.name", + "changes": [ { "alterCluster": { "tableName": "test_table_clustered_new", @@ -91,6 +91,48 @@ } ] } + }, + { + "changeSet": { + "id": "4", + "author": "your.name", + "changes": [ + { + "createTable": { + "tableName": "clustered_delta_table", + "tableFormat": "delta", + "clusterColumns": "id,test_column", + "columns": [ + { + "column": { + "name": "id", + "type": "int" + } + }, + { + "column": { + "name": "name", + "type": "varchar(20)" + } + }, + { + "column": { + "name": "test_column", + "type": "varchar(20)" + } + } + ] + } + } + ], + "rollback": [ + { + "dropTable": { + "tableName": "clustered_delta_table" + } + } + ] + } } ] } diff --git a/src/test/resources/liquibase/harness/change/changelogs/databricks/createClusteredTableNew.xml b/src/test/resources/liquibase/harness/change/changelogs/databricks/createClusteredTableNew.xml index d5eb617f..c6cb17a3 100644 --- a/src/test/resources/liquibase/harness/change/changelogs/databricks/createClusteredTableNew.xml +++ b/src/test/resources/liquibase/harness/change/changelogs/databricks/createClusteredTableNew.xml @@ -3,6 +3,7 @@ xmlns="http://www.liquibase.org/xml/ns/dbchangelog" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:databricks="http://www.liquibase.org/xml/ns/databricks" + xmlns:ext="http://www.liquibase.org/xml/ns/dbchangelog-ext" xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-latest.xsd @@ -14,7 +15,7 @@ - test_id,test_new,test_present_new + @@ -33,4 +34,11 @@ + + + + + + + \ No newline at end of file diff --git a/src/test/resources/liquibase/harness/change/changelogs/databricks/createClusteredTableNew.yaml b/src/test/resources/liquibase/harness/change/changelogs/databricks/createClusteredTableNew.yaml index f719fbc4..12c7d685 100644 --- a/src/test/resources/liquibase/harness/change/changelogs/databricks/createClusteredTableNew.yaml +++ b/src/test/resources/liquibase/harness/change/changelogs/databricks/createClusteredTableNew.yaml @@ -15,7 +15,8 @@ databaseChangeLog: - column: name: test_present_new type: int - clusterColumns: test_id, test_new, test_present_new + extendedTableProperties: + clusterColumns: test_id, test_new, test_present_new rollback: dropTable: tableName: test_table_clustered_new @@ -41,3 +42,24 @@ databaseChangeLog: tableName: test_table_clustered_new rollback: empty + - changeSet: + id: 4 + author: your.name + changes: + - createTable: + tableName: clustered_delta_table + tableFormat: delta + clusterColumns: id,test_column + columns: + - column: + name: id + type: int + - column: + name: name + type: varchar(20) + - column: + name: test_column + type: varchar(20) + rollback: + dropTable: + tableName: clustered_delta_table diff --git a/src/test/resources/liquibase/harness/change/changelogs/databricks/createExternalCsvTable.xml b/src/test/resources/liquibase/harness/change/changelogs/databricks/createExternalCsvTable.xml new file mode 100644 index 00000000..f04cb99b --- /dev/null +++ b/src/test/resources/liquibase/harness/change/changelogs/databricks/createExternalCsvTable.xml @@ -0,0 +1,19 @@ + + + + + + + + + + + + + + \ No newline at end of file diff --git a/src/test/resources/liquibase/harness/change/changelogs/databricks/createPartitionedTable.json b/src/test/resources/liquibase/harness/change/changelogs/databricks/createPartitionedTable.json new file mode 100644 index 00000000..46b4a4cf --- /dev/null +++ b/src/test/resources/liquibase/harness/change/changelogs/databricks/createPartitionedTable.json @@ -0,0 +1,91 @@ +{ + "databaseChangeLog": [ + { + "changeSet": { + "id": "1", + "author": "your.name", + "changes": [ + { + "createTable": { + "tableName": "test_table_partitioned", + "partitionColumns": "partition_column", + "columns": [ + { + "column": { + "name": "test_id", + "type": "int" + } + }, + { + "column": { + "name": "test_column", + "type": "varchar(50)" + } + }, + { + "column": { + "name": "partition_column", + "type": "string" + } + } + ] + } + } + ], + "rollback": [ + { + "dropTable": { + "tableName": "test_table_partitioned" + } + } + ] + } + }, + { + "changeSet": { + "id": "2", + "author": "your.name", + "changes": [ + { + "createTable": { + "tableName": "partitioned_delta_table", + "columns": [ + { + "column": { + "name": "id", + "type": "int" + } + }, + { + "column": { + "name": "name", + "type": "varchar(20)" + } + }, + { + "column": { + "name": "some_column", + "type": "bigint" + } + } + ], + "extendedTableProperties": { + "tableFormat": "delta", + "partitionColumns": "id, some_column", + "tblProperties": "'this.is.my.key'=12,'this.is.my.key2'=true", + "tableLocation": "s3://bucket/partitioned_delta_table_new" + } + } + } + ], + "rollback": [ + { + "dropTable": { + "tableName": "partitioned_delta_table" + } + } + ] + } + } + ] +} diff --git a/src/test/resources/liquibase/harness/change/changelogs/databricks/createPartitionedTable.xml b/src/test/resources/liquibase/harness/change/changelogs/databricks/createPartitionedTable.xml index 66ecf107..8ba274bc 100644 --- a/src/test/resources/liquibase/harness/change/changelogs/databricks/createPartitionedTable.xml +++ b/src/test/resources/liquibase/harness/change/changelogs/databricks/createPartitionedTable.xml @@ -2,6 +2,7 @@ @@ -17,4 +18,16 @@ + + + + + + + + \ No newline at end of file diff --git a/src/test/resources/liquibase/harness/change/changelogs/databricks/createPartitionedTable.yaml b/src/test/resources/liquibase/harness/change/changelogs/databricks/createPartitionedTable.yaml new file mode 100644 index 00000000..6f237836 --- /dev/null +++ b/src/test/resources/liquibase/harness/change/changelogs/databricks/createPartitionedTable.yaml @@ -0,0 +1,46 @@ +--- +databaseChangeLog: + - changeSet: + id: '1' + author: your.name + changes: + - createTable: + tableName: test_table_partitioned + partitionColumns: partition_column + columns: + - column: + name: test_id + type: int + - column: + name: test_column + type: varchar(50) + - column: + name: partition_column + type: string + rollback: + - dropTable: + tableName: test_table_partitioned + - changeSet: + id: '2' + author: your.name + changes: + - createTable: + tableName: partitioned_delta_table + columns: + - column: + name: id + type: int + - column: + name: name + type: varchar(20) + - column: + name: some_column + type: bigint + extendedTableProperties: + tableFormat: delta + partitionColumns: id, some_column + tblProperties: "'this.is.my.key'=12,'this.is.my.key2'=true" + tableLocation: s3://bucket/partitioned_delta_table_new + rollback: + - dropTable: + tableName: partitioned_delta_table diff --git a/src/test/resources/liquibase/harness/change/expectedSnapshot/databricks/createExternalCsvTable.json b/src/test/resources/liquibase/harness/change/expectedSnapshot/databricks/createExternalCsvTable.json new file mode 100644 index 00000000..30997834 --- /dev/null +++ b/src/test/resources/liquibase/harness/change/expectedSnapshot/databricks/createExternalCsvTable.json @@ -0,0 +1,45 @@ +{ + "snapshot": { + "objects": { + "liquibase.structure.core.Table": [ + { + "table": { + "name": "test_external_csv_table" + } + } + ], + "liquibase.structure.core.Column": [ + { + "column": { + "name": "id" + } + }, + { + "column": { + "name": "first_name" + } + }, + { + "column": { + "name": "last_name" + } + }, + { + "column": { + "name": "email" + } + }, + { + "column": { + "name": "birthdate" + } + }, + { + "column": { + "name": "added" + } + } + ] + } + } +} \ No newline at end of file diff --git a/src/test/resources/liquibase/harness/change/expectedSql/databricks/createClusteredTableNew.sql b/src/test/resources/liquibase/harness/change/expectedSql/databricks/createClusteredTableNew.sql index c1737f53..f0f2ba1d 100644 --- a/src/test/resources/liquibase/harness/change/expectedSql/databricks/createClusteredTableNew.sql +++ b/src/test/resources/liquibase/harness/change/expectedSql/databricks/createClusteredTableNew.sql @@ -1,3 +1,4 @@ CREATE TABLE main.liquibase_harness_test_ds.test_table_clustered_new (test_id INT, test_new INT, test_present_new INT) USING delta TBLPROPERTIES('delta.feature.allowColumnDefaults' = 'supported', 'delta.columnMapping.mode' = 'name', 'delta.enableDeletionVectors' = true) CLUSTER BY (test_id, test_new, test_present_new) ALTER TABLE main.liquibase_harness_test_ds.test_table_clustered_new CLUSTER BY (test_id,test_present_new) -ALTER TABLE main.liquibase_harness_test_ds.test_table_clustered_new DROP COLUMN test_new \ No newline at end of file +ALTER TABLE main.liquibase_harness_test_ds.test_table_clustered_new DROP COLUMN test_new +CREATE TABLE main.liquibase_harness_test_ds.clustered_delta_table (id INT, name VARCHAR(20), test_column VARCHAR(20)) USING delta TBLPROPERTIES('delta.feature.allowColumnDefaults' = 'supported', 'delta.columnMapping.mode' = 'name', 'delta.enableDeletionVectors' = true) CLUSTER BY (id, test_column) \ No newline at end of file diff --git a/src/test/resources/liquibase/harness/change/expectedSql/databricks/createExternalCsvTable.sql b/src/test/resources/liquibase/harness/change/expectedSql/databricks/createExternalCsvTable.sql new file mode 100644 index 00000000..fd993e5b --- /dev/null +++ b/src/test/resources/liquibase/harness/change/expectedSql/databricks/createExternalCsvTable.sql @@ -0,0 +1 @@ +CREATE TABLE main.liquibase_harness_test_ds.test_external_csv_table (id BIGINT, first_name STRING, last_name STRING, email STRING, birthdate date, added TIMESTAMP) USING CSV OPTIONS (header 'true', inferSchema 'false', path 's3://databricks-th/integration-tests/files/csv_table.csv') TBLPROPERTIES('delta.feature.allowColumnDefaults' = 'supported', 'delta.columnMapping.mode' = 'name', 'delta.enableDeletionVectors' = true) diff --git a/src/test/resources/liquibase/harness/change/expectedSql/databricks/createPartitionedTable.sql b/src/test/resources/liquibase/harness/change/expectedSql/databricks/createPartitionedTable.sql index e166036c..183854e0 100644 --- a/src/test/resources/liquibase/harness/change/expectedSql/databricks/createPartitionedTable.sql +++ b/src/test/resources/liquibase/harness/change/expectedSql/databricks/createPartitionedTable.sql @@ -1 +1,2 @@ -CREATE TABLE main.liquibase_harness_test_ds.test_table_partitioned (test_id INT NOT NULL, test_column VARCHAR(50) NOT NULL, partition_column STRING NOT NULL, CONSTRAINT PK_TEST_TABLE_PARTITIONED PRIMARY KEY (test_id)) USING delta TBLPROPERTIES('delta.feature.allowColumnDefaults' = 'supported', 'delta.columnMapping.mode' = 'name', 'delta.enableDeletionVectors' = true) PARTITIONED BY (partition_column) \ No newline at end of file +CREATE TABLE main.liquibase_harness_test_ds.test_table_partitioned (test_id INT NOT NULL, test_column VARCHAR(50) NOT NULL, partition_column STRING NOT NULL, CONSTRAINT PK_TEST_TABLE_PARTITIONED PRIMARY KEY (test_id)) USING delta TBLPROPERTIES('delta.feature.allowColumnDefaults' = 'supported', 'delta.columnMapping.mode' = 'name', 'delta.enableDeletionVectors' = true) PARTITIONED BY (partition_column) +CREATE TABLE main.liquibase_harness_test_ds.partitioned_delta_table (id INT, name VARCHAR(20), some_column BIGINT) USING delta TBLPROPERTIES ('this.is.my.key'=12,'this.is.my.key2'=true) LOCATION 's3://databricks-th/partitioned_delta_table' PARTITIONED BY (id, some_column) \ No newline at end of file diff --git a/src/test/resources/liquibase/harness/change/expectedSql/databricks/createTable.sql b/src/test/resources/liquibase/harness/change/expectedSql/databricks/createTable.sql index 775841e2..becfb20e 100644 --- a/src/test/resources/liquibase/harness/change/expectedSql/databricks/createTable.sql +++ b/src/test/resources/liquibase/harness/change/expectedSql/databricks/createTable.sql @@ -1,2 +1,2 @@ CREATE TABLE main.liquibase_harness_test_ds.test_table (test_id INT NOT NULL, test_column VARCHAR(50) NOT NULL, CONSTRAINT PK_TEST_TABLE PRIMARY KEY (test_id)) USING delta TBLPROPERTIES('delta.feature.allowColumnDefaults' = 'supported', 'delta.columnMapping.mode' = 'name', 'delta.enableDeletionVectors' = true) -CREATE TABLE main.liquibase_harness_test_ds.test_table_properties (test_id INT NOT NULL, CONSTRAINT PK_TEST_TABLE_PROPERTIES PRIMARY KEY (test_id)) TBLPROPERTIES ('this.is.my.key'=12,'this.is.my.key2'=true) LOCATION 's3://databricks-th/test_table_properties' \ No newline at end of file +CREATE TABLE main.liquibase_harness_test_ds.test_table_properties (test_id INT NOT NULL, CONSTRAINT PK_TEST_TABLE_PROPERTIES PRIMARY KEY (test_id)) USING delta TBLPROPERTIES ('this.is.my.key'=12,'this.is.my.key2'=true) LOCATION 's3://databricks-th/test_table_properties' \ No newline at end of file