Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move normalize into sync activity #2329

Merged
merged 13 commits into from
Dec 17, 2024
118 changes: 81 additions & 37 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,16 @@ type CheckConnectionResult struct {
NeedsSetupMetadataTables bool
}

type NormalizeBatchRequest struct {
Done chan struct{}
BatchID int64
}

type CdcCacheEntry struct {
connector connectors.CDCPullConnectorCore
done chan struct{}
connector connectors.CDCPullConnectorCore
done chan struct{}
serprex marked this conversation as resolved.
Show resolved Hide resolved
normalize chan NormalizeBatchRequest
normalizeDone chan struct{}
}

type FlowableActivity struct {
Expand Down Expand Up @@ -297,16 +304,56 @@ func (a *FlowableActivity) MaintainPull(
}

done := make(chan struct{})
normalize := make(chan NormalizeBatchRequest)
normalizeDone := make(chan struct{})
a.CdcCacheRw.Lock()
a.CdcCache[sessionID] = CdcCacheEntry{
connector: srcConn,
done: done,
connector: srcConn,
done: done,
normalize: normalize,
normalizeDone: normalizeDone,
}
a.CdcCacheRw.Unlock()

ticker := time.NewTicker(15 * time.Second)
defer ticker.Stop()

go func() {
serprex marked this conversation as resolved.
Show resolved Hide resolved
loop:
for {
select {
case req, ok := <-normalize:
if !ok {
break loop
}
retry:
serprex marked this conversation as resolved.
Show resolved Hide resolved
if err := a.StartNormalize(ctx, config, req.BatchID); err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
for {
// update req to latest normalize request & retry
select {
case req = <-normalize:
case <-done:
serprex marked this conversation as resolved.
Show resolved Hide resolved
break loop
case <-ctx.Done():
break loop
default:
time.Sleep(time.Second)
serprex marked this conversation as resolved.
Show resolved Hide resolved
goto retry
}
}
} else if req.Done != nil {
close(req.Done)
}
case <-done:
break loop
case <-ctx.Done():
break loop
}
}
close(normalizeDone)
serprex marked this conversation as resolved.
Show resolved Hide resolved
}()

