diff --git a/flow/connectors/clickhouse/normalize.go b/flow/connectors/clickhouse/normalize.go index 2d72d8d693..cd551cfe9f 100644 --- a/flow/connectors/clickhouse/normalize.go +++ b/flow/connectors/clickhouse/normalize.go @@ -394,10 +394,10 @@ func (c *ClickHouseConnector) NormalizeRecords( jsonExtractData := "JSONExtract(_peerdb_data, '%s', '%s')" jsonExtractMatchData := "JSONExtract(_peerdb_match_data, '%s', '%s')" if useJSON { - jsonExtractStringData = "_peerdb_data.'%s'" - jsonExtractStringMatchData = "_peerdb_match_data.'%s'" - jsonExtractData = "_peerdb_data.'%s'.:'%s'" - jsonExtractMatchData = "_peerdb_match_data.'%s'.:'%s'" + jsonExtractStringData = "_peerdb_data.`%s`" + jsonExtractStringMatchData = "_peerdb_match_data.`%s`" + jsonExtractData = "_peerdb_data.`%s`::%s" + jsonExtractMatchData = "_peerdb_match_data.`%s`::%s" } switch clickHouseType { @@ -486,9 +486,6 @@ func (c *ClickHouseConnector) NormalizeRecords( insertIntoSelectQuery.WriteString("` ") insertIntoSelectQuery.WriteString(colSelector.String()) insertIntoSelectQuery.WriteString(selectQuery.String()) - if useJSON { - insertIntoSelectQuery.WriteString(" SETTINGS allow_experimental_json_type=1") - } select { case queries <- insertIntoSelectQuery.String(): @@ -560,18 +557,14 @@ func (c *ClickHouseConnector) peerdbDataTypeIsJSON( flowJobName string, ) (bool, error) { rawTable := c.getRawTableName(flowJobName) - query := fmt.Sprintf("SELECT _peerdb_data FROM %s LIMIT 0", rawTable) - rows, err := c.query(ctx, query) - if err != nil { + var isJSON bool + if err := c.queryRow(ctx, + `SELECT type='JSON' FROM system.columns + WHERE database=currentDatabase() AND table=? AND name='_peerdb_data'`, rawTable).Scan(&isJSON); err != nil { return false, fmt.Errorf("error while querying raw table for data type: %w", err) } - defer rows.Close() - if len(rows.ColumnTypes()) == 1 && rows.ColumnTypes()[0].DatabaseTypeName() == "JSON" { - return true, nil - } - - return false, nil + return isJSON, nil } func (c *ClickHouseConnector) copyAvroStageToDestination(ctx context.Context, flowJobName string, syncBatchID int64) error { diff --git a/flow/connectors/clickhouse/qrep_avro_sync.go b/flow/connectors/clickhouse/qrep_avro_sync.go index f78a021be9..d0acb397e9 100644 --- a/flow/connectors/clickhouse/qrep_avro_sync.go +++ b/flow/connectors/clickhouse/qrep_avro_sync.go @@ -52,11 +52,9 @@ func (s *ClickHouseAvroSyncMethod) CopyStageToDestination(ctx context.Context, a sessionTokenPart = fmt.Sprintf(", '%s'", creds.AWS.SessionToken) } - // needed because Avro parser in CH won't allow Avro string into JSON + // needed because Avro parser in CH won't accept Avro string into JSON query := fmt.Sprintf( - `INSERT INTO "%s" SELECT * FROM s3('%s','%s','%s'%s, 'Avro', '_peerdb_uid UUID, - _peerdb_timestamp Int64, _peerdb_destination_table_name String, _peerdb_data String, - _peerdb_record_type Int, _peerdb_match_data String, _peerdb_batch_id Int64, _peerdb_unchanged_toast_columns String')`, + `INSERT INTO "%s" SELECT * FROM s3('%s','%s','%s'%s, 'Avro') SETTINGS use_structure_from_insertion_table_in_table_functions=0`, s.config.DestinationTableIdentifier, avroFileUrl, creds.AWS.AccessKeyID, creds.AWS.SecretAccessKey, sessionTokenPart)