Skip to content

Commit

Permalink
storage: make it so snapshot_total and snapshot_total can start as NULL
Browse files Browse the repository at this point in the history
  • Loading branch information
guswynn committed Feb 14, 2024
1 parent da7a355 commit 5b291dd
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 22 deletions.
66 changes: 52 additions & 14 deletions src/storage-client/src/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,16 @@ pub static MZ_SOURCE_STATISTICS_RAW_DESC: Lazy<RelationDesc> = Lazy::new(|| {
// Sometimes resetted when the source can snapshot new pieces of upstream (like Postgres and
// MySql).
// (like pg and mysql) may repopulate this column when tables are added.
.with_column("snapshot_total", ScalarType::UInt64.nullable(false))
//
// `NULL` while we discover the snapshot size.
.with_column("snapshot_total", ScalarType::UInt64.nullable(true))
// A gauge of the number of _values_ (source defined unit) we have read of the _snapshot_
// of this source.
// Sometimes resetted when the source can snapshot new pieces of upstream (like Postgres and
// MySql).
.with_column("snapshot_read", ScalarType::UInt64.nullable(false))
//
// `NULL` while we discover the snapshot size.
.with_column("snapshot_read", ScalarType::UInt64.nullable(true))
//
// Non-resetting gauges
//
Expand Down Expand Up @@ -171,6 +175,40 @@ impl GaugeSemantics for ResettingLatency {
}
}

/// A numerical gauge that is always resets, but can start out as `NULL`.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Default)]
pub struct ResettingNullableTotal(Option<u64>);

impl GaugeSemantics for ResettingNullableTotal {
fn merge(&mut self, other: Self) {
match (&mut self.0, other.0) {
(None, _) | (_, None) => {
// If any of the values are `NULL`, then we merge to `NULL`
self.0 = None
}
// Sum across workers.
(Some(this), Some(other)) => *this += other,
}
}
fn incorporate(&mut self, other: Self, _field_name: &'static str) {
match (&mut self.0, other.0) {
(None, other) => {
self.0 = other;
}
// Sum across workers.
(Some(this), Some(other)) => *this = other,
(Some(_), None) => {
// `NULL`'s don't reset the value.
}
}
}
}

impl From<Option<u64>> for ResettingNullableTotal {
fn from(f: Option<u64>) -> Self {
ResettingNullableTotal(f)
}
}
/// A numerical gauge that is always resets.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Default)]
pub struct ResettingTotal(u64);
Expand Down Expand Up @@ -340,8 +378,8 @@ pub struct SourceStatisticsUpdate {
pub envelope_state_records: Gauge<ResettingTotal>,
pub envelope_state_bytes: Gauge<ResettingTotal>,
pub rehydration_latency_ms: Gauge<ResettingLatency>,
pub snapshot_total: SkippableGauge<ResettingTotal>,
pub snapshot_read: SkippableGauge<ResettingTotal>,
pub snapshot_total: Gauge<ResettingNullableTotal>,
pub snapshot_read: Gauge<ResettingNullableTotal>,

pub snapshot_committed: Gauge<Boolean>,
pub upstream_values: SkippableGauge<Total>,
Expand All @@ -366,8 +404,8 @@ impl SourceStatisticsUpdate {
// These are `Some(0)` and not `None` so `merge`-ing them
// does not produce `None` if all workers produced values.
rehydration_latency_ms: Gauge::gauge(Some(0)),
snapshot_total: SkippableGauge::gauge(Some(0)),
snapshot_read: SkippableGauge::gauge(Some(0)),
snapshot_total: Gauge::gauge(Some(0)),
snapshot_read: Gauge::gauge(Some(0)),

upstream_values: SkippableGauge::gauge(Some(0)),
committed_values: SkippableGauge::gauge(Some(0)),
Expand Down Expand Up @@ -472,8 +510,8 @@ impl PackableStats for SourceStatisticsUpdate {
.0
.map(|ms| mz_repr::adt::interval::Interval::new(0, 0, ms * 1000));
packer.push(Datum::from(rehydration_latency));
packer.push(Datum::from(self.snapshot_total.pack().0));
packer.push(Datum::from(self.snapshot_read.pack().0));
packer.push(Datum::from(self.snapshot_total.0 .0));
packer.push(Datum::from(self.snapshot_read.0 .0));
// Gauges
packer.push(Datum::from(self.snapshot_committed.0 .0));
packer.push(Datum::from(self.upstream_values.pack().0));
Expand All @@ -497,8 +535,8 @@ impl PackableStats for SourceStatisticsUpdate {
.unwrap()
.map(|int| int.micros),
),
snapshot_total: SkippableGauge::gauge(Some(iter.next().unwrap().unwrap_uint64())),
snapshot_read: SkippableGauge::gauge(Some(iter.next().unwrap().unwrap_uint64())),
snapshot_total: Gauge::gauge(<Option<u64>>::try_from(iter.next().unwrap()).unwrap()),
snapshot_read: Gauge::gauge(<Option<u64>>::try_from(iter.next().unwrap()).unwrap()),

snapshot_committed: Gauge::gauge(iter.next().unwrap().unwrap_bool()),
upstream_values: SkippableGauge::gauge(Some(iter.next().unwrap().unwrap_uint64())),
Expand All @@ -522,8 +560,8 @@ impl RustType<ProtoSourceStatisticsUpdate> for SourceStatisticsUpdate {
envelope_state_records: self.envelope_state_records.0 .0,
envelope_state_bytes: self.envelope_state_bytes.0 .0,
rehydration_latency_ms: self.rehydration_latency_ms.0 .0,
snapshot_total: self.snapshot_total.0.clone().map(|i| i.0),
snapshot_read: self.snapshot_read.0.clone().map(|i| i.0),
snapshot_total: self.snapshot_total.0 .0,
snapshot_read: self.snapshot_read.0 .0,

snapshot_committed: self.snapshot_committed.0 .0,
upstream_values: self.upstream_values.0.clone().map(|i| i.0),
Expand All @@ -545,8 +583,8 @@ impl RustType<ProtoSourceStatisticsUpdate> for SourceStatisticsUpdate {
envelope_state_records: Gauge::gauge(proto.envelope_state_records),
envelope_state_bytes: Gauge::gauge(proto.envelope_state_bytes),
rehydration_latency_ms: Gauge::gauge(proto.rehydration_latency_ms),
snapshot_total: SkippableGauge::gauge(proto.snapshot_total),
snapshot_read: SkippableGauge::gauge(proto.snapshot_read),
snapshot_total: Gauge::gauge(proto.snapshot_total),
snapshot_read: Gauge::gauge(proto.snapshot_read),

snapshot_committed: Gauge::gauge(proto.snapshot_committed),
upstream_values: SkippableGauge::gauge(proto.upstream_values),
Expand Down
4 changes: 2 additions & 2 deletions src/storage/src/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -429,8 +429,8 @@ impl SourceStatisticsRecord {
envelope_state_records: Gauge::gauge(envelope_state_records.unwrap()),
envelope_state_bytes: Gauge::gauge(envelope_state_bytes.unwrap()),
rehydration_latency_ms: Gauge::gauge(rehydration_latency_ms.unwrap()),
snapshot_total: SkippableGauge::gauge(snapshot_total.unwrap()),
snapshot_read: SkippableGauge::gauge(snapshot_read.unwrap()),
snapshot_total: Gauge::gauge(snapshot_total.unwrap()),
snapshot_read: Gauge::gauge(snapshot_read.unwrap()),
snapshot_committed: Gauge::gauge(snapshot_committed.unwrap()),
upstream_values: SkippableGauge::gauge(upstream_values.unwrap()),
committed_values: SkippableGauge::gauge(committed_values.unwrap()),
Expand Down
15 changes: 9 additions & 6 deletions test/testdrive/statistics-maintenance.td
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,13 @@ sink1 1 1 true true

> SELECT s.name,
SUM(u.updates_committed) > 0,
SUM(u.messages_received)
SUM(u.messages_received),
SUM(snapshot_read) IS NULL
FROM mz_sources s
JOIN mz_internal.mz_source_statistics_raw u ON s.id = u.id
WHERE s.name IN ('upsert1')
GROUP BY s.id, s.name
upsert1 true 1
upsert1 true 1 true

# Shut down the cluster
> ALTER CLUSTER cluster1 SET (REPLICATION FACTOR = 0)
Expand All @@ -74,12 +75,13 @@ sink1 1 1 true true

> SELECT s.name,
SUM(u.updates_committed) > 0,
SUM(u.messages_received)
SUM(u.messages_received),
SUM(snapshot_read) IS NULL
FROM mz_sources s
JOIN mz_internal.mz_source_statistics_raw u ON s.id = u.id
WHERE s.name IN ('upsert1')
GROUP BY s.id, s.name
upsert1 true 1
upsert1 true 1 true

# Ingest some more data, and ensure counters are maintained

Expand All @@ -98,9 +100,10 @@ sink1 2 2 true true

> SELECT s.name,
SUM(u.updates_committed) > 0,
SUM(u.messages_received)
SUM(u.messages_received),
SUM(snapshot_read) IS NULL
FROM mz_sources s
JOIN mz_internal.mz_source_statistics_raw u ON s.id = u.id
WHERE s.name IN ('upsert1')
GROUP BY s.id, s.name
upsert1 true 2
upsert1 true 2 true

0 comments on commit 5b291dd

Please sign in to comment.