Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
130897: changefeedccl: add changefeed.total_ranges metric r=rharding6373,asg0451 a=andyyang890

This patch adds a new `changefeed.total_ranges` metric that can be used to monitor the number of ranges that are watched by changefeed aggregators. It uses the existing `changefeed.lagging_ranges` polling mechanism and thus its polling interval is also controlled by the existing `lagging_ranges_polling_interval` option.

Fixes cockroachdb#124705

Release note (enterprise change): A new `changefeed.total_ranges` metric has been added and can be used to monitor the number of ranges that are watched by changefeed aggregators. It shares the same polling interval as `changefeed.lagging_ranges`, which is controlled by the existing `lagging_ranges_polling_interval` option.

Co-authored-by: Andy Yang <[email protected]>
  • Loading branch information
craig[bot] and andyyang890 committed Sep 19, 2024
2 parents a818801 + a0421e1 commit fbed78a
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 20 deletions.
1 change: 1 addition & 0 deletions docs/generated/metrics/metrics.html
Original file line number Diff line number Diff line change
Expand Up @@ -889,6 +889,7 @@
<tr><td>APPLICATION</td><td>changefeed.sink_batch_hist_nanos</td><td>Time spent batched in the sink buffer before being flushed and acknowledged</td><td>Changefeeds</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.sink_io_inflight</td><td>The number of keys currently inflight as IO requests being sent to the sink</td><td>Messages</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.size_based_flushes</td><td>Total size based flushes across all feeds</td><td>Flushes</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.total_ranges</td><td>The total number of ranges being watched by changefeed aggregators</td><td>Ranges</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.usage.error_count</td><td>Count of errors encountered while generating usage metrics for changefeeds</td><td>Errors</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.usage.query_duration</td><td>Time taken by the queries used to generate usage metrics for changefeeds</td><td>Nanoseconds</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.usage.table_bytes</td><td>Aggregated number of bytes of data per table watched by changefeeds</td><td>Storage</td><td>GAUGE</td><td>BYTES</td><td>AVG</td><td>NONE</td></tr>
Expand Down
54 changes: 54 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1392,6 +1392,60 @@ func TestChangefeedLaggingRangesMetrics(t *testing.T) {
cdcTest(t, testFn, feedTestNoTenants, feedTestEnterpriseSinks)
}

func TestChangefeedTotalRangesMetric(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
registry := s.Server.JobRegistry().(*jobs.Registry)
metrics := registry.MetricsStruct().Changefeed.(*Metrics)
defaultSLI, err := metrics.getSLIMetrics(defaultSLIScope)
require.NoError(t, err)
totalRanges := defaultSLI.TotalRanges

// Total ranges should start at zero.
require.Zero(t, totalRanges.Value())

assertTotalRanges := func(expected int64) {
testutils.SucceedsSoon(t, func() error {
if actual := totalRanges.Value(); expected != actual {
return errors.Newf("expected total ranges to be %d, but got %d", expected, actual)
}
return nil
})
}

sqlDB := sqlutils.MakeSQLRunner(s.DB)
sqlDB.Exec(t, "CREATE TABLE foo (x int)")

// We expect one range after creating a changefeed on a single table.
fooFeed := feed(t, f, "CREATE CHANGEFEED FOR foo WITH lagging_ranges_polling_interval='1s'")
assertTotalRanges(1)

// We expect total ranges to be zero again after pausing the changefeed.
require.NoError(t, fooFeed.(cdctest.EnterpriseTestFeed).Pause())
assertTotalRanges(0)

// We once again expect one range after resuming the changefeed.
require.NoError(t, fooFeed.(cdctest.EnterpriseTestFeed).Resume())
assertTotalRanges(1)

// We expect two ranges after starting another changefeed on a single table.
barFeed := feed(t, f, "CREATE CHANGEFEED FOR foo WITH lagging_ranges_polling_interval='1s'")
assertTotalRanges(2)

// We expect there to still be one range after cancelling one of the changefeeds.
require.NoError(t, fooFeed.Close())
assertTotalRanges(1)

// We expect there to be no ranges left after cancelling the other changefeed.
require.NoError(t, barFeed.Close())
assertTotalRanges(0)
}

cdcTest(t, testFn, feedTestEnterpriseSinks)
}

