Skip to content

Commit

Permalink
workaround for getting isJSON, fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Nov 29, 2024
1 parent 1443bc4 commit 9bf6756
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 20 deletions.
25 changes: 9 additions & 16 deletions flow/connectors/clickhouse/normalize.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,10 +394,10 @@ func (c *ClickHouseConnector) NormalizeRecords(
jsonExtractData := "JSONExtract(_peerdb_data, '%s', '%s')"
jsonExtractMatchData := "JSONExtract(_peerdb_match_data, '%s', '%s')"
if useJSON {
jsonExtractStringData = "_peerdb_data.'%s'"
jsonExtractStringMatchData = "_peerdb_match_data.'%s'"
jsonExtractData = "_peerdb_data.'%s'.:'%s'"
jsonExtractMatchData = "_peerdb_match_data.'%s'.:'%s'"
jsonExtractStringData = "_peerdb_data.`%s`"
jsonExtractStringMatchData = "_peerdb_match_data.`%s`"
jsonExtractData = "_peerdb_data.`%s`::%s"
jsonExtractMatchData = "_peerdb_match_data.`%s`::%s"
}

switch clickHouseType {
Expand Down Expand Up @@ -486,9 +486,6 @@ func (c *ClickHouseConnector) NormalizeRecords(
insertIntoSelectQuery.WriteString("` ")
insertIntoSelectQuery.WriteString(colSelector.String())
insertIntoSelectQuery.WriteString(selectQuery.String())
if useJSON {
insertIntoSelectQuery.WriteString(" SETTINGS allow_experimental_json_type=1")
}

select {
case queries <- insertIntoSelectQuery.String():
Expand Down Expand Up @@ -560,18 +557,14 @@ func (c *ClickHouseConnector) peerdbDataTypeIsJSON(
flowJobName string,
) (bool, error) {
rawTable := c.getRawTableName(flowJobName)
query := fmt.Sprintf("SELECT _peerdb_data FROM %s LIMIT 0", rawTable)

rows, err := c.query(ctx, query)
if err != nil {
var isJSON bool
if err := c.queryRow(ctx,
`SELECT type='JSON' FROM system.columns
WHERE database=currentDatabase() AND table=? AND name='_peerdb_data'`, rawTable).Scan(&isJSON); err != nil {
return false, fmt.Errorf("error while querying raw table for data type: %w", err)
}
defer rows.Close()
if len(rows.ColumnTypes()) == 1 && rows.ColumnTypes()[0].DatabaseTypeName() == "JSON" {
return true, nil
}

return false, nil
return isJSON, nil
}

func (c *ClickHouseConnector) copyAvroStageToDestination(ctx context.Context, flowJobName string, syncBatchID int64) error {
Expand Down
6 changes: 2 additions & 4 deletions flow/connectors/clickhouse/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,9 @@ func (s *ClickHouseAvroSyncMethod) CopyStageToDestination(ctx context.Context, a
sessionTokenPart = fmt.Sprintf(", '%s'", creds.AWS.SessionToken)
}

// needed because Avro parser in CH won't allow Avro string into JSON
// needed because Avro parser in CH won't accept Avro string into JSON
query := fmt.Sprintf(
`INSERT INTO "%s" SELECT * FROM s3('%s','%s','%s'%s, 'Avro', '_peerdb_uid UUID,
_peerdb_timestamp Int64, _peerdb_destination_table_name String, _peerdb_data String,
_peerdb_record_type Int, _peerdb_match_data String, _peerdb_batch_id Int64, _peerdb_unchanged_toast_columns String')`,
`INSERT INTO "%s" SELECT * FROM s3('%s','%s','%s'%s, 'Avro') SETTINGS use_structure_from_insertion_table_in_table_functions=0`,
s.config.DestinationTableIdentifier, avroFileUrl,
creds.AWS.AccessKeyID, creds.AWS.SecretAccessKey, sessionTokenPart)

Expand Down

0 comments on commit 9bf6756

Please sign in to comment.