From 217fe99cd8e51c9363a84e7d9eeeeac9a48370d8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Mon, 16 Dec 2024 19:33:52 +0000 Subject: [PATCH] more logging --- flow/activities/flowable.go | 11 +++------- flow/connectors/clickhouse/normalize.go | 5 ++--- flow/workflows/cdc_flow.go | 28 ++++++++++--------------- 3 files changed, 16 insertions(+), 28 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 4975987512..6e2232b905 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -467,6 +467,7 @@ func (a *FlowableActivity) StartNormalize( return model.NormalizeResponse{}, fmt.Errorf("failed to get table name schema mapping: %w", err) } + logger.Info("Normalizing batch", slog.Int64("SyncBatchID", input.SyncBatchID)) res, err := dstConn.NormalizeRecords(ctx, &model.NormalizeRecordsRequest{ FlowJobName: input.FlowConnectionConfigs.FlowJobName, Env: input.FlowConnectionConfigs.Env, @@ -480,11 +481,7 @@ func (a *FlowableActivity) StartNormalize( a.Alerter.LogFlowError(ctx, input.FlowConnectionConfigs.FlowJobName, err) return model.NormalizeResponse{}, fmt.Errorf("failed to normalized records: %w", err) } - dstType, err := connectors.LoadPeerType(ctx, a.CatalogPool, input.FlowConnectionConfigs.DestinationName) - if err != nil { - return model.NormalizeResponse{}, fmt.Errorf("failed to get peer type: %w", err) - } - if dstType == protos.DBType_POSTGRES { + if _, dstPg := dstConn.(*connpostgres.PostgresConnector); dstPg { if err := monitoring.UpdateEndTimeForCDCBatch( ctx, a.CatalogPool, @@ -495,9 +492,7 @@ func (a *FlowableActivity) StartNormalize( } } - // log the number of batches normalized - logger.Info(fmt.Sprintf("normalized records from batch %d to batch %d", - res.StartBatchID, res.EndBatchID)) + logger.Info("normalized batches", slog.Int64("StartBatchID", res.StartBatchID), slog.Int64("EndBatchID", res.EndBatchID)) return res, nil } diff --git a/flow/connectors/clickhouse/normalize.go b/flow/connectors/clickhouse/normalize.go index 9166104a45..62ec9a2e4c 100644 --- a/flow/connectors/clickhouse/normalize.go +++ b/flow/connectors/clickhouse/normalize.go @@ -274,9 +274,8 @@ func (c *ClickHouseConnector) NormalizeRecords( return model.NormalizeResponse{}, err } parallelNormalize = min(max(parallelNormalize, 1), len(destinationTableNames)) - if parallelNormalize > 1 { - c.logger.Info("normalizing in parallel", slog.Int("connections", parallelNormalize)) - } + c.logger.Info("[clickhouse] normalizing batch", + slog.Int64("StartBatchID", normBatchID), slog.Int64("EndBatchID", req.SyncBatchID), slog.Int("connections", parallelNormalize)) queries := make(chan string) rawTbl := c.getRawTableName(req.FlowJobName) diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 9358093b9d..a3764a6777 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -503,27 +503,22 @@ func CDCFlowWorkflow( } syncCtx := workflow.WithChildOptions(ctx, syncFlowOpts) - handleError := func(name string, err error) { - var panicErr *temporal.PanicError - if errors.As(err, &panicErr) { - logger.Error( - "panic in flow", - slog.String("name", name), - slog.Any("error", panicErr.Error()), - slog.String("stack", panicErr.StackTrace()), - ) - } else { - logger.Error("error in flow", slog.String("name", name), slog.Any("error", err)) - } - } - syncFlowFuture := workflow.ExecuteChildWorkflow(syncCtx, SyncFlowWorkflow, cfg, state.SyncFlowOptions) mainLoopSelector := workflow.NewNamedSelector(ctx, "MainLoop") mainLoopSelector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) {}) mainLoopSelector.AddFuture(syncFlowFuture, func(f workflow.Future) { if err := f.Get(ctx, nil); err != nil { - handleError("sync", err) + var panicErr *temporal.PanicError + if errors.As(err, &panicErr) { + logger.Error( + "panic in sync flow", + slog.Any("error", panicErr.Error()), + slog.String("stack", panicErr.StackTrace()), + ) + } else { + logger.Error("error in sync flow", slog.Any("error", err)) + } } logger.Info("sync finished") @@ -554,8 +549,7 @@ func CDCFlowWorkflow( if state.ActiveSignal == model.PauseSignal || workflow.GetInfo(ctx).GetContinueAsNewSuggested() { restart = true if syncFlowFuture != nil { - err := model.SyncStopSignal.SignalChildWorkflow(ctx, syncFlowFuture, struct{}{}).Get(ctx, nil) - if err != nil { + if err := model.SyncStopSignal.SignalChildWorkflow(ctx, syncFlowFuture, struct{}{}).Get(ctx, nil); err != nil { logger.Warn("failed to send sync-stop, finishing", slog.Any("error", err)) finished = true }