Skip to content

Commit

Permalink
gauges for current batch id & normalized batch id
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Dec 19, 2024
1 parent c0e5fba commit a3b2465
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 5 deletions.
19 changes: 19 additions & 0 deletions flow/activities/flowable_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,16 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
activity.RecordHeartbeat(ctx, pushedRecordsWithCount)
a.Alerter.LogFlowInfo(ctx, flowName, pushedRecordsWithCount)

if a.OtelManager != nil {
currentBatchID, err := a.OtelManager.GetOrInitInt64Gauge(
otel_metrics.BuildMetricName(otel_metrics.CurrentBatchIdGaugeName))
if err != nil {
logger.Error("Failed to get current batch id gauge", slog.Any("error", err))
} else {
currentBatchID.Record(ctx, res.CurrentSyncBatchID)
}
}

if err := a.applySchemaDeltas(ctx, config, options, res.TableSchemaDeltas); err != nil {
return 0, err
}
Expand Down Expand Up @@ -666,6 +676,15 @@ func (a *FlowableActivity) normalizeLoop(
} else if req.Done != nil {
close(req.Done)
}
if a.OtelManager != nil {
lastNormalizedBatchID, err := a.OtelManager.GetOrInitInt64Gauge(
otel_metrics.BuildMetricName(otel_metrics.LastNormalizedBatchIdGaugeName))
if err != nil {
logger.Error("Failed to get normalized batch id gauge", slog.Any("error", err))
} else {
lastNormalizedBatchID.Record(ctx, req.BatchID)
}
}
break
}
case <-syncDone:
Expand Down
14 changes: 9 additions & 5 deletions flow/otel_metrics/otel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,19 @@ import (
)

const (
SlotLagGaugeName string = "cdc_slot_lag"
OpenConnectionsGaugeName string = "open_connections"
OpenReplicationConnectionsGaugeName string = "open_replication_connections"
IntervalSinceLastNormalizeGaugeName string = "interval_since_last_normalize"
FetchedBytesCounterName string = "fetched_bytes"
SlotLagGaugeName = "cdc_slot_lag"
CurrentBatchIdGaugeName = "current_batch_id"
LastNormalizedBatchIdGaugeName = "last_normalized_batch_id"
OpenConnectionsGaugeName = "open_connections"
OpenReplicationConnectionsGaugeName = "open_replication_connections"
IntervalSinceLastNormalizeGaugeName = "interval_since_last_normalize"
FetchedBytesCounterName = "fetched_bytes"
)

type SlotMetricGauges struct {
SlotLagGauge metric.Float64Gauge
CurrentBatchIdGauge metric.Int64Gauge
LastNormalizedBatchIdGauge metric.Int64Gauge
OpenConnectionsGauge metric.Int64Gauge
OpenReplicationConnectionsGauge metric.Int64Gauge
IntervalSinceLastNormalizeGauge metric.Float64Gauge
Expand Down

0 comments on commit a3b2465

Please sign in to comment.