Skip to content

Commit

Permalink
more logging
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Dec 16, 2024
1 parent e397820 commit dbc8325
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 8 deletions.
7 changes: 2 additions & 5 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,7 @@ func (a *FlowableActivity) StartNormalize(
return model.NormalizeResponse{}, fmt.Errorf("failed to get table name schema mapping: %w", err)
}

logger.Info("Normalizing batch", slog.Int64("SyncBatchID", input.SyncBatchID))
res, err := dstConn.NormalizeRecords(ctx, &model.NormalizeRecordsRequest{
FlowJobName: input.FlowConnectionConfigs.FlowJobName,
Env: input.FlowConnectionConfigs.Env,
Expand All @@ -480,11 +481,7 @@ func (a *FlowableActivity) StartNormalize(
a.Alerter.LogFlowError(ctx, input.FlowConnectionConfigs.FlowJobName, err)
return model.NormalizeResponse{}, fmt.Errorf("failed to normalized records: %w", err)
}
dstType, err := connectors.LoadPeerType(ctx, a.CatalogPool, input.FlowConnectionConfigs.DestinationName)
if err != nil {
return model.NormalizeResponse{}, fmt.Errorf("failed to get peer type: %w", err)
}
if dstType == protos.DBType_POSTGRES {
if _, dstPg := dstConn.(*connpostgres.PostgresConnector); dstPg {
if err := monitoring.UpdateEndTimeForCDCBatch(
ctx,
a.CatalogPool,
Expand Down
5 changes: 2 additions & 3 deletions flow/connectors/clickhouse/normalize.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,9 +276,8 @@ func (c *ClickHouseConnector) NormalizeRecords(
return model.NormalizeResponse{}, err
}
parallelNormalize = min(max(parallelNormalize, 1), len(destinationTableNames))
if parallelNormalize > 1 {
c.logger.Info("normalizing in parallel", slog.Int("connections", parallelNormalize))
}
c.logger.Info("[clickhouse] normalizing batch",
slog.Int64("StartBatchID", normBatchID), slog.Int64("EndBatchID", req.SyncBatchID), slog.Int("connections", parallelNormalize))

queries := make(chan string)
rawTbl := c.getRawTableName(req.FlowJobName)
Expand Down

0 comments on commit dbc8325

Please sign in to comment.