Skip to content

Commit

Permalink
move normalize into sync activity
Browse files Browse the repository at this point in the history
can still run in parallel,
in fact now sync batches won't sync ahead if normalize is slow
  • Loading branch information
serprex committed Dec 7, 2024
1 parent e868b3b commit f9c2355
Show file tree
Hide file tree
Showing 12 changed files with 123 additions and 283 deletions.
57 changes: 45 additions & 12 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,15 @@ type CheckConnectionResult struct {
NeedsSetupMetadataTables bool
}

type NormalizeBatchRequest struct {
Done chan model.NormalizeResponse
BatchID int64
}

type CdcCacheEntry struct {
connector connectors.CDCPullConnectorCore
done chan struct{}
normalize chan NormalizeBatchRequest
}

type FlowableActivity struct {
Expand Down Expand Up @@ -297,16 +303,39 @@ func (a *FlowableActivity) MaintainPull(
}

done := make(chan struct{})
normalize := make(chan NormalizeBatchRequest)
a.CdcCacheRw.Lock()
a.CdcCache[sessionID] = CdcCacheEntry{
connector: srcConn,
done: done,
normalize: normalize,
}
a.CdcCacheRw.Unlock()

ticker := time.NewTicker(15 * time.Second)
defer ticker.Stop()

normDone := make(chan struct{})
go func() {
for req := range normalize {
res, err := a.StartNormalize(ctx, &protos.StartNormalizeInput{
FlowConnectionConfigs: config,
SyncBatchID: req.BatchID,
})
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
} else if req.Done != nil {
req.Done <- res
}
if req.Done != nil {
close(req.Done)
}
}
close(normDone)
}()
defer func() { <-normDone }()
defer close(normalize) // TODO race, this will cause sync to panic if it tries to send to normalize after maintainpull ends

