Skip to content

Commit

Permalink
adds more qvalue types for bq avro
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Jan 17, 2024
1 parent 2866aaa commit dbdf432
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 36 deletions.
70 changes: 35 additions & 35 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,77 +308,77 @@ func GetAvroType(bqField *bigquery.FieldSchema) (interface{}, error) {
case bigquery.BooleanFieldType:
return considerRepeated("boolean", bqField.Repeated), nil
case bigquery.TimestampFieldType:
timestampSchema := map[string]string{
"type": "long",
"logicalType": "timestamp-micros",
timestampSchema := qvalue.AvroSchemaField{
Type: "long",
LogicalType: "timestamp-micros",
}
if bqField.Repeated {
return qvalue.AvroSchemaArray{
return qvalue.AvroSchemaComplexArray{
Type: "array",
Items: timestampSchema,
}, nil
}
return timestampSchema, nil
case bigquery.DateFieldType:
dateSchema := map[string]string{
"type": "int",
"logicalType": "date",
dateSchema := qvalue.AvroSchemaField{
Type: "int",
LogicalType: "date",
}
if bqField.Repeated {
return qvalue.AvroSchemaArray{
return qvalue.AvroSchemaComplexArray{
Type: "array",
Items: dateSchema,
}, nil
}
return dateSchema, nil

case bigquery.TimeFieldType:
return map[string]string{
"type": "long",
"logicalType": "timestamp-micros",
return qvalue.AvroSchemaField{
Type: "long",
LogicalType: "timestamp-micros",
}, nil
case bigquery.DateTimeFieldType:
return map[string]interface{}{
"type": "record",
"name": "datetime",
"fields": []map[string]string{
return qvalue.AvroSchemaRecord{
Type: "record",
Name: "datetime",
Fields: []qvalue.AvroSchemaField{
{
"name": "date",
"type": "int",
"logicalType": "date",
Name: "date",
Type: "int",
LogicalType: "date",
},
{
"name": "time",
"type": "long",
"logicalType": "time-micros",
Name: "time",
Type: "long",
LogicalType: "time-micros",
},
},
}, nil
case bigquery.NumericFieldType:
return map[string]interface{}{
"type": "bytes",
"logicalType": "decimal",
"precision": 38,
"scale": 9,
return qvalue.AvroSchemaNumeric{
Type: "bytes",
LogicalType: "decimal",
Precision: 38,
Scale: 9,
}, nil
case bigquery.RecordFieldType:
avroFields := []map[string]interface{}{}
avroFields := []qvalue.AvroSchemaField{}
for _, bqSubField := range bqField.Schema {
avroType, err := GetAvroType(bqSubField)
if err != nil {
return nil, err
}
avroFields = append(avroFields, map[string]interface{}{
"name": bqSubField.Name,
"type": avroType,
avroFields = append(avroFields, qvalue.AvroSchemaField{
Name: bqSubField.Name,
Type: avroType,
})
}
return map[string]interface{}{
"type": "record",
"name": bqField.Name,
"fields": avroFields,
return qvalue.AvroSchemaRecord{
Type: "record",
Name: bqField.Name,
Fields: avroFields,
}, nil
// TODO(kaushik/sai): Add other field types as needed

default:
return nil, fmt.Errorf("unsupported BigQuery field type: %s", bqField.Type)
}
Expand Down
19 changes: 18 additions & 1 deletion flow/model/qvalue/avro_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,12 @@ import (
// https://avro.apache.org/docs/1.11.0/spec.html
type AvroSchemaArray struct {
Type string `json:"type"`
Items interface{}
Items string `json:"items"`
}

type AvroSchemaComplexArray struct {
Type string `json:"type"`
Items AvroSchemaField `json:"items"`
}

type AvroSchemaNumeric struct {
Expand All @@ -24,6 +29,18 @@ type AvroSchemaNumeric struct {
Scale int `json:"scale"`
}

type AvroSchemaRecord struct {
Type string `json:"type"`
Name string `json:"name"`
Fields []AvroSchemaField `json:"fields"`
}

type AvroSchemaField struct {
Name string `json:"name"`
Type interface{} `json:"type"`
LogicalType string `json:"logicalType,omitempty"`
}

// GetAvroSchemaFromQValueKind returns the Avro schema for a given QValueKind.
// The function takes in two parameters, a QValueKind and a boolean indicating if the
// Avro schema should respect null values. It returns a QValueKindAvroSchema object
Expand Down

0 comments on commit dbdf432

Please sign in to comment.