From fd0456cd2eb51c1edad70f0893e1cdc6e3e227bf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Tue, 17 Dec 2024 00:17:20 +0000 Subject: [PATCH] retries, also avoid locking up on context cancelation --- flow/activities/flowable.go | 37 +++++++++++++++---------- flow/activities/flowable_core.go | 20 ++++++------- flow/connectors/bigquery/bigquery.go | 2 -- flow/connectors/clickhouse/normalize.go | 2 -- flow/connectors/postgres/postgres.go | 6 +--- flow/connectors/snowflake/snowflake.go | 2 -- flow/model/model.go | 8 ------ 7 files changed, 34 insertions(+), 43 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 5907101954..f83efd8221 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -39,7 +39,7 @@ type CheckConnectionResult struct { } type NormalizeBatchRequest struct { - Done chan model.NormalizeResponse + Done chan struct{} BatchID int64 } @@ -326,13 +326,23 @@ func (a *FlowableActivity) MaintainPull( if !ok { break loop } - res, err := a.StartNormalize(ctx, config, req.BatchID) - if err != nil { + retry: + if err := a.StartNormalize(ctx, config, req.BatchID); err != nil { a.Alerter.LogFlowError(ctx, config.FlowJobName, err) + for { + // update req to latest normalize request & retry + select { + case req = <-normalize: + case <-done: + break loop + case <-ctx.Done(): + break loop + default: + time.Sleep(time.Second) + goto retry + } + } } else if req.Done != nil { - req.Done <- res - } - if req.Done != nil { close(req.Done) } case <-done: @@ -436,7 +446,7 @@ func (a *FlowableActivity) StartNormalize( ctx context.Context, config *protos.FlowConnectionConfigs, batchID int64, -) (model.NormalizeResponse, error) { +) error { ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName) logger := activity.GetLogger(ctx) @@ -447,10 +457,9 @@ func (a *FlowableActivity) StartNormalize( config.DestinationName, ) if errors.Is(err, errors.ErrUnsupported) { - return model.NormalizeResponse{}, - monitoring.UpdateEndTimeForCDCBatch(ctx, a.CatalogPool, config.FlowJobName, batchID) + return monitoring.UpdateEndTimeForCDCBatch(ctx, a.CatalogPool, config.FlowJobName, batchID) } else if err != nil { - return model.NormalizeResponse{}, fmt.Errorf("failed to get normalize connector: %w", err) + return fmt.Errorf("failed to get normalize connector: %w", err) } defer connectors.CloseConnector(ctx, dstConn) @@ -461,7 +470,7 @@ func (a *FlowableActivity) StartNormalize( tableNameSchemaMapping, err := a.getTableNameSchemaMapping(ctx, config.FlowJobName) if err != nil { - return model.NormalizeResponse{}, fmt.Errorf("failed to get table name schema mapping: %w", err) + return fmt.Errorf("failed to get table name schema mapping: %w", err) } logger.Info("Normalizing batch", slog.Int64("SyncBatchID", batchID)) @@ -476,17 +485,17 @@ func (a *FlowableActivity) StartNormalize( }) if err != nil { a.Alerter.LogFlowError(ctx, config.FlowJobName, err) - return model.NormalizeResponse{}, fmt.Errorf("failed to normalized records: %w", err) + return fmt.Errorf("failed to normalized records: %w", err) } if _, dstPg := dstConn.(*connpostgres.PostgresConnector); dstPg { if err := monitoring.UpdateEndTimeForCDCBatch(ctx, a.CatalogPool, config.FlowJobName, batchID); err != nil { - return model.NormalizeResponse{}, fmt.Errorf("failed to update end time for cdc batch: %w", err) + return fmt.Errorf("failed to update end time for cdc batch: %w", err) } } logger.Info("normalized batches", slog.Int64("StartBatchID", res.StartBatchID), slog.Int64("EndBatchID", res.EndBatchID)) - return res, nil + return nil } // SetupQRepMetadataTables sets up the metadata tables for QReplication. diff --git a/flow/activities/flowable_core.go b/flow/activities/flowable_core.go index b6edd72486..6eea134832 100644 --- a/flow/activities/flowable_core.go +++ b/flow/activities/flowable_core.go @@ -3,7 +3,6 @@ package activities import ( "context" - "errors" "fmt" "log/slog" "reflect" @@ -360,19 +359,20 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon if err != nil { return 0, err } - var done chan model.NormalizeResponse + var done chan struct{} if !parallel { - done = make(chan model.NormalizeResponse) + done = make(chan struct{}) } - normChan <- NormalizeBatchRequest{ - BatchID: res.CurrentSyncBatchID, - Done: done, + select { + case normChan <- NormalizeBatchRequest{BatchID: res.CurrentSyncBatchID, Done: done}: + case <-ctx.Done(): + return 0, nil } if done != nil { - if normRes, ok := <-done; !ok { - return 0, errors.New("failed to normalize") - } else { - a.Alerter.LogFlowInfo(ctx, flowName, fmt.Sprintf("normalized from %d to %d", normRes.StartBatchID, normRes.EndBatchID)) + select { + case <-done: + case <-ctx.Done(): + return 0, nil } } } diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 73203d50fe..ffe0016445 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -399,7 +399,6 @@ func (c *BigQueryConnector) NormalizeRecords(ctx context.Context, req *model.Nor // normalize has caught up with sync, chill until more records are loaded. if normBatchID >= req.SyncBatchID { return model.NormalizeResponse{ - Done: false, StartBatchID: normBatchID, EndBatchID: req.SyncBatchID, }, nil @@ -423,7 +422,6 @@ func (c *BigQueryConnector) NormalizeRecords(ctx context.Context, req *model.Nor } return model.NormalizeResponse{ - Done: true, StartBatchID: normBatchID + 1, EndBatchID: req.SyncBatchID, }, nil diff --git a/flow/connectors/clickhouse/normalize.go b/flow/connectors/clickhouse/normalize.go index 62ec9a2e4c..60a79a6d9f 100644 --- a/flow/connectors/clickhouse/normalize.go +++ b/flow/connectors/clickhouse/normalize.go @@ -242,7 +242,6 @@ func (c *ClickHouseConnector) NormalizeRecords( // normalize has caught up with sync, chill until more records are loaded. if normBatchID >= req.SyncBatchID { return model.NormalizeResponse{ - Done: false, StartBatchID: normBatchID, EndBatchID: req.SyncBatchID, }, nil @@ -471,7 +470,6 @@ func (c *ClickHouseConnector) NormalizeRecords( } return model.NormalizeResponse{ - Done: true, StartBatchID: normBatchID + 1, EndBatchID: req.SyncBatchID, }, nil diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 40a10b9c23..6cd3be11e1 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -621,9 +621,7 @@ func (c *PostgresConnector) NormalizeRecords( // no SyncFlow has run, chill until more records are loaded. if !jobMetadataExists { c.logger.Info("no metadata found for mirror") - return model.NormalizeResponse{ - Done: false, - }, nil + return model.NormalizeResponse{}, nil } normBatchID, err := c.GetLastNormalizeBatchID(ctx, req.FlowJobName) @@ -636,7 +634,6 @@ func (c *PostgresConnector) NormalizeRecords( c.logger.Info(fmt.Sprintf("no records to normalize: syncBatchID %d, normalizeBatchID %d", req.SyncBatchID, normBatchID)) return model.NormalizeResponse{ - Done: false, StartBatchID: normBatchID, EndBatchID: req.SyncBatchID, }, nil @@ -708,7 +705,6 @@ func (c *PostgresConnector) NormalizeRecords( } return model.NormalizeResponse{ - Done: true, StartBatchID: normBatchID + 1, EndBatchID: req.SyncBatchID, }, nil diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index 3fadbff670..9bb66e5922 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -485,7 +485,6 @@ func (c *SnowflakeConnector) NormalizeRecords(ctx context.Context, req *model.No // normalize has caught up with sync, chill until more records are loaded. if normBatchID >= req.SyncBatchID { return model.NormalizeResponse{ - Done: false, StartBatchID: normBatchID, EndBatchID: req.SyncBatchID, }, nil @@ -511,7 +510,6 @@ func (c *SnowflakeConnector) NormalizeRecords(ctx context.Context, req *model.No } return model.NormalizeResponse{ - Done: true, StartBatchID: normBatchID + 1, EndBatchID: req.SyncBatchID, }, nil diff --git a/flow/model/model.go b/flow/model/model.go index d2f66693c0..5a9d7b76cc 100644 --- a/flow/model/model.go +++ b/flow/model/model.go @@ -169,15 +169,7 @@ type SyncResponse struct { CurrentSyncBatchID int64 } -type NormalizePayload struct { - TableNameSchemaMapping map[string]*protos.TableSchema - Done bool - SyncBatchID int64 -} - type NormalizeResponse struct { - // Flag to depict if normalization is done - Done bool StartBatchID int64 EndBatchID int64 }