From c674b2166c3a78d922c66a9d7d5ed6d3c814f227 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Mon, 16 Dec 2024 19:33:52 +0000 Subject: [PATCH] more logging --- flow/activities/flowable.go | 7 ++----- flow/connectors/clickhouse/normalize.go | 5 ++--- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 497598751..08aeaae9f 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -467,6 +467,7 @@ func (a *FlowableActivity) StartNormalize( return model.NormalizeResponse{}, fmt.Errorf("failed to get table name schema mapping: %w", err) } + logger.Info("Normalizing batch", slog.Int64("SyncBatchID", input.SyncBatchID)) res, err := dstConn.NormalizeRecords(ctx, &model.NormalizeRecordsRequest{ FlowJobName: input.FlowConnectionConfigs.FlowJobName, Env: input.FlowConnectionConfigs.Env, @@ -480,11 +481,7 @@ func (a *FlowableActivity) StartNormalize( a.Alerter.LogFlowError(ctx, input.FlowConnectionConfigs.FlowJobName, err) return model.NormalizeResponse{}, fmt.Errorf("failed to normalized records: %w", err) } - dstType, err := connectors.LoadPeerType(ctx, a.CatalogPool, input.FlowConnectionConfigs.DestinationName) - if err != nil { - return model.NormalizeResponse{}, fmt.Errorf("failed to get peer type: %w", err) - } - if dstType == protos.DBType_POSTGRES { + if _, dstPg := dstConn.(*connpostgres.PostgresConnector); dstPg { if err := monitoring.UpdateEndTimeForCDCBatch( ctx, a.CatalogPool, diff --git a/flow/connectors/clickhouse/normalize.go b/flow/connectors/clickhouse/normalize.go index 9166104a4..62ec9a2e4 100644 --- a/flow/connectors/clickhouse/normalize.go +++ b/flow/connectors/clickhouse/normalize.go @@ -274,9 +274,8 @@ func (c *ClickHouseConnector) NormalizeRecords( return model.NormalizeResponse{}, err } parallelNormalize = min(max(parallelNormalize, 1), len(destinationTableNames)) - if parallelNormalize > 1 { - c.logger.Info("normalizing in parallel", slog.Int("connections", parallelNormalize)) - } + c.logger.Info("[clickhouse] normalizing batch", + slog.Int64("StartBatchID", normBatchID), slog.Int64("EndBatchID", req.SyncBatchID), slog.Int("connections", parallelNormalize)) queries := make(chan string) rawTbl := c.getRawTableName(req.FlowJobName)