Skip to content

Commit

Permalink
Fix issues with CH qrep (#2064)
Browse files Browse the repository at this point in the history
1. Fix NPE in tableMappigns.len
2. Default to ReplacingMergeTree to support upsert key columns
  • Loading branch information
iskakaushik authored Sep 10, 2024
1 parent 97c66a2 commit a5a4707
Showing 1 changed file with 10 additions and 5 deletions.
15 changes: 10 additions & 5 deletions flow/connectors/clickhouse/normalize.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,9 @@ func generateCreateTableSQLForNormalizedTable(
}

var engine string
if tableMapping == nil || tableMapping.Engine == protos.TableEngine_CH_ENGINE_MERGE_TREE {
if tableMapping == nil {
engine = fmt.Sprintf("ReplacingMergeTree(`%s`)", versionColName)
} else if tableMapping.Engine == protos.TableEngine_CH_ENGINE_MERGE_TREE {
engine = "MergeTree()"
} else {
engine = fmt.Sprintf("ReplacingMergeTree(`%s`)", versionColName)
Expand All @@ -180,10 +182,13 @@ func generateCreateTableSQLForNormalizedTable(
stmtBuilder.WriteString(") ")
}

orderby := make([]*protos.ColumnSetting, 0, len(tableMapping.Columns))
for _, col := range tableMapping.Columns {
if col.Ordering > 0 && !slices.Contains(pkeys, getColName(colNameMap, col.SourceName)) {
orderby = append(orderby, col)
orderby := make([]*protos.ColumnSetting, 0)
if tableMapping != nil {
orderby = slices.Clone(tableMapping.Columns)
for _, col := range tableMapping.Columns {
if col.Ordering > 0 && !slices.Contains(pkeys, getColName(colNameMap, col.SourceName)) {
orderby = append(orderby, col)
}
}
}
slices.SortStableFunc(orderby, func(a *protos.ColumnSetting, b *protos.ColumnSetting) int {
Expand Down

0 comments on commit a5a4707

Please sign in to comment.