Skip to content

Commit

Permalink
[breaking] BQ SyncRecords now streams properly, code cleanup (#909)
Browse files Browse the repository at this point in the history
### ⚠️ This change can break existing CDC mirrors from Postgres to
BigQuery!

Fixes a bug in BigQuery CDC where a batch with number of records greater
than `2 ** 20` causes the Avro file generation part to hang. This is
because all records were being written to a bounded channel first and
then consumed by the Avro writer instead of the 2 operations happening
in parallel. With a large number of records, the channel would fill up
and block before the records finished writing, leading to the loop
deadlocking itself.

Fixed by switching BigQuery record generation to the mechanism used by
Snowflake, where the record generation happens in another goroutine and
therefore the channel consumption happens in parallel. As part of this
change, some code was cleaned up and the BigQuery raw table schema was
changed in a breaking manner to be similar to the SF/PG equivalent.
Specifically, the column `_peerdb_timestamp` of type `TIMESTAMP` was
removed and the column `_peerdb_timestamp_nanos` of type `INTEGER` was
renamed to the former. Existing raw tables will need to be fixed up to
match this new, simpler schema.

```
ALTER TABLE <...> DROP COLUMN _peerdb_timestamp;
ALTER TABLE <...> RENAME COLUMN _peerdb_timestamp_nanos TO _peerdb_timestamp;
``` 

Closes #908
  • Loading branch information
heavycrystal authored Dec 27, 2023
1 parent e16a371 commit a3b2800
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 230 deletions.
221 changes: 10 additions & 211 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/shared"
"github.com/google/uuid"
"github.com/jackc/pgx/v5/pgxpool"

"go.temporal.io/sdk/activity"
Expand Down Expand Up @@ -69,19 +68,6 @@ type BigQueryConnector struct {
logger slog.Logger
}

type StagingBQRecord struct {
uid string `bigquery:"_peerdb_uid"`
timestamp time.Time `bigquery:"_peerdb_timestamp"`
timestampNanos int64 `bigquery:"_peerdb_timestamp_nanos"`
destinationTableName string `bigquery:"_peerdb_destination_table_name"`
data string `bigquery:"_peerdb_data"`
recordType int `bigquery:"_peerdb_record_type"`
matchData string `bigquery:"_peerdb_match_data"`
batchID int64 `bigquery:"_peerdb_batch_id"`
stagingBatchID int64 `bigquery:"_peerdb_staging_batch_id"`
unchangedToastColumns string `bigquery:"_peerdb_unchanged_toast_columns"`
}

// Create BigQueryServiceAccount from BigqueryConfig
func NewBigQueryServiceAccount(bqConfig *protos.BigqueryConfig) (*BigQueryServiceAccount, error) {
var serviceAccount BigQueryServiceAccount
Expand Down Expand Up @@ -493,22 +479,6 @@ func (c *BigQueryConnector) getTableNametoUnchangedCols(flowJobName string, sync
return resultMap, nil
}

// ValueSaver interface for bqRecord
func (r StagingBQRecord) Save() (map[string]bigquery.Value, string, error) {
return map[string]bigquery.Value{
"_peerdb_uid": r.uid,
"_peerdb_timestamp": r.timestamp,
"_peerdb_timestamp_nanos": r.timestampNanos,
"_peerdb_destination_table_name": r.destinationTableName,
"_peerdb_data": r.data,
"_peerdb_record_type": r.recordType,
"_peerdb_match_data": r.matchData,
"_peerdb_batch_id": r.batchID,
"_peerdb_staging_batch_id": r.stagingBatchID,
"_peerdb_unchanged_toast_columns": r.unchangedToastColumns,
}, bigquery.NoDedupeID, nil
}

// SyncRecords pushes records to the destination.
// Currently only supports inserts, updates, and deletes.
// More record types will be added in the future.
Expand Down Expand Up @@ -539,201 +509,31 @@ func (c *BigQueryConnector) syncRecordsViaAvro(
syncBatchID int64,
) (*model.SyncResponse, error) {
tableNameRowsMapping := make(map[string]uint32)
recordStream := model.NewQRecordStream(1 << 20)
err := recordStream.SetSchema(&model.QRecordSchema{
Fields: []model.QField{
{
Name: "_peerdb_uid",
Type: qvalue.QValueKindString,
Nullable: false,
},
{
Name: "_peerdb_timestamp",
Type: qvalue.QValueKindTimestamp,
Nullable: false,
},
{
Name: "_peerdb_timestamp_nanos",
Type: qvalue.QValueKindInt64,
Nullable: false,
},
{
Name: "_peerdb_destination_table_name",
Type: qvalue.QValueKindString,
Nullable: false,
},
{
Name: "_peerdb_data",
Type: qvalue.QValueKindString,
Nullable: false,
},
{
Name: "_peerdb_record_type",
Type: qvalue.QValueKindInt64,
Nullable: true,
},
{
Name: "_peerdb_match_data",
Type: qvalue.QValueKindString,
Nullable: true,
},
{
Name: "_peerdb_staging_batch_id",
Type: qvalue.QValueKindInt64,
Nullable: true,
},
{
Name: "_peerdb_batch_id",
Type: qvalue.QValueKindInt64,
Nullable: true,
},
{
Name: "_peerdb_unchanged_toast_columns",
Type: qvalue.QValueKindString,
Nullable: true,
},
},
})
streamReq := model.NewRecordsToStreamRequest(req.Records.GetRecords(), tableNameRowsMapping, syncBatchID)
streamRes, err := utils.RecordsToRawTableStream(streamReq)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to convert records to raw table stream: %w", err)
}

// loop over req.Records
for record := range req.Records.GetRecords() {
var entries [10]qvalue.QValue
switch r := record.(type) {
case *model.InsertRecord:
itemsJSON, err := r.Items.ToJSON()
if err != nil {
return nil, fmt.Errorf("failed to create items to json: %v", err)
}

entries[4] = qvalue.QValue{
Kind: qvalue.QValueKindString,
Value: itemsJSON,
}
entries[5] = qvalue.QValue{
Kind: qvalue.QValueKindInt64,
Value: 0,
}
entries[6] = qvalue.QValue{
Kind: qvalue.QValueKindString,
Value: "",
}
entries[9] = qvalue.QValue{
Kind: qvalue.QValueKindString,
Value: "",
}

tableNameRowsMapping[r.DestinationTableName] += 1
case *model.UpdateRecord:
newItemsJSON, err := r.NewItems.ToJSON()
if err != nil {
return nil, fmt.Errorf("failed to create new items to json: %v", err)
}
oldItemsJSON, err := r.OldItems.ToJSON()
if err != nil {
return nil, fmt.Errorf("failed to create old items to json: %v", err)
}

entries[4] = qvalue.QValue{
Kind: qvalue.QValueKindString,
Value: newItemsJSON,
}
entries[5] = qvalue.QValue{
Kind: qvalue.QValueKindInt64,
Value: 1,
}
entries[6] = qvalue.QValue{
Kind: qvalue.QValueKindString,
Value: oldItemsJSON,
}
entries[9] = qvalue.QValue{
Kind: qvalue.QValueKindString,
Value: utils.KeysToString(r.UnchangedToastColumns),
}

tableNameRowsMapping[r.DestinationTableName] += 1
case *model.DeleteRecord:
itemsJSON, err := r.Items.ToJSON()
if err != nil {
return nil, fmt.Errorf("failed to create items to json: %v", err)
}

entries[4] = qvalue.QValue{
Kind: qvalue.QValueKindString,
Value: itemsJSON,
}
entries[5] = qvalue.QValue{
Kind: qvalue.QValueKindInt64,
Value: 2,
}
entries[6] = qvalue.QValue{
Kind: qvalue.QValueKindString,
Value: itemsJSON,
}
entries[9] = qvalue.QValue{
Kind: qvalue.QValueKindString,
Value: "",
}

tableNameRowsMapping[r.DestinationTableName] += 1
default:
return nil, fmt.Errorf("record type %T not supported", r)
}

entries[0] = qvalue.QValue{
Kind: qvalue.QValueKindString,
Value: uuid.New().String(),
}
entries[1] = qvalue.QValue{
Kind: qvalue.QValueKindTimestamp,
Value: time.Now(),
}
entries[2] = qvalue.QValue{
Kind: qvalue.QValueKindInt64,
Value: time.Now().UnixNano(),
}
entries[3] = qvalue.QValue{
Kind: qvalue.QValueKindString,
Value: record.GetDestinationTableName(),
}
entries[7] = qvalue.QValue{
Kind: qvalue.QValueKindInt64,
Value: syncBatchID,
}
entries[8] = qvalue.QValue{
Kind: qvalue.QValueKindInt64,
Value: syncBatchID,
}
recordStream.Records <- model.QRecordOrError{
Record: model.QRecord{
NumEntries: 10,
Entries: entries[:],
},
}
}

close(recordStream.Records)
avroSync := NewQRepAvroSyncMethod(c, req.StagingPath, req.FlowJobName)
rawTableMetadata, err := c.client.Dataset(c.datasetID).Table(rawTableName).Metadata(c.ctx)
if err != nil {
return nil, fmt.Errorf("failed to get metadata of destination table: %v", err)
}

lastCP, err := req.Records.GetLastCheckpoint()
if err != nil {
return nil, fmt.Errorf("failed to get last checkpoint: %v", err)
}

numRecords, err := avroSync.SyncRecords(rawTableName, req.FlowJobName,
lastCP, rawTableMetadata, syncBatchID, recordStream)
req.Records, rawTableMetadata, syncBatchID, streamRes.Stream)
if err != nil {
return nil, fmt.Errorf("failed to sync records via avro : %v", err)
}

c.logger.Info(fmt.Sprintf("pushed %d records to %s.%s", numRecords, c.datasetID, rawTableName))

lastCP, err := req.Records.GetLastCheckpoint()
if err != nil {
return nil, fmt.Errorf("failed to get last checkpoint: %v", err)
}

return &model.SyncResponse{
LastSyncedCheckPointID: lastCP,
NumRecordsSynced: int64(numRecords),
Expand Down Expand Up @@ -842,8 +642,7 @@ func (c *BigQueryConnector) CreateRawTable(req *protos.CreateRawTableInput) (*pr

schema := bigquery.Schema{
{Name: "_peerdb_uid", Type: bigquery.StringFieldType},
{Name: "_peerdb_timestamp", Type: bigquery.TimestampFieldType},
{Name: "_peerdb_timestamp_nanos", Type: bigquery.IntegerFieldType},
{Name: "_peerdb_timestamp", Type: bigquery.IntegerFieldType},
{Name: "_peerdb_destination_table_name", Type: bigquery.StringFieldType},
{Name: "_peerdb_data", Type: bigquery.StringFieldType},
{Name: "_peerdb_record_type", Type: bigquery.IntegerFieldType},
Expand Down
3 changes: 1 addition & 2 deletions flow/connectors/bigquery/merge_statement_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ func (m *mergeStmtGenerator) generateFlattenedCTE() string {
flattenedProjs = append(
flattenedProjs,
"_peerdb_timestamp",
"_peerdb_timestamp_nanos",
"_peerdb_record_type",
"_peerdb_unchanged_toast_columns",
)
Expand All @@ -99,7 +98,7 @@ func (m *mergeStmtGenerator) generateDeDupedCTE() string {
SELECT _peerdb_ranked.*
FROM (
SELECT RANK() OVER (
PARTITION BY %s ORDER BY _peerdb_timestamp_nanos DESC
PARTITION BY %s ORDER BY _peerdb_timestamp DESC
) as _peerdb_rank, * FROM _peerdb_flattened
) _peerdb_ranked
WHERE _peerdb_rank = 1
Expand Down
9 changes: 7 additions & 2 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func NewQRepAvroSyncMethod(connector *BigQueryConnector, gcsBucket string,
func (s *QRepAvroSyncMethod) SyncRecords(
rawTableName string,
flowJobName string,
lastCP int64,
records *model.CDCRecordStream,
dstTableMetadata *bigquery.TableMetadata,
syncBatchID int64,
stream *model.QRecordStream,
Expand Down Expand Up @@ -67,6 +67,11 @@ func (s *QRepAvroSyncMethod) SyncRecords(
datasetID := s.connector.datasetID
insertStmt := fmt.Sprintf("INSERT INTO `%s.%s` SELECT * FROM `%s.%s`;",
datasetID, rawTableName, datasetID, stagingTable)

lastCP, err := records.GetLastCheckpoint()
if err != nil {
return -1, fmt.Errorf("failed to get last checkpoint: %v", err)
}
updateMetadataStmt, err := s.connector.getUpdateMetadataStmt(flowJobName, lastCP, syncBatchID)
if err != nil {
return -1, fmt.Errorf("failed to update metadata: %v", err)
Expand Down Expand Up @@ -421,7 +426,7 @@ func (s *QRepAvroSyncMethod) writeToStage(
if err := status.Err(); err != nil {
return 0, fmt.Errorf("failed to load Avro file into BigQuery table: %w", err)
}
slog.Info(fmt.Sprintf("Pushed into %s", avroFile.FilePath))
slog.Info(fmt.Sprintf("Pushed from %s to BigQuery", avroFile.FilePath))

err = s.connector.waitForTableReady(stagingTable)
if err != nil {
Expand Down
20 changes: 5 additions & 15 deletions flow/connectors/utils/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

func RecordsToRawTableStream(req *model.RecordsToStreamRequest) (*model.RecordsToStreamResponse, error) {
recordStream := model.NewQRecordStream(1 << 16)
recordStream := model.NewQRecordStream(1 << 17)
err := recordStream.SetSchema(&model.QRecordSchema{
Fields: []model.QField{
{
Expand Down Expand Up @@ -85,11 +85,6 @@ func recordToQRecordOrError(tableMapping map[string]uint32, batchID int64, recor
}
}

// add insert record to the raw table
entries[2] = qvalue.QValue{
Kind: qvalue.QValueKindString,
Value: typedRecord.DestinationTableName,
}
entries[3] = qvalue.QValue{
Kind: qvalue.QValueKindString,
Value: itemsJSON,
Expand Down Expand Up @@ -121,10 +116,6 @@ func recordToQRecordOrError(tableMapping map[string]uint32, batchID int64, recor
}
}

entries[2] = qvalue.QValue{
Kind: qvalue.QValueKindString,
Value: typedRecord.DestinationTableName,
}
entries[3] = qvalue.QValue{
Kind: qvalue.QValueKindString,
Value: newItemsJSON,
Expand All @@ -150,11 +141,6 @@ func recordToQRecordOrError(tableMapping map[string]uint32, batchID int64, recor
}
}

// append delete record to the raw table
entries[2] = qvalue.QValue{
Kind: qvalue.QValueKindString,
Value: typedRecord.DestinationTableName,
}
entries[3] = qvalue.QValue{
Kind: qvalue.QValueKindString,
Value: itemsJSON,
Expand Down Expand Up @@ -186,6 +172,10 @@ func recordToQRecordOrError(tableMapping map[string]uint32, batchID int64, recor
Kind: qvalue.QValueKindInt64,
Value: time.Now().UnixNano(),
}
entries[2] = qvalue.QValue{
Kind: qvalue.QValueKindString,
Value: record.GetDestinationTableName(),
}
entries[6] = qvalue.QValue{
Kind: qvalue.QValueKindInt64,
Value: batchID,
Expand Down

0 comments on commit a3b2800

Please sign in to comment.