From a031ee9207bdd34cbba86efae141a8678ba3f853 Mon Sep 17 00:00:00 2001 From: Mykhailo Savchenko Date: Thu, 31 Oct 2024 17:52:15 +0200 Subject: [PATCH] DAT-18896: [createTable] added tableFormat snapshot and generate changelog functionality --- ...MissingTableChangeGeneratorDatabricks.java | 3 +- .../jvm/TableSnapshotGeneratorDatabricks.java | 55 +++++++++++++++---- .../databricks/createExternalCsvTable.xml | 19 +++++++ .../databricks/createExternalCsvTable.json | 45 +++++++++++++++ .../databricks/createExternalCsvTable.sql | 1 + 5 files changed, 111 insertions(+), 12 deletions(-) create mode 100644 src/test/resources/liquibase/harness/change/changelogs/databricks/createExternalCsvTable.xml create mode 100644 src/test/resources/liquibase/harness/change/expectedSnapshot/databricks/createExternalCsvTable.json create mode 100644 src/test/resources/liquibase/harness/change/expectedSql/databricks/createExternalCsvTable.sql diff --git a/src/main/java/liquibase/ext/databricks/diff/output/changelog/MissingTableChangeGeneratorDatabricks.java b/src/main/java/liquibase/ext/databricks/diff/output/changelog/MissingTableChangeGeneratorDatabricks.java index ccbea35d..3dc56e2a 100644 --- a/src/main/java/liquibase/ext/databricks/diff/output/changelog/MissingTableChangeGeneratorDatabricks.java +++ b/src/main/java/liquibase/ext/databricks/diff/output/changelog/MissingTableChangeGeneratorDatabricks.java @@ -31,9 +31,8 @@ public Change[] fixMissing(DatabaseObject missingObject, DiffOutputControl contr return changes; } //so far we intentionally omit tableLocation in generated changelog - //TODO: add tableFormat extended property if needed in scope of DAT-18896 ExtendedTableProperties extendedTableProperties = new ExtendedTableProperties( - null, + missingObject.getAttribute("tableFormat", String.class), null, missingObject.getAttribute("tblProperties", String.class), missingObject.getAttribute("clusteringColumns", String.class), diff --git a/src/main/java/liquibase/ext/databricks/snapshot/jvm/TableSnapshotGeneratorDatabricks.java b/src/main/java/liquibase/ext/databricks/snapshot/jvm/TableSnapshotGeneratorDatabricks.java index f996d793..7852bf15 100644 --- a/src/main/java/liquibase/ext/databricks/snapshot/jvm/TableSnapshotGeneratorDatabricks.java +++ b/src/main/java/liquibase/ext/databricks/snapshot/jvm/TableSnapshotGeneratorDatabricks.java @@ -14,23 +14,29 @@ import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; public class TableSnapshotGeneratorDatabricks extends TableSnapshotGenerator { private static final String LOCATION = "Location"; + private static final String PROVIDER = "Provider"; + private static final String STORAGE_PROPERTIES = "Storage Properties"; + private static final String TABLE_FORMAT = "tableFormat"; private static final String TBL_PROPERTIES = "tblProperties"; private static final String CLUSTER_COLUMNS = "clusteringColumns"; private static final String PARTITION_COLUMNS = "partitionColumns"; private static final String DETAILED_TABLE_INFORMATION_NODE = "# Detailed Table Information"; private static final String TABLE_PARTITION_INFORMATION_NODE = "# Partition Information"; + private static final String DATA_TYPE = "DATA_TYPE"; private static final List TBL_PROPERTIES_STOP_LIST = Arrays.asList( "delta.columnMapping.maxColumnId", "delta.rowTracking.materializedRowCommitVersionColumnName", "delta.rowTracking.materializedRowIdColumnName", "delta.feature.clustering" ); + private static final List FILE_TYPE_PROVIDERS = Arrays.asList("AVRO", "BINARYFILE", "CSV", "JSON", "ORC", "PARQUET", "TEXT"); @Override public int getPriority(Class objectType, Database database) { @@ -49,32 +55,46 @@ protected DatabaseObject snapshotObject(DatabaseObject example, DatabaseSnapshot example.getName()); List> tablePropertiesResponse = Scope.getCurrentScope().getSingleton(ExecutorService.class) .getExecutor("jdbc", database).queryForList(new RawParameterizedSqlStatement(query)); + StringBuilder tableFormat = new StringBuilder(); // DESCRIBE TABLE EXTENDED returns both columns and additional information. // We need to make sure "Location" is not column in the table, but table location in s3 boolean detailedInformationNode = false; boolean partitionInformationNode = false; StringBuilder partitionColumns = new StringBuilder(); for (Map tableProperty : tablePropertiesResponse) { - if (tableProperty.get("COL_NAME").equals(DETAILED_TABLE_INFORMATION_NODE)) { + String currentColName = (String) tableProperty.get("COL_NAME"); + if (currentColName.equals(DETAILED_TABLE_INFORMATION_NODE)) { detailedInformationNode = true; continue; } - if (tableProperty.get("COL_NAME").equals(TABLE_PARTITION_INFORMATION_NODE)) { + if (detailedInformationNode) { + if (currentColName.equals(LOCATION)) { + table.setAttribute(LOCATION, tableProperty.get(DATA_TYPE)); + } + if (currentColName.equals(PROVIDER) && FILE_TYPE_PROVIDERS.contains(tableProperty.get(DATA_TYPE).toString().toUpperCase())) { + tableFormat.append(tableProperty.get(DATA_TYPE)); + } + if (!tableFormat.toString().isEmpty() && currentColName.equals(STORAGE_PROPERTIES)) { + if(table.getAttribute(LOCATION, String.class) != null) { + tableFormat.append(" ").append(LOCATION.toUpperCase()).append("'").append(table.getAttribute(LOCATION, String.class)).append("' "); + } + tableFormat.append(extractOptionsFromStorageProperties(tableProperty.get(DATA_TYPE))); + table.setAttribute(TABLE_FORMAT, tableFormat.toString()); + } + } + if (currentColName.equals(TABLE_PARTITION_INFORMATION_NODE)) { partitionInformationNode = true; continue; } - if (detailedInformationNode && tableProperty.get("COL_NAME").equals(LOCATION)) { - table.setAttribute(LOCATION, tableProperty.get("DATA_TYPE")); - } - if(partitionInformationNode && !tableProperty.get("COL_NAME").equals("# col_name")) { - if(tableProperty.get("COL_NAME").equals("")) { + if (partitionInformationNode && !currentColName.equals("# col_name")) { + if (currentColName.equals("")) { partitionInformationNode = false; continue; } if (partitionColumns.toString().isEmpty()) { - partitionColumns.append(tableProperty.get("COL_NAME")); + partitionColumns.append(currentColName); } else { - partitionColumns.append(',').append(tableProperty.get("COL_NAME")); + partitionColumns.append(',').append(currentColName); } } } @@ -84,13 +104,28 @@ protected DatabaseObject snapshotObject(DatabaseObject example, DatabaseSnapshot if (tblProperties.containsKey(CLUSTER_COLUMNS)) { table.setAttribute(CLUSTER_COLUMNS, sanitizeClusterColumns(tblProperties.remove(CLUSTER_COLUMNS))); } - if(!partitionColumns.toString().isEmpty()) { + if (!partitionColumns.toString().isEmpty()) { table.setAttribute(PARTITION_COLUMNS, partitionColumns.toString()); } table.setAttribute(TBL_PROPERTIES, getTblPropertiesString(tblProperties)); } return table; } + + private String extractOptionsFromStorageProperties(Object storageProperties) { + StringBuilder options = new StringBuilder(); + if (storageProperties instanceof String) { + Matcher matcher = Pattern.compile("(\\b\\w+\\b)=(.*?)(,|\\])").matcher((String) storageProperties); + if (matcher.find()) { + options.append(" OPTIONS (").append(matcher.group(1)).append(" '").append(matcher.group(2)).append("'"); + while (matcher.find()) { + options.append(", ").append(matcher.group(1)).append(" '").append(matcher.group(2)).append("'"); + } + options.append(")"); + } + } + return options.toString(); + } //TODO another way of getting Location is query like // select * from `system`.`information_schema`.`tables` where table_name = 'test_table_properties' AND table_schema='liquibase_harness_test_ds'; // get column 'table_type', if 'EXTERNAL' then diff --git a/src/test/resources/liquibase/harness/change/changelogs/databricks/createExternalCsvTable.xml b/src/test/resources/liquibase/harness/change/changelogs/databricks/createExternalCsvTable.xml new file mode 100644 index 00000000..64b578c4 --- /dev/null +++ b/src/test/resources/liquibase/harness/change/changelogs/databricks/createExternalCsvTable.xml @@ -0,0 +1,19 @@ + + + + + + + + + + + + + + \ No newline at end of file diff --git a/src/test/resources/liquibase/harness/change/expectedSnapshot/databricks/createExternalCsvTable.json b/src/test/resources/liquibase/harness/change/expectedSnapshot/databricks/createExternalCsvTable.json new file mode 100644 index 00000000..30997834 --- /dev/null +++ b/src/test/resources/liquibase/harness/change/expectedSnapshot/databricks/createExternalCsvTable.json @@ -0,0 +1,45 @@ +{ + "snapshot": { + "objects": { + "liquibase.structure.core.Table": [ + { + "table": { + "name": "test_external_csv_table" + } + } + ], + "liquibase.structure.core.Column": [ + { + "column": { + "name": "id" + } + }, + { + "column": { + "name": "first_name" + } + }, + { + "column": { + "name": "last_name" + } + }, + { + "column": { + "name": "email" + } + }, + { + "column": { + "name": "birthdate" + } + }, + { + "column": { + "name": "added" + } + } + ] + } + } +} \ No newline at end of file diff --git a/src/test/resources/liquibase/harness/change/expectedSql/databricks/createExternalCsvTable.sql b/src/test/resources/liquibase/harness/change/expectedSql/databricks/createExternalCsvTable.sql new file mode 100644 index 00000000..46c44b51 --- /dev/null +++ b/src/test/resources/liquibase/harness/change/expectedSql/databricks/createExternalCsvTable.sql @@ -0,0 +1 @@ +CREATE TABLE main.liquibase_harness_test_ds.test_external_csv_table (id BIGINT, first_name STRING, last_name STRING, email STRING, birthdate date, added TIMESTAMP) USING CSV OPTIONS (header 'true', inferSchema 'false', path 's3://databricks-th/files/csv_table.csv')