Skip to content

Commit

Permalink
support cross-project mode
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Jan 19, 2024
1 parent 7d87cb3 commit 2e3e7e4
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 36 deletions.
101 changes: 73 additions & 28 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type BigQueryConnector struct {
client *bigquery.Client
storageClient *storage.Client
datasetID string
projectID string
catalogPool *pgxpool.Pool
logger slog.Logger
}
Expand Down Expand Up @@ -149,22 +150,23 @@ func NewBigQueryConnector(ctx context.Context, config *protos.BigqueryConfig) (*
}

datasetID := config.GetDatasetId()
datasetParts := strings.Split(datasetID, ".")
if len(datasetParts) > 2 {
projectID := config.GetProjectId()
projectPart, datasetPart, found := strings.Cut(datasetID, ".")
if found && strings.Contains(datasetPart, ".") {
return nil,
fmt.Errorf("invalid dataset ID: %s. Ensure that it is just a single string or string1.string2", datasetID)
}
if len(datasetParts) == 2 {
datasetID = datasetParts[1]
bqsa.ProjectID = datasetParts[0]
if projectPart != "" && datasetPart != "" {
datasetID = datasetPart
projectID = projectPart
}

client, err := bqsa.CreateBigQueryClient(ctx)
if err != nil {
return nil, fmt.Errorf("failed to create BigQuery client: %v", err)
}

_, checkErr := client.Dataset(datasetID).Metadata(ctx)
_, checkErr := client.DatasetInProject(projectID, datasetID).Metadata(ctx)
if checkErr != nil {
slog.ErrorContext(ctx, "failed to get dataset metadata", slog.Any("error", checkErr))
return nil, fmt.Errorf("failed to get dataset metadata: %v", checkErr)
Expand All @@ -187,6 +189,7 @@ func NewBigQueryConnector(ctx context.Context, config *protos.BigqueryConfig) (*
bqConfig: config,
client: client,
datasetID: datasetID,
projectID: projectID,
storageClient: storageClient,
catalogPool: catalogPool,
logger: *slog.With(slog.String(string(shared.FlowNameKey), flowName)),
Expand All @@ -203,7 +206,7 @@ func (c *BigQueryConnector) Close() error {

// ConnectionActive returns true if the connection is active.
func (c *BigQueryConnector) ConnectionActive() error {
_, err := c.client.Dataset(c.datasetID).Metadata(c.ctx)
_, err := c.client.DatasetInProject(c.projectID, c.datasetID).Metadata(c.ctx)
if err != nil {
return fmt.Errorf("failed to get dataset metadata: %v", err)
}
Expand All @@ -216,12 +219,12 @@ func (c *BigQueryConnector) ConnectionActive() error {

// NeedsSetupMetadataTables returns true if the metadata tables need to be set up.
func (c *BigQueryConnector) NeedsSetupMetadataTables() bool {
_, err := c.client.Dataset(c.datasetID).Table(MirrorJobsTable).Metadata(c.ctx)
_, err := c.client.DatasetInProject(c.projectID, c.datasetID).Table(MirrorJobsTable).Metadata(c.ctx)
return err != nil
}

func (c *BigQueryConnector) waitForTableReady(datasetTable *datasetTable) error {
table := c.client.Dataset(datasetTable.dataset).Table(datasetTable.table)
table := c.client.DatasetInProject(c.projectID, datasetTable.dataset).Table(datasetTable.table)
maxDuration := 5 * time.Minute
deadline := time.Now().Add(maxDuration)
sleepInterval := 5 * time.Second
Expand Down Expand Up @@ -256,10 +259,13 @@ func (c *BigQueryConnector) ReplayTableSchemaDeltas(flowJobName string,

for _, addedColumn := range schemaDelta.AddedColumns {
dstDatasetTable, _ := c.convertToDatasetTable(schemaDelta.DstTableName)
_, err := c.client.Query(fmt.Sprintf(
query := c.client.Query(fmt.Sprintf(
"ALTER TABLE %s.%s ADD COLUMN IF NOT EXISTS `%s` %s", dstDatasetTable.dataset,
dstDatasetTable.table, addedColumn.ColumnName,
qValueKindToBigQueryType(addedColumn.ColumnType))).Read(c.ctx)
qValueKindToBigQueryType(addedColumn.ColumnType)))
query.DefaultProjectID = c.projectID
query.DefaultDatasetID = c.datasetID
_, err := query.Read(c.ctx)
if err != nil {
return fmt.Errorf("failed to add column %s for table %s: %w", addedColumn.ColumnName,
schemaDelta.DstTableName, err)
Expand All @@ -275,7 +281,7 @@ func (c *BigQueryConnector) ReplayTableSchemaDeltas(flowJobName string,
// SetupMetadataTables sets up the metadata tables.
func (c *BigQueryConnector) SetupMetadataTables() error {
// check if the dataset exists
dataset := c.client.Dataset(c.datasetID)
dataset := c.client.DatasetInProject(c.projectID, c.datasetID)
if _, err := dataset.Metadata(c.ctx); err != nil {
// create the dataset as it doesn't exist
if err := dataset.Create(c.ctx, nil); err != nil {
Expand Down Expand Up @@ -308,6 +314,8 @@ func (c *BigQueryConnector) SetupMetadataTables() error {
func (c *BigQueryConnector) GetLastOffset(jobName string) (int64, error) {
query := fmt.Sprintf("SELECT offset FROM %s.%s WHERE mirror_job_name = '%s'", c.datasetID, MirrorJobsTable, jobName)
q := c.client.Query(query)
q.DefaultProjectID = c.projectID
q.DefaultDatasetID = c.datasetID
it, err := q.Read(c.ctx)
if err != nil {
err = fmt.Errorf("failed to run query %s on BigQuery:\n %w", query, err)
Expand Down Expand Up @@ -338,6 +346,8 @@ func (c *BigQueryConnector) SetLastOffset(jobName string, lastOffset int64) erro
jobName,
)
q := c.client.Query(query)
q.DefaultProjectID = c.projectID
q.DefaultDatasetID = c.datasetID
_, err := q.Read(c.ctx)
if err != nil {
return fmt.Errorf("failed to run query %s on BigQuery:\n %w", query, err)
Expand All @@ -350,6 +360,8 @@ func (c *BigQueryConnector) GetLastSyncBatchID(jobName string) (int64, error) {
query := fmt.Sprintf("SELECT sync_batch_id FROM %s.%s WHERE mirror_job_name = '%s'",
c.datasetID, MirrorJobsTable, jobName)
q := c.client.Query(query)
q.DefaultProjectID = c.projectID
q.DefaultDatasetID = c.datasetID
it, err := q.Read(c.ctx)
if err != nil {
err = fmt.Errorf("failed to run query %s on BigQuery:\n %w", query, err)
Expand All @@ -375,6 +387,8 @@ func (c *BigQueryConnector) GetLastSyncAndNormalizeBatchID(jobName string) (mode
query := fmt.Sprintf("SELECT sync_batch_id, normalize_batch_id FROM %s.%s WHERE mirror_job_name = '%s'",
c.datasetID, MirrorJobsTable, jobName)
q := c.client.Query(query)
q.DefaultProjectID = c.projectID
q.DefaultDatasetID = c.datasetID
it, err := q.Read(c.ctx)
if err != nil {
err = fmt.Errorf("failed to run query %s on BigQuery:\n %w", query, err)
Expand Down Expand Up @@ -413,6 +427,8 @@ func (c *BigQueryConnector) getDistinctTableNamesInBatch(flowJobName string, syn
c.datasetID, rawTableName, normalizeBatchID, syncBatchID)
// Run the query
q := c.client.Query(query)
q.DefaultProjectID = c.projectID
q.DefaultDatasetID = c.datasetID
it, err := q.Read(c.ctx)
if err != nil {
err = fmt.Errorf("failed to run query %s on BigQuery:\n %w", query, err)
Expand Down Expand Up @@ -455,6 +471,8 @@ func (c *BigQueryConnector) getTableNametoUnchangedCols(flowJobName string, sync
c.datasetID, rawTableName, normalizeBatchID, syncBatchID)
// Run the query
q := c.client.Query(query)
q.DefaultDatasetID = c.datasetID
q.DefaultProjectID = c.projectID
it, err := q.Read(c.ctx)
if err != nil {
err = fmt.Errorf("failed to run query %s on BigQuery:\n %w", query, err)
Expand Down Expand Up @@ -519,7 +537,7 @@ func (c *BigQueryConnector) syncRecordsViaAvro(
}

avroSync := NewQRepAvroSyncMethod(c, req.StagingPath, req.FlowJobName)
rawTableMetadata, err := c.client.Dataset(c.datasetID).Table(rawTableName).Metadata(c.ctx)
rawTableMetadata, err := c.client.DatasetInProject(c.projectID, c.datasetID).Table(rawTableName).Metadata(c.ctx)
if err != nil {
return nil, fmt.Errorf("failed to get metadata of destination table: %w", err)
}
Expand Down Expand Up @@ -604,6 +622,8 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
c.logger.Info(fmt.Sprintf("running merge statement [%d/%d] for table %s..",
i+1, len(mergeStmts), tableName))
q := c.client.Query(mergeStmt)
q.DefaultProjectID = c.projectID
q.DefaultDatasetID = c.datasetID
_, err = q.Read(c.ctx)
if err != nil {
return nil, fmt.Errorf("failed to execute merge statement %s: %v", mergeStmt, err)
Expand All @@ -615,7 +635,10 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
"UPDATE %s.%s SET normalize_batch_id=%d WHERE mirror_job_name='%s';",
c.datasetID, MirrorJobsTable, batchIDs.SyncBatchID, req.FlowJobName)

_, err = c.client.Query(updateMetadataStmt).Read(c.ctx)
query := c.client.Query(updateMetadataStmt)
query.DefaultProjectID = c.projectID
query.DefaultDatasetID = c.datasetID
_, err = query.Read(c.ctx)
if err != nil {
return nil, fmt.Errorf("failed to execute update metadata statements %s: %v", updateMetadataStmt, err)
}
Expand Down Expand Up @@ -649,7 +672,7 @@ func (c *BigQueryConnector) CreateRawTable(req *protos.CreateRawTableInput) (*pr
}

// create the table
table := c.client.Dataset(c.datasetID).Table(rawTableName)
table := c.client.DatasetInProject(c.projectID, c.datasetID).Table(rawTableName)

// check if the table exists
tableRef, err := table.Metadata(c.ctx)
Expand Down Expand Up @@ -728,6 +751,8 @@ func (c *BigQueryConnector) metadataHasJob(jobName string) (bool, error) {
c.datasetID, MirrorJobsTable, jobName)

q := c.client.Query(checkStmt)
q.DefaultProjectID = c.projectID
q.DefaultDatasetID = c.datasetID
it, err := q.Read(c.ctx)
if err != nil {
return false, fmt.Errorf("failed to check if job exists: %w", err)
Expand Down Expand Up @@ -765,7 +790,7 @@ func (c *BigQueryConnector) SetupNormalizedTables(
return nil, fmt.Errorf("invalid mirror: two tables mirror to the same BigQuery table %s",
datasetTable.string())
}
dataset := c.client.Dataset(datasetTable.dataset)
dataset := c.client.DatasetInProject(c.projectID, datasetTable.dataset)
_, err = dataset.Metadata(c.ctx)
// just assume this means dataset don't exist, and create it
if err != nil {
Expand Down Expand Up @@ -852,7 +877,7 @@ func (c *BigQueryConnector) SetupNormalizedTables(
}

func (c *BigQueryConnector) SyncFlowCleanup(jobName string) error {
dataset := c.client.Dataset(c.datasetID)
dataset := c.client.DatasetInProject(c.projectID, c.datasetID)
// deleting PeerDB specific tables
err := dataset.Table(c.getRawTableName(jobName)).Delete(c.ctx)
if err != nil {
Expand All @@ -861,7 +886,10 @@ func (c *BigQueryConnector) SyncFlowCleanup(jobName string) error {

// deleting job from metadata table
query := fmt.Sprintf("DELETE FROM %s.%s WHERE mirror_job_name = '%s'", c.datasetID, MirrorJobsTable, jobName)
_, err = c.client.Query(query).Read(c.ctx)
queryHandler := c.client.Query(query)
queryHandler.DefaultProjectID = c.projectID
queryHandler.DefaultDatasetID = c.datasetID
_, err = queryHandler.Read(c.ctx)
if err != nil {
return fmt.Errorf("failed to delete job from metadata table: %w", err)
}
Expand Down Expand Up @@ -898,11 +926,15 @@ func (c *BigQueryConnector) RenameTables(req *protos.RenameTablesInput) (*protos
srcDatasetTable.string(), fmt.Sprintf("%s,%s", allCols, *req.SoftDeleteColName),
allCols, *req.SoftDeleteColName, dstDatasetTable.string(),
pkeyCols, pkeyCols, srcDatasetTable.string()))
_, err := c.client.Query(
query := c.client.Query(
fmt.Sprintf("INSERT INTO %s(%s) SELECT %s,true AS %s FROM %s WHERE (%s) NOT IN (SELECT %s FROM %s)",
srcDatasetTable.string(), fmt.Sprintf("%s,%s", allCols, *req.SoftDeleteColName),
allCols, *req.SoftDeleteColName, dstDatasetTable.string(),
pkeyCols, pkeyCols, srcDatasetTable.string())).Read(c.ctx)
pkeyCols, pkeyCols, srcDatasetTable.string()))

query.DefaultProjectID = c.projectID
query.DefaultDatasetID = c.datasetID
_, err := query.Read(c.ctx)
if err != nil {
return nil, fmt.Errorf("unable to handle soft-deletes for table %s: %w", dstDatasetTable.string(), err)
}
Expand All @@ -917,9 +949,13 @@ func (c *BigQueryConnector) RenameTables(req *protos.RenameTablesInput) (*protos
c.logger.InfoContext(c.ctx,
fmt.Sprintf("UPDATE %s SET %s = CURRENT_TIMESTAMP WHERE %s IS NULL", srcDatasetTable.string(),
*req.SyncedAtColName, *req.SyncedAtColName))
_, err := c.client.Query(
query := c.client.Query(
fmt.Sprintf("UPDATE %s SET %s = CURRENT_TIMESTAMP WHERE %s IS NULL", srcDatasetTable.string(),
*req.SyncedAtColName, *req.SyncedAtColName)).Read(c.ctx)
*req.SyncedAtColName, *req.SyncedAtColName))

query.DefaultProjectID = c.projectID
query.DefaultDatasetID = c.datasetID
_, err := query.Read(c.ctx)
if err != nil {
return nil, fmt.Errorf("unable to set synced at column for table %s: %w", srcDatasetTable.string(), err)
}
Expand All @@ -928,17 +964,23 @@ func (c *BigQueryConnector) RenameTables(req *protos.RenameTablesInput) (*protos
c.logger.InfoContext(c.ctx, fmt.Sprintf("DROP TABLE IF EXISTS %s",
dstDatasetTable.string()))
// drop the dst table if exists
_, err := c.client.Query(fmt.Sprintf("DROP TABLE IF EXISTS %s",
dstDatasetTable.string())).Read(c.ctx)
dropQuery := c.client.Query(fmt.Sprintf("DROP TABLE IF EXISTS %s",
dstDatasetTable.string()))
dropQuery.DefaultProjectID = c.projectID
dropQuery.DefaultDatasetID = c.datasetID
_, err := dropQuery.Read(c.ctx)
if err != nil {
return nil, fmt.Errorf("unable to drop table %s: %w", dstDatasetTable.string(), err)
}

c.logger.InfoContext(c.ctx, fmt.Sprintf("ALTER TABLE %s RENAME TO %s",
srcDatasetTable.string(), dstDatasetTable.table))
// rename the src table to dst
_, err = c.client.Query(fmt.Sprintf("ALTER TABLE %s RENAME TO %s",
srcDatasetTable.string(), dstDatasetTable.table)).Read(c.ctx)
query := c.client.Query(fmt.Sprintf("ALTER TABLE %s RENAME TO %s",
srcDatasetTable.string(), dstDatasetTable.table))
query.DefaultProjectID = c.projectID
query.DefaultDatasetID = c.datasetID
_, err = query.Read(c.ctx)
if err != nil {
return nil, fmt.Errorf("unable to rename table %s to %s: %w", srcDatasetTable.string(),
dstDatasetTable.string(), err)
Expand All @@ -964,8 +1006,11 @@ func (c *BigQueryConnector) CreateTablesFromExisting(req *protos.CreateTablesFro
activity.RecordHeartbeat(c.ctx, fmt.Sprintf("creating table '%s' similar to '%s'", newTable, existingTable))

// rename the src table to dst
_, err := c.client.Query(fmt.Sprintf("CREATE TABLE IF NOT EXISTS `%s` LIKE `%s`",
newDatasetTable.string(), existingDatasetTable.string())).Read(c.ctx)
query := c.client.Query(fmt.Sprintf("CREATE TABLE IF NOT EXISTS `%s` LIKE `%s`",
newDatasetTable.string(), existingDatasetTable.string()))
query.DefaultProjectID = c.projectID
query.DefaultDatasetID = c.datasetID
_, err := query.Read(c.ctx)
if err != nil {
return nil, fmt.Errorf("unable to create table %s: %w", newTable, err)
}
Expand Down
11 changes: 8 additions & 3 deletions flow/connectors/bigquery/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (c *BigQueryConnector) replayTableSchemaDeltasQRep(config *protos.QRepConfi
srcSchema *model.QRecordSchema,
) (*bigquery.TableMetadata, error) {
destDatasetTable, _ := c.convertToDatasetTable(config.DestinationTableIdentifier)
bqTable := c.client.Dataset(destDatasetTable.dataset).Table(destDatasetTable.table)
bqTable := c.client.DatasetInProject(c.projectID, destDatasetTable.dataset).Table(destDatasetTable.table)
dstTableMetadata, err := bqTable.Metadata(c.ctx)
if err != nil {
return nil, fmt.Errorf("failed to get metadata of table %s: %w", destDatasetTable, err)
Expand Down Expand Up @@ -134,7 +134,7 @@ func (c *BigQueryConnector) SetupQRepMetadataTables(config *protos.QRepConfig) e
}

// reference the table
table := c.client.Dataset(c.datasetID).Table(qRepMetadataTableName)
table := c.client.DatasetInProject(c.projectID, c.datasetID).Table(qRepMetadataTableName)

// check if the table exists
meta, err := table.Metadata(c.ctx)
Expand All @@ -156,7 +156,10 @@ func (c *BigQueryConnector) SetupQRepMetadataTables(config *protos.QRepConfig) e
}

if config.WriteMode.WriteType == protos.QRepWriteType_QREP_WRITE_MODE_OVERWRITE {
_, err = c.client.Query(fmt.Sprintf("TRUNCATE TABLE %s", config.DestinationTableIdentifier)).Read(c.ctx)
query := c.client.Query(fmt.Sprintf("TRUNCATE TABLE %s", config.DestinationTableIdentifier))
query.DefaultDatasetID = c.datasetID
query.DefaultProjectID = c.projectID
_, err = query.Read(c.ctx)
if err != nil {
return fmt.Errorf("failed to TRUNCATE table before query replication: %w", err)
}
Expand All @@ -172,6 +175,8 @@ func (c *BigQueryConnector) isPartitionSynced(partitionID string) (bool, error)
)

query := c.client.Query(queryString)
query.DefaultDatasetID = c.datasetID
query.DefaultProjectID = c.projectID
it, err := query.Read(c.ctx)
if err != nil {
return false, fmt.Errorf("failed to execute query: %w", err)
Expand Down
16 changes: 11 additions & 5 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,16 @@ func (s *QRepAvroSyncMethod) SyncRecords(
updateMetadataStmt,
"COMMIT TRANSACTION;",
}
_, err = bqClient.Query(strings.Join(stmts, "\n")).Read(s.connector.ctx)
query := bqClient.Query(strings.Join(stmts, "\n"))
query.DefaultDatasetID = s.connector.datasetID
query.DefaultProjectID = s.connector.projectID
_, err = query.Read(s.connector.ctx)
if err != nil {
return nil, fmt.Errorf("failed to execute statements in a transaction: %v", err)
}

// drop the staging table
if err := bqClient.Dataset(datasetID).Table(stagingTable).Delete(s.connector.ctx); err != nil {
if err := bqClient.DatasetInProject(s.connector.projectID, datasetID).Table(stagingTable).Delete(s.connector.ctx); err != nil {
// just log the error this isn't fatal.
slog.Error("failed to delete staging table "+stagingTable,
slog.Any("error", err),
Expand Down Expand Up @@ -209,13 +212,16 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords(
insertMetadataStmt,
"COMMIT TRANSACTION;",
}
_, err = bqClient.Query(strings.Join(stmts, "\n")).Read(s.connector.ctx)
query := bqClient.Query(strings.Join(stmts, "\n"))
query.DefaultDatasetID = s.connector.datasetID
query.DefaultProjectID = s.connector.projectID
_, err = query.Read(s.connector.ctx)
if err != nil {
return -1, fmt.Errorf("failed to execute statements in a transaction: %v", err)
}

// drop the staging table
if err := bqClient.Dataset(stagingDatasetTable.dataset).
if err := bqClient.DatasetInProject(s.connector.projectID, stagingDatasetTable.dataset).
Table(stagingDatasetTable.table).Delete(s.connector.ctx); err != nil {
// just log the error this isn't fatal.
slog.Error("failed to delete staging table "+stagingDatasetTable.string(),
Expand Down Expand Up @@ -460,7 +466,7 @@ func (s *QRepAvroSyncMethod) writeToStage(
avroRef = localRef
}

loader := bqClient.Dataset(stagingTable.dataset).Table(stagingTable.table).LoaderFrom(avroRef)
loader := bqClient.DatasetInProject(s.connector.projectID, stagingTable.dataset).Table(stagingTable.table).LoaderFrom(avroRef)
loader.UseAvroLogicalTypes = true
loader.WriteDisposition = bigquery.WriteTruncate
job, err := loader.Run(s.connector.ctx)
Expand Down

0 comments on commit 2e3e7e4

Please sign in to comment.