Skip to content

Commit

Permalink
DAT-18896: [createTable] added tableFormat snapshot and generate chan…
Browse files Browse the repository at this point in the history
…gelog functionality
  • Loading branch information
Mykhailo Savchenko committed Oct 31, 2024
1 parent 85efece commit a031ee9
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,8 @@ public Change[] fixMissing(DatabaseObject missingObject, DiffOutputControl contr
return changes;
}
//so far we intentionally omit tableLocation in generated changelog
//TODO: add tableFormat extended property if needed in scope of DAT-18896
ExtendedTableProperties extendedTableProperties = new ExtendedTableProperties(
null,
missingObject.getAttribute("tableFormat", String.class),
null,
missingObject.getAttribute("tblProperties", String.class),
missingObject.getAttribute("clusteringColumns", String.class),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,29 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

public class TableSnapshotGeneratorDatabricks extends TableSnapshotGenerator {

private static final String LOCATION = "Location";
private static final String PROVIDER = "Provider";
private static final String STORAGE_PROPERTIES = "Storage Properties";
private static final String TABLE_FORMAT = "tableFormat";
private static final String TBL_PROPERTIES = "tblProperties";
private static final String CLUSTER_COLUMNS = "clusteringColumns";
private static final String PARTITION_COLUMNS = "partitionColumns";
private static final String DETAILED_TABLE_INFORMATION_NODE = "# Detailed Table Information";
private static final String TABLE_PARTITION_INFORMATION_NODE = "# Partition Information";
private static final String DATA_TYPE = "DATA_TYPE";
private static final List<String> TBL_PROPERTIES_STOP_LIST = Arrays.asList(
"delta.columnMapping.maxColumnId",
"delta.rowTracking.materializedRowCommitVersionColumnName",
"delta.rowTracking.materializedRowIdColumnName",
"delta.feature.clustering"
);
private static final List<String> FILE_TYPE_PROVIDERS = Arrays.asList("AVRO", "BINARYFILE", "CSV", "JSON", "ORC", "PARQUET", "TEXT");

@Override
public int getPriority(Class<? extends DatabaseObject> objectType, Database database) {
Expand All @@ -49,32 +55,46 @@ protected DatabaseObject snapshotObject(DatabaseObject example, DatabaseSnapshot
example.getName());
List<Map<String, ?>> tablePropertiesResponse = Scope.getCurrentScope().getSingleton(ExecutorService.class)
.getExecutor("jdbc", database).queryForList(new RawParameterizedSqlStatement(query));
StringBuilder tableFormat = new StringBuilder();
// DESCRIBE TABLE EXTENDED returns both columns and additional information.
// We need to make sure "Location" is not column in the table, but table location in s3
boolean detailedInformationNode = false;
boolean partitionInformationNode = false;
StringBuilder partitionColumns = new StringBuilder();
for (Map<String, ?> tableProperty : tablePropertiesResponse) {
if (tableProperty.get("COL_NAME").equals(DETAILED_TABLE_INFORMATION_NODE)) {
String currentColName = (String) tableProperty.get("COL_NAME");
if (currentColName.equals(DETAILED_TABLE_INFORMATION_NODE)) {
detailedInformationNode = true;
continue;
}
if (tableProperty.get("COL_NAME").equals(TABLE_PARTITION_INFORMATION_NODE)) {
if (detailedInformationNode) {
if (currentColName.equals(LOCATION)) {
table.setAttribute(LOCATION, tableProperty.get(DATA_TYPE));
}
if (currentColName.equals(PROVIDER) && FILE_TYPE_PROVIDERS.contains(tableProperty.get(DATA_TYPE).toString().toUpperCase())) {
tableFormat.append(tableProperty.get(DATA_TYPE));
}
if (!tableFormat.toString().isEmpty() && currentColName.equals(STORAGE_PROPERTIES)) {
if(table.getAttribute(LOCATION, String.class) != null) {
tableFormat.append(" ").append(LOCATION.toUpperCase()).append("'").append(table.getAttribute(LOCATION, String.class)).append("' ");
}
tableFormat.append(extractOptionsFromStorageProperties(tableProperty.get(DATA_TYPE)));
table.setAttribute(TABLE_FORMAT, tableFormat.toString());
}
}
if (currentColName.equals(TABLE_PARTITION_INFORMATION_NODE)) {
partitionInformationNode = true;
continue;
}
if (detailedInformationNode && tableProperty.get("COL_NAME").equals(LOCATION)) {
table.setAttribute(LOCATION, tableProperty.get("DATA_TYPE"));
}
if(partitionInformationNode && !tableProperty.get("COL_NAME").equals("# col_name")) {
if(tableProperty.get("COL_NAME").equals("")) {
if (partitionInformationNode && !currentColName.equals("# col_name")) {
if (currentColName.equals("")) {
partitionInformationNode = false;
continue;
}
if (partitionColumns.toString().isEmpty()) {
partitionColumns.append(tableProperty.get("COL_NAME"));
partitionColumns.append(currentColName);
} else {
partitionColumns.append(',').append(tableProperty.get("COL_NAME"));
partitionColumns.append(',').append(currentColName);
}
}
}
Expand All @@ -84,13 +104,28 @@ protected DatabaseObject snapshotObject(DatabaseObject example, DatabaseSnapshot
if (tblProperties.containsKey(CLUSTER_COLUMNS)) {
table.setAttribute(CLUSTER_COLUMNS, sanitizeClusterColumns(tblProperties.remove(CLUSTER_COLUMNS)));
}
if(!partitionColumns.toString().isEmpty()) {
if (!partitionColumns.toString().isEmpty()) {
table.setAttribute(PARTITION_COLUMNS, partitionColumns.toString());
}
table.setAttribute(TBL_PROPERTIES, getTblPropertiesString(tblProperties));
}
return table;
}

private String extractOptionsFromStorageProperties(Object storageProperties) {
StringBuilder options = new StringBuilder();
if (storageProperties instanceof String) {
Matcher matcher = Pattern.compile("(\\b\\w+\\b)=(.*?)(,|\\])").matcher((String) storageProperties);
if (matcher.find()) {
options.append(" OPTIONS (").append(matcher.group(1)).append(" '").append(matcher.group(2)).append("'");
while (matcher.find()) {
options.append(", ").append(matcher.group(1)).append(" '").append(matcher.group(2)).append("'");
}
options.append(")");
}
}
return options.toString();
}
//TODO another way of getting Location is query like
// select * from `system`.`information_schema`.`tables` where table_name = 'test_table_properties' AND table_schema='liquibase_harness_test_ds';
// get column 'table_type', if 'EXTERNAL' then
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<?xml version="1.0" encoding="UTF-8"?>
<databaseChangeLog
xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:databricks="http://www.liquibase.org/xml/ns/databricks"
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog
http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-latest.xsd">
<changeSet id="1" author="mykhailo">
<createTable tableName="test_external_csv_table">
<column name="id" type="BIGINT"/>
<column name="first_name" type="STRING"/>
<column name="last_name" type="STRING"/>
<column name="email" type="STRING"/>
<column name="birthdate" type="DATE"/>
<column name="added" type="TIMESTAMP"/>
<databricks:extendedTableProperties tableFormat="CSV OPTIONS (header 'true', inferSchema 'false', path 's3://databricks-th/files/csv_table.csv')"/>
</createTable>
</changeSet>
</databaseChangeLog>
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
{
"snapshot": {
"objects": {
"liquibase.structure.core.Table": [
{
"table": {
"name": "test_external_csv_table"
}
}
],
"liquibase.structure.core.Column": [
{
"column": {
"name": "id"
}
},
{
"column": {
"name": "first_name"
}
},
{
"column": {
"name": "last_name"
}
},
{
"column": {
"name": "email"
}
},
{
"column": {
"name": "birthdate"
}
},
{
"column": {
"name": "added"
}
}
]
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
CREATE TABLE main.liquibase_harness_test_ds.test_external_csv_table (id BIGINT, first_name STRING, last_name STRING, email STRING, birthdate date, added TIMESTAMP) USING CSV OPTIONS (header 'true', inferSchema 'false', path 's3://databricks-th/files/csv_table.csv')

0 comments on commit a031ee9

Please sign in to comment.