diff --git a/flow/connectors/postgres/qvalue_convert.go b/flow/connectors/postgres/qvalue_convert.go index 4bb6fa2a5b..fe2489ed30 100644 --- a/flow/connectors/postgres/qvalue_convert.go +++ b/flow/connectors/postgres/qvalue_convert.go @@ -297,12 +297,12 @@ func parseFieldFromQValueKind(qvalueKind qvalue.QValueKind, value interface{}) ( upperBoundType := tstzrangeObject.UpperType lowerTime, err := convertTimeRangeBound(tstzrangeObject.Lower) if err != nil { - return nil, fmt.Errorf("[tstzrange]error for lower time bound: %v", err) + return nil, fmt.Errorf("[tstzrange]error for lower time bound: %w", err) } upperTime, err := convertTimeRangeBound(tstzrangeObject.Upper) if err != nil { - return nil, fmt.Errorf("[tstzrange]error for upper time bound: %v", err) + return nil, fmt.Errorf("[tstzrange]error for upper time bound: %w", err) } lowerBracket := "[" diff --git a/flow/model/qvalue/avro_converter.go b/flow/model/qvalue/avro_converter.go index c162f33c4c..97d9641b6b 100644 --- a/flow/model/qvalue/avro_converter.go +++ b/flow/model/qvalue/avro_converter.go @@ -138,6 +138,8 @@ func GetAvroSchemaFromQValueKind(kind QValueKind, targetDWH protos.DBType, preci }, nil } return "string", nil + case QValueKindTSTZRange: + return "string", nil case QValueKindHStore, QValueKindJSON, QValueKindJSONB, QValueKindStruct: return "string", nil case QValueKindArrayFloat32: @@ -193,6 +195,8 @@ func GetAvroSchemaFromQValueKind(kind QValueKind, targetDWH protos.DBType, preci Type: "array", Items: "string", }, nil + case QValueKindArrayJSON, QValueKindArrayJSONB: + return "string", nil case QValueKindArrayString: return AvroSchemaArray{ Type: "array", @@ -315,7 +319,7 @@ func QValueToAvro(value QValue, field *QField, targetDWH protos.DBType, logger l return t, nil case QValueQChar: return c.processNullableUnion("string", string(v.Val)) - case QValueString, QValueCIDR, QValueINET, QValueMacaddr, QValueInterval: + case QValueString, QValueCIDR, QValueINET, QValueMacaddr, QValueInterval, QValueTSTZRange: if c.TargetDWH == protos.DBType_SNOWFLAKE && v.Value() != nil && (len(v.Value().(string)) > 15*1024*1024) { slog.Warn("Clearing TEXT value > 15MB for Snowflake!") diff --git a/flow/model/qvalue/kind.go b/flow/model/qvalue/kind.go index 9cfe849d97..91ab867a0e 100644 --- a/flow/model/qvalue/kind.go +++ b/flow/model/qvalue/kind.go @@ -99,6 +99,8 @@ var QValueKindToSnowflakeTypeMap = map[QValueKind]string{ QValueKindArrayTimestamp: "VARIANT", QValueKindArrayTimestampTZ: "VARIANT", QValueKindArrayBoolean: "VARIANT", + QValueKindArrayJSON: "VARIANT", + QValueKindArrayJSONB: "VARIANT", } var QValueKindToClickHouseTypeMap = map[QValueKind]string{ @@ -114,6 +116,7 @@ var QValueKindToClickHouseTypeMap = map[QValueKind]string{ QValueKindJSON: "String", QValueKindTimestamp: "DateTime64(6)", QValueKindTimestampTZ: "DateTime64(6)", + QValueKindTSTZRange: "String", QValueKindTime: "DateTime64(6)", QValueKindTimeTZ: "DateTime64(6)", QValueKindDate: "Date32", @@ -123,7 +126,6 @@ var QValueKindToClickHouseTypeMap = map[QValueKind]string{ QValueKindInvalid: "String", QValueKindHStore: "String", - // array types will be mapped to VARIANT QValueKindArrayFloat32: "Array(Float32)", QValueKindArrayFloat64: "Array(Float64)", QValueKindArrayInt32: "Array(Int32)", @@ -134,6 +136,8 @@ var QValueKindToClickHouseTypeMap = map[QValueKind]string{ QValueKindArrayDate: "Array(Date)", QValueKindArrayTimestamp: "Array(DateTime64(6))", QValueKindArrayTimestampTZ: "Array(DateTime64(6))", + QValueKindArrayJSON: "String", + QValueKindArrayJSONB: "String", } func (kind QValueKind) ToDWHColumnType(dwhType protos.DBType) (string, error) {