Skip to content

Commit

Permalink
more logs
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Dec 17, 2024
1 parent cb55369 commit 55e967a
Showing 1 changed file with 5 additions and 6 deletions.
11 changes: 5 additions & 6 deletions flow/activities/flowable_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
return err
}
syncBatchID += 1
logger.Info("begin pulling records for batch", slog.Int64("SyncBatchID", syncBatchID))

if err := monitoring.AddCDCBatchForFlow(errCtx, a.CatalogPool, flowName, monitoring.CDCBatchInfo{
BatchID: syncBatchID,
Expand Down Expand Up @@ -303,6 +304,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
return fmt.Errorf("failed to push records: %w", err)
}

logger.Info("finished pulling records for batch", slog.Int64("SyncBatchID", syncBatchID))
return nil
})

Expand All @@ -319,16 +321,12 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
}
}

numRecords := res.NumRecordsSynced
syncDuration := time.Since(syncStartTime)

logger.Info(fmt.Sprintf("pushed %d records in %d seconds", numRecords, int(syncDuration.Seconds())))

lastCheckpoint := recordBatchSync.GetLastCheckpoint()
srcConn.UpdateReplStateLastOffset(lastCheckpoint)

if err := monitoring.UpdateNumRowsAndEndLSNForCDCBatch(
ctx, a.CatalogPool, flowName, res.CurrentSyncBatchID, uint32(numRecords), lastCheckpoint,
ctx, a.CatalogPool, flowName, res.CurrentSyncBatchID, uint32(res.NumRecordsSynced), lastCheckpoint,
); err != nil {
a.Alerter.LogFlowError(ctx, flowName, err)
return 0, err
Expand All @@ -346,7 +344,8 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
}
}

pushedRecordsWithCount := fmt.Sprintf("pushed %d records", numRecords)
pushedRecordsWithCount := fmt.Sprintf("pushed %d records for batch %d in %v",
res.NumRecordsSynced, res.CurrentSyncBatchID, syncDuration.Truncate(time.Second))
activity.RecordHeartbeat(ctx, pushedRecordsWithCount)
a.Alerter.LogFlowInfo(ctx, flowName, pushedRecordsWithCount)

Expand Down

0 comments on commit 55e967a

Please sign in to comment.