Skip to content

Commit

Permalink
[clickhouse] change raw table definition to use int64 for _peerdb_bat…
Browse files Browse the repository at this point in the history
…ch_id (#2248)
  • Loading branch information
heavycrystal authored Nov 13, 2024
1 parent 1f969d1 commit 0fbd7f7
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 7 deletions.
12 changes: 6 additions & 6 deletions flow/connectors/clickhouse/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,13 @@ func (c *ClickHouseConnector) CreateRawTable(ctx context.Context, req *protos.Cr
rawTableName := c.getRawTableName(req.FlowJobName)

createRawTableSQL := `CREATE TABLE IF NOT EXISTS %s (
_peerdb_uid UUID NOT NULL,
_peerdb_timestamp Int64 NOT NULL,
_peerdb_destination_table_name String NOT NULL,
_peerdb_data String NOT NULL,
_peerdb_record_type Int NOT NULL,
_peerdb_uid UUID,
_peerdb_timestamp Int64,
_peerdb_destination_table_name String,
_peerdb_data String,
_peerdb_record_type Int,
_peerdb_match_data String,
_peerdb_batch_id Int,
_peerdb_batch_id Int64,
_peerdb_unchanged_toast_columns String
) ENGINE = MergeTree() ORDER BY (_peerdb_batch_id, _peerdb_destination_table_name);`

Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/clickhouse/normalize.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ func (c *ClickHouseConnector) getDistinctTableNamesInBatch(
rawTbl := c.getRawTableName(flowJobName)

q := fmt.Sprintf(
`SELECT DISTINCT _peerdb_destination_table_name FROM %s WHERE _peerdb_batch_id > %d AND _peerdb_batch_id <= %d`,
`SELECT DISTINCT _peerdb_destination_table_name FROM %s WHERE _peerdb_batch_id>%d AND _peerdb_batch_id<=%d`,
rawTbl, normalizeBatchID, syncBatchID)

rows, err := c.query(ctx, q)
Expand Down

0 comments on commit 0fbd7f7

Please sign in to comment.