Skip to content

Commit

Permalink
Increase bq avro stream size to 10 mil (#907)
Browse files Browse the repository at this point in the history
Ran into an issue where in syncRecordsViaAvro we are creating a stream
with size 1 >> 20.
For pulled records of a number higher than this, the loop just blocks
with no logs
  • Loading branch information
Amogh-Bharadwaj authored Dec 27, 2023
1 parent e905eea commit 6f3eec2
Showing 1 changed file with 12 additions and 7 deletions.
19 changes: 12 additions & 7 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,8 @@ func (c *BigQueryConnector) WaitForTableReady(tblName string) error {
// ReplayTableSchemaDeltas changes a destination table to match the schema at source
// This could involve adding or dropping multiple columns.
func (c *BigQueryConnector) ReplayTableSchemaDeltas(flowJobName string,
schemaDeltas []*protos.TableSchemaDelta) error {
schemaDeltas []*protos.TableSchemaDelta,
) error {
for _, schemaDelta := range schemaDeltas {
if schemaDelta == nil || len(schemaDelta.AddedColumns) == 0 {
continue
Expand Down Expand Up @@ -391,7 +392,8 @@ func (c *BigQueryConnector) GetLastNormalizeBatchID(jobName string) (int64, erro
}

func (c *BigQueryConnector) getDistinctTableNamesInBatch(flowJobName string, syncBatchID int64,
normalizeBatchID int64) ([]string, error) {
normalizeBatchID int64,
) ([]string, error) {
rawTableName := c.getRawTableName(flowJobName)

// Prepare the query to retrieve distinct tables in that batch
Expand Down Expand Up @@ -427,7 +429,8 @@ func (c *BigQueryConnector) getDistinctTableNamesInBatch(flowJobName string, syn
}

func (c *BigQueryConnector) getTableNametoUnchangedCols(flowJobName string, syncBatchID int64,
normalizeBatchID int64) (map[string][]string, error) {
normalizeBatchID int64,
) (map[string][]string, error) {
rawTableName := c.getRawTableName(flowJobName)

// Prepare the query to retrieve distinct tables in that batch
Expand Down Expand Up @@ -513,7 +516,7 @@ func (c *BigQueryConnector) syncRecordsViaAvro(
tableNameRowsMapping := make(map[string]uint32)
first := true
var firstCP int64 = 0
recordStream := model.NewQRecordStream(1 << 20)
recordStream := model.NewQRecordStream(10 << 20)
err := recordStream.SetSchema(&model.QRecordSchema{
Fields: []*model.QField{
{
Expand Down Expand Up @@ -803,7 +806,7 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
mergeStmts := mergeGen.generateMergeStmts()
stmts = append(stmts, mergeStmts...)
}
//update metadata to make the last normalized batch id to the recent last sync batch id.
// update metadata to make the last normalized batch id to the recent last sync batch id.
updateMetadataStmt := fmt.Sprintf(
"UPDATE %s.%s SET normalize_batch_id=%d WHERE mirror_job_name = '%s';",
c.datasetID, MirrorJobsTable, syncBatchID, req.FlowJobName)
Expand Down Expand Up @@ -902,7 +905,8 @@ func (c *BigQueryConnector) CreateRawTable(req *protos.CreateRawTableInput) (*pr

// getUpdateMetadataStmt updates the metadata tables for a given job.
func (c *BigQueryConnector) getUpdateMetadataStmt(jobName string, lastSyncedCheckpointID int64,
batchID int64) (string, error) {
batchID int64,
) (string, error) {
hasJob, err := c.metadataHasJob(jobName)
if err != nil {
return "", fmt.Errorf("failed to check if job exists: %w", err)
Expand Down Expand Up @@ -1101,7 +1105,8 @@ func (c *BigQueryConnector) RenameTables(req *protos.RenameTablesInput) (*protos
}

func (c *BigQueryConnector) CreateTablesFromExisting(req *protos.CreateTablesFromExistingInput) (
*protos.CreateTablesFromExistingOutput, error) {
*protos.CreateTablesFromExistingOutput, error,
) {
for newTable, existingTable := range req.NewToExistingTableMapping {
log.WithFields(log.Fields{
"flowName": req.FlowJobName,
Expand Down

0 comments on commit 6f3eec2

Please sign in to comment.