for {
select {
case <-ticker.C:
Expand All @@ -330,12 +377,16 @@ func (a *FlowableActivity) MaintainPull(
}

func (a *FlowableActivity) UnmaintainPull(ctx context.Context, sessionID string) error {
var normalizeDone chan struct{}
a.CdcCacheRw.Lock()
if entry, ok := a.CdcCache[sessionID]; ok {
close(entry.done)
close(entry.normalize)
delete(a.CdcCache, sessionID)
normalizeDone = entry.normalizeDone
}
a.CdcCacheRw.Unlock()
<-normalizeDone
return nil
}

Expand All @@ -344,7 +395,7 @@ func (a *FlowableActivity) SyncRecords(
config *protos.FlowConnectionConfigs,
options *protos.SyncFlowOptions,
sessionID string,
) (*model.SyncCompositeResponse, error) {
) (int64, error) {
var adaptStream func(stream *model.CDCStream[model.RecordItems]) (*model.CDCStream[model.RecordItems], error)
if config.Script != "" {
var onErr context.CancelCauseFunc
Expand Down Expand Up @@ -385,30 +436,30 @@ func (a *FlowableActivity) SyncPg(
config *protos.FlowConnectionConfigs,
options *protos.SyncFlowOptions,
sessionID string,
) (*model.SyncCompositeResponse, error) {
) (int64, error) {
return syncCore(ctx, a, config, options, sessionID, nil,
connectors.CDCPullPgConnector.PullPg,
connectors.CDCSyncPgConnector.SyncPg)
}

func (a *FlowableActivity) StartNormalize(
ctx context.Context,
input *protos.StartNormalizeInput,
) (*model.NormalizeResponse, error) {
conn := input.FlowConnectionConfigs
ctx = context.WithValue(ctx, shared.FlowNameKey, conn.FlowJobName)
config *protos.FlowConnectionConfigs,
batchID int64,
) error {
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
logger := activity.GetLogger(ctx)

dstConn, err := connectors.GetByNameAs[connectors.CDCNormalizeConnector](
ctx,
input.FlowConnectionConfigs.Env,
config.Env,
a.CatalogPool,
conn.DestinationName,
config.DestinationName,
)
if errors.Is(err, errors.ErrUnsupported) {
return nil, monitoring.UpdateEndTimeForCDCBatch(ctx, a.CatalogPool, input.FlowConnectionConfigs.FlowJobName, input.SyncBatchID)
return monitoring.UpdateEndTimeForCDCBatch(ctx, a.CatalogPool, config.FlowJobName, batchID)
} else if err != nil {
return nil, fmt.Errorf("failed to get normalize connector: %w", err)
return fmt.Errorf("failed to get normalize connector: %w", err)
}
defer connectors.CloseConnector(ctx, dstConn)

Expand All @@ -417,41 +468,34 @@ func (a *FlowableActivity) StartNormalize(
})
defer shutdown()

tableNameSchemaMapping, err := a.getTableNameSchemaMapping(ctx, input.FlowConnectionConfigs.FlowJobName)
tableNameSchemaMapping, err := a.getTableNameSchemaMapping(ctx, config.FlowJobName)
if err != nil {
return nil, fmt.Errorf("failed to get table name schema mapping: %w", err)
return fmt.Errorf("failed to get table name schema mapping: %w", err)
}

logger.Info("Normalizing batch", slog.Int64("SyncBatchID", batchID))
res, err := dstConn.NormalizeRecords(ctx, &model.NormalizeRecordsRequest{
FlowJobName: input.FlowConnectionConfigs.FlowJobName,
Env: input.FlowConnectionConfigs.Env,
FlowJobName: config.FlowJobName,
Env: config.Env,
TableNameSchemaMapping: tableNameSchemaMapping,
TableMappings: input.FlowConnectionConfigs.TableMappings,
SyncBatchID: input.SyncBatchID,
SoftDeleteColName: input.FlowConnectionConfigs.SoftDeleteColName,
SyncedAtColName: input.FlowConnectionConfigs.SyncedAtColName,
TableMappings: config.TableMappings,
SoftDeleteColName: config.SoftDeleteColName,
SyncedAtColName: config.SyncedAtColName,
SyncBatchID: batchID,
})
if err != nil {
a.Alerter.LogFlowError(ctx, input.FlowConnectionConfigs.FlowJobName, err)
return nil, fmt.Errorf("failed to normalized records: %w", err)
}
dstType, err := connectors.LoadPeerType(ctx, a.CatalogPool, input.FlowConnectionConfigs.DestinationName)
if err != nil {
return nil, fmt.Errorf("failed to get peer type: %w", err)
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return fmt.Errorf("failed to normalized records: %w", err)
}
if dstType == protos.DBType_POSTGRES {
err = monitoring.UpdateEndTimeForCDCBatch(ctx, a.CatalogPool, input.FlowConnectionConfigs.FlowJobName,
input.SyncBatchID)
if err != nil {
return nil, fmt.Errorf("failed to update end time for cdc batch: %w", err)
if _, dstPg := dstConn.(*connpostgres.PostgresConnector); dstPg {
if err := monitoring.UpdateEndTimeForCDCBatch(ctx, a.CatalogPool, config.FlowJobName, batchID); err != nil {
return fmt.Errorf("failed to update end time for cdc batch: %w", err)
}
}

// log the number of batches normalized
logger.Info(fmt.Sprintf("normalized records from batch %d to batch %d",
res.StartBatchID, res.EndBatchID))
logger.Info("normalized batches", slog.Int64("StartBatchID", res.StartBatchID), slog.Int64("EndBatchID", res.EndBatchID))

return res, nil
return nil
}

// SetupQRepMetadataTables sets up the metadata tables for QReplication.
Expand Down
Loading
Loading