for {
select {
case <-ticker.C:
Expand Down Expand Up @@ -344,7 +373,7 @@ func (a *FlowableActivity) SyncRecords(
config *protos.FlowConnectionConfigs,
options *protos.SyncFlowOptions,
sessionID string,
) (*model.SyncCompositeResponse, error) {
) (*model.SyncResponse, error) {
var adaptStream func(stream *model.CDCStream[model.RecordItems]) (*model.CDCStream[model.RecordItems], error)
if config.Script != "" {
var onErr context.CancelCauseFunc
Expand Down Expand Up @@ -385,7 +414,7 @@ func (a *FlowableActivity) SyncPg(
config *protos.FlowConnectionConfigs,
options *protos.SyncFlowOptions,
sessionID string,
) (*model.SyncCompositeResponse, error) {
) (*model.SyncResponse, error) {
return syncCore(ctx, a, config, options, sessionID, nil,
connectors.CDCPullPgConnector.PullPg,
connectors.CDCSyncPgConnector.SyncPg)
Expand All @@ -394,7 +423,7 @@ func (a *FlowableActivity) SyncPg(
func (a *FlowableActivity) StartNormalize(
ctx context.Context,
input *protos.StartNormalizeInput,
) (*model.NormalizeResponse, error) {
) (model.NormalizeResponse, error) {
conn := input.FlowConnectionConfigs
ctx = context.WithValue(ctx, shared.FlowNameKey, conn.FlowJobName)
logger := activity.GetLogger(ctx)
Expand All @@ -406,9 +435,10 @@ func (a *FlowableActivity) StartNormalize(
conn.DestinationName,
)
if errors.Is(err, errors.ErrUnsupported) {
return nil, monitoring.UpdateEndTimeForCDCBatch(ctx, a.CatalogPool, input.FlowConnectionConfigs.FlowJobName, input.SyncBatchID)
return model.NormalizeResponse{},
monitoring.UpdateEndTimeForCDCBatch(ctx, a.CatalogPool, input.FlowConnectionConfigs.FlowJobName, input.SyncBatchID)
} else if err != nil {
return nil, fmt.Errorf("failed to get normalize connector: %w", err)
return model.NormalizeResponse{}, fmt.Errorf("failed to get normalize connector: %w", err)
}
defer connectors.CloseConnector(ctx, dstConn)

Expand All @@ -419,7 +449,7 @@ func (a *FlowableActivity) StartNormalize(

tableNameSchemaMapping, err := a.getTableNameSchemaMapping(ctx, input.FlowConnectionConfigs.FlowJobName)
if err != nil {
return nil, fmt.Errorf("failed to get table name schema mapping: %w", err)
return model.NormalizeResponse{}, fmt.Errorf("failed to get table name schema mapping: %w", err)
}

res, err := dstConn.NormalizeRecords(ctx, &model.NormalizeRecordsRequest{
Expand All @@ -433,17 +463,20 @@ func (a *FlowableActivity) StartNormalize(
})
if err != nil {
a.Alerter.LogFlowError(ctx, input.FlowConnectionConfigs.FlowJobName, err)
return nil, fmt.Errorf("failed to normalized records: %w", err)
return model.NormalizeResponse{}, fmt.Errorf("failed to normalized records: %w", err)
}
dstType, err := connectors.LoadPeerType(ctx, a.CatalogPool, input.FlowConnectionConfigs.DestinationName)
if err != nil {
return nil, fmt.Errorf("failed to get peer type: %w", err)
return model.NormalizeResponse{}, fmt.Errorf("failed to get peer type: %w", err)
}
if dstType == protos.DBType_POSTGRES {
err = monitoring.UpdateEndTimeForCDCBatch(ctx, a.CatalogPool, input.FlowConnectionConfigs.FlowJobName,
input.SyncBatchID)
if err != nil {
return nil, fmt.Errorf("failed to update end time for cdc batch: %w", err)
if err := monitoring.UpdateEndTimeForCDCBatch(
ctx,
a.CatalogPool,
input.FlowConnectionConfigs.FlowJobName,
input.SyncBatchID,
); err != nil {
return model.NormalizeResponse{}, fmt.Errorf("failed to update end time for cdc batch: %w", err)
}
}

Expand Down
51 changes: 35 additions & 16 deletions flow/activities/flowable_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package activities

import (
"context"
"errors"
"fmt"
"log/slog"
"reflect"
Expand Down Expand Up @@ -50,7 +51,9 @@ func heartbeatRoutine(
)
}

func waitForCdcCache[TPull connectors.CDCPullConnectorCore](ctx context.Context, a *FlowableActivity, sessionID string) (TPull, error) {
func waitForCdcCache[TPull connectors.CDCPullConnectorCore](
ctx context.Context, a *FlowableActivity, sessionID string,
) (TPull, chan NormalizeBatchRequest, error) {
var none TPull
logger := activity.GetLogger(ctx)
attempt := 0
Expand All @@ -63,9 +66,9 @@ func waitForCdcCache[TPull connectors.CDCPullConnectorCore](ctx context.Context,
a.CdcCacheRw.RUnlock()
if ok {
if conn, ok := entry.connector.(TPull); ok {
return conn, nil
return conn, entry.normalize, nil
}
return none, fmt.Errorf("expected %s, cache held %T", reflect.TypeFor[TPull]().Name(), entry.connector)
return none, nil, fmt.Errorf("expected %s, cache held %T", reflect.TypeFor[TPull]().Name(), entry.connector)
}
activity.RecordHeartbeat(ctx, fmt.Sprintf("wait %s for source connector", waitInterval))
attempt += 1
Expand All @@ -74,7 +77,7 @@ func waitForCdcCache[TPull connectors.CDCPullConnectorCore](ctx context.Context,
slog.Int("attempt", attempt), slog.String("sessionID", sessionID))
}
if err := ctx.Err(); err != nil {
return none, err
return none, nil, err
}
time.Sleep(waitInterval)
if attempt == 300 {
Expand Down Expand Up @@ -116,7 +119,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
adaptStream func(*model.CDCStream[Items]) (*model.CDCStream[Items], error),
pull func(TPull, context.Context, *pgxpool.Pool, *otel_metrics.OtelManager, *model.PullRecordsRequest[Items]) error,
sync func(TSync, context.Context, *model.SyncRecordsRequest[Items]) (*model.SyncResponse, error),
) (*model.SyncCompositeResponse, error) {
) (*model.SyncResponse, error) {
flowName := config.FlowJobName
ctx = context.WithValue(ctx, shared.FlowNameKey, flowName)
logger := activity.GetLogger(ctx)
Expand All @@ -130,7 +133,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
tblNameMapping[v.SourceTableIdentifier] = model.NewNameAndExclude(v.DestinationTableIdentifier, v.Exclude)
}

srcConn, err := waitForCdcCache[TPull](ctx, a, sessionID)
srcConn, normChan, err := waitForCdcCache[TPull](ctx, a, sessionID)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -229,12 +232,9 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
return nil, fmt.Errorf("failed to sync schema: %w", err)
}

return &model.SyncCompositeResponse{
SyncResponse: &model.SyncResponse{
CurrentSyncBatchID: -1,
TableSchemaDeltas: recordBatchSync.SchemaDeltas,
},
NeedsNormalize: false,
return &model.SyncResponse{
CurrentSyncBatchID: -1,
TableSchemaDeltas: recordBatchSync.SchemaDeltas,
}, nil
}

Expand Down Expand Up @@ -326,10 +326,29 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
activity.RecordHeartbeat(ctx, pushedRecordsWithCount)
a.Alerter.LogFlowInfo(ctx, flowName, pushedRecordsWithCount)

return &model.SyncCompositeResponse{
SyncResponse: res,
NeedsNormalize: recordBatchSync.NeedsNormalize(),
}, nil
if recordBatchSync.NeedsNormalize() {
parallel, err := peerdbenv.PeerDBEnableParallelSyncNormalize(ctx, config.Env)
if err != nil {
return nil, err
}
var done chan model.NormalizeResponse
if !parallel {
done = make(chan model.NormalizeResponse)
}
normChan <- NormalizeBatchRequest{
BatchID: res.CurrentSyncBatchID,
Done: done,
}
if done != nil {
if normRes, ok := <-done; !ok {
return nil, errors.New("failed to normalize")
} else {
a.Alerter.LogFlowInfo(ctx, flowName, fmt.Sprintf("normalized from %d to %d", normRes.StartBatchID, normRes.EndBatchID))
}
}
}

return res, nil
}

func (a *FlowableActivity) getPostgresPeerConfigs(ctx context.Context) ([]*protos.Peer, error) {
Expand Down
12 changes: 6 additions & 6 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,17 +383,17 @@ func (c *BigQueryConnector) syncRecordsViaAvro(

// NormalizeRecords normalizes raw table to destination table,
// one batch at a time from the previous normalized batch to the currently synced batch.
func (c *BigQueryConnector) NormalizeRecords(ctx context.Context, req *model.NormalizeRecordsRequest) (*model.NormalizeResponse, error) {
func (c *BigQueryConnector) NormalizeRecords(ctx context.Context, req *model.NormalizeRecordsRequest) (model.NormalizeResponse, error) {
rawTableName := c.getRawTableName(req.FlowJobName)

normBatchID, err := c.GetLastNormalizeBatchID(ctx, req.FlowJobName)
if err != nil {
return nil, fmt.Errorf("failed to get batch for the current mirror: %v", err)
return model.NormalizeResponse{}, fmt.Errorf("failed to get batch for the current mirror: %v", err)
}

// normalize has caught up with sync, chill until more records are loaded.
if normBatchID >= req.SyncBatchID {
return &model.NormalizeResponse{
return model.NormalizeResponse{
Done: false,
StartBatchID: normBatchID,
EndBatchID: req.SyncBatchID,
Expand All @@ -408,16 +408,16 @@ func (c *BigQueryConnector) NormalizeRecords(ctx context.Context, req *model.Nor
SyncedAtColName: req.SyncedAtColName,
})
if mergeErr != nil {
return nil, mergeErr
return model.NormalizeResponse{}, mergeErr
}

err = c.UpdateNormalizeBatchID(ctx, req.FlowJobName, batchId)
if err != nil {
return nil, err
return model.NormalizeResponse{}, err
}
}

return &model.NormalizeResponse{
return model.NormalizeResponse{
Done: true,
StartBatchID: normBatchID + 1,
EndBatchID: req.SyncBatchID,
Expand Down
24 changes: 12 additions & 12 deletions flow/connectors/clickhouse/normalize.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,27 +231,27 @@ func getOrderedOrderByColumns(
func (c *ClickHouseConnector) NormalizeRecords(
ctx context.Context,
req *model.NormalizeRecordsRequest,
) (*model.NormalizeResponse, error) {
) (model.NormalizeResponse, error) {
// fix for potential consistency issues
time.Sleep(3 * time.Second)

normBatchID, err := c.GetLastNormalizeBatchID(ctx, req.FlowJobName)
if err != nil {
c.logger.Error("[clickhouse] error while getting last sync and normalize batch id", "error", err)
return nil, err
return model.NormalizeResponse{}, err
}

// normalize has caught up with sync, chill until more records are loaded.
if normBatchID >= req.SyncBatchID {
return &model.NormalizeResponse{
return model.NormalizeResponse{
Done: false,
StartBatchID: normBatchID,
EndBatchID: req.SyncBatchID,
}, nil
}

if err := c.copyAvroStagesToDestination(ctx, req.FlowJobName, normBatchID, req.SyncBatchID); err != nil {
return nil, fmt.Errorf("failed to copy avro stages to destination: %w", err)
return model.NormalizeResponse{}, fmt.Errorf("failed to copy avro stages to destination: %w", err)
}

destinationTableNames, err := c.getDistinctTableNamesInBatch(
Expand All @@ -262,17 +262,17 @@ func (c *ClickHouseConnector) NormalizeRecords(
)
if err != nil {
c.logger.Error("[clickhouse] error while getting distinct table names in batch", "error", err)
return nil, err
return model.NormalizeResponse{}, err
}

enablePrimaryUpdate, err := peerdbenv.PeerDBEnableClickHousePrimaryUpdate(ctx, req.Env)
if err != nil {
return nil, err
return model.NormalizeResponse{}, err
}

parallelNormalize, err := peerdbenv.PeerDBClickHouseParallelNormalize(ctx, req.Env)
if err != nil {
return nil, err
return model.NormalizeResponse{}, err
}
parallelNormalize = min(max(parallelNormalize, 1), len(destinationTableNames))
if parallelNormalize > 1 {
Expand Down Expand Up @@ -357,7 +357,7 @@ func (c *ClickHouseConnector) NormalizeRecords(
clickHouseType, err = colType.ToDWHColumnType(ctx, req.Env, protos.DBType_CLICKHOUSE, column)
if err != nil {
close(queries)
return nil, fmt.Errorf("error while converting column type to clickhouse type: %w", err)
return model.NormalizeResponse{}, fmt.Errorf("error while converting column type to clickhouse type: %w", err)
}

if (schema.NullableEnabled || columnNullableEnabled) && column.Nullable && !colType.IsArray() {
Expand Down Expand Up @@ -459,20 +459,20 @@ func (c *ClickHouseConnector) NormalizeRecords(
c.logger.Error("[clickhouse] context canceled while normalizing",
slog.Any("error", errCtx.Err()),
slog.Any("cause", context.Cause(errCtx)))
return nil, context.Cause(errCtx)
return model.NormalizeResponse{}, context.Cause(errCtx)
}
}
close(queries)
if err := group.Wait(); err != nil {
return nil, err
return model.NormalizeResponse{}, err
}

if err := c.UpdateNormalizeBatchID(ctx, req.FlowJobName, req.SyncBatchID); err != nil {
c.logger.Error("[clickhouse] error while updating normalize batch id", slog.Int64("BatchID", req.SyncBatchID), slog.Any("error", err))
return nil, err
return model.NormalizeResponse{}, err
}

return &model.NormalizeResponse{
return model.NormalizeResponse{
Done: true,
StartBatchID: normBatchID + 1,
EndBatchID: req.SyncBatchID,
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ type CDCNormalizeConnector interface {

// NormalizeRecords merges records pushed earlier into the destination table.
// This method should be idempotent, and should be able to be called multiple times with the same request.
NormalizeRecords(ctx context.Context, req *model.NormalizeRecordsRequest) (*model.NormalizeResponse, error)
NormalizeRecords(ctx context.Context, req *model.NormalizeRecordsRequest) (model.NormalizeResponse, error)
}

type CreateTablesFromExistingConnector interface {
Expand Down
Loading

0 comments on commit f9c2355

Please sign in to comment.