From a5a470769ffa4539dd305e866ef5fc7dd5a80bbb Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Tue, 10 Sep 2024 13:03:01 -0500 Subject: [PATCH] Fix issues with CH qrep (#2064) 1. Fix NPE in tableMappigns.len 2. Default to ReplacingMergeTree to support upsert key columns --- flow/connectors/clickhouse/normalize.go | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/flow/connectors/clickhouse/normalize.go b/flow/connectors/clickhouse/normalize.go index b6d2f7f9ac..39fb5c9bd4 100644 --- a/flow/connectors/clickhouse/normalize.go +++ b/flow/connectors/clickhouse/normalize.go @@ -153,7 +153,9 @@ func generateCreateTableSQLForNormalizedTable( } var engine string - if tableMapping == nil || tableMapping.Engine == protos.TableEngine_CH_ENGINE_MERGE_TREE { + if tableMapping == nil { + engine = fmt.Sprintf("ReplacingMergeTree(`%s`)", versionColName) + } else if tableMapping.Engine == protos.TableEngine_CH_ENGINE_MERGE_TREE { engine = "MergeTree()" } else { engine = fmt.Sprintf("ReplacingMergeTree(`%s`)", versionColName) @@ -180,10 +182,13 @@ func generateCreateTableSQLForNormalizedTable( stmtBuilder.WriteString(") ") } - orderby := make([]*protos.ColumnSetting, 0, len(tableMapping.Columns)) - for _, col := range tableMapping.Columns { - if col.Ordering > 0 && !slices.Contains(pkeys, getColName(colNameMap, col.SourceName)) { - orderby = append(orderby, col) + orderby := make([]*protos.ColumnSetting, 0) + if tableMapping != nil { + orderby = slices.Clone(tableMapping.Columns) + for _, col := range tableMapping.Columns { + if col.Ordering > 0 && !slices.Contains(pkeys, getColName(colNameMap, col.SourceName)) { + orderby = append(orderby, col) + } } } slices.SortStableFunc(orderby, func(a *protos.ColumnSetting, b *protos.ColumnSetting) int {