Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into stable
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Jul 6, 2023
2 parents 4d146ad + eae6bbc commit c9bd1a3
Show file tree
Hide file tree
Showing 46 changed files with 1,004 additions and 1,085 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ jobs:

- name: run tests
run: |
gotestsum --format testname -- -p 1 ./...
gotestsum --format testname -- -p 1 ./... -timeout 1200s
working-directory: ./flow
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
Expand Down
13 changes: 7 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@

<div align="center">
<img class="img-fluid" src="images/logo-light-transparent_copy_2.png" alt="img-verification" width="120" height="120">
<h3>PeerDB</h3>
<h4>Modern ETL in minutes, with SQL<h4>
<a href="https://github.com/Peerdb-io/peerdb/actions/workflows/ci.yml"><img src="https://github.com/PEerDB-io/peerdb/actions/workflows/ci.yml/badge.svg"/></a>
<a href="https://github.com/PeerDB-io/peerdb/blob/main/LICENSE.md"><img src="https://badgen.net/badge/License/Elv2/green?icon=github"/></a>
<a href="https://join.slack.com/t/peerdb-public/shared_invite/zt-1wo9jydev-EXInbMtCtpAKFFWdi7QvLQ"><img src="https://img.shields.io/badge/slack-peerdb-brightgreen.svg?logo=slack" /></a>

<img class="img-fluid" src="images/banner.jpg" alt="PeerDB Banner" width="512" />

#### Modern ETL in minutes, with SQL

