Skip to content

Commit

Permalink
more logging
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Dec 16, 2024
1 parent 989aefe commit 217fe99
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 28 deletions.
11 changes: 3 additions & 8 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,7 @@ func (a *FlowableActivity) StartNormalize(
return model.NormalizeResponse{}, fmt.Errorf("failed to get table name schema mapping: %w", err)
}

logger.Info("Normalizing batch", slog.Int64("SyncBatchID", input.SyncBatchID))
res, err := dstConn.NormalizeRecords(ctx, &model.NormalizeRecordsRequest{
FlowJobName: input.FlowConnectionConfigs.FlowJobName,
Env: input.FlowConnectionConfigs.Env,
Expand All @@ -480,11 +481,7 @@ func (a *FlowableActivity) StartNormalize(
a.Alerter.LogFlowError(ctx, input.FlowConnectionConfigs.FlowJobName, err)
return model.NormalizeResponse{}, fmt.Errorf("failed to normalized records: %w", err)
}
dstType, err := connectors.LoadPeerType(ctx, a.CatalogPool, input.FlowConnectionConfigs.DestinationName)
if err != nil {
return model.NormalizeResponse{}, fmt.Errorf("failed to get peer type: %w", err)
}
if dstType == protos.DBType_POSTGRES {
if _, dstPg := dstConn.(*connpostgres.PostgresConnector); dstPg {
if err := monitoring.UpdateEndTimeForCDCBatch(
ctx,
a.CatalogPool,
Expand All @@ -495,9 +492,7 @@ func (a *FlowableActivity) StartNormalize(
}
}

// log the number of batches normalized
logger.Info(fmt.Sprintf("normalized records from batch %d to batch %d",
res.StartBatchID, res.EndBatchID))
logger.Info("normalized batches", slog.Int64("StartBatchID", res.StartBatchID), slog.Int64("EndBatchID", res.EndBatchID))

return res, nil
}
Expand Down
5 changes: 2 additions & 3 deletions flow/connectors/clickhouse/normalize.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,9 +274,8 @@ func (c *ClickHouseConnector) NormalizeRecords(
return model.NormalizeResponse{}, err
}
parallelNormalize = min(max(parallelNormalize, 1), len(destinationTableNames))
if parallelNormalize > 1 {
c.logger.Info("normalizing in parallel", slog.Int("connections", parallelNormalize))
}
c.logger.Info("[clickhouse] normalizing batch",
slog.Int64("StartBatchID", normBatchID), slog.Int64("EndBatchID", req.SyncBatchID), slog.Int("connections", parallelNormalize))

queries := make(chan string)
rawTbl := c.getRawTableName(req.FlowJobName)
Expand Down
28 changes: 11 additions & 17 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,27 +503,22 @@ func CDCFlowWorkflow(
}
syncCtx := workflow.WithChildOptions(ctx, syncFlowOpts)

handleError := func(name string, err error) {
var panicErr *temporal.PanicError
if errors.As(err, &panicErr) {
logger.Error(
"panic in flow",
slog.String("name", name),
slog.Any("error", panicErr.Error()),
slog.String("stack", panicErr.StackTrace()),
)
} else {
logger.Error("error in flow", slog.String("name", name), slog.Any("error", err))
}
}

syncFlowFuture := workflow.ExecuteChildWorkflow(syncCtx, SyncFlowWorkflow, cfg, state.SyncFlowOptions)

mainLoopSelector := workflow.NewNamedSelector(ctx, "MainLoop")
mainLoopSelector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) {})
mainLoopSelector.AddFuture(syncFlowFuture, func(f workflow.Future) {
if err := f.Get(ctx, nil); err != nil {
handleError("sync", err)
var panicErr *temporal.PanicError
if errors.As(err, &panicErr) {
logger.Error(
"panic in sync flow",
slog.Any("error", panicErr.Error()),
slog.String("stack", panicErr.StackTrace()),
)
} else {
logger.Error("error in sync flow", slog.Any("error", err))
}
}

logger.Info("sync finished")
Expand Down Expand Up @@ -554,8 +549,7 @@ func CDCFlowWorkflow(
if state.ActiveSignal == model.PauseSignal || workflow.GetInfo(ctx).GetContinueAsNewSuggested() {
restart = true
if syncFlowFuture != nil {
err := model.SyncStopSignal.SignalChildWorkflow(ctx, syncFlowFuture, struct{}{}).Get(ctx, nil)
if err != nil {
if err := model.SyncStopSignal.SignalChildWorkflow(ctx, syncFlowFuture, struct{}{}).Get(ctx, nil); err != nil {
logger.Warn("failed to send sync-stop, finishing", slog.Any("error", err))
finished = true
}
Expand Down

0 comments on commit 217fe99

Please sign in to comment.