Skip to content

Commit

Permalink
storage: move some stuff from mz_storage_client::client to mz_storage…
Browse files Browse the repository at this point in the history
…_client::statistics
  • Loading branch information
guswynn committed Feb 15, 2024
1 parent d057eb3 commit 8d7e8da
Show file tree
Hide file tree
Showing 11 changed files with 232 additions and 149 deletions.
2 changes: 2 additions & 0 deletions src/buf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ breaking:
- persist-types/src/stats.proto
# reason: does currently not require backward-compatibility
- storage-client/src/client.proto
# reason: does currently not require backward-compatibility
- storage-client/src/statistics.proto
# reason: currently does not require backward-compatibility
- storage-types/src/connections/aws.proto
lint:
Expand Down
9 changes: 8 additions & 1 deletion src/storage-client/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,13 @@ fn main() {
.extern_path(".mz_tracing", "::mz_tracing")
.extern_path(".mz_service", "::mz_service")
.extern_path(".mz_storage_types", "::mz_storage_types")
.compile_with_config(config, &["storage-client/src/client.proto"], &[".."])
.compile_with_config(
config,
&[
"storage-client/src/client.proto",
"storage-client/src/statistics.proto",
],
&[".."],
)
.unwrap_or_else(|e| panic!("{e}"))
}
25 changes: 3 additions & 22 deletions src/storage-client/src/client.proto
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import "cluster-client/src/client.proto";
import "storage-types/src/parameters.proto";
import "storage-types/src/sources.proto";
import "storage-types/src/sinks.proto";
import "storage-client/src/statistics.proto";

import "google/protobuf/empty.proto";

Expand Down Expand Up @@ -87,29 +88,9 @@ message ProtoStorageCommand {
}

message ProtoStorageResponse {
message ProtoSourceStatisticsUpdate {
mz_repr.global_id.ProtoGlobalId id = 1;
uint64 worker_id = 2;
bool snapshot_committed = 3;
uint64 messages_received = 4;
uint64 updates_staged = 5;
uint64 updates_committed = 6;
uint64 bytes_received = 7;
uint64 envelope_state_bytes = 8;
uint64 envelope_state_records = 9;
optional int64 rehydration_latency_ms = 10;
}
message ProtoSinkStatisticsUpdate {
mz_repr.global_id.ProtoGlobalId id = 1;
uint64 worker_id = 2;
uint64 messages_staged = 3;
uint64 messages_committed = 5;
uint64 bytes_staged = 4;
uint64 bytes_committed = 6;
}
message ProtoStatisticsUpdates {
repeated ProtoSourceStatisticsUpdate source_updates = 1;
repeated ProtoSinkStatisticsUpdate sink_updates = 2;
repeated mz_storage_client.statistics.ProtoSourceStatisticsUpdate source_updates = 1;
repeated mz_storage_client.statistics.ProtoSinkStatisticsUpdate sink_updates = 2;
}

message ProtoStatus {
Expand Down
126 changes: 6 additions & 120 deletions src/storage-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use std::iter;
use async_trait::async_trait;
use differential_dataflow::lattice::Lattice;
use mz_cluster_client::client::{ClusterStartupEpoch, TimelyConfig, TryIntoTimelyConfig};
use mz_ore::cast::CastFrom;
use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
use mz_repr::{Diff, GlobalId, Row};
use mz_service::client::{GenericClient, Partitionable, PartitionedState};
Expand All @@ -40,6 +39,7 @@ use tonic::{Request, Status as TonicStatus, Streaming};

use crate::client::proto_storage_server::ProtoStorage;
use crate::metrics::RehydratingStorageClientMetrics;
use crate::statistics::{SinkStatisticsUpdate, SourceStatisticsUpdate};

include!(concat!(env!("OUT_DIR"), "/mz_storage_client.client.rs"));

Expand Down Expand Up @@ -287,73 +287,6 @@ impl Arbitrary for StorageCommand<mz_repr::Timestamp> {
}
}

// These structure represents a full set up updates for the `mz_source_statistics_per_worker`
// and `mz_sink_statistics_per_worker` tables for a specific source-worker/sink-worker pair.
// They are structured like this for simplicity
// and efficiency: Each storage worker can individually collect and consolidate metrics,
// then control how much `StorageResponse` traffic is produced when sending updates
// back to the controller to be written.

#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct SourceStatisticsUpdate {
pub id: GlobalId,
pub worker_id: usize,
pub snapshot_committed: bool,
pub messages_received: u64,
pub bytes_received: u64,
pub updates_staged: u64,
pub updates_committed: u64,
pub envelope_state_bytes: u64,
pub envelope_state_records: u64,
pub rehydration_latency_ms: Option<i64>,
}

#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct SinkStatisticsUpdate {
pub id: GlobalId,
pub worker_id: usize,
pub messages_staged: u64,
pub messages_committed: u64,
pub bytes_staged: u64,
pub bytes_committed: u64,
}