func TestChangefeedBackfillObservability(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
22 changes: 12 additions & 10 deletions pkg/ccl/changefeedccl/kvfeed/kv_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ import (
// the caller with information about the state of the kvfeed.
type MonitoringConfig struct {
// LaggingRangesCallback is called periodically with the number of lagging ranges
// in the kvfeed.
LaggingRangesCallback func(int64)
// and total ranges watched by the kvfeed.
LaggingRangesCallback func(lagging int64, total int64)
// LaggingRangesPollingInterval is how often the kv feed will poll for
// lagging ranges.
// lagging ranges and total ranges.
LaggingRangesPollingInterval time.Duration
// LaggingRangesThreshold is how far behind a range must be to be considered
// lagging.
Expand Down Expand Up @@ -176,15 +176,15 @@ func Run(ctx context.Context, cfg Config) error {

func startLaggingRangesObserver(
g ctxgroup.Group,
updateLaggingRanges func(int64),
updateLaggingRanges func(lagging int64, total int64),
pollingInterval time.Duration,
threshold time.Duration,
) func(fn kvcoord.ForEachRangeFn) {
) kvcoord.RangeObserver {
return func(fn kvcoord.ForEachRangeFn) {
g.GoCtx(func(ctx context.Context) error {
// Reset metrics on shutdown.
defer func() {
updateLaggingRanges(0)
updateLaggingRanges(0 /* lagging */, 0 /* total */)
}()

var timer timeutil.Timer
Expand All @@ -198,9 +198,11 @@ func startLaggingRangesObserver(
case <-timer.C:
timer.Read = true

count := int64(0)
var laggingCount, totalCount int64
thresholdTS := timeutil.Now().Add(-1 * threshold)
err := fn(func(rfCtx kvcoord.RangeFeedContext, feed kvcoord.PartialRangeFeed) error {
totalCount += 1

// The resolved timestamp of a range determines the timestamp which is caught up to.
// However, during catchup scans, this is not set. For catchup scans, we consider the
// time the partial rangefeed was created to be its resolved ts. Note that a range can
Expand All @@ -212,14 +214,14 @@ func startLaggingRangesObserver(
}

if ts.Less(hlc.Timestamp{WallTime: thresholdTS.UnixNano()}) {
count += 1
laggingCount += 1
}
return nil
})
if err != nil {
return err
}
updateLaggingRanges(count)
updateLaggingRanges(laggingCount, totalCount)
timer.Reset(pollingInterval)
}
}
Expand Down Expand Up @@ -251,7 +253,7 @@ type kvFeed struct {
codec keys.SQLCodec

onBackfillCallback func() func()
rangeObserver func(fn kvcoord.ForEachRangeFn)
rangeObserver kvcoord.RangeObserver
schemaChangeEvents changefeedbase.SchemaChangeEventClass
schemaChangePolicy changefeedbase.SchemaChangePolicy

Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type rangeFeedConfig struct {
Spans []kvcoord.SpanTimePair
WithDiff bool
WithFiltering bool
RangeObserver func(fn kvcoord.ForEachRangeFn)
RangeObserver kvcoord.RangeObserver
Knobs TestingKnobs
}

Expand Down
29 changes: 22 additions & 7 deletions pkg/ccl/changefeedccl/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ type AggMetrics struct {
AggregatorProgress *aggmetric.AggGauge
CheckpointProgress *aggmetric.AggGauge
LaggingRanges *aggmetric.AggGauge
TotalRanges *aggmetric.AggGauge
CloudstorageBufferedBytes *aggmetric.AggGauge
KafkaThrottlingNanos *aggmetric.AggHistogram

Expand Down Expand Up @@ -158,6 +159,7 @@ type sliMetrics struct {
AggregatorProgress *aggmetric.Gauge
CheckpointProgress *aggmetric.Gauge
LaggingRanges *aggmetric.Gauge
TotalRanges *aggmetric.Gauge
CloudstorageBufferedBytes *aggmetric.Gauge
KafkaThrottlingNanos *aggmetric.Histogram

Expand Down Expand Up @@ -937,12 +939,18 @@ func newAggregateMetrics(histogramWindow time.Duration, lookup *cidr.Lookup) *Ag
Measurement: "Unix Timestamp Nanoseconds",
Unit: metric.Unit_TIMESTAMP_NS,
}
metaLaggingRangePercentage := metric.Metadata{
metaLaggingRanges := metric.Metadata{
Name: "changefeed.lagging_ranges",
Help: "The number of ranges considered to be lagging behind",
Measurement: "Ranges",
Unit: metric.Unit_COUNT,
}
metaTotalRanges := metric.Metadata{
Name: "changefeed.total_ranges",
Help: "The total number of ranges being watched by changefeed aggregators",
Measurement: "Ranges",
Unit: metric.Unit_COUNT,
}
metaCloudstorageBufferedBytes := metric.Metadata{
Name: "changefeed.cloudstorage_buffered_bytes",
Help: "The number of bytes buffered in cloudstorage sink files which have not been emitted yet",
Expand Down Expand Up @@ -1046,7 +1054,8 @@ func newAggregateMetrics(histogramWindow time.Duration, lookup *cidr.Lookup) *Ag
SchemaRegistrations: b.Counter(metaSchemaRegistryRegistrations),
AggregatorProgress: b.FunctionalGauge(metaAggregatorProgress, functionalGaugeMinFn),
CheckpointProgress: b.FunctionalGauge(metaCheckpointProgress, functionalGaugeMinFn),
LaggingRanges: b.Gauge(metaLaggingRangePercentage),
LaggingRanges: b.Gauge(metaLaggingRanges),
TotalRanges: b.Gauge(metaTotalRanges),
CloudstorageBufferedBytes: b.Gauge(metaCloudstorageBufferedBytes),
KafkaThrottlingNanos: b.Histogram(metric.HistogramOptions{
Metadata: metaChangefeedKafkaThrottlingNanos,
Expand Down Expand Up @@ -1121,6 +1130,7 @@ func (a *AggMetrics) getOrCreateScope(scope string) (*sliMetrics, error) {
SchemaRegistryRetries: a.SchemaRegistryRetries.AddChild(scope),
SchemaRegistrations: a.SchemaRegistrations.AddChild(scope),
LaggingRanges: a.LaggingRanges.AddChild(scope),
TotalRanges: a.TotalRanges.AddChild(scope),
CloudstorageBufferedBytes: a.CloudstorageBufferedBytes.AddChild(scope),
KafkaThrottlingNanos: a.KafkaThrottlingNanos.AddChild(scope),
// TODO(#130358): Again, this doesn't belong here, but it's the most
Expand Down Expand Up @@ -1158,7 +1168,7 @@ func (a *AggMetrics) getOrCreateScope(scope string) (*sliMetrics, error) {
// getLaggingRangesCallback returns a function which can be called to update the
// lagging ranges metric. It should be called with the current number of lagging
// ranges.
func (s *sliMetrics) getLaggingRangesCallback() func(int64) {
func (s *sliMetrics) getLaggingRangesCallback() func(lagging int64, total int64) {
// Because this gauge is shared between changefeeds in the same metrics scope,
// we must instead modify it using `Inc` and `Dec` (as opposed to `Update`) to
// ensure values written by others are not overwritten. The code below is used
Expand All @@ -1175,13 +1185,18 @@ func (s *sliMetrics) getLaggingRangesCallback() func(int64) {
// If 1 lagging range is deleted, last=7,i=10: X.Dec(11-10) = X.Dec(1)
last := struct {
syncutil.Mutex
v int64
lagging int64
total int64
}{}
return func(i int64) {
return func(lagging int64, total int64) {
last.Lock()
defer last.Unlock()
s.LaggingRanges.Dec(last.v - i)
last.v = i

s.LaggingRanges.Dec(last.lagging - lagging)
last.lagging = lagging

s.TotalRanges.Dec(last.total - total)
last.total = total
}
}

Expand Down
8 changes: 6 additions & 2 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,17 @@ var catchupStartupRate = settings.RegisterIntSetting(
// ForEachRangeFn is used to execute `fn` over each range in a rangefeed.
type ForEachRangeFn func(fn ActiveRangeFeedIterFn) error

// A RangeObserver is a function that observes the ranges in a rangefeed
// by polling fn.
type RangeObserver func(fn ForEachRangeFn)

type rangeFeedConfig struct {
overSystemTable bool
withDiff bool
withFiltering bool
withMetadata bool
withMatchingOriginIDs []uint32
rangeObserver func(ForEachRangeFn)
rangeObserver RangeObserver

knobs struct {
// onRangefeedEvent invoked on each rangefeed event.
Expand Down Expand Up @@ -128,7 +132,7 @@ func WithMatchingOriginIDs(originIDs ...uint32) RangeFeedOption {

// WithRangeObserver is called when the rangefeed starts with a function that
// can be used to iterate over all the ranges.
func WithRangeObserver(observer func(ForEachRangeFn)) RangeFeedOption {
func WithRangeObserver(observer RangeObserver) RangeFeedOption {
return optionFunc(func(c *rangeFeedConfig) {
c.rangeObserver = observer
})
Expand Down

0 comments on commit fbed78a

Please sign in to comment.