Skip to content

Commit

Permalink
use qvalue array, generic helper
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Jan 17, 2024
1 parent baf2616 commit 2866aaa
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 71 deletions.
12 changes: 6 additions & 6 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,9 +313,9 @@ func GetAvroType(bqField *bigquery.FieldSchema) (interface{}, error) {
"logicalType": "timestamp-micros",
}
if bqField.Repeated {
return map[string]interface{}{
"type": "array",
"items": timestampSchema,
return qvalue.AvroSchemaArray{
Type: "array",
Items: timestampSchema,
}, nil
}
return timestampSchema, nil
Expand All @@ -325,9 +325,9 @@ func GetAvroType(bqField *bigquery.FieldSchema) (interface{}, error) {
"logicalType": "date",
}
if bqField.Repeated {
return map[string]interface{}{
"type": "array",
"items": dateSchema,
return qvalue.AvroSchemaArray{
Type: "array",
Items: dateSchema,
}, nil
}
return dateSchema, nil
Expand Down
108 changes: 44 additions & 64 deletions flow/model/qrecord_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,18 @@ func (q *QRecordBatch) ToQRecordStream(buffer int) (*QRecordStream, error) {
return stream, nil
}

func constructArray[T any](qValue qvalue.QValue, typeName string) (*pgtype.Array[T], error) {
v, ok := qValue.Value.([]T)
if !ok {
return nil, fmt.Errorf("invalid %s value", typeName)
}
return &pgtype.Array[T]{
Elements: v,
Dims: []pgtype.ArrayDimension{{Length: int32(len(v)), LowerBound: 1}},
Valid: true,
}, nil
}

type QRecordBatchCopyFromSource struct {
numRecords int
stream *QRecordStream
Expand Down Expand Up @@ -215,99 +227,67 @@ func (src *QRecordBatchCopyFromSource) Values() ([]interface{}, error) {

values[i] = wkb
case qvalue.QValueKindArrayString:
v, ok := qValue.Value.([]string)
if !ok {
src.err = fmt.Errorf("invalid ArrayString value")
v, err := constructArray[string](qValue, "ArrayString")
if err != nil {
src.err = err
return nil, src.err
}
values[i] = pgtype.Array[string]{
Elements: v,
Dims: []pgtype.ArrayDimension{{Length: int32(len(v)), LowerBound: 1}},
Valid: true,
}
values[i] = v

case qvalue.QValueKindArrayDate, qvalue.QValueKindArrayTimestamp, qvalue.QValueKindArrayTimestampTZ:
v, ok := qValue.Value.([]time.Time)
if !ok {
src.err = fmt.Errorf("invalid ArrayDate value")
v, err := constructArray[time.Time](qValue, "ArrayTime")
if err != nil {
src.err = err
return nil, src.err
}
values[i] = pgtype.Array[time.Time]{
Elements: v,
Dims: []pgtype.ArrayDimension{{Length: int32(len(v)), LowerBound: 1}},
Valid: true,
}
values[i] = v

case qvalue.QValueKindArrayInt16:
v, ok := qValue.Value.([]int16)
if !ok {
src.err = fmt.Errorf("invalid ArrayInt16 value")
v, err := constructArray[int16](qValue, "ArrayInt16")
if err != nil {
src.err = err
return nil, src.err
}
values[i] = pgtype.Array[int16]{
Elements: v,
Dims: []pgtype.ArrayDimension{{Length: int32(len(v)), LowerBound: 1}},
Valid: true,
}
values[i] = v

case qvalue.QValueKindArrayInt32:
v, ok := qValue.Value.([]int32)
if !ok {
src.err = fmt.Errorf("invalid ArrayInt32 value")
v, err := constructArray[int32](qValue, "ArrayInt32")
if err != nil {
src.err = err
return nil, src.err
}
values[i] = pgtype.Array[int32]{
Elements: v,
Dims: []pgtype.ArrayDimension{{Length: int32(len(v)), LowerBound: 1}},
Valid: true,
}
values[i] = v

case qvalue.QValueKindArrayInt64:
v, ok := qValue.Value.([]int64)
if !ok {
src.err = fmt.Errorf("invalid ArrayInt64 value")
v, err := constructArray[int64](qValue, "ArrayInt64")
if err != nil {
src.err = err
return nil, src.err
}
values[i] = pgtype.Array[int64]{
Elements: v,
Dims: []pgtype.ArrayDimension{{Length: int32(len(v)), LowerBound: 1}},
Valid: true,
}
values[i] = v

case qvalue.QValueKindArrayFloat32:
v, ok := qValue.Value.([]float32)
if !ok {
src.err = fmt.Errorf("invalid ArrayFloat32 value")
v, err := constructArray[float32](qValue, "ArrayFloat32")
if err != nil {
src.err = err
return nil, src.err
}
values[i] = pgtype.Array[float32]{
Elements: v,
Dims: []pgtype.ArrayDimension{{Length: int32(len(v)), LowerBound: 1}},
Valid: true,
}
values[i] = v

case qvalue.QValueKindArrayFloat64:
v, ok := qValue.Value.([]float64)
if !ok {
src.err = fmt.Errorf("invalid ArrayFloat64 value")
v, err := constructArray[float64](qValue, "ArrayFloat64")
if err != nil {
src.err = err
return nil, src.err
}
values[i] = pgtype.Array[float64]{
Elements: v,
Dims: []pgtype.ArrayDimension{{Length: int32(len(v)), LowerBound: 1}},
Valid: true,
}
values[i] = v
case qvalue.QValueKindArrayBoolean:
v, ok := qValue.Value.([]bool)
if !ok {
src.err = fmt.Errorf("invalid ArrayBoolean value")
v, err := constructArray[bool](qValue, "ArrayBool")
if err != nil {
src.err = err
return nil, src.err
}
values[i] = pgtype.Array[bool]{
Elements: v,
Dims: []pgtype.ArrayDimension{{Length: int32(len(v)), LowerBound: 1}},
Valid: true,
}
values[i] = v
case qvalue.QValueKindJSON:
v, ok := qValue.Value.(string)
if !ok {
Expand Down
2 changes: 1 addition & 1 deletion flow/model/qvalue/avro_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
// https://avro.apache.org/docs/1.11.0/spec.html
type AvroSchemaArray struct {
Type string `json:"type"`
Items string `json:"items"`
Items interface{}
}

type AvroSchemaNumeric struct {
Expand Down

0 comments on commit 2866aaa

Please sign in to comment.