Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into stable
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Sep 26, 2024
2 parents 11fd8cf + ebda937 commit 3a8e4a8
Show file tree
Hide file tree
Showing 62 changed files with 1,468 additions and 943 deletions.
4 changes: 2 additions & 2 deletions docker-compose-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ services:
- POSTGRES_PWD=postgres
- POSTGRES_SEEDS=catalog
- DYNAMIC_CONFIG_FILE_PATH=config/dynamicconfig/development-sql.yaml
image: temporalio/auto-setup:1.24
image: temporalio/auto-setup:1.25
ports:
- 7233:7233
volumes:
Expand All @@ -95,7 +95,7 @@ services:
- TEMPORAL_ADDRESS=temporal:7233
- TEMPORAL_CLI_ADDRESS=temporal:7233
- TEMPORAL_CLI_SHOW_STACKS=1
image: temporalio/admin-tools:1.24
image: temporalio/admin-tools:1.25
stdin_open: true
tty: true
entrypoint: /etc/temporal/entrypoint.sh
Expand Down
4 changes: 2 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ services:
- POSTGRES_PWD=postgres
- POSTGRES_SEEDS=catalog
- DYNAMIC_CONFIG_FILE_PATH=config/dynamicconfig/development-sql.yaml
image: temporalio/auto-setup:1.24
image: temporalio/auto-setup:1.25
ports:
- 7233:7233
volumes:
Expand All @@ -85,7 +85,7 @@ services:
- TEMPORAL_ADDRESS=temporal:7233
- TEMPORAL_CLI_ADDRESS=temporal:7233
- TEMPORAL_CLI_SHOW_STACKS=1
image: temporalio/admin-tools:1.24
image: temporalio/admin-tools:1.25
stdin_open: true
tty: true
entrypoint: /etc/temporal/entrypoint.sh
Expand Down
67 changes: 67 additions & 0 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/PeerDB-io/peer-flow/alerting"
"github.com/PeerDB-io/peer-flow/connectors"
connmetadata "github.com/PeerDB-io/peer-flow/connectors/external_metadata"
connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres"
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/connectors/utils/monitoring"
Expand Down Expand Up @@ -831,3 +832,69 @@ func (a *FlowableActivity) AddTablesToPublication(ctx context.Context, cfg *prot
}
return err
}

func (a *FlowableActivity) RemoveTablesFromPublication(ctx context.Context, cfg *protos.FlowConnectionConfigs,
removedTablesMapping []*protos.TableMapping,
) error {
ctx = context.WithValue(ctx, shared.FlowNameKey, cfg.FlowJobName)
srcConn, err := connectors.GetByNameAs[connectors.CDCPullConnector](ctx, cfg.Env, a.CatalogPool, cfg.SourceName)
if err != nil {
return fmt.Errorf("failed to get source connector: %w", err)
}
defer connectors.CloseConnector(ctx, srcConn)

err = srcConn.RemoveTablesFromPublication(ctx, &protos.RemoveTablesFromPublicationInput{
FlowJobName: cfg.FlowJobName,
PublicationName: cfg.PublicationName,
TablesToRemove: removedTablesMapping,
})
if err != nil {
a.Alerter.LogFlowError(ctx, cfg.FlowJobName, err)
}
return err
}

