diff --git a/src/buf.yaml b/src/buf.yaml index a898b4d629aa5..cc2ec8ef5242f 100644 --- a/src/buf.yaml +++ b/src/buf.yaml @@ -47,6 +47,8 @@ breaking: - persist-types/src/stats.proto # reason: does currently not require backward-compatibility - storage-client/src/client.proto + # reason: does currently not require backward-compatibility + - storage-client/src/statistics.proto # reason: currently does not require backward-compatibility - storage-types/src/connections/aws.proto lint: diff --git a/src/storage-client/build.rs b/src/storage-client/build.rs index 831c7a750d535..cfb131aa7fece 100644 --- a/src/storage-client/build.rs +++ b/src/storage-client/build.rs @@ -43,6 +43,13 @@ fn main() { .extern_path(".mz_tracing", "::mz_tracing") .extern_path(".mz_service", "::mz_service") .extern_path(".mz_storage_types", "::mz_storage_types") - .compile_with_config(config, &["storage-client/src/client.proto"], &[".."]) + .compile_with_config( + config, + &[ + "storage-client/src/client.proto", + "storage-client/src/statistics.proto", + ], + &[".."], + ) .unwrap_or_else(|e| panic!("{e}")) } diff --git a/src/storage-client/src/client.proto b/src/storage-client/src/client.proto index 14fc03e85f685..d741765f74bf0 100644 --- a/src/storage-client/src/client.proto +++ b/src/storage-client/src/client.proto @@ -19,6 +19,7 @@ import "cluster-client/src/client.proto"; import "storage-types/src/parameters.proto"; import "storage-types/src/sources.proto"; import "storage-types/src/sinks.proto"; +import "storage-client/src/statistics.proto"; import "google/protobuf/empty.proto"; @@ -87,29 +88,9 @@ message ProtoStorageCommand { } message ProtoStorageResponse { - message ProtoSourceStatisticsUpdate { - mz_repr.global_id.ProtoGlobalId id = 1; - uint64 worker_id = 2; - bool snapshot_committed = 3; - uint64 messages_received = 4; - uint64 updates_staged = 5; - uint64 updates_committed = 6; - uint64 bytes_received = 7; - uint64 envelope_state_bytes = 8; - uint64 envelope_state_records = 9; - optional int64 rehydration_latency_ms = 10; - } - message ProtoSinkStatisticsUpdate { - mz_repr.global_id.ProtoGlobalId id = 1; - uint64 worker_id = 2; - uint64 messages_staged = 3; - uint64 messages_committed = 5; - uint64 bytes_staged = 4; - uint64 bytes_committed = 6; - } message ProtoStatisticsUpdates { - repeated ProtoSourceStatisticsUpdate source_updates = 1; - repeated ProtoSinkStatisticsUpdate sink_updates = 2; + repeated mz_storage_client.statistics.ProtoSourceStatisticsUpdate source_updates = 1; + repeated mz_storage_client.statistics.ProtoSinkStatisticsUpdate sink_updates = 2; } message ProtoStatus { diff --git a/src/storage-client/src/client.rs b/src/storage-client/src/client.rs index 469ad6504507d..3f0c5e62d2417 100644 --- a/src/storage-client/src/client.rs +++ b/src/storage-client/src/client.rs @@ -21,7 +21,6 @@ use std::iter; use async_trait::async_trait; use differential_dataflow::lattice::Lattice; use mz_cluster_client::client::{ClusterStartupEpoch, TimelyConfig, TryIntoTimelyConfig}; -use mz_ore::cast::CastFrom; use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError}; use mz_repr::{Diff, GlobalId, Row}; use mz_service::client::{GenericClient, Partitionable, PartitionedState}; @@ -40,6 +39,7 @@ use tonic::{Request, Status as TonicStatus, Streaming}; use crate::client::proto_storage_server::ProtoStorage; use crate::metrics::RehydratingStorageClientMetrics; +use crate::statistics::{SinkStatisticsUpdate, SourceStatisticsUpdate}; include!(concat!(env!("OUT_DIR"), "/mz_storage_client.client.rs")); @@ -287,73 +287,6 @@ impl Arbitrary for StorageCommand { } } -// These structure represents a full set up updates for the `mz_source_statistics_per_worker` -// and `mz_sink_statistics_per_worker` tables for a specific source-worker/sink-worker pair. -// They are structured like this for simplicity -// and efficiency: Each storage worker can individually collect and consolidate metrics, -// then control how much `StorageResponse` traffic is produced when sending updates -// back to the controller to be written. - -#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] -pub struct SourceStatisticsUpdate { - pub id: GlobalId, - pub worker_id: usize, - pub snapshot_committed: bool, - pub messages_received: u64, - pub bytes_received: u64, - pub updates_staged: u64, - pub updates_committed: u64, - pub envelope_state_bytes: u64, - pub envelope_state_records: u64, - pub rehydration_latency_ms: Option, -} - -#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] -pub struct SinkStatisticsUpdate { - pub id: GlobalId, - pub worker_id: usize, - pub messages_staged: u64, - pub messages_committed: u64, - pub bytes_staged: u64, - pub bytes_committed: u64, -} - -/// A trait that abstracts over user-facing statistics objects, used -/// by `spawn_statistics_scraper`. -pub trait PackableStats { - /// Pack `self` into the `Row`. - fn pack(&self, packer: mz_repr::RowPacker<'_>); -} -impl PackableStats for SourceStatisticsUpdate { - fn pack(&self, mut packer: mz_repr::RowPacker<'_>) { - use mz_repr::Datum; - packer.push(Datum::from(self.id.to_string().as_str())); - packer.push(Datum::from(u64::cast_from(self.worker_id))); - packer.push(Datum::from(self.snapshot_committed)); - packer.push(Datum::from(self.messages_received)); - packer.push(Datum::from(self.bytes_received)); - packer.push(Datum::from(self.updates_staged)); - packer.push(Datum::from(self.updates_committed)); - packer.push(Datum::from(self.envelope_state_bytes)); - packer.push(Datum::from(self.envelope_state_records)); - packer.push(Datum::from( - self.rehydration_latency_ms - .map(chrono::Duration::milliseconds), - )); - } -} -impl PackableStats for SinkStatisticsUpdate { - fn pack(&self, mut packer: mz_repr::RowPacker<'_>) { - use mz_repr::Datum; - packer.push(Datum::from(self.id.to_string().as_str())); - packer.push(Datum::from(u64::cast_from(self.worker_id))); - packer.push(Datum::from(self.messages_staged)); - packer.push(Datum::from(self.messages_committed)); - packer.push(Datum::from(self.bytes_staged)); - packer.push(Datum::from(self.bytes_committed)); - } -} - /// A "kind" enum for statuses tracked by the health operator #[derive(Copy, Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] pub enum Status { @@ -567,10 +500,7 @@ pub enum StorageResponse { impl RustType for StorageResponse { fn into_proto(&self) -> ProtoStorageResponse { use proto_storage_response::Kind::*; - use proto_storage_response::{ - ProtoDroppedIds, ProtoSinkStatisticsUpdate, ProtoSourceStatisticsUpdate, - ProtoStatisticsUpdates, ProtoStatusUpdates, - }; + use proto_storage_response::{ProtoDroppedIds, ProtoStatisticsUpdates, ProtoStatusUpdates}; ProtoStorageResponse { kind: Some(match self { StorageResponse::FrontierUppers(traces) => FrontierUppers(traces.into_proto()), @@ -581,29 +511,11 @@ impl RustType for StorageResponse { Stats(ProtoStatisticsUpdates { source_updates: source_stats .iter() - .map(|update| ProtoSourceStatisticsUpdate { - id: Some(update.id.into_proto()), - worker_id: u64::cast_from(update.worker_id), - snapshot_committed: update.snapshot_committed, - messages_received: update.messages_received, - bytes_received: update.bytes_received, - updates_staged: update.updates_staged, - updates_committed: update.updates_committed, - envelope_state_bytes: update.envelope_state_bytes, - envelope_state_records: update.envelope_state_records, - rehydration_latency_ms: update.rehydration_latency_ms, - }) + .map(|update| update.into_proto()) .collect(), sink_updates: sink_stats .iter() - .map(|update| ProtoSinkStatisticsUpdate { - id: Some(update.id.into_proto()), - worker_id: u64::cast_from(update.worker_id), - messages_staged: update.messages_staged, - messages_committed: update.messages_committed, - bytes_staged: update.bytes_staged, - bytes_committed: update.bytes_committed, - }) + .map(|update| update.into_proto()) .collect(), }) } @@ -628,38 +540,12 @@ impl RustType for StorageResponse { stats .source_updates .into_iter() - .map(|update| { - Ok(SourceStatisticsUpdate { - id: update.id.into_rust_if_some( - "ProtoStorageResponse::stats::source_updates::id", - )?, - worker_id: usize::cast_from(update.worker_id), - snapshot_committed: update.snapshot_committed, - messages_received: update.messages_received, - updates_staged: update.updates_staged, - updates_committed: update.updates_committed, - bytes_received: update.bytes_received, - envelope_state_bytes: update.envelope_state_bytes, - envelope_state_records: update.envelope_state_records, - rehydration_latency_ms: update.rehydration_latency_ms, - }) - }) + .map(|update| update.into_rust()) .collect::, TryFromProtoError>>()?, stats .sink_updates .into_iter() - .map(|update| { - Ok(SinkStatisticsUpdate { - id: update.id.into_rust_if_some( - "ProtoStorageResponse::stats::sink_updates::id", - )?, - worker_id: usize::cast_from(update.worker_id), - messages_staged: update.messages_staged, - messages_committed: update.messages_committed, - bytes_staged: update.bytes_staged, - bytes_committed: update.bytes_committed, - }) - }) + .map(|update| update.into_rust()) .collect::, TryFromProtoError>>()?, )), Some(StatusUpdates(ProtoStatusUpdates { updates })) => { diff --git a/src/storage-client/src/lib.rs b/src/storage-client/src/lib.rs index cb5cb4eb06551..89ad989f1192b 100644 --- a/src/storage-client/src/lib.rs +++ b/src/storage-client/src/lib.rs @@ -14,4 +14,5 @@ pub mod controller; pub mod healthcheck; pub mod metrics; pub mod sink; +pub mod statistics; pub mod util; diff --git a/src/storage-client/src/statistics.proto b/src/storage-client/src/statistics.proto new file mode 100644 index 0000000000000..a27bbd17c0ad0 --- /dev/null +++ b/src/storage-client/src/statistics.proto @@ -0,0 +1,46 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +// buf breaking: ignore (does currently not require backward-compatibility) + +syntax = "proto3"; + +import "repr/src/global_id.proto"; + +import "google/protobuf/empty.proto"; + +package mz_storage_client.statistics; + +message ProtoSourceStatisticsUpdate { + mz_repr.global_id.ProtoGlobalId id = 1; + + uint64 worker_id = 100; + + uint64 messages_received = 2; + uint64 updates_staged = 3; + uint64 updates_committed = 4; + uint64 bytes_received = 5; + + uint64 envelope_state_records = 7; + uint64 envelope_state_bytes = 6; + optional int64 rehydration_latency_ms = 8; + + bool snapshot_committed = 11; +} + +message ProtoSinkStatisticsUpdate { + mz_repr.global_id.ProtoGlobalId id = 1; + + uint64 worker_id = 100; + + uint64 messages_staged = 2; + uint64 messages_committed = 3; + uint64 bytes_staged = 4; + uint64 bytes_committed = 5; +} diff --git a/src/storage-client/src/statistics.rs b/src/storage-client/src/statistics.rs new file mode 100644 index 0000000000000..40dd06e8a5871 --- /dev/null +++ b/src/storage-client/src/statistics.rs @@ -0,0 +1,160 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +//! These structure represents a full set up updates for the `mz_source_statistics_per_worker` +//! and `mz_sink_statistics_per_worker` tables for a specific source-worker/sink-worker pair. +//! They are structured like this for simplicity +//! and efficiency: Each storage worker can individually collect and consolidate metrics, +//! then control how much `StorageResponse` traffic is produced when sending updates +//! back to the controller to be written. +//! +//! The proto conversions for this types are in the `client` module, for now. + +use serde::{Deserialize, Serialize}; + +use mz_ore::cast::CastFrom; +use mz_proto::{IntoRustIfSome, RustType, TryFromProtoError}; +use mz_repr::GlobalId; + +include!(concat!(env!("OUT_DIR"), "/mz_storage_client.statistics.rs")); + +/// A trait that abstracts over user-facing statistics objects, used +/// by `spawn_statistics_scraper`. +pub trait PackableStats { + /// Pack `self` into the `Row`. + fn pack(&self, packer: mz_repr::RowPacker<'_>); +} + +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +pub struct SourceStatisticsUpdate { + pub id: GlobalId, + pub worker_id: usize, + pub snapshot_committed: bool, + pub messages_received: u64, + pub bytes_received: u64, + pub updates_staged: u64, + pub updates_committed: u64, + pub envelope_state_bytes: u64, + pub envelope_state_records: u64, + pub rehydration_latency_ms: Option, +} + +impl PackableStats for SourceStatisticsUpdate { + fn pack(&self, mut packer: mz_repr::RowPacker<'_>) { + use mz_repr::Datum; + // id + packer.push(Datum::from(self.id.to_string().as_str())); + packer.push(Datum::from(u64::cast_from(self.worker_id))); + packer.push(Datum::from(self.snapshot_committed)); + packer.push(Datum::from(self.messages_received)); + packer.push(Datum::from(self.bytes_received)); + packer.push(Datum::from(self.updates_staged)); + packer.push(Datum::from(self.updates_committed)); + packer.push(Datum::from(self.envelope_state_bytes)); + packer.push(Datum::from(self.envelope_state_records)); + packer.push(Datum::from( + self.rehydration_latency_ms + .map(chrono::Duration::milliseconds), + )); + } +} + +impl RustType for SourceStatisticsUpdate { + fn into_proto(&self) -> ProtoSourceStatisticsUpdate { + ProtoSourceStatisticsUpdate { + id: Some(self.id.into_proto()), + + worker_id: u64::cast_from(self.worker_id), + + messages_received: self.messages_received, + bytes_received: self.bytes_received, + updates_staged: self.updates_staged, + updates_committed: self.updates_committed, + + envelope_state_records: self.envelope_state_records, + envelope_state_bytes: self.envelope_state_bytes, + rehydration_latency_ms: self.rehydration_latency_ms, + + snapshot_committed: self.snapshot_committed, + } + } + + fn from_proto(proto: ProtoSourceStatisticsUpdate) -> Result { + Ok(SourceStatisticsUpdate { + id: proto + .id + .into_rust_if_some("ProtoSourceStatisticsUpdate::id")?, + + worker_id: usize::cast_from(proto.worker_id), + + messages_received: proto.messages_received, + bytes_received: proto.bytes_received, + updates_staged: proto.updates_staged, + updates_committed: proto.updates_committed, + + envelope_state_records: proto.envelope_state_records, + envelope_state_bytes: proto.envelope_state_bytes, + rehydration_latency_ms: proto.rehydration_latency_ms, + + snapshot_committed: proto.snapshot_committed, + }) + } +} + +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +pub struct SinkStatisticsUpdate { + pub id: GlobalId, + pub worker_id: usize, + pub messages_staged: u64, + pub messages_committed: u64, + pub bytes_staged: u64, + pub bytes_committed: u64, +} + +impl PackableStats for SinkStatisticsUpdate { + fn pack(&self, mut packer: mz_repr::RowPacker<'_>) { + use mz_repr::Datum; + packer.push(Datum::from(self.id.to_string().as_str())); + packer.push(Datum::from(u64::cast_from(self.worker_id))); + packer.push(Datum::from(self.messages_staged)); + packer.push(Datum::from(self.messages_committed)); + packer.push(Datum::from(self.bytes_staged)); + packer.push(Datum::from(self.bytes_committed)); + } +} + +impl RustType for SinkStatisticsUpdate { + fn into_proto(&self) -> ProtoSinkStatisticsUpdate { + ProtoSinkStatisticsUpdate { + id: Some(self.id.into_proto()), + + worker_id: u64::cast_from(self.worker_id), + + messages_staged: self.messages_staged, + messages_committed: self.messages_committed, + bytes_staged: self.bytes_staged, + bytes_committed: self.bytes_committed, + } + } + + fn from_proto(proto: ProtoSinkStatisticsUpdate) -> Result { + Ok(SinkStatisticsUpdate { + id: proto + .id + .into_rust_if_some("ProtoSinkStatisticsUpdate::id")?, + + worker_id: usize::cast_from(proto.worker_id), + + messages_staged: proto.messages_staged, + messages_committed: proto.messages_committed, + bytes_staged: proto.bytes_staged, + bytes_committed: proto.bytes_committed, + }) + } +} diff --git a/src/storage-controller/src/lib.rs b/src/storage-controller/src/lib.rs index a8d7caf38fba2..e766a6c84951a 100644 --- a/src/storage-controller/src/lib.rs +++ b/src/storage-controller/src/lib.rs @@ -45,15 +45,15 @@ use mz_repr::{ColumnName, Datum, Diff, GlobalId, RelationDesc, Row, TimestampMan use mz_stash::{self, AppendBatch, StashFactory, TypedCollection}; use mz_stash_types::metrics::Metrics as StashMetrics; use mz_storage_client::client::{ - ProtoStorageCommand, ProtoStorageResponse, RunIngestionCommand, RunSinkCommand, - SinkStatisticsUpdate, SourceStatisticsUpdate, Status, StatusUpdate, StorageCommand, - StorageResponse, TimestamplessUpdate, + ProtoStorageCommand, ProtoStorageResponse, RunIngestionCommand, RunSinkCommand, Status, + StatusUpdate, StorageCommand, StorageResponse, TimestamplessUpdate, }; use mz_storage_client::controller::{ CollectionDescription, CollectionState, DataSource, DataSourceOther, ExportDescription, ExportState, IntrospectionType, MonotonicAppender, Response, SnapshotCursor, StorageController, }; use mz_storage_client::metrics::StorageControllerMetrics; +use mz_storage_client::statistics::{SinkStatisticsUpdate, SourceStatisticsUpdate}; use mz_storage_types::collections as proto; use mz_storage_types::configuration::StorageConfiguration; use mz_storage_types::connections::ConnectionContext; diff --git a/src/storage-controller/src/statistics.rs b/src/storage-controller/src/statistics.rs index 5e87be2fccb50..d904b5435b70f 100644 --- a/src/storage-controller/src/statistics.rs +++ b/src/storage-controller/src/statistics.rs @@ -21,7 +21,7 @@ use mz_ore::now::EpochMillis; use mz_persist_types::Codec64; use mz_repr::TimestampManipulation; use mz_repr::{GlobalId, Row}; -use mz_storage_client::client::PackableStats; +use mz_storage_client::statistics::PackableStats; use timely::progress::ChangeBatch; use timely::progress::Timestamp; use tokio::sync::oneshot; diff --git a/src/storage/src/internal_control.rs b/src/storage/src/internal_control.rs index 0b438587ccbcb..665ec07742e5d 100644 --- a/src/storage/src/internal_control.rs +++ b/src/storage/src/internal_control.rs @@ -11,7 +11,7 @@ use std::time::Instant; use mz_repr::{GlobalId, Row}; use mz_rocksdb::config::SharedWriteBufferManager; -use mz_storage_client::client::{SinkStatisticsUpdate, SourceStatisticsUpdate}; +use mz_storage_client::statistics::{SinkStatisticsUpdate, SourceStatisticsUpdate}; use mz_storage_types::controller::CollectionMetadata; use mz_storage_types::parameters::StorageParameters; use mz_storage_types::sinks::{MetadataFilled, StorageSinkDesc}; diff --git a/src/storage/src/statistics.rs b/src/storage/src/statistics.rs index bd9c1bb342c60..1160d70645c37 100644 --- a/src/storage/src/statistics.rs +++ b/src/storage/src/statistics.rs @@ -20,7 +20,7 @@ use mz_ore::metrics::{ MetricsRegistry, UIntGaugeVec, }; use mz_repr::{GlobalId, Timestamp}; -use mz_storage_client::client::{SinkStatisticsUpdate, SourceStatisticsUpdate}; +use mz_storage_client::statistics::{SinkStatisticsUpdate, SourceStatisticsUpdate}; use mz_storage_types::sources::SourceEnvelope; use prometheus::core::{AtomicI64, AtomicU64}; use timely::progress::frontier::Antichain;