Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move normalize into sync activity #2329

Merged
merged 13 commits into from
Dec 17, 2024
92 changes: 52 additions & 40 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,16 @@ type CheckConnectionResult struct {
NeedsSetupMetadataTables bool
}

type NormalizeBatchRequest struct {
Done chan struct{}
BatchID int64
}

type CdcCacheEntry struct {
connector connectors.CDCPullConnectorCore
done chan struct{}
connector connectors.CDCPullConnectorCore
syncDone chan struct{}
normalize chan NormalizeBatchRequest
normalizeDone chan struct{}
}

type FlowableActivity struct {
Expand Down Expand Up @@ -296,17 +303,26 @@ func (a *FlowableActivity) MaintainPull(
return err
}

done := make(chan struct{})
// syncDone will be closed by UnmaintainPull,
// whereas normalizeDone will be closed by the normalize goroutine
// Wait on normalizeDone at end to not interrupt final normalize
syncDone := make(chan struct{})
normalize := make(chan NormalizeBatchRequest)
normalizeDone := make(chan struct{})
a.CdcCacheRw.Lock()
a.CdcCache[sessionID] = CdcCacheEntry{
connector: srcConn,
done: done,
connector: srcConn,
syncDone: syncDone,
normalize: normalize,
normalizeDone: normalizeDone,
}
a.CdcCacheRw.Unlock()

ticker := time.NewTicker(15 * time.Second)
defer ticker.Stop()

go a.normalizeLoop(ctx, config, syncDone, normalize, normalizeDone)

for {
select {
case <-ticker.C:
Expand All @@ -318,7 +334,7 @@ func (a *FlowableActivity) MaintainPull(
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return temporal.NewNonRetryableApplicationError("connection to source down", "disconnect", err)
}
case <-done:
case <-syncDone:
return nil
case <-ctx.Done():
a.CdcCacheRw.Lock()
Expand All @@ -330,12 +346,15 @@ func (a *FlowableActivity) MaintainPull(
}

func (a *FlowableActivity) UnmaintainPull(ctx context.Context, sessionID string) error {
var normalizeDone chan struct{}
a.CdcCacheRw.Lock()
if entry, ok := a.CdcCache[sessionID]; ok {
close(entry.done)
close(entry.syncDone)
delete(a.CdcCache, sessionID)
normalizeDone = entry.normalizeDone
}
a.CdcCacheRw.Unlock()
<-normalizeDone
return nil
}

Expand All @@ -344,7 +363,7 @@ func (a *FlowableActivity) SyncRecords(
config *protos.FlowConnectionConfigs,
options *protos.SyncFlowOptions,
sessionID string,
) (*model.SyncCompositeResponse, error) {
) (int64, error) {
var adaptStream func(stream *model.CDCStream[model.RecordItems]) (*model.CDCStream[model.RecordItems], error)
if config.Script != "" {
var onErr context.CancelCauseFunc
Expand Down Expand Up @@ -385,30 +404,30 @@ func (a *FlowableActivity) SyncPg(
config *protos.FlowConnectionConfigs,
options *protos.SyncFlowOptions,
sessionID string,
) (*model.SyncCompositeResponse, error) {
) (int64, error) {
return syncCore(ctx, a, config, options, sessionID, nil,
connectors.CDCPullPgConnector.PullPg,
connectors.CDCSyncPgConnector.SyncPg)
}

func (a *FlowableActivity) StartNormalize(
ctx context.Context,
input *protos.StartNormalizeInput,
) (*model.NormalizeResponse, error) {
conn := input.FlowConnectionConfigs
ctx = context.WithValue(ctx, shared.FlowNameKey, conn.FlowJobName)
config *protos.FlowConnectionConfigs,
batchID int64,
) error {
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
logger := activity.GetLogger(ctx)

dstConn, err := connectors.GetByNameAs[connectors.CDCNormalizeConnector](
ctx,
input.FlowConnectionConfigs.Env,
config.Env,
a.CatalogPool,
conn.DestinationName,
config.DestinationName,
)
if errors.Is(err, errors.ErrUnsupported) {
return nil, monitoring.UpdateEndTimeForCDCBatch(ctx, a.CatalogPool, input.FlowConnectionConfigs.FlowJobName, input.SyncBatchID)
return monitoring.UpdateEndTimeForCDCBatch(ctx, a.CatalogPool, config.FlowJobName, batchID)
} else if err != nil {
return nil, fmt.Errorf("failed to get normalize connector: %w", err)
return fmt.Errorf("failed to get normalize connector: %w", err)
}
defer connectors.CloseConnector(ctx, dstConn)

Expand All @@ -417,41 +436,34 @@ func (a *FlowableActivity) StartNormalize(
})
defer shutdown()

tableNameSchemaMapping, err := a.getTableNameSchemaMapping(ctx, input.FlowConnectionConfigs.FlowJobName)
tableNameSchemaMapping, err := a.getTableNameSchemaMapping(ctx, config.FlowJobName)
if err != nil {
return nil, fmt.Errorf("failed to get table name schema mapping: %w", err)
return fmt.Errorf("failed to get table name schema mapping: %w", err)
}

logger.Info("Normalizing batch", slog.Int64("SyncBatchID", batchID))
res, err := dstConn.NormalizeRecords(ctx, &model.NormalizeRecordsRequest{
FlowJobName: input.FlowConnectionConfigs.FlowJobName,
Env: input.FlowConnectionConfigs.Env,
FlowJobName: config.FlowJobName,
Env: config.Env,
TableNameSchemaMapping: tableNameSchemaMapping,
TableMappings: input.FlowConnectionConfigs.TableMappings,
SyncBatchID: input.SyncBatchID,
SoftDeleteColName: input.FlowConnectionConfigs.SoftDeleteColName,
SyncedAtColName: input.FlowConnectionConfigs.SyncedAtColName,
TableMappings: config.TableMappings,
SoftDeleteColName: config.SoftDeleteColName,
SyncedAtColName: config.SyncedAtColName,
SyncBatchID: batchID,
})
if err != nil {
a.Alerter.LogFlowError(ctx, input.FlowConnectionConfigs.FlowJobName, err)
return nil, fmt.Errorf("failed to normalized records: %w", err)
}
dstType, err := connectors.LoadPeerType(ctx, a.CatalogPool, input.FlowConnectionConfigs.DestinationName)
if err != nil {
return nil, fmt.Errorf("failed to get peer type: %w", err)
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return fmt.Errorf("failed to normalized records: %w", err)
}
if dstType == protos.DBType_POSTGRES {
err = monitoring.UpdateEndTimeForCDCBatch(ctx, a.CatalogPool, input.FlowConnectionConfigs.FlowJobName,
input.SyncBatchID)
if err != nil {
return nil, fmt.Errorf("failed to update end time for cdc batch: %w", err)
if _, dstPg := dstConn.(*connpostgres.PostgresConnector); dstPg {
if err := monitoring.UpdateEndTimeForCDCBatch(ctx, a.CatalogPool, config.FlowJobName, batchID); err != nil {
return fmt.Errorf("failed to update end time for cdc batch: %w", err)
}
}

// log the number of batches normalized
logger.Info(fmt.Sprintf("normalized records from batch %d to batch %d",
res.StartBatchID, res.EndBatchID))
logger.Info("normalized batches", slog.Int64("StartBatchID", res.StartBatchID), slog.Int64("EndBatchID", res.EndBatchID))

return res, nil
return nil
}

// SetupQRepMetadataTables sets up the metadata tables for QReplication.
Expand Down
Loading
Loading