From 55e967a7090a9d10b71d58aa2b8bf9c654da549d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Tue, 17 Dec 2024 01:42:51 +0000 Subject: [PATCH] more logs --- flow/activities/flowable_core.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/flow/activities/flowable_core.go b/flow/activities/flowable_core.go index 6eea134832..e9a28adbb9 100644 --- a/flow/activities/flowable_core.go +++ b/flow/activities/flowable_core.go @@ -276,6 +276,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon return err } syncBatchID += 1 + logger.Info("begin pulling records for batch", slog.Int64("SyncBatchID", syncBatchID)) if err := monitoring.AddCDCBatchForFlow(errCtx, a.CatalogPool, flowName, monitoring.CDCBatchInfo{ BatchID: syncBatchID, @@ -303,6 +304,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon return fmt.Errorf("failed to push records: %w", err) } + logger.Info("finished pulling records for batch", slog.Int64("SyncBatchID", syncBatchID)) return nil }) @@ -319,16 +321,12 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon } } - numRecords := res.NumRecordsSynced syncDuration := time.Since(syncStartTime) - - logger.Info(fmt.Sprintf("pushed %d records in %d seconds", numRecords, int(syncDuration.Seconds()))) - lastCheckpoint := recordBatchSync.GetLastCheckpoint() srcConn.UpdateReplStateLastOffset(lastCheckpoint) if err := monitoring.UpdateNumRowsAndEndLSNForCDCBatch( - ctx, a.CatalogPool, flowName, res.CurrentSyncBatchID, uint32(numRecords), lastCheckpoint, + ctx, a.CatalogPool, flowName, res.CurrentSyncBatchID, uint32(res.NumRecordsSynced), lastCheckpoint, ); err != nil { a.Alerter.LogFlowError(ctx, flowName, err) return 0, err @@ -346,7 +344,8 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon } } - pushedRecordsWithCount := fmt.Sprintf("pushed %d records", numRecords) + pushedRecordsWithCount := fmt.Sprintf("pushed %d records for batch %d in %v", + res.NumRecordsSynced, res.CurrentSyncBatchID, syncDuration.Truncate(time.Second)) activity.RecordHeartbeat(ctx, pushedRecordsWithCount) a.Alerter.LogFlowInfo(ctx, flowName, pushedRecordsWithCount)