From 3eaac061d57cb158e4ff6e6e0842c5d2e4b52425 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Tue, 27 Aug 2024 03:35:45 +0530 Subject: [PATCH 1/2] add column params for replacingmergetree --- flow/connectors/clickhouse/cdc.go | 4 ++-- flow/connectors/clickhouse/normalize.go | 17 +++++++++-------- flow/connectors/clickhouse/qrep_avro_sync.go | 2 +- 3 files changed, 12 insertions(+), 11 deletions(-) diff --git a/flow/connectors/clickhouse/cdc.go b/flow/connectors/clickhouse/cdc.go index bffc1c5eef..c4539a93a1 100644 --- a/flow/connectors/clickhouse/cdc.go +++ b/flow/connectors/clickhouse/cdc.go @@ -169,8 +169,8 @@ func (c *ClickhouseConnector) RenameTables(ctx context.Context, req *protos.Rena c.logger.Info(fmt.Sprintf("handling soft-deletes for table '%s'...", renameRequest.NewName)) err := c.execWithLogging(ctx, fmt.Sprintf("INSERT INTO %s(%s) SELECT %s,true AS %s FROM %s WHERE (%s) NOT IN (SELECT %s FROM %s)", - renameRequest.CurrentName, fmt.Sprintf("%s,%s", allCols, signColName), allCols, - signColName, + renameRequest.CurrentName, fmt.Sprintf("%s,%s", allCols, isDeletedColName), allCols, + isDeletedColName, renameRequest.NewName, pkeyCols, pkeyCols, renameRequest.CurrentName)) if err != nil { return nil, fmt.Errorf("unable to handle soft-deletes for table %s: %w", renameRequest.NewName, err) diff --git a/flow/connectors/clickhouse/normalize.go b/flow/connectors/clickhouse/normalize.go index 2896041612..b64fa8cf48 100644 --- a/flow/connectors/clickhouse/normalize.go +++ b/flow/connectors/clickhouse/normalize.go @@ -16,10 +16,10 @@ import ( ) const ( - signColName = "_peerdb_is_deleted" - signColType = "Int8" - versionColName = "_peerdb_version" - versionColType = "Int64" + isDeletedColName = "_peerdb_is_deleted" + isDeletedColType = "Int8" + versionColName = "_peerdb_version" + versionColType = "Int64" ) func (c *ClickhouseConnector) StartSetupNormalizedTables(_ context.Context) (interface{}, error) { @@ -106,8 +106,9 @@ func generateCreateTableSQLForNormalizedTable( // add sign and version columns stmtBuilder.WriteString(fmt.Sprintf( - "`%s` %s, `%s` %s) ENGINE = ReplacingMergeTree(`%s`)", - signColName, signColType, versionColName, versionColType, versionColName)) + "`%s` %s, `%s` %s) ENGINE = ReplacingMergeTree(`%s`, `%s`, `%s`)", + isDeletedColName, isDeletedColType, versionColName, versionColType, versionColName, + versionColName, isDeletedColName)) pkeys := tableSchema.PrimaryKeyColumns if len(pkeys) > 0 { @@ -207,8 +208,8 @@ func (c *ClickhouseConnector) NormalizeRecords(ctx context.Context, } // add _peerdb_sign as _peerdb_record_type / 2 - projection.WriteString(fmt.Sprintf("intDiv(_peerdb_record_type, 2) AS `%s`,", signColName)) - colSelector.WriteString(fmt.Sprintf("`%s`,", signColName)) + projection.WriteString(fmt.Sprintf("intDiv(_peerdb_record_type, 2) AS `%s`,", isDeletedColName)) + colSelector.WriteString(fmt.Sprintf("`%s`,", isDeletedColName)) // add _peerdb_timestamp as _peerdb_version projection.WriteString(fmt.Sprintf("_peerdb_timestamp AS `%s`", versionColName)) diff --git a/flow/connectors/clickhouse/qrep_avro_sync.go b/flow/connectors/clickhouse/qrep_avro_sync.go index b7288bf566..8038c98a47 100644 --- a/flow/connectors/clickhouse/qrep_avro_sync.go +++ b/flow/connectors/clickhouse/qrep_avro_sync.go @@ -133,7 +133,7 @@ func (s *ClickhouseAvroSyncMethod) SyncQRepRecords( for _, col := range dstTableSchema { colName := col.Name() if strings.EqualFold(colName, config.SoftDeleteColName) || - strings.EqualFold(colName, signColName) || + strings.EqualFold(colName, isDeletedColName) || strings.EqualFold(colName, config.SyncedAtColName) || strings.EqualFold(colName, versionColName) { continue From d8dc178480b73d4d0c5380294e1dd19aee8342c1 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Tue, 27 Aug 2024 03:49:58 +0530 Subject: [PATCH 2/2] fix syntax --- flow/connectors/clickhouse/normalize.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/flow/connectors/clickhouse/normalize.go b/flow/connectors/clickhouse/normalize.go index b64fa8cf48..8cc9e4bba6 100644 --- a/flow/connectors/clickhouse/normalize.go +++ b/flow/connectors/clickhouse/normalize.go @@ -17,9 +17,9 @@ import ( const ( isDeletedColName = "_peerdb_is_deleted" - isDeletedColType = "Int8" + isDeletedColType = "UInt8" versionColName = "_peerdb_version" - versionColType = "Int64" + versionColType = "UInt64" ) func (c *ClickhouseConnector) StartSetupNormalizedTables(_ context.Context) (interface{}, error) { @@ -106,9 +106,9 @@ func generateCreateTableSQLForNormalizedTable( // add sign and version columns stmtBuilder.WriteString(fmt.Sprintf( - "`%s` %s, `%s` %s) ENGINE = ReplacingMergeTree(`%s`, `%s`, `%s`)", + "`%s` %s, `%s` %s) ENGINE = ReplacingMergeTree(`%s`, `%s`)", isDeletedColName, isDeletedColType, versionColName, versionColType, versionColName, - versionColName, isDeletedColName)) + isDeletedColName)) pkeys := tableSchema.PrimaryKeyColumns if len(pkeys) > 0 {