/// A trait that abstracts over user-facing statistics objects, used
/// by `spawn_statistics_scraper`.
pub trait PackableStats {
/// Pack `self` into the `Row`.
fn pack(&self, packer: mz_repr::RowPacker<'_>);
}
impl PackableStats for SourceStatisticsUpdate {
fn pack(&self, mut packer: mz_repr::RowPacker<'_>) {
use mz_repr::Datum;
packer.push(Datum::from(self.id.to_string().as_str()));
packer.push(Datum::from(u64::cast_from(self.worker_id)));
packer.push(Datum::from(self.snapshot_committed));
packer.push(Datum::from(self.messages_received));
packer.push(Datum::from(self.bytes_received));
packer.push(Datum::from(self.updates_staged));
packer.push(Datum::from(self.updates_committed));
packer.push(Datum::from(self.envelope_state_bytes));
packer.push(Datum::from(self.envelope_state_records));
packer.push(Datum::from(
self.rehydration_latency_ms
.map(chrono::Duration::milliseconds),
));
}
}
impl PackableStats for SinkStatisticsUpdate {
fn pack(&self, mut packer: mz_repr::RowPacker<'_>) {
use mz_repr::Datum;
packer.push(Datum::from(self.id.to_string().as_str()));
packer.push(Datum::from(u64::cast_from(self.worker_id)));
packer.push(Datum::from(self.messages_staged));
packer.push(Datum::from(self.messages_committed));
packer.push(Datum::from(self.bytes_staged));
packer.push(Datum::from(self.bytes_committed));
}
}

/// A "kind" enum for statuses tracked by the health operator
#[derive(Copy, Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
pub enum Status {
Expand Down Expand Up @@ -567,10 +500,7 @@ pub enum StorageResponse<T = mz_repr::Timestamp> {
impl RustType<ProtoStorageResponse> for StorageResponse<mz_repr::Timestamp> {
fn into_proto(&self) -> ProtoStorageResponse {
use proto_storage_response::Kind::*;
use proto_storage_response::{
ProtoDroppedIds, ProtoSinkStatisticsUpdate, ProtoSourceStatisticsUpdate,
ProtoStatisticsUpdates, ProtoStatusUpdates,
};
use proto_storage_response::{ProtoDroppedIds, ProtoStatisticsUpdates, ProtoStatusUpdates};
ProtoStorageResponse {
kind: Some(match self {
StorageResponse::FrontierUppers(traces) => FrontierUppers(traces.into_proto()),
Expand All @@ -581,29 +511,11 @@ impl RustType<ProtoStorageResponse> for StorageResponse<mz_repr::Timestamp> {
Stats(ProtoStatisticsUpdates {
source_updates: source_stats
.iter()
.map(|update| ProtoSourceStatisticsUpdate {
id: Some(update.id.into_proto()),
worker_id: u64::cast_from(update.worker_id),
snapshot_committed: update.snapshot_committed,
messages_received: update.messages_received,
bytes_received: update.bytes_received,
updates_staged: update.updates_staged,
updates_committed: update.updates_committed,
envelope_state_bytes: update.envelope_state_bytes,
envelope_state_records: update.envelope_state_records,
rehydration_latency_ms: update.rehydration_latency_ms,
})
.map(|update| update.into_proto())
.collect(),
sink_updates: sink_stats
.iter()
.map(|update| ProtoSinkStatisticsUpdate {
id: Some(update.id.into_proto()),
worker_id: u64::cast_from(update.worker_id),
messages_staged: update.messages_staged,
messages_committed: update.messages_committed,
bytes_staged: update.bytes_staged,
bytes_committed: update.bytes_committed,
})
.map(|update| update.into_proto())
.collect(),
})
}
Expand All @@ -628,38 +540,12 @@ impl RustType<ProtoStorageResponse> for StorageResponse<mz_repr::Timestamp> {
stats
.source_updates
.into_iter()
.map(|update| {
Ok(SourceStatisticsUpdate {
id: update.id.into_rust_if_some(
"ProtoStorageResponse::stats::source_updates::id",
)?,
worker_id: usize::cast_from(update.worker_id),
snapshot_committed: update.snapshot_committed,
messages_received: update.messages_received,
updates_staged: update.updates_staged,
updates_committed: update.updates_committed,
bytes_received: update.bytes_received,
envelope_state_bytes: update.envelope_state_bytes,
envelope_state_records: update.envelope_state_records,
rehydration_latency_ms: update.rehydration_latency_ms,
})
})
.map(|update| update.into_rust())
.collect::<Result<Vec<_>, TryFromProtoError>>()?,
stats
.sink_updates
.into_iter()
.map(|update| {
Ok(SinkStatisticsUpdate {
id: update.id.into_rust_if_some(
"ProtoStorageResponse::stats::sink_updates::id",
)?,
worker_id: usize::cast_from(update.worker_id),
messages_staged: update.messages_staged,
messages_committed: update.messages_committed,
bytes_staged: update.bytes_staged,
bytes_committed: update.bytes_committed,
})
})
.map(|update| update.into_rust())
.collect::<Result<Vec<_>, TryFromProtoError>>()?,
)),
Some(StatusUpdates(ProtoStatusUpdates { updates })) => {
Expand Down
1 change: 1 addition & 0 deletions src/storage-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@ pub mod controller;
pub mod healthcheck;
pub mod metrics;
pub mod sink;
pub mod statistics;
pub mod util;
46 changes: 46 additions & 0 deletions src/storage-client/src/statistics.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

// buf breaking: ignore (does currently not require backward-compatibility)

syntax = "proto3";

import "repr/src/global_id.proto";

import "google/protobuf/empty.proto";

package mz_storage_client.statistics;

message ProtoSourceStatisticsUpdate {
mz_repr.global_id.ProtoGlobalId id = 1;

uint64 worker_id = 100;

uint64 messages_received = 2;
uint64 updates_staged = 3;
uint64 updates_committed = 4;
uint64 bytes_received = 5;

uint64 envelope_state_records = 7;
uint64 envelope_state_bytes = 6;
optional int64 rehydration_latency_ms = 8;

bool snapshot_committed = 11;
}

message ProtoSinkStatisticsUpdate {
mz_repr.global_id.ProtoGlobalId id = 1;

uint64 worker_id = 100;

uint64 messages_staged = 2;
uint64 messages_committed = 3;
uint64 bytes_staged = 4;
uint64 bytes_committed = 5;
}
Loading

0 comments on commit 8d7e8da

Please sign in to comment.