func (a *FlowableActivity) RemoveTablesFromRawTable(ctx context.Context, cfg *protos.FlowConnectionConfigs,
tablesToRemove []*protos.TableMapping,
) error {
ctx = context.WithValue(ctx, shared.FlowNameKey, cfg.FlowJobName)
logger := log.With(activity.GetLogger(ctx), slog.String(string(shared.FlowNameKey), cfg.FlowJobName))
pgMetadata := connmetadata.NewPostgresMetadataFromCatalog(logger, a.CatalogPool)
normBatchID, err := pgMetadata.GetLastNormalizeBatchID(ctx, cfg.FlowJobName)
if err != nil {
logger.Error("[RemoveTablesFromRawTable] failed to get last normalize batch id", slog.Any("error", err))
return err
}

syncBatchID, err := pgMetadata.GetLastSyncBatchID(ctx, cfg.FlowJobName)
if err != nil {
logger.Error("[RemoveTablesFromRawTable] failed to get last sync batch id", slog.Any("error", err))
return err
}

dstConn, err := connectors.GetByNameAs[connectors.RawTableConnector](ctx, cfg.Env, a.CatalogPool, cfg.DestinationName)
if err != nil {
if errors.Is(err, errors.ErrUnsupported) {
// For connectors where raw table is not a concept,
// we can ignore the error
return nil
}
return fmt.Errorf("[RemoveTablesFromRawTable]:failed to get destination connector: %w", err)
}
defer connectors.CloseConnector(ctx, dstConn)

tableNames := make([]string, 0, len(tablesToRemove))
for _, table := range tablesToRemove {
tableNames = append(tableNames, table.DestinationTableIdentifier)
}
err = dstConn.RemoveTableEntriesFromRawTable(ctx, &protos.RemoveTablesFromRawTableInput{
FlowJobName: cfg.FlowJobName,
DestinationTableNames: tableNames,
SyncBatchId: syncBatchID,
NormalizeBatchId: normBatchID,
})
if err != nil {
a.Alerter.LogFlowError(ctx, cfg.FlowJobName, err)
}
return err
}
2 changes: 1 addition & 1 deletion flow/alerting/alerting.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ func (a *Alerter) checkAndAddAlertToCatalog(ctx context.Context, alertConfigId i
var createdTimestamp time.Time
err = row.Scan(&createdTimestamp)
if err != nil && err != pgx.ErrNoRows {
logger.LoggerFromCtx(ctx).Warn("failed to send alert: ", slog.String("err", err.Error()))
logger.LoggerFromCtx(ctx).Warn("failed to send alert", slog.Any("err", err))
return false
}

Expand Down
53 changes: 37 additions & 16 deletions flow/cmd/mirror_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,13 +199,18 @@ func (h *FlowRequestHandler) cdcFlowStatus(
return nil, err
}

cloneStatuses, err := h.cloneTableSummary(ctx, req.FlowJobName)
initialLoadResponse, err := h.InitialLoadSummary(ctx, &protos.InitialLoadSummaryRequest{
ParentMirrorName: req.FlowJobName,
})
if err != nil {
slog.Error("unable to query clone table summary", slog.Any("error", err))
return nil, err
}

cdcBatches, err := h.getCdcBatches(ctx, req.FlowJobName)
cdcBatchesResponse, err := h.GetCDCBatches(ctx, &protos.GetCDCBatchesRequest{
FlowJobName: req.FlowJobName,
Limit: 0,
})
if err != nil {
return nil, err
}
Expand All @@ -215,16 +220,17 @@ func (h *FlowRequestHandler) cdcFlowStatus(
SourceType: srcType,
DestinationType: dstType,
SnapshotStatus: &protos.SnapshotStatus{
Clones: cloneStatuses,
Clones: initialLoadResponse.TableSummaries,
},
CdcBatches: cdcBatches,
CdcBatches: cdcBatchesResponse.CdcBatches,
}, nil
}