[![Workflow Status](https://github.com/PEerDB-io/peerdb/actions/workflows/ci.yml/badge.svg)](https://github.com/Peerdb-io/peerdb/actions/workflows/ci.yml) [![ElV2 License](https://badgen.net/badge/License/Elv2/green?icon=github)](https://github.com/PeerDB-io/peerdb/blob/main/LICENSE.md) [![Slack Community](https://img.shields.io/badge/slack-peerdb-brightgreen.svg?logo=slack)](https://join.slack.com/t/peerdb-public/shared_invite/zt-1wo9jydev-EXInbMtCtpAKFFWdi7QvLQ)

</div>

## PeerDB
Expand Down
92 changes: 28 additions & 64 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/google/uuid"
log "github.com/sirupsen/logrus"
"google.golang.org/api/iterator"
Expand Down Expand Up @@ -442,8 +443,8 @@ func (c *BigQueryConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
// 1. _peerdb_uid - uuid
// 2. _peerdb_timestamp - current timestamp
// 2. _peerdb_timestamp_nanos - current timestamp in nano seconds
// 3. _peerdb_data - json of `r.Items`
json, err := json.Marshal(r.Items)
// 3. _peerdb_data - itemsJSON of `r.Items`
itemsJSON, err := r.Items.ToJSON()
if err != nil {
return nil, fmt.Errorf("failed to create items to json: %v", err)
}
Expand All @@ -454,7 +455,7 @@ func (c *BigQueryConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
timestamp: time.Now(),
timestampNanos: time.Now().UnixNano(),
destinationTableName: r.DestinationTableName,
data: string(json),
data: itemsJSON,
recordType: 0,
matchData: "",
batchID: syncBatchID,
Expand All @@ -469,12 +470,12 @@ func (c *BigQueryConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
// 4. _peerdb_record_type - 1
// 5. _peerdb_match_data - json of `r.OldItems`

newItemsJSON, err := json.Marshal(r.NewItems)
newItemsJSON, err := r.NewItems.ToJSON()
if err != nil {
return nil, fmt.Errorf("failed to create new items to json: %v", err)
}

oldItemsJSON, err := json.Marshal(r.OldItems)
oldItemsJSON, err := r.OldItems.ToJSON()
if err != nil {
return nil, fmt.Errorf("failed to create old items to json: %v", err)
}
Expand All @@ -485,9 +486,9 @@ func (c *BigQueryConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
timestamp: time.Now(),
timestampNanos: time.Now().UnixNano(),
destinationTableName: r.DestinationTableName,
data: string(newItemsJSON),
data: newItemsJSON,
recordType: 1,
matchData: string(oldItemsJSON),
matchData: oldItemsJSON,
batchID: syncBatchID,
stagingBatchID: stagingBatchID,
unchangedToastColumns: utils.KeysToString(r.UnchangedToastColumns),
Expand All @@ -500,7 +501,7 @@ func (c *BigQueryConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
// 4. _peerdb_match_data - json of `r.Items`

// json.Marshal converts bytes in Hex automatically to BASE64 string.
itemsJSON, err := json.Marshal(r.Items)
itemsJSON, err := r.Items.ToJSON()
if err != nil {
return nil, fmt.Errorf("failed to create items to json: %v", err)
}
Expand All @@ -511,9 +512,9 @@ func (c *BigQueryConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
timestamp: time.Now(),
timestampNanos: time.Now().UnixNano(),
destinationTableName: r.DestinationTableName,
data: string(itemsJSON),
data: itemsJSON,
recordType: 2,
matchData: string(itemsJSON),
matchData: itemsJSON,
batchID: syncBatchID,
stagingBatchID: stagingBatchID,
unchangedToastColumns: utils.KeysToString(r.UnchangedToastColumns),
Expand Down Expand Up @@ -823,7 +824,7 @@ func (c *BigQueryConnector) SetupNormalizedTable(
for colName, genericColType := range sourceSchema.Columns {
columns[idx] = &bigquery.FieldSchema{
Name: colName,
Type: getBigQueryColumnTypeForGenericColType(genericColType),
Type: qValueKindToBigQueryType(genericColType),
}
idx++
}
Expand Down Expand Up @@ -923,45 +924,6 @@ func (c *BigQueryConnector) truncateTable(tableIdentifier string) error {
return nil
}

func getBigQueryColumnTypeForGenericColType(colType string) bigquery.FieldType {
switch colType {
// boolean
case model.ColumnTypeBoolean:
return bigquery.BooleanFieldType
// integer types
case model.ColumnTypeInt16, model.ColumnTypeInt32, model.ColumnTypeInt64:
return bigquery.IntegerFieldType
// decimal types
case model.ColumnTypeFloat16, model.ColumnTypeFloat32, model.ColumnTypeFloat64:
return bigquery.FloatFieldType
case model.ColumnTypeNumeric:
return bigquery.NumericFieldType
// string related
case model.ColumnTypeString:
return bigquery.StringFieldType
// json also is stored as string for now
case model.ColumnTypeJSON:
return bigquery.StringFieldType
// time related
case model.ColumnTypeTimestamp, model.ColumnTypeTimeStampWithTimeZone:
return bigquery.TimestampFieldType
case model.ColumnTypeTime:
return bigquery.TimeFieldType
case model.ColumnTypeTimeWithTimeZone:
return bigquery.StringFieldType
case model.ColumnTypeDate:
return bigquery.TimestampFieldType
case model.ColumnTypeInterval:
return bigquery.IntervalFieldType
// bytes
case model.ColumnHexBytes, model.ColumnHexBit:
return bigquery.BytesFieldType
// rest will be strings
default:
return bigquery.StringFieldType
}
}

type MergeStmtGenerator struct {
// dataset of all the tables
Dataset string
Expand Down Expand Up @@ -1003,40 +965,42 @@ func (m *MergeStmtGenerator) generateFlattenedCTE() string {
// statement.
flattenedProjs := make([]string, 0)
for colName, colType := range m.NormalizedTableSchema.Columns {
bqType := getBigQueryColumnTypeForGenericColType(colType)
bqType := qValueKindToBigQueryType(colType)
// CAST doesn't work for FLOAT, so rewrite it to FLOAT64.
if bqType == bigquery.FloatFieldType {
bqType = "FLOAT64"
}
var castStmt string

switch colType {
case model.ColumnTypeJSON:
switch qvalue.QValueKind(colType) {
case qvalue.QValueKindJSON:
//if the type is JSON, then just extract JSON
castStmt = fmt.Sprintf("CAST(JSON_EXTRACT(_peerdb_data, '$.%s') AS %s) AS %s",
colName, bqType, colName)
// expecting data in BASE64 format
case model.ColumnHexBytes:
case qvalue.QValueKindBytes:
castStmt = fmt.Sprintf("FROM_BASE64(JSON_EXTRACT_SCALAR(_peerdb_data, '$.%s')) AS %s",
colName, colName)
// MAKE_INTERVAL(years INT64, months INT64, days INT64, hours INT64, minutes INT64, seconds INT64)
// Expecting interval to be in the format of {"Microseconds":2000000,"Days":0,"Months":0,"Valid":true}
// json.Marshal in SyncRecords for Postgres already does this - once new data-stores are added,
// this needs to be handled again
case model.ColumnTypeInterval:
castStmt = fmt.Sprintf("MAKE_INTERVAL(0,CAST(JSON_EXTRACT_SCALAR(_peerdb_data, '$.%s.Months') AS INT64),"+
"CAST(JSON_EXTRACT_SCALAR(_peerdb_data, '$.%s.Days') AS INT64),0,0,"+
"CAST(CAST(JSON_EXTRACT_SCALAR(_peerdb_data, '$.%s.Microseconds') AS INT64)/1000000 AS INT64)) AS %s",
colName, colName, colName, colName)
case model.ColumnHexBit:
// TODO add interval types again
// case model.ColumnTypeInterval:
// castStmt = fmt.Sprintf("MAKE_INTERVAL(0,CAST(JSON_EXTRACT_SCALAR(_peerdb_data, '$.%s.Months') AS INT64),"+
// "CAST(JSON_EXTRACT_SCALAR(_peerdb_data, '$.%s.Days') AS INT64),0,0,"+
// "CAST(CAST(JSON_EXTRACT_SCALAR(_peerdb_data, '$.%s.Microseconds') AS INT64)/1000000 AS INT64)) AS %s",
// colName, colName, colName, colName)
case qvalue.QValueKindBit:
// sample raw data for BIT {"a":{"Bytes":"oA==","Len":3,"Valid":true},"id":1}
// need to check correctness TODO
castStmt = fmt.Sprintf("FROM_BASE64(JSON_EXTRACT_SCALAR(_peerdb_data, '$.%s.Bytes')) AS %s",
colName, colName)
case model.ColumnTypeTime:
castStmt = fmt.Sprintf("time(timestamp_micros(CAST(JSON_EXTRACT(_peerdb_data, '$.%s.Microseconds')"+
" AS int64))) AS %s",
colName, colName)
// TODO add proper granularity for time types, then restore this
// case model.ColumnTypeTime:
// castStmt = fmt.Sprintf("time(timestamp_micros(CAST(JSON_EXTRACT(_peerdb_data, '$.%s.Microseconds')"+
// " AS int64))) AS %s",
// colName, colName)
default:
castStmt = fmt.Sprintf("CAST(JSON_EXTRACT_SCALAR(_peerdb_data, '$.%s') AS %s) AS %s",
colName, bqType, colName)
Expand Down
19 changes: 13 additions & 6 deletions flow/connectors/bigquery/qrecord_value_saver.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,17 @@ func (q QRecordValueSaver) Save() (map[string]bigquery.Value, string, error) {
}
bqValues[k] = val

case qvalue.QValueKindInt16:
val, ok := v.Value.(int16)
if !ok {
return nil, "", fmt.Errorf("failed to convert %v to int16", v.Value)
}
bqValues[k] = val

case qvalue.QValueKindInt32:
val, ok := v.Value.(int32)
if !ok {
return nil, "", fmt.Errorf("failed to convert %v to int64", v.Value)
return nil, "", fmt.Errorf("failed to convert %v to int32", v.Value)
}
bqValues[k] = val

Expand All @@ -91,12 +98,12 @@ func (q QRecordValueSaver) Save() (map[string]bigquery.Value, string, error) {
}
bqValues[k] = val

case qvalue.QValueKindETime:
val, ok := v.Value.(*qvalue.ExtendedTime)
if !ok {
return nil, "", fmt.Errorf("failed to convert %v to ExtendedTime", v.Value)
case qvalue.QValueKindTimestamp, qvalue.QValueKindDate, qvalue.QValueKindTime:
var err error
bqValues[k], err = v.GoTimeConvert()
if err != nil {
return nil, "", fmt.Errorf("failed to convert parse %v into time.Time", v)
}
bqValues[k] = val.Time

case qvalue.QValueKindNumeric:
val, ok := v.Value.(*big.Rat)
Expand Down
6 changes: 3 additions & 3 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,13 +216,13 @@ func GetAvroType(bqField *bigquery.FieldSchema) (interface{}, error) {
}, nil
case bigquery.DateFieldType:
return map[string]string{
"type": "int",
"logicalType": "date",
"type": "long",
"logicalType": "timestamp-micros",
}, nil
case bigquery.TimeFieldType:
return map[string]string{
"type": "long",
"logicalType": "time-micros",
"logicalType": "timestamp-micros",
}, nil
case bigquery.DateTimeFieldType:
return map[string]interface{}{
Expand Down
6 changes: 5 additions & 1 deletion flow/connectors/bigquery/qrep_sync_method.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,11 @@ func (s *QRepStagingTableSync) SyncQRepRecords(
// col names for the destination table joined by comma
colNames := []string{}
for _, col := range dstTableMetadata.Schema {
colNames = append(colNames, col.Name)
if strings.ToLower(col.Name) == "from" {
colNames = append(colNames, "`from`")
} else {
colNames = append(colNames, col.Name)
}
}
colNamesStr := strings.Join(colNames, ", ")

Expand Down
76 changes: 76 additions & 0 deletions flow/connectors/bigquery/qvalue_convert.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package connbigquery

import (
"fmt"

"cloud.google.com/go/bigquery"
"github.com/PeerDB-io/peer-flow/model/qvalue"
)

func qValueKindToBigQueryType(colType string) bigquery.FieldType {
switch qvalue.QValueKind(colType) {
// boolean
case qvalue.QValueKindBoolean:
return bigquery.BooleanFieldType
// integer types
case qvalue.QValueKindInt16, qvalue.QValueKindInt32, qvalue.QValueKindInt64:
return bigquery.IntegerFieldType
// decimal types
case qvalue.QValueKindFloat32, qvalue.QValueKindFloat64:
return bigquery.FloatFieldType
case qvalue.QValueKindNumeric:
return bigquery.NumericFieldType
// string related
case qvalue.QValueKindString:
return bigquery.StringFieldType
// json also is stored as string for now
case qvalue.QValueKindJSON:
return bigquery.StringFieldType
// time related
case qvalue.QValueKindTimestamp, qvalue.QValueKindTimestampTZ:
return bigquery.TimestampFieldType
// TODO: https://github.com/PeerDB-io/peerdb/issues/189 - DATE support is incomplete
case qvalue.QValueKindDate:
return bigquery.DateFieldType
// TODO: https://github.com/PeerDB-io/peerdb/issues/189 - TIME/TIMETZ support is incomplete
case qvalue.QValueKindTime, qvalue.QValueKindTimeTZ:
return bigquery.TimeFieldType
// TODO: https://github.com/PeerDB-io/peerdb/issues/189 - handle INTERVAL types again,
// bytes
case qvalue.QValueKindBit, qvalue.QValueKindBytes:
return bigquery.BytesFieldType
// rest will be strings
default:
return bigquery.StringFieldType
}
}

// bigqueryTypeToQValueKind converts a bigquery FieldType to a QValueKind.
func BigQueryTypeToQValueKind(fieldType bigquery.FieldType) (qvalue.QValueKind, error) {
switch fieldType {
case bigquery.StringFieldType:
return qvalue.QValueKindString, nil
case bigquery.BytesFieldType:
return qvalue.QValueKindBytes, nil
case bigquery.IntegerFieldType:
return qvalue.QValueKindInt64, nil
case bigquery.FloatFieldType:
return qvalue.QValueKindFloat64, nil
case bigquery.BooleanFieldType:
return qvalue.QValueKindBoolean, nil
case bigquery.TimestampFieldType:
return qvalue.QValueKindTimestamp, nil
case bigquery.DateFieldType:
return qvalue.QValueKindDate, nil
case bigquery.TimeFieldType:
return qvalue.QValueKindTime, nil
case bigquery.RecordFieldType:
return qvalue.QValueKindStruct, nil
case bigquery.NumericFieldType:
return qvalue.QValueKindNumeric, nil
case bigquery.GeographyFieldType:
return qvalue.QValueKindString, nil
default:
return "", fmt.Errorf("unsupported bigquery field type: %v", fieldType)
}
}
Loading

0 comments on commit c9bd1a3

Please sign in to comment.