func (h *FlowRequestHandler) cloneTableSummary(
func (h *FlowRequestHandler) InitialLoadSummary(
ctx context.Context,
parentMirrorName string,
) ([]*protos.CloneTableSummary, error) {
req *protos.InitialLoadSummaryRequest,
) (*protos.InitialLoadSummaryResponse, error) {
parentMirrorName := req.ParentMirrorName
q := `
SELECT
distinct qr.flow_name,
Expand Down Expand Up @@ -325,7 +331,9 @@ func (h *FlowRequestHandler) cloneTableSummary(

cloneStatuses = append(cloneStatuses, &res)
}
return cloneStatuses, nil
return &protos.InitialLoadSummaryResponse{
TableSummaries: cloneStatuses,
}, nil
}

func (h *FlowRequestHandler) qrepFlowStatus(
Expand Down Expand Up @@ -478,25 +486,31 @@ func (h *FlowRequestHandler) getMirrorCreatedAt(ctx context.Context, flowJobName
return &createdAt.Time, nil
}

func (h *FlowRequestHandler) getCdcBatches(ctx context.Context, flowJobName string) ([]*protos.CDCBatch, error) {
func (h *FlowRequestHandler) GetCDCBatches(ctx context.Context, req *protos.GetCDCBatchesRequest) (*protos.GetCDCBatchesResponse, error) {
mirrorName := req.FlowJobName
limit := req.Limit
limitClause := ""
if limit > 0 {
limitClause = fmt.Sprintf(" LIMIT %d", limit)
}
q := `SELECT DISTINCT ON(batch_id) batch_id,start_time,end_time,rows_in_batch,batch_start_lsn,batch_end_lsn FROM peerdb_stats.cdc_batches
WHERE flow_name=$1 AND start_time IS NOT NULL ORDER BY batch_id DESC, start_time DESC`
rows, err := h.pool.Query(ctx, q, flowJobName)
WHERE flow_name=$1 AND start_time IS NOT NULL ORDER BY batch_id DESC, start_time DESC` + limitClause
rows, err := h.pool.Query(ctx, q, mirrorName)
if err != nil {
slog.Error(fmt.Sprintf("unable to query cdc batches - %s: %s", flowJobName, err.Error()))
return nil, fmt.Errorf("unable to query cdc batches - %s: %w", flowJobName, err)
slog.Error(fmt.Sprintf("unable to query cdc batches - %s: %s", mirrorName, err.Error()))
return nil, fmt.Errorf("unable to query cdc batches - %s: %w", mirrorName, err)
}

return pgx.CollectRows(rows, func(row pgx.CollectableRow) (*protos.CDCBatch, error) {
batches, err := pgx.CollectRows(rows, func(row pgx.CollectableRow) (*protos.CDCBatch, error) {
var batchID pgtype.Int8
var startTime pgtype.Timestamp
var endTime pgtype.Timestamp
var numRows pgtype.Int8
var startLSN pgtype.Numeric
var endLSN pgtype.Numeric
if err := rows.Scan(&batchID, &startTime, &endTime, &numRows, &startLSN, &endLSN); err != nil {
slog.Error(fmt.Sprintf("unable to scan cdc batches - %s: %s", flowJobName, err.Error()))
return nil, fmt.Errorf("unable to scan cdc batches - %s: %w", flowJobName, err)
slog.Error(fmt.Sprintf("unable to scan cdc batches - %s: %s", mirrorName, err.Error()))
return nil, fmt.Errorf("unable to scan cdc batches - %s: %w", mirrorName, err)
}

var batch protos.CDCBatch
Expand All @@ -522,6 +536,13 @@ func (h *FlowRequestHandler) getCdcBatches(ctx context.Context, flowJobName stri

return &batch, nil
})
if err != nil {
return nil, err
}

return &protos.GetCDCBatchesResponse{
CdcBatches: batches,
}, nil
}

func (h *FlowRequestHandler) CDCTableTotalCounts(
Expand Down
2 changes: 1 addition & 1 deletion flow/cmd/validate_mirror.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func (h *FlowRequestHandler) ValidateCDCMirror(
}, err
}
if dstPeer.GetClickhouseConfig() != nil {
chPeer, err := connclickhouse.NewClickhouseConnector(ctx, nil, dstPeer.GetClickhouseConfig())
chPeer, err := connclickhouse.NewClickHouseConnector(ctx, nil, dstPeer.GetClickhouseConfig())
if err != nil {
displayErr := fmt.Errorf("failed to create clickhouse connector: %v", err)
h.alerter.LogNonFlowWarning(ctx, telemetry.CreateMirror, req.ConnectionConfigs.FlowJobName,
Expand Down
23 changes: 23 additions & 0 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -931,6 +931,29 @@ func (c *BigQueryConnector) CreateTablesFromExisting(
}, nil
}

func (c *BigQueryConnector) RemoveTableEntriesFromRawTable(
ctx context.Context,
req *protos.RemoveTablesFromRawTableInput,
) error {
rawTableIdentifier := c.getRawTableName(req.FlowJobName)
for _, tableName := range req.DestinationTableNames {
c.logger.Info(fmt.Sprintf("removing entries for table '%s' from raw table...", tableName))
deleteCmd := c.queryWithLogging(fmt.Sprintf("DELETE FROM `%s` WHERE _peerdb_destination_table_name = '%s'"+
" AND _peerdb_batch_id > %d AND _peerdb_batch_id <= %d",
rawTableIdentifier, tableName, req.NormalizeBatchId, req.SyncBatchId))
deleteCmd.DefaultProjectID = c.projectID
deleteCmd.DefaultDatasetID = c.datasetID
_, err := deleteCmd.Read(ctx)
if err != nil {
c.logger.Error("failed to remove entries from raw table", "error", err)
}

c.logger.Info(fmt.Sprintf("successfully removed entries for table '%s' from raw table", tableName))
}

return nil
}

type datasetTable struct {
project string
dataset string
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,12 +408,12 @@ func (s *QRepAvroSyncMethod) writeToStage(
obj := bucket.Object(avroFilePath)
w := obj.NewWriter(ctx)

numRecords, err := ocfWriter.WriteOCF(ctx, w, -1)
numRecords, err := ocfWriter.WriteOCF(ctx, w)
if err != nil {
return 0, fmt.Errorf("failed to write records to Avro file on GCS: %w", err)
}
avroFile = &avro.AvroFile{
NumRecords: int(numRecords),
NumRecords: numRecords,
StorageLocation: avro.AvroGCSStorage,
FilePath: avroFilePath,
}
Expand Down
Loading

0 comments on commit 3a8e4a8

Please sign in to comment.