From 90ca07bc3def9e9326e698a0c05d9d9abd0ad694 Mon Sep 17 00:00:00 2001 From: Daniel Harrison <52528+danhhz@users.noreply.github.com> Date: Mon, 25 Mar 2024 14:25:09 -0700 Subject: [PATCH 1/2] persist: inline small writes directly in consensus State Previously, a compare_and_append operation always required writing the data to Blob (s3) and then only putting a pointer to it in State (stored in Consensus/crdb). The latencies of s3 puts are both considerably higher and with a much fatter tail than crdb latencies. So, inline small batch parts directly where we would have put the pointer. - The threshold for small is made a LD param, so we can tune it. - Skip keeping pushdown stats for these parts. - Represent them using the new LazyProto abstraction developed for part stats. This keeps the inline parts as bytes::Bytes (which is in turn just a slice of the encoded ProtoStateRollup/ProtoStateDiff), only decoding them when they would have been "fetched". This considerably lessens the cost of deserializing these in places that aren't interested in the updates themselves (e.g. the controllers on envd). --- .../design/20240401_persist_inline_writes.md | 64 ++++ misc/python/materialize/mzcompose/__init__.py | 2 + src/persist-client/build.rs | 1 + src/persist-client/src/batch.rs | 312 ++++++++++++------ src/persist-client/src/cfg.rs | 2 + src/persist-client/src/cli/inspect.rs | 13 +- src/persist-client/src/fetch.rs | 81 ++++- src/persist-client/src/internal/compact.rs | 16 +- src/persist-client/src/internal/encoding.rs | 121 ++++++- src/persist-client/src/internal/gc.rs | 1 + src/persist-client/src/internal/machine.rs | 41 ++- src/persist-client/src/internal/metrics.rs | 38 ++- src/persist-client/src/internal/restore.rs | 2 + src/persist-client/src/internal/state.proto | 14 +- src/persist-client/src/internal/state.rs | 109 +++++- .../src/internal/state_serde.json | 216 +++++++++--- .../src/internal/state_versions.rs | 35 +- src/persist-client/src/iter.rs | 18 + .../src/operators/shard_source.rs | 12 +- src/persist-client/src/usage.rs | 27 +- src/persist-client/src/write.rs | 58 +++- src/persist-client/tests/machine/gc | 7 + src/persist-client/tests/machine/restore_blob | 6 + src/persist-proc/src/lib.rs | 14 +- src/persist/build.rs | 5 + src/persist/src/indexed/columnar.rs | 40 ++- src/persist/src/persist.proto | 10 + test/sqllogictest/explain-pushdown.slt | 6 + 28 files changed, 1044 insertions(+), 227 deletions(-) create mode 100644 doc/developer/design/20240401_persist_inline_writes.md diff --git a/doc/developer/design/20240401_persist_inline_writes.md b/doc/developer/design/20240401_persist_inline_writes.md new file mode 100644 index 0000000000000..c23e115e8f4d5 --- /dev/null +++ b/doc/developer/design/20240401_persist_inline_writes.md @@ -0,0 +1,64 @@ +# Persist Inline Writes + +- Associated: [#24832](https://github.com/MaterializeInc/materialize/pull/24832) + +## The Problem + +A non-empty persist write currently always requires a write to S3 and a write to +CockroachDB. S3 writes are much slower than CRDB writes and incur a per-PUT +charge, so in the case of very small writes, this is wasteful of both latency +and cost. + +Persist latencies directly impact the user experience of Materialize in a number +of ways. The above waste is particularly egregious in DDL, which may serially +perform a number of small persist writes. + +## Success Criteria + +Eligible persist writes have latency of `O(crdb write)` and not `O(s3 write)`. + +## Out of Scope + +- Latency guarantees: In particular, to keep persist metadata small, inline + writes are an optimization, they are not guaranteed. +- Read latencies. + +## Solution Proposal + +Persist internally has the concept of a _part_, which corresponds 1:1:1 with a +persist _blob_ and an _object_ in S3. Currently, all data in a persist shard is +stored in S3 and then a _hollow_ reference to it is stored in CRDB. This +reference also includes metadata, such as pushdown statistics and the encoded +size. + +We make this reference instead a two variant enum: `Hollow` and `Inline`. The +`Inline` variant stores the same data as would be written to s3, but in an +encoded protobuf. This protobuf is only decoded in data fetch paths, and is +otherwise passed around as opaque bytes to save allocations and cpu cycles. +Pushdown statistics and the encoded size are both unnecessary for inline parts. + +The persist state stored in CRDB is a control plane concept, so there is both a +performance and a stability risk from mixing the data plane into it. We reduce +the inline parts over time by making compaction flush them out to S3, never +inlining them. However, nothing prevents new writes from arriving faster than +compaction can pull them out. We protect the control plane with the following +two limits to create a hard upper bound on how much data can be inline: + +- `persist_inline_update_threshold_bytes`: An (exclusive) maximum size of a + write that persist will inline in metadata. +- `persist_inline_update_max_bytes`: An (inclusive) maximum total size of inline + writes in metadata. Any attempted writes beyond this threshold will instead + fall through to the s3 path. + +## Alternatives + +- In addition to S3, also store _blobs_ in CRDB (or a third technology). CRDB is + not tuned as a blob store and doesn't handle these workloads well. A third + technology would not be worth the additional operational burden. +- Make S3 faster. This is not actionable in the short term. + +## Open Questions + +- How do we tune the two thresholds? +- Should every inline write result in a compaction request for the new batch + immediately flushing it out to s3? diff --git a/misc/python/materialize/mzcompose/__init__.py b/misc/python/materialize/mzcompose/__init__.py index 4a9b6f92c182b..b16a2f5d90c08 100644 --- a/misc/python/materialize/mzcompose/__init__.py +++ b/misc/python/materialize/mzcompose/__init__.py @@ -87,6 +87,8 @@ "enable_worker_core_affinity": "true", "persist_batch_delete_enabled": "true", "persist_fast_path_limit": "1000", + "persist_inline_update_max_bytes": "1048576", + "persist_inline_update_threshold_bytes": "4096", "persist_pubsub_client_enabled": "true", "persist_pubsub_push_diff_enabled": "true", "persist_sink_minimum_batch_updates": "128", diff --git a/src/persist-client/build.rs b/src/persist-client/build.rs index 2971d76adb37c..73ec5e4ceeb10 100644 --- a/src/persist-client/build.rs +++ b/src/persist-client/build.rs @@ -66,6 +66,7 @@ fn main() { // is to re-run if any file in the crate changes; that's still a bit too // broad, but it's better. .emit_rerun_if_changed(false) + .extern_path(".mz_persist", "::mz_persist") .extern_path(".mz_persist_types", "::mz_persist_types") .extern_path(".mz_proto", "::mz_proto") .compile_with_config( diff --git a/src/persist-client/src/batch.rs b/src/persist-client/src/batch.rs index 34fd78a21d3aa..011f54504de59 100644 --- a/src/persist-client/src/batch.rs +++ b/src/persist-client/src/batch.rs @@ -43,11 +43,11 @@ use tracing::{debug_span, error, trace_span, warn, Instrument}; use crate::async_runtime::IsolatedRuntime; use crate::cfg::MiB; use crate::error::InvalidUsage; -use crate::internal::encoding::{LazyPartStats, Schemas}; +use crate::internal::encoding::{LazyInlineBatchPart, LazyPartStats, Schemas}; use crate::internal::machine::retry_external; use crate::internal::metrics::{BatchWriteMetrics, Metrics, RetryMetrics, ShardMetrics}; use crate::internal::paths::{PartId, PartialBatchKey, WriterKey}; -use crate::internal::state::{BatchPart, HollowBatch, HollowBatchPart}; +use crate::internal::state::{BatchPart, HollowBatch, HollowBatchPart, ProtoInlineBatchPart}; use crate::stats::{ part_stats_for_legacy_part, untrimmable_columns, STATS_BUDGET_BYTES, STATS_COLLECTION_ENABLED, }; @@ -228,6 +228,56 @@ where self.mark_consumed(); ret } + + pub(crate) async fn flush_to_blob( + &mut self, + cfg: &BatchBuilderConfig, + batch_metrics: &BatchWriteMetrics, + isolated_runtime: &Arc, + stats_schemas: &Schemas, + ) { + // It's necessary for correctness to keep the parts in the same order. + // We could introduce concurrency here with FuturesOrdered, but it would + // be pretty unexpected to have inline writes in more than one part, so + // don't bother. + let mut parts = Vec::new(); + for part in self.batch.parts.drain(..) { + let (updates, key_lower, ts_rewrite) = match part { + BatchPart::Hollow(x) => { + parts.push(BatchPart::Hollow(x)); + continue; + } + BatchPart::Inline { + updates, + key_lower, + ts_rewrite, + } => (updates, key_lower, ts_rewrite), + }; + let updates = updates.decode::().expect("valid inline part"); + let write_span = + debug_span!("batch::flush_to_blob", shard = %self.shard_metrics.shard_id) + .or_current(); + let handle = mz_ore::task::spawn( + || "batch::flush_to_blob", + BatchParts::write_part( + cfg.clone(), + Arc::clone(&self.blob), + Arc::clone(&self.metrics), + Arc::clone(&self.shard_metrics), + batch_metrics.clone(), + Arc::clone(isolated_runtime), + updates, + key_lower, + ts_rewrite, + stats_schemas.clone(), + ) + .instrument(write_span), + ); + let part = handle.await.expect("part write task failed"); + parts.push(part); + } + self.batch.parts = parts; + } } /// Indicates what work was done in a call to [BatchBuilder::add] @@ -248,6 +298,7 @@ pub struct BatchBuilderConfig { pub(crate) blob_target_size: usize, pub(crate) batch_delete_enabled: bool, pub(crate) batch_builder_max_outstanding_parts: usize, + pub(crate) inline_update_threshold_bytes: usize, pub(crate) stats_collection_enabled: bool, pub(crate) stats_budget: usize, pub(crate) stats_untrimmable_columns: Arc, @@ -272,6 +323,20 @@ pub(crate) const BLOB_TARGET_SIZE: Config = Config::new( "A target maximum size of persist blob payloads in bytes (Materialize).", ); +pub(crate) const INLINE_UPDATE_THRESHOLD_BYTES: Config = Config::new( + "persist_inline_update_threshold_bytes", + 0, + "The (exclusive) maximum size of a write that persist will inline in metadata.", +); + +pub(crate) const INLINE_UPDATE_MAX_BYTES: Config = Config::new( + "persist_inline_update_max_bytes", + 0, + "\ + The (inclusive) maximum total size of inline writes in metadata before \ + persist will backpressure them by flushing out to s3.", +); + impl BatchBuilderConfig { /// Initialize a batch builder config based on a snapshot of the Persist config. pub fn new(value: &PersistConfig, _writer_id: &WriterId) -> Self { @@ -283,6 +348,7 @@ impl BatchBuilderConfig { batch_builder_max_outstanding_parts: value .dynamic .batch_builder_max_outstanding_parts(), + inline_update_threshold_bytes: INLINE_UPDATE_THRESHOLD_BYTES.get(value), stats_collection_enabled: STATS_COLLECTION_ENABLED.get(value), stats_budget: STATS_BUDGET_BYTES.get(value), stats_untrimmable_columns: Arc::new(untrimmable_columns(value)), @@ -837,107 +903,53 @@ impl BatchParts { since: Antichain, ) { let desc = Description::new(self.lower.clone(), upper, since); - let metrics = Arc::clone(&self.metrics); - let shard_metrics = Arc::clone(&self.shard_metrics); - let blob = Arc::clone(&self.blob); - let isolated_runtime = Arc::clone(&self.isolated_runtime); let batch_metrics = self.batch_metrics.clone(); - let partial_key = PartialBatchKey::new(&self.cfg.writer_key, &PartId::new()); - let key = partial_key.complete(&self.shard_id); let index = u64::cast_from(self.finished_parts.len() + self.writing_parts.len()); - let stats_collection_enabled = self.cfg.stats_collection_enabled; - let stats_budget = self.cfg.stats_budget; - let schemas = schemas.clone(); - let untrimmable_columns = Arc::clone(&self.cfg.stats_untrimmable_columns); - - let write_span = debug_span!("batch::write_part", shard = %self.shard_id).or_current(); - let handle = mz_ore::task::spawn( - || "batch::write_part", - async move { - let goodbytes = updates.goodbytes(); - let batch = BlobTraceBatchPart { - desc, - updates: vec![updates], - index, - }; - - let (stats, (buf, encode_time)) = isolated_runtime - .spawn_named(|| "batch::encode_part", async move { - let stats = if stats_collection_enabled { - let stats_start = Instant::now(); - match part_stats_for_legacy_part(&schemas, &batch.updates) { - Ok(x) => { - let mut trimmed_bytes = 0; - let x = LazyPartStats::encode(&x, |s| { - trimmed_bytes = trim_to_budget(s, stats_budget, |s| { - untrimmable_columns.should_retain(s) - }); - }); - Some((x, stats_start.elapsed(), trimmed_bytes)) - } - Err(err) => { - error!("failed to construct part stats: {}", err); - None - } - } - } else { - None - }; - - let encode_start = Instant::now(); - let mut buf = Vec::new(); - batch.encode(&mut buf); - - // Drop batch as soon as we can to reclaim its memory. - drop(batch); - (stats, (Bytes::from(buf), encode_start.elapsed())) - }) - .instrument(debug_span!("batch::encode_part")) - .await - .expect("part encode task failed"); - // Can't use the `CodecMetrics::encode` helper because of async. - metrics.codecs.batch.encode_count.inc(); - metrics - .codecs - .batch - .encode_seconds - .inc_by(encode_time.as_secs_f64()); - - let start = Instant::now(); - let payload_len = buf.len(); - let () = retry_external(&metrics.retries.external.batch_set, || async { - shard_metrics.blob_sets.inc(); - blob.set(&key, Bytes::clone(&buf)).await - }) - .instrument(trace_span!("batch::set", payload_len)) - .await; - batch_metrics.seconds.inc_by(start.elapsed().as_secs_f64()); - batch_metrics.bytes.inc_by(u64::cast_from(payload_len)); - batch_metrics.goodbytes.inc_by(u64::cast_from(goodbytes)); - let stats = stats.map(|(stats, stats_step_timing, trimmed_bytes)| { - batch_metrics - .step_stats - .inc_by(stats_step_timing.as_secs_f64()); - if trimmed_bytes > 0 { - metrics.pushdown.parts_stats_trimmed_count.inc(); - metrics - .pushdown - .parts_stats_trimmed_bytes - .inc_by(u64::cast_from(trimmed_bytes)); + let ts_rewrite = None; + + let handle = if updates.goodbytes() < self.cfg.inline_update_threshold_bytes { + let span = debug_span!("batch::inline_part", shard = %self.shard_id).or_current(); + mz_ore::task::spawn( + || "batch::inline_part", + async move { + let updates = LazyInlineBatchPart::from(&ProtoInlineBatchPart { + desc: Some(desc.into_proto()), + index: index.into_proto(), + updates: Some(updates.into_proto()), + }); + BatchPart::Inline { + updates, + key_lower, + ts_rewrite, } - stats - }); - - BatchPart::Hollow(HollowBatchPart { - key: partial_key, - encoded_size_bytes: payload_len, + } + .instrument(span), + ) + } else { + let part = BlobTraceBatchPart { + desc, + updates: vec![updates], + index, + }; + let write_span = + debug_span!("batch::write_part", shard = %self.shard_metrics.shard_id).or_current(); + mz_ore::task::spawn( + || "batch::write_part", + BatchParts::write_part( + self.cfg.clone(), + Arc::clone(&self.blob), + Arc::clone(&self.metrics), + Arc::clone(&self.shard_metrics), + batch_metrics.clone(), + Arc::clone(&self.isolated_runtime), + part, key_lower, - stats, - ts_rewrite: None, - }) - } - .instrument(write_span), - ); + ts_rewrite, + schemas.clone(), + ) + .instrument(write_span), + ) + }; self.writing_parts.push_back(handle); while self.writing_parts.len() > self.cfg.batch_builder_max_outstanding_parts { @@ -954,6 +966,98 @@ impl BatchParts { } } + async fn write_part( + cfg: BatchBuilderConfig, + blob: Arc, + metrics: Arc, + shard_metrics: Arc, + batch_metrics: BatchWriteMetrics, + isolated_runtime: Arc, + updates: BlobTraceBatchPart, + key_lower: Vec, + ts_rewrite: Option>, + schemas: Schemas, + ) -> BatchPart { + let partial_key = PartialBatchKey::new(&cfg.writer_key, &PartId::new()); + let key = partial_key.complete(&shard_metrics.shard_id); + let goodbytes = updates.updates.iter().map(|x| x.goodbytes()).sum::(); + + let (stats, (buf, encode_time)) = isolated_runtime + .spawn_named(|| "batch::encode_part", async move { + let stats = if cfg.stats_collection_enabled { + let stats_start = Instant::now(); + match part_stats_for_legacy_part(&schemas, &updates.updates) { + Ok(x) => { + let mut trimmed_bytes = 0; + let x = LazyPartStats::encode(&x, |s| { + trimmed_bytes = trim_to_budget(s, cfg.stats_budget, |s| { + cfg.stats_untrimmable_columns.should_retain(s) + }); + }); + Some((x, stats_start.elapsed(), trimmed_bytes)) + } + Err(err) => { + error!("failed to construct part stats: {}", err); + None + } + } + } else { + None + }; + + let encode_start = Instant::now(); + let mut buf = Vec::new(); + updates.encode(&mut buf); + + // Drop batch as soon as we can to reclaim its memory. + drop(updates); + (stats, (Bytes::from(buf), encode_start.elapsed())) + }) + .instrument(debug_span!("batch::encode_part")) + .await + .expect("part encode task failed"); + // Can't use the `CodecMetrics::encode` helper because of async. + metrics.codecs.batch.encode_count.inc(); + metrics + .codecs + .batch + .encode_seconds + .inc_by(encode_time.as_secs_f64()); + + let start = Instant::now(); + let payload_len = buf.len(); + let () = retry_external(&metrics.retries.external.batch_set, || async { + shard_metrics.blob_sets.inc(); + blob.set(&key, Bytes::clone(&buf)).await + }) + .instrument(trace_span!("batch::set", payload_len)) + .await; + batch_metrics.seconds.inc_by(start.elapsed().as_secs_f64()); + batch_metrics.bytes.inc_by(u64::cast_from(payload_len)); + batch_metrics.goodbytes.inc_by(u64::cast_from(goodbytes)); + let stats = stats.map(|(stats, stats_step_timing, trimmed_bytes)| { + batch_metrics + .step_stats + .inc_by(stats_step_timing.as_secs_f64()); + if trimmed_bytes > 0 { + metrics.pushdown.parts_stats_trimmed_count.inc(); + metrics + .pushdown + .parts_stats_trimmed_bytes + .inc_by(u64::cast_from(trimmed_bytes)); + } + stats + }); + + BatchPart::Hollow(HollowBatchPart { + key: partial_key, + encoded_size_bytes: payload_len, + key_lower, + stats, + ts_rewrite, + }) + } + #[instrument(level = "debug", name = "batch::finish_upload", fields(shard = %self.shard_id))] pub(crate) async fn finish(self) -> Vec> { let mut parts = self.finished_parts; @@ -1016,11 +1120,15 @@ pub(crate) fn validate_truncate_batch( pub(crate) struct PartDeletes(BTreeSet); impl PartDeletes { - // Adds the part to the set to be deleted and returns true if it was already - // present. + // Adds the part to the set to be deleted and returns true if it was newly + // inserted. pub fn add(&mut self, part: &BatchPart) -> bool { match part { BatchPart::Hollow(x) => self.0.insert(x.key.clone()), + BatchPart::Inline { .. } => { + // Nothing to delete. + true + } } } @@ -1176,6 +1284,7 @@ mod tests { for part in &batch.batch.parts { let part = match part { BatchPart::Hollow(x) => x, + BatchPart::Inline { .. } => panic!("batch unexpectedly used inline part"), }; match BlobKey::parse_ids(&part.key.complete(&shard_id)) { Ok((shard, PartialBlobKey::Batch(writer, _))) => { @@ -1226,6 +1335,7 @@ mod tests { for part in &batch.batch.parts { let part = match part { BatchPart::Hollow(x) => x, + BatchPart::Inline { .. } => panic!("batch unexpectedly used inline part"), }; match BlobKey::parse_ids(&part.key.complete(&shard_id)) { Ok((shard, PartialBlobKey::Batch(writer, _))) => { diff --git a/src/persist-client/src/cfg.rs b/src/persist-client/src/cfg.rs index 0cd3aa6301f8b..d9918f2a1903e 100644 --- a/src/persist-client/src/cfg.rs +++ b/src/persist-client/src/cfg.rs @@ -312,6 +312,8 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet { mz_persist::cfg::all_dyn_configs(configs) .add(&crate::batch::BATCH_DELETE_ENABLED) .add(&crate::batch::BLOB_TARGET_SIZE) + .add(&crate::batch::INLINE_UPDATE_MAX_BYTES) + .add(&crate::batch::INLINE_UPDATE_THRESHOLD_BYTES) .add(&crate::cfg::CONSENSUS_CONNECTION_POOL_TTL_STAGGER) .add(&crate::cfg::CONSENSUS_CONNECTION_POOL_TTL) .add(&crate::cfg::CRDB_CONNECT_TIMEOUT) diff --git a/src/persist-client/src/cli/inspect.rs b/src/persist-client/src/cli/inspect.rs index 551740ae86028..01d231208b0b1 100644 --- a/src/persist-client/src/cli/inspect.rs +++ b/src/persist-client/src/cli/inspect.rs @@ -39,7 +39,7 @@ use crate::internal::encoding::{Rollup, UntypedState}; use crate::internal::paths::{ BlobKey, BlobKeyPrefix, PartialBatchKey, PartialBlobKey, PartialRollupKey, WriterKey, }; -use crate::internal::state::{BatchPart, HollowBatchPart, ProtoRollup, ProtoStateDiff, State}; +use crate::internal::state::{BatchPart, ProtoRollup, ProtoStateDiff, State}; use crate::rpc::NoopPubSubSender; use crate::usage::{HumanBytes, StorageUsageClient}; use crate::{Metrics, PersistClient, PersistConfig, ShardId}; @@ -347,17 +347,11 @@ pub async fn blob_batch_part( let parsed = BlobTraceBatchPart::::decode(&buf, &metrics.columnar).expect("decodable"); let desc = parsed.desc.clone(); - let part = HollowBatchPart { - key, - encoded_size_bytes: 0, - key_lower: vec![], - stats: None, - ts_rewrite: None, - }; let encoded_part = EncodedPart::new( metrics.read.snapshot.clone(), parsed.desc.clone(), - &part, + &key, + None, parsed, ); let mut out = BatchPartOutput { @@ -605,6 +599,7 @@ pub async fn unreferenced_blobs(args: &StateArgs) -> Result known_parts.insert(x.key.clone()), + BatchPart::Inline { .. } => continue, }; } } diff --git a/src/persist-client/src/fetch.rs b/src/persist-client/src/fetch.rs index 6948c86fd2528..94fefddbdefc8 100644 --- a/src/persist-client/src/fetch.rs +++ b/src/persist-client/src/fetch.rs @@ -36,7 +36,7 @@ use crate::batch::{ ProtoLeasedBatchPart, }; use crate::error::InvalidUsage; -use crate::internal::encoding::{LazyPartStats, LazyProto, Schemas}; +use crate::internal::encoding::{LazyInlineBatchPart, LazyPartStats, LazyProto, Schemas}; use crate::internal::machine::retry_external; use crate::internal::metrics::{Metrics, ReadMetrics, ShardMetrics}; use crate::internal::paths::BlobKey; @@ -120,6 +120,15 @@ where part: x.clone(), } } + BatchPart::Inline { + updates, + key_lower: _, + ts_rewrite, + } => FetchedBlobBuf::Inline { + desc: part.desc.clone(), + updates: updates.clone(), + ts_rewrite: ts_rewrite.clone(), + }, }; let fetched_blob = FetchedBlob { metrics: Arc::clone(&self.metrics), @@ -324,7 +333,7 @@ where read_metrics.part_goodbytes.inc_by(u64::cast_from( parsed.updates.iter().map(|x| x.goodbytes()).sum::(), )); - EncodedPart::new(read_metrics.clone(), registered_desc, part, parsed) + EncodedPart::from_hollow(read_metrics.clone(), registered_desc, part, parsed) }) } @@ -475,6 +484,11 @@ enum FetchedBlobBuf { buf: SegmentedBytes, part: HollowBatchPart, }, + Inline { + desc: Description, + updates: LazyInlineBatchPart, + ts_rewrite: Option>, + }, } impl Clone for FetchedBlob { @@ -506,6 +520,19 @@ impl FetchedBlob { + let parsed = EncodedPart::from_inline( + self.read_metrics.clone(), + desc.clone(), + updates, + ts_rewrite.as_ref(), + ); + (parsed, None) + } }; FetchedPart::new( Arc::clone(&self.metrics), @@ -711,14 +738,50 @@ where ) .await } + BatchPart::Inline { + updates, + key_lower: _, + ts_rewrite, + } => Ok(EncodedPart::from_inline( + read_metrics.clone(), + registered_desc.clone(), + updates, + ts_rewrite.as_ref(), + )), } } - pub(crate) fn new( + pub(crate) fn from_inline( + metrics: ReadMetrics, + desc: Description, + x: &LazyInlineBatchPart, + ts_rewrite: Option<&Antichain>, + ) -> Self { + let parsed = x.decode().expect("valid inline part"); + Self::new(metrics, desc, "inline", ts_rewrite, parsed) + } + + pub(crate) fn from_hollow( metrics: ReadMetrics, registered_desc: Description, part: &HollowBatchPart, parsed: BlobTraceBatchPart, + ) -> Self { + Self::new( + metrics, + registered_desc, + &part.key, + part.ts_rewrite.as_ref(), + parsed, + ) + } + + pub(crate) fn new( + metrics: ReadMetrics, + registered_desc: Description, + key: &str, + ts_rewrite: Option<&Antichain>, + parsed: BlobTraceBatchPart, ) -> Self { // There are two types of batches in persist: // - Batches written by a persist user (either directly or indirectly @@ -738,11 +801,11 @@ where assert!( PartialOrder::less_equal(inline_desc.lower(), registered_desc.lower()), "key={} inline={:?} registered={:?}", - part.key, + key, inline_desc, registered_desc ); - if part.ts_rewrite.is_none() { + if ts_rewrite.is_none() { // The ts rewrite feature allows us to advance the registered // upper of a batch that's already been staged (the inline // upper), so if it's been used, then there's no useful @@ -750,7 +813,7 @@ where assert!( PartialOrder::less_equal(registered_desc.upper(), inline_desc.upper()), "key={} inline={:?} registered={:?}", - part.key, + key, inline_desc, registered_desc ); @@ -763,7 +826,7 @@ where inline_desc.since(), &Antichain::from_elem(T::minimum()), "key={} inline={:?} registered={:?}", - part.key, + key, inline_desc, registered_desc ); @@ -771,7 +834,7 @@ where assert_eq!( inline_desc, ®istered_desc, "key={} inline={:?} registered={:?}", - part.key, inline_desc, registered_desc + key, inline_desc, registered_desc ); } @@ -780,7 +843,7 @@ where registered_desc, part: Arc::new(parsed), needs_truncation, - ts_rewrite: part.ts_rewrite.clone(), + ts_rewrite: ts_rewrite.cloned(), } } diff --git a/src/persist-client/src/internal/compact.rs b/src/persist-client/src/internal/compact.rs index 9a48c224c3a9a..c037ebc91f928 100644 --- a/src/persist-client/src/internal/compact.rs +++ b/src/persist-client/src/internal/compact.rs @@ -685,7 +685,7 @@ where metrics.compaction.batch.clone(), desc.lower().clone(), Arc::clone(&blob), - isolated_runtime, + Arc::clone(&isolated_runtime), shard_id.clone(), cfg.version.clone(), desc.since().clone(), @@ -736,7 +736,17 @@ where } tokio::task::yield_now().await; } - let batch = batch.finish(&real_schemas, desc.upper().clone()).await?; + let mut batch = batch.finish(&real_schemas, desc.upper().clone()).await?; + // Use compaction as a method of getting inline writes out of state, to + // make room for more inline writes. + let () = batch + .flush_to_blob( + &cfg.batch, + &metrics.compaction.batch, + &isolated_runtime, + &real_schemas, + ) + .await; let hollow_batch = batch.into_hollow_batch(); timings.record(&metrics); @@ -873,6 +883,7 @@ mod tests { assert_eq!(res.output.parts.len(), 1); let part = match &res.output.parts[0] { BatchPart::Hollow(x) => x, + BatchPart::Inline { .. } => panic!("test outputs a hollow part"), }; let (part, updates) = expect_fetch_part( write.blob.as_ref(), @@ -958,6 +969,7 @@ mod tests { assert_eq!(res.output.parts.len(), 1); let part = match &res.output.parts[0] { BatchPart::Hollow(x) => x, + BatchPart::Inline { .. } => panic!("test outputs a hollow part"), }; let (part, updates) = expect_fetch_part( write.blob.as_ref(), diff --git a/src/persist-client/src/internal/encoding.rs b/src/persist-client/src/internal/encoding.rs index b5538fe41a1a8..6ae01748257e2 100644 --- a/src/persist-client/src/internal/encoding.rs +++ b/src/persist-client/src/internal/encoding.rs @@ -10,6 +10,7 @@ use std::cmp::Ordering; use std::collections::BTreeMap; use std::fmt::Debug; +use std::hash::{Hash, Hasher}; use std::marker::PhantomData; use std::sync::Arc; @@ -17,6 +18,7 @@ use bytes::{Buf, Bytes}; use differential_dataflow::lattice::Lattice; use differential_dataflow::trace::Description; use mz_ore::halt; +use mz_persist::indexed::encoding::BlobTraceBatchPart; use mz_persist::location::{SeqNo, VersionedData}; use mz_persist_types::stats::{PartStats, ProtoStructStats}; use mz_persist_types::{Codec, Codec64}; @@ -25,6 +27,7 @@ use proptest::prelude::Arbitrary; use proptest::strategy::Strategy; use prost::Message; use semver::Version; +use serde::ser::SerializeStruct; use serde::{Deserialize, Serialize}; use timely::progress::{Antichain, Timestamp}; use uuid::Uuid; @@ -38,10 +41,11 @@ use crate::internal::state::{ HollowBatchPart, HollowRollup, IdempotencyToken, LeasedReaderState, OpaqueState, ProtoCriticalReaderState, ProtoFuelingMerge, ProtoHandleDebugState, ProtoHollowBatch, ProtoHollowBatchPart, ProtoHollowRollup, ProtoIdFuelingMerge, ProtoIdHollowBatch, - ProtoIdSpineBatch, ProtoInlinedDiffs, ProtoLeasedReaderState, ProtoRollup, ProtoSpineBatch, - ProtoSpineId, ProtoStateDiff, ProtoStateField, ProtoStateFieldDiffType, ProtoStateFieldDiffs, - ProtoTrace, ProtoU64Antichain, ProtoU64Description, ProtoVersionedData, ProtoWriterState, - State, StateCollections, TypedState, WriterState, + ProtoIdSpineBatch, ProtoInlineBatchPart, ProtoInlinedDiffs, ProtoLeasedReaderState, + ProtoRollup, ProtoSpineBatch, ProtoSpineId, ProtoStateDiff, ProtoStateField, + ProtoStateFieldDiffType, ProtoStateFieldDiffs, ProtoTrace, ProtoU64Antichain, + ProtoU64Description, ProtoVersionedData, ProtoWriterState, State, StateCollections, TypedState, + WriterState, }; use crate::internal::state_diff::{ ProtoStateFieldDiff, ProtoStateFieldDiffsWriter, StateDiff, StateFieldDiff, StateFieldValDiff, @@ -133,6 +137,13 @@ impl Ord for LazyProto { } } +impl Hash for LazyProto { + fn hash(&self, state: &mut H) { + let LazyProto { buf, _phantom } = self; + buf.hash(state); + } +} + impl From<&T> for LazyProto { fn from(value: &T) -> Self { let buf = Bytes::from(value.encode_to_vec()); @@ -1240,6 +1251,17 @@ impl RustType for BatchPart { key_stats: x.stats.into_proto(), ts_rewrite: x.ts_rewrite.as_ref().map(|x| x.into_proto()), }, + BatchPart::Inline { + updates, + key_lower, + ts_rewrite, + } => ProtoHollowBatchPart { + kind: Some(proto_hollow_batch_part::Kind::Inline(updates.into_proto())), + encoded_size_bytes: 0, + key_lower: Bytes::copy_from_slice(key_lower), + key_stats: None, + ts_rewrite: ts_rewrite.as_ref().map(|x| x.into_proto()), + }, } } @@ -1248,17 +1270,27 @@ impl RustType for BatchPart { Some(ts_rewrite) => Some(ts_rewrite.into_rust()?), None => None, }; - let encoded_size_bytes = proto.encoded_size_bytes.into_rust()?; match proto.kind { Some(proto_hollow_batch_part::Kind::Key(key)) => { Ok(BatchPart::Hollow(HollowBatchPart { key: key.into_rust()?, - encoded_size_bytes, + encoded_size_bytes: proto.encoded_size_bytes.into_rust()?, key_lower: proto.key_lower.into(), stats: proto.key_stats.into_rust()?, ts_rewrite, })) } + Some(proto_hollow_batch_part::Kind::Inline(x)) => { + assert_eq!(proto.encoded_size_bytes, 0); + assert!(proto.key_stats.is_none()); + let key_lower = proto.key_lower.into(); + let updates = LazyInlineBatchPart(x.into_rust()?); + Ok(BatchPart::Inline { + updates, + key_lower, + ts_rewrite, + }) + } None => Err(TryFromProtoError::unknown_enum_variant( "ProtoHollowBatchPart::kind", )), @@ -1327,6 +1359,81 @@ impl Arbitrary for LazyPartStats { } } +impl RustType for BlobTraceBatchPart { + fn into_proto(&self) -> ProtoInlineBatchPart { + // BlobTraceBatchPart has a Vec. Inline writes only + // needs one and it's nice to only have to model one at the + // ProtoInlineBatchPart level. I'm _pretty_ sure that the actual + // BlobTraceBatchPart we've serialized into parquet always have exactly + // one (_maaaaaybe_ zero or one), but that's a scary thing to start + // enforcing, so separate it out (so we can e.g. use sentry errors! to + // confirm before rolling anything out). In the meantime, just construct + // the ProtoInlineBatchPart directly in BatchParts where it still knows + // that it has exactly one ColumnarRecords. + unreachable!("unused") + } + + fn from_proto(proto: ProtoInlineBatchPart) -> Result { + Ok(BlobTraceBatchPart { + desc: proto.desc.into_rust_if_some("ProtoInlineBatchPart::desc")?, + index: proto.index.into_rust()?, + updates: vec![proto + .updates + .into_rust_if_some("ProtoInlineBatchPart::updates")?], + }) + } +} + +/// A batch part stored inlined in State. +#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct LazyInlineBatchPart(LazyProto); + +impl From<&ProtoInlineBatchPart> for LazyInlineBatchPart { + fn from(value: &ProtoInlineBatchPart) -> Self { + LazyInlineBatchPart(value.into()) + } +} + +impl Serialize for LazyInlineBatchPart { + fn serialize(&self, s: S) -> Result { + // NB: This serialize impl is only used for QA and debugging, so emit a + // truncated version. + let proto = self.0.decode().expect("valid proto"); + let mut s = s.serialize_struct("InlineBatchPart", 3)?; + let () = s.serialize_field("desc", &proto.desc)?; + let () = s.serialize_field("index", &proto.index)?; + let () = s.serialize_field("updates[len]", &proto.updates.map_or(0, |x| x.len))?; + s.end() + } +} + +impl LazyInlineBatchPart { + pub(crate) fn encoded_size_bytes(&self) -> usize { + self.0.buf.len() + } + + /// Decodes and returns PartStats from the encoded representation. + /// + /// This does not cache the returned value, it decodes each time it's + /// called. + pub fn decode( + &self, + ) -> Result, TryFromProtoError> { + let proto = self.0.decode().expect("valid proto"); + proto.into_rust() + } +} + +impl RustType for LazyInlineBatchPart { + fn into_proto(&self) -> Bytes { + self.0.into_proto() + } + + fn from_proto(proto: Bytes) -> Result { + Ok(LazyInlineBatchPart(proto.into_rust()?)) + } +} + impl RustType for HollowRollup { fn into_proto(&self) -> ProtoHollowRollup { ProtoHollowRollup { @@ -1392,7 +1499,7 @@ mod tests { use crate::internal::paths::PartialRollupKey; use crate::internal::state::tests::any_state; - use crate::internal::state::HandleDebugState; + use crate::internal::state::{BatchPart, HandleDebugState}; use crate::internal::state_diff::StateDiff; use crate::tests::new_test_client_cache; use crate::ShardId; diff --git a/src/persist-client/src/internal/gc.rs b/src/persist-client/src/internal/gc.rs index d6b24bb624e31..dd62371ff6f7c 100644 --- a/src/persist-client/src/internal/gc.rs +++ b/src/persist-client/src/internal/gc.rs @@ -471,6 +471,7 @@ where BatchPart::Hollow(x) => { assert_eq!(batch_parts_to_delete.get(&x.key), None) } + BatchPart::Inline { .. } => {} } } } diff --git a/src/persist-client/src/internal/machine.rs b/src/persist-client/src/internal/machine.rs index 9c0d1cc7229b4..73d9843966a1c 100644 --- a/src/persist-client/src/internal/machine.rs +++ b/src/persist-client/src/internal/machine.rs @@ -32,6 +32,7 @@ use timely::PartialOrder; use tracing::{debug, info, trace_span, warn, Instrument}; use crate::async_runtime::IsolatedRuntime; +use crate::batch::INLINE_UPDATE_MAX_BYTES; use crate::cache::StateCache; use crate::cfg::RetryParameters; use crate::critical::CriticalReaderId; @@ -240,6 +241,9 @@ where CompareAndAppendRes::InvalidUsage(x) => { return CompareAndAppendRes::InvalidUsage(x) } + CompareAndAppendRes::InlineBackpressure => { + return CompareAndAppendRes::InlineBackpressure + } CompareAndAppendRes::UpperMismatch(seqno, _current_upper) => { // If the state machine thinks that the shard upper is not // far enough along, it could be because the caller of this @@ -376,7 +380,7 @@ where loop { let cmd_res = self .applier - .apply_unbatched_cmd(&metrics.cmds.compare_and_append, |_, _, state| { + .apply_unbatched_cmd(&metrics.cmds.compare_and_append, |_, cfg, state| { writer_was_present = state.writers.contains_key(writer_id); state.compare_and_append( batch, @@ -385,6 +389,7 @@ where lease_duration_ms, idempotency_token, debug_info, + INLINE_UPDATE_MAX_BYTES.get(cfg), ) }) .await; @@ -451,6 +456,11 @@ where assert!(indeterminate.is_none()); return CompareAndAppendRes::InvalidUsage(err); } + Err(CompareAndAppendBreak::InlineBackpressure) => { + // We tried to write an inline part, but there was already + // too much in state. Flush it out to s3 and try again. + return CompareAndAppendRes::InlineBackpressure; + } Err(CompareAndAppendBreak::Upper { shard_upper, writer_upper, @@ -1008,6 +1018,7 @@ pub(crate) enum CompareAndAppendRes { Success(SeqNo, WriterMaintenance), InvalidUsage(InvalidUsage), UpperMismatch(SeqNo, Antichain), + InlineBackpressure, } #[cfg(test)] @@ -1583,7 +1594,7 @@ pub mod datadriven { val: Arc::new(UnitSchema), }; let builder = BatchBuilderInternal::new( - cfg, + cfg.clone(), Arc::clone(&datadriven.client.metrics), Arc::clone(&datadriven.machine.applier.shard_metrics), schemas.clone(), @@ -1599,18 +1610,32 @@ pub mod datadriven { ); let mut builder = BatchBuilder { builder, - stats_schemas: schemas, + stats_schemas: schemas.clone(), }; for ((k, ()), t, d) in updates { builder.add(&k, &(), &t, &d).await.expect("invalid batch"); } - let batch = builder.finish(upper).await?.into_hollow_batch(); + let mut batch = builder.finish(upper).await?; + // We can only reasonably use parts_size_override with hollow batches, + // so if it's set, flush any inline batches out. + if parts_size_override.is_some() { + batch + .flush_to_blob( + &cfg, + &datadriven.client.metrics.user, + &datadriven.client.isolated_runtime, + &schemas, + ) + .await; + } + let batch = batch.into_hollow_batch(); if let Some(size) = parts_size_override { let mut batch = batch.clone(); for part in batch.parts.iter_mut() { match part { - BatchPart::Hollow(x) => x.encoded_size_bytes = size, + BatchPart::Hollow(part) => part.encoded_size_bytes = size, + BatchPart::Inline { .. } => unreachable!("flushed out above"), } } datadriven.batches.insert(output.to_owned(), batch); @@ -1651,6 +1676,7 @@ pub mod datadriven { } }; } + BatchPart::Inline { .. } => {} }; let part = EncodedPart::fetch( &datadriven.shard_id, @@ -1717,7 +1743,10 @@ pub mod datadriven { let batch = datadriven.batches.get_mut(input).expect("unknown batch"); for part in batch.parts.iter_mut() { match part { - BatchPart::Hollow(part) => part.encoded_size_bytes = size, + BatchPart::Hollow(x) => x.encoded_size_bytes = size, + BatchPart::Inline { .. } => { + panic!("set_batch_parts_size only supports hollow parts") + } } } Ok("ok\n".to_string()) diff --git a/src/persist-client/src/internal/metrics.rs b/src/persist-client/src/internal/metrics.rs index f5a08500451e0..4bf1494557ba6 100644 --- a/src/persist-client/src/internal/metrics.rs +++ b/src/persist-client/src/internal/metrics.rs @@ -96,6 +96,8 @@ pub struct Metrics { pub tasks: TasksMetrics, /// Metrics for columnar data encoding and decoding. pub columnar: ColumnarMetrics, + /// Metrics for inline writes. + pub inline: InlineMetrics, /// Metrics for the persist sink. pub sink: SinkMetrics, @@ -153,6 +155,7 @@ impl Metrics { blob_cache_mem: BlobMemCache::new(registry), tasks: TasksMetrics::new(registry), columnar, + inline: InlineMetrics::new(registry), sink: SinkMetrics::new(registry), s3_blob, postgres_consensus: PostgresClientMetrics::new(registry, "mz_persist"), @@ -1209,7 +1212,9 @@ pub struct ShardsMetrics { backpressure_emitted_bytes: IntCounterVec, backpressure_last_backpressured_bytes: UIntGaugeVec, backpressure_retired_bytes: IntCounterVec, - rewrite_part_count: mz_ore::metrics::UIntGaugeVec, + rewrite_part_count: UIntGaugeVec, + inline_part_count: UIntGaugeVec, + inline_part_bytes: UIntGaugeVec, // We hand out `Arc` to read and write handles, but store it // here as `Weak`. This allows us to discover if it's no longer in use and // so we can remove it from the map. @@ -1399,6 +1404,16 @@ impl ShardsMetrics { help: "count of batch parts with rewrites by shard", var_labels: ["shard", "name"], )), + inline_part_count: registry.register(metric!( + name: "mz_persist_shard_inline_part_count", + help: "count of parts inline in shard metadata", + var_labels: ["shard", "name"], + )), + inline_part_bytes: registry.register(metric!( + name: "mz_persist_shard_inline_part_bytes", + help: "total size of parts inline in shard metadata", + var_labels: ["shard", "name"], + )), shards, } } @@ -1477,6 +1492,8 @@ pub struct ShardMetrics { Arc>>, pub backpressure_retired_bytes: Arc>>, pub rewrite_part_count: DeleteOnDropGauge<'static, AtomicU64, Vec>, + pub inline_part_count: DeleteOnDropGauge<'static, AtomicU64, Vec>, + pub inline_part_bytes: DeleteOnDropGauge<'static, AtomicU64, Vec>, } impl ShardMetrics { @@ -1588,6 +1605,12 @@ impl ShardMetrics { ), rewrite_part_count: shards_metrics .rewrite_part_count + .get_delete_on_drop_gauge(vec![shard.clone(), name.to_string()]), + inline_part_count: shards_metrics + .inline_part_count + .get_delete_on_drop_gauge(vec![shard.clone(), name.to_string()]), + inline_part_bytes: shards_metrics + .inline_part_bytes .get_delete_on_drop_gauge(vec![shard, name.to_string()]), } } @@ -2763,6 +2786,19 @@ impl TasksMetrics { } } +#[derive(Debug)] +pub struct InlineMetrics { + pub(crate) backpressure: BatchWriteMetrics, +} + +impl InlineMetrics { + fn new(registry: &MetricsRegistry) -> Self { + InlineMetrics { + backpressure: BatchWriteMetrics::new(registry, "inline_backpressure"), + } + } +} + fn blob_key_shard_id(key: &str) -> Option { let (shard_id, _) = BlobKey::parse_ids(key).ok()?; Some(shard_id.to_string()) diff --git a/src/persist-client/src/internal/restore.rs b/src/persist-client/src/internal/restore.rs index bbe6c694f0c5d..0b58ec37f870f 100644 --- a/src/persist-client/src/internal/restore.rs +++ b/src/persist-client/src/internal/restore.rs @@ -88,6 +88,7 @@ pub(crate) async fn restore_blob( for part in &batch.parts { let key = match part { BatchPart::Hollow(x) => x.key.complete(&shard_id), + BatchPart::Inline { .. } => continue, }; check_restored(&key, blob.restore(&key).await); } @@ -98,6 +99,7 @@ pub(crate) async fn restore_blob( for part in &after.parts { let key = match part { BatchPart::Hollow(x) => x.key.complete(&shard_id), + BatchPart::Inline { .. } => continue, }; check_restored(&key, blob.restore(&key).await); } diff --git a/src/persist-client/src/internal/state.proto b/src/persist-client/src/internal/state.proto index 4f32742f48d72..d2c7b2a5c3b8c 100644 --- a/src/persist-client/src/internal/state.proto +++ b/src/persist-client/src/internal/state.proto @@ -13,6 +13,8 @@ syntax = "proto3"; package mz_persist_client.internal.state; +import "persist/src/persist.proto"; + message ProtoU64Antichain { repeated int64 elements = 1; } @@ -26,16 +28,24 @@ message ProtoU64Description { message ProtoHollowBatchPart { oneof kind { string key = 1; + bytes inline = 5; } - uint64 encoded_size_bytes = 2; - bytes key_lower = 3; ProtoU64Antichain ts_rewrite = 4; + // Only set when Kind is Inline + uint64 encoded_size_bytes = 2; optional bytes key_stats = 536870906; + reserved 536870907 to 536870911; } +message ProtoInlineBatchPart { + ProtoU64Description desc = 1; + uint64 index = 2; + mz_persist.gen.persist.ProtoColumnarRecords updates = 3; +} + message ProtoHollowBatch { ProtoU64Description desc = 1; repeated ProtoHollowBatchPart parts = 4; diff --git a/src/persist-client/src/internal/state.rs b/src/persist-client/src/internal/state.rs index de2684bd6bf80..e2286bce9791f 100644 --- a/src/persist-client/src/internal/state.rs +++ b/src/persist-client/src/internal/state.rs @@ -35,7 +35,7 @@ use uuid::Uuid; use crate::critical::CriticalReaderId; use crate::error::InvalidUsage; -use crate::internal::encoding::{parse_id, LazyPartStats}; +use crate::internal::encoding::{parse_id, LazyInlineBatchPart, LazyPartStats}; use crate::internal::gc::GcReq; use crate::internal::paths::{PartialBatchKey, PartialRollupKey}; use crate::internal::trace::{ApplyMergeResult, FueledMergeReq, FueledMergeRes, Trace}; @@ -176,12 +176,18 @@ pub struct HandleDebugState { #[serde(tag = "type")] pub enum BatchPart { Hollow(HollowBatchPart), + Inline { + updates: LazyInlineBatchPart, + key_lower: Vec, + ts_rewrite: Option>, + }, } impl BatchPart { pub fn encoded_size_bytes(&self) -> usize { match self { BatchPart::Hollow(x) => x.encoded_size_bytes, + BatchPart::Inline { updates, .. } => updates.encoded_size_bytes(), } } @@ -190,24 +196,28 @@ impl BatchPart { pub fn printable_name(&self) -> &str { match self { BatchPart::Hollow(x) => x.key.0.as_str(), + BatchPart::Inline { .. } => "", } } pub fn stats(&self) -> Option<&LazyPartStats> { match self { BatchPart::Hollow(x) => x.stats.as_ref(), + BatchPart::Inline { .. } => None, } } pub fn key_lower(&self) -> &[u8] { match self { BatchPart::Hollow(x) => x.key_lower.as_slice(), + BatchPart::Inline { key_lower, .. } => key_lower.as_slice(), } } pub fn ts_rewrite(&self) -> Option<&Antichain> { match self { BatchPart::Hollow(x) => x.ts_rewrite.as_ref(), + BatchPart::Inline { ts_rewrite, .. } => ts_rewrite.as_ref(), } } } @@ -222,6 +232,29 @@ impl Ord for BatchPart { fn cmp(&self, other: &Self) -> Ordering { match (self, other) { (BatchPart::Hollow(s), BatchPart::Hollow(o)) => s.cmp(o), + ( + BatchPart::Inline { + updates: s_updates, + key_lower: s_key_lower, + ts_rewrite: s_ts_rewrite, + }, + BatchPart::Inline { + updates: o_updates, + key_lower: o_key_lower, + ts_rewrite: o_ts_rewrite, + }, + ) => ( + s_updates, + s_key_lower, + s_ts_rewrite.as_ref().map(|x| x.elements()), + ) + .cmp(&( + o_updates, + o_key_lower, + o_ts_rewrite.as_ref().map(|x| x.elements()), + )), + (BatchPart::Hollow(_), BatchPart::Inline { .. }) => Ordering::Less, + (BatchPart::Inline { .. }, BatchPart::Hollow(_)) => Ordering::Greater, } } } @@ -363,6 +396,16 @@ impl HollowBatch { emitted_implicit: false, } } + + pub(crate) fn inline_bytes(&self) -> usize { + self.parts + .iter() + .map(|x| match x { + BatchPart::Inline { updates, .. } => updates.encoded_size_bytes(), + BatchPart::Hollow(_) => 0, + }) + .sum() + } } pub(crate) struct HollowBatchRunIter<'a, T> { @@ -456,6 +499,7 @@ impl HollowBatch { for part in &mut self.parts { match part { BatchPart::Hollow(part) => part.ts_rewrite = Some(frontier.clone()), + BatchPart::Inline { ts_rewrite, .. } => *ts_rewrite = Some(frontier.clone()), } } Ok(()) @@ -558,6 +602,7 @@ pub enum CompareAndAppendBreak { writer_upper: Antichain, }, InvalidUsage(InvalidUsage), + InlineBackpressure, } #[derive(Debug)] @@ -699,6 +744,7 @@ where lease_duration_ms: u64, idempotency_token: &IdempotencyToken, debug_info: &HandleDebugState, + inline_update_max_bytes: usize, ) -> ControlFlow, Vec>> { // We expire all writers if the upper and since both advance to the // empty antichain. Gracefully handle this. At the same time, @@ -775,6 +821,16 @@ where }); } + let new_inline_bytes = batch.inline_bytes(); + if new_inline_bytes > 0 { + let mut existing_inline_bytes = 0; + self.trace + .map_batches(|x| existing_inline_bytes += x.inline_bytes()); + if existing_inline_bytes + new_inline_bytes >= inline_update_max_bytes { + return Break(CompareAndAppendBreak::InlineBackpressure); + } + } + let merge_reqs = if batch.desc.upper() != batch.desc.lower() { self.trace.push_batch(batch.clone()) } else { @@ -1391,6 +1447,13 @@ where if x.ts_rewrite().is_some() { ret.rewrite_part_count += 1; } + match x { + BatchPart::Hollow(_) => {} + BatchPart::Inline { updates, .. } => { + ret.inline_part_count += 1; + ret.inline_part_bytes += updates.encoded_size_bytes(); + } + } } ret.largest_batch_bytes = std::cmp::max(ret.largest_batch_bytes, batch_size); ret.state_batches_bytes += batch_size; @@ -1655,6 +1718,8 @@ pub struct StateSizeMetrics { pub state_batches_bytes: usize, pub state_rollups_bytes: usize, pub state_rollup_count: usize, + pub inline_part_count: usize, + pub inline_part_bytes: usize, } #[derive(Default)] @@ -1675,9 +1740,11 @@ pub struct Upper(pub Antichain); pub(crate) mod tests { use std::ops::Range; + use bytes::Bytes; use mz_build_info::DUMMY_BUILD_INFO; use mz_dyncfg::ConfigUpdates; use mz_ore::now::SYSTEM_TIME; + use mz_proto::RustType; use proptest::prelude::*; use proptest::strategy::ValueTree; @@ -1705,7 +1772,7 @@ pub(crate) mod tests { any::(), any::(), any::(), - proptest::collection::vec(any_hollow_batch_part::(), 0..3), + proptest::collection::vec(any_batch_part::(), 0..3), any::(), any::(), ), @@ -1716,7 +1783,6 @@ pub(crate) mod tests { (Antichain::from_elem(t1), Antichain::from_elem(t0)) }; let since = Antichain::from_elem(since); - let parts = parts.into_iter().map(BatchPart::Hollow).collect::>(); let runs = if runs { vec![parts.len()] } else { vec![] }; HollowBatch { desc: Description::new(lower, upper, since), @@ -1728,6 +1794,30 @@ pub(crate) mod tests { ) } + pub fn any_batch_part() -> impl Strategy> { + Strategy::prop_map( + ( + any::(), + any_hollow_batch_part(), + proptest::collection::vec(any::(), 0..3), + any::>(), + ), + |(is_hollow, hollow, key_lower, ts_rewrite)| { + if is_hollow { + BatchPart::Hollow(hollow) + } else { + let updates = LazyInlineBatchPart::from_proto(Bytes::new()).unwrap(); + let ts_rewrite = ts_rewrite.map(Antichain::from_elem); + BatchPart::Inline { + updates, + key_lower, + ts_rewrite, + } + } + }, + ) + } + pub fn any_hollow_batch_part( ) -> impl Strategy> { Strategy::prop_map( @@ -2073,6 +2163,7 @@ pub(crate) mod tests { LEASE_DURATION_MS, &IdempotencyToken::new(), &debug_state(), + 0, ), Break(CompareAndAppendBreak::Upper { shard_upper: Antichain::from_elem(0), @@ -2089,6 +2180,7 @@ pub(crate) mod tests { LEASE_DURATION_MS, &IdempotencyToken::new(), &debug_state(), + 0, ) .is_continue()); @@ -2101,6 +2193,7 @@ pub(crate) mod tests { LEASE_DURATION_MS, &IdempotencyToken::new(), &debug_state(), + 0, ), Break(CompareAndAppendBreak::InvalidUsage(InvalidBounds { lower: Antichain::from_elem(5), @@ -2117,6 +2210,7 @@ pub(crate) mod tests { LEASE_DURATION_MS, &IdempotencyToken::new(), &debug_state(), + 0, ), Break(CompareAndAppendBreak::InvalidUsage( InvalidEmptyTimeInterval { @@ -2136,6 +2230,7 @@ pub(crate) mod tests { LEASE_DURATION_MS, &IdempotencyToken::new(), &debug_state(), + 0, ) .is_continue()); } @@ -2180,6 +2275,7 @@ pub(crate) mod tests { LEASE_DURATION_MS, &IdempotencyToken::new(), &debug_state(), + 0, ) .is_continue()); @@ -2250,6 +2346,7 @@ pub(crate) mod tests { LEASE_DURATION_MS, &IdempotencyToken::new(), &debug_state(), + 0, ) .is_continue()); @@ -2278,6 +2375,7 @@ pub(crate) mod tests { LEASE_DURATION_MS, &IdempotencyToken::new(), &debug_state(), + 0, ) .is_continue()); @@ -2338,6 +2436,7 @@ pub(crate) mod tests { LEASE_DURATION_MS, &IdempotencyToken::new(), &debug_state(), + 0, ) .is_continue()); assert!(state @@ -2349,6 +2448,7 @@ pub(crate) mod tests { LEASE_DURATION_MS, &IdempotencyToken::new(), &debug_state(), + 0, ) .is_continue()); @@ -2403,6 +2503,7 @@ pub(crate) mod tests { LEASE_DURATION_MS, &IdempotencyToken::new(), &debug_state(), + 0, ) .is_continue()); @@ -2421,6 +2522,7 @@ pub(crate) mod tests { LEASE_DURATION_MS, &IdempotencyToken::new(), &debug_state(), + 0, ) .is_continue()); } @@ -2453,6 +2555,7 @@ pub(crate) mod tests { LEASE_DURATION_MS, &IdempotencyToken::new(), &debug_state(), + 0, ); assert_eq!(state.maybe_gc(false), None); diff --git a/src/persist-client/src/internal/state_serde.json b/src/persist-client/src/internal/state_serde.json index 2d396b10e888f..1c371e0f827a7 100644 --- a/src/persist-client/src/internal/state_serde.json +++ b/src/persist-client/src/internal/state_serde.json @@ -88,7 +88,7 @@ 16259358130896200968 ], "upper": [ - 18139951554495009546 + 14783176325274669853 ], "batches": [ { @@ -96,50 +96,40 @@ 0 ], "upper": [ - 6077620655969140312 + 3215901267041653792 ], "since": [ - 11695933775649927238 + 16107715624355499630 ], - "len": 7, - "part_runs": [] - }, - { - "lower": [ - 6077620655969140312 - ], - "upper": [ - 9048289724302440207 - ], - "since": [ - 16259358130896200968 - ], - "len": 8, + "len": 3, "part_runs": [ [ { "type": "Hollow", - "key": "𝕏`𑜈Ⱥ5:<", - "encoded_size_bytes": 16872256722443379164, - "key_lower": "009abac79278769a8279207fb05f71d6fa9de6df3a3175e60606a6f4b7e471b019f49aaf03c0ca34281ee4c943fb5497062565268d24444fd890cab724176dca822cc29101aef59d92d9093e", + "key": "𖭠 { @@ -1118,22 +1128,35 @@ impl ReferencedBlobValidator { assert_eq!(inc_lower, full_lower); assert_eq!(inc_upper, full_upper); + fn part_unique(x: &BatchPart) -> String { + match x { + BatchPart::Hollow(x) => x.key.to_string(), + BatchPart::Inline { + updates, + key_lower, + ts_rewrite, + } => { + let mut h = DefaultHasher::new(); + updates.hash(&mut h); + key_lower.hash(&mut h); + ts_rewrite.as_ref().map(|x| x.elements()).hash(&mut h); + h.finish().to_string() + } + } + } + // Check that the overall set of parts contained in both representations is the same. let inc_parts: HashSet<_> = self .inc_batches .iter() .flat_map(|x| x.parts.iter()) - .map(|x| match x { - BatchPart::Hollow(x) => &x.key, - }) + .map(part_unique) .collect(); let full_parts = self .full_batches .iter() .flat_map(|x| x.parts.iter()) - .map(|x| match x { - BatchPart::Hollow(x) => &x.key, - }) + .map(part_unique) .collect(); assert_eq!(inc_parts, full_parts); diff --git a/src/persist-client/src/iter.rs b/src/persist-client/src/iter.rs index 4f3a6c33d7f40..d3a45fb8b722d 100644 --- a/src/persist-client/src/iter.rs +++ b/src/persist-client/src/iter.rs @@ -85,6 +85,9 @@ impl FetchData { FetchData::Unleased { part, .. } => part.key.split().0 >= min_version, FetchData::Leased { part, .. } => match &part.part { BatchPart::Hollow(x) => x.key.split().0 >= min_version, + // Inline parts are only written directly by the user and so may + // be unconsolidated. + BatchPart::Inline { .. } => true, }, FetchData::AlreadyFetched => false, } @@ -308,6 +311,21 @@ impl Consolidator { + let read_metrics = read_metrics(&metrics.read).clone(); + let part = EncodedPart::from_inline( + read_metrics, + desc.clone(), + updates, + ts_rewrite.as_ref(), + ); + let c_part = ConsolidationPart::from_encoded(part, &self.filter, true); + (c_part, updates.encoded_size_bytes()) + } }) .collect(); self.runs.push(run); diff --git a/src/persist-client/src/operators/shard_source.rs b/src/persist-client/src/operators/shard_source.rs index ce29eeb4a4e7c..16f039ededd58 100644 --- a/src/persist-client/src/operators/shard_source.rs +++ b/src/persist-client/src/operators/shard_source.rs @@ -383,10 +383,13 @@ where for mut part_desc in parts { // TODO: Push the filter down into the Subscribe? if STATS_FILTER_ENABLED.get(&cfg) { - let should_fetch = part_desc.part.stats().map_or(true, |stats| { - should_fetch_part(&stats.decode(), current_frontier.borrow()) - }); - let bytes = u64::cast_from(part_desc.part.encoded_size_bytes()); + let should_fetch = match &part_desc.part { + BatchPart::Hollow(x) => x.stats.as_ref().map_or(true, |stats| { + should_fetch_part(&stats.decode(), current_frontier.borrow()) + }), + BatchPart::Inline { .. } => true, + }; + let bytes = u64::cast_from(part_desc.encoded_size_bytes()); if should_fetch { audit_budget_bytes = audit_budget_bytes.saturating_add(part_desc.part.encoded_size_bytes()); @@ -401,6 +404,7 @@ where x.key.hash(&mut h); usize::cast_from(h.finish()) % 100 < STATS_AUDIT_PERCENT.get(&cfg) } + BatchPart::Inline { .. } => false, }; if should_audit && part_desc.part.encoded_size_bytes() < audit_budget_bytes { diff --git a/src/persist-client/src/usage.rs b/src/persist-client/src/usage.rs index 25010c499fa1d..e21d3826232de 100644 --- a/src/persist-client/src/usage.rs +++ b/src/persist-client/src/usage.rs @@ -454,6 +454,7 @@ impl StorageUsageClient { for part in x.parts.iter() { let part = match part { BatchPart::Hollow(x) => x, + BatchPart::Inline { .. } => continue, }; let parsed = BlobKey::parse_ids(&part.key.complete(&shard_id)); if let Ok((_, PartialBlobKey::Batch(writer_id, _))) = parsed { @@ -481,6 +482,7 @@ impl StorageUsageClient { for part in x.parts.iter() { let part = match part { BatchPart::Hollow(x) => x, + BatchPart::Inline { .. } => continue, }; current_state_batches_bytes += u64::cast_from(part.encoded_size_bytes); } @@ -740,7 +742,7 @@ mod tests { use semver::Version; use timely::progress::Antichain; - use crate::batch::BLOB_TARGET_SIZE; + use crate::batch::{BLOB_TARGET_SIZE, INLINE_UPDATE_THRESHOLD_BYTES}; use crate::internal::paths::{PartialRollupKey, RollupId}; use crate::tests::new_test_client; use crate::ShardId; @@ -758,6 +760,7 @@ mod tests { ]; let client = new_test_client(&dyncfgs).await; + let inline_writes_enabled = INLINE_UPDATE_THRESHOLD_BYTES.get(&client.cfg) > 0; let build_version = client.cfg.build_version.clone(); let shard_id_one = ShardId::new(); let shard_id_two = ShardId::new(); @@ -818,7 +821,12 @@ mod tests { assert!(shard_one_size > 0); assert!(shard_two_size > 0); - assert!(shard_one_size < shard_two_size); + if inline_writes_enabled { + // Allow equality, but only if inline writes are enabled. + assert!(shard_one_size <= shard_two_size); + } else { + assert!(shard_one_size < shard_two_size); + } assert_eq!( shard_two_size, writer_one_size + writer_two_size + versioned_size + rollups_size @@ -866,6 +874,7 @@ mod tests { let shard_id = ShardId::new(); let mut client = new_test_client(&dyncfgs).await; + let inline_writes_enabled = INLINE_UPDATE_THRESHOLD_BYTES.get(&client.cfg) > 0; let (mut write0, _) = client .expect_open::(shard_id) @@ -901,9 +910,11 @@ mod tests { let usage = StorageUsageClient::open(client); let shard_usage_audit = usage.shard_usage_audit(shard_id).await; let shard_usage_referenced = usage.shard_usage_referenced(shard_id).await; - // We've written data. - assert!(shard_usage_audit.current_state_batches_bytes > 0); - assert!(shard_usage_referenced.batches_bytes > 0); + if !inline_writes_enabled { + // We've written data. + assert!(shard_usage_audit.current_state_batches_bytes > 0); + assert!(shard_usage_referenced.batches_bytes > 0); + } // There's always at least one rollup. assert!(shard_usage_audit.current_state_rollups_bytes > 0); assert!(shard_usage_referenced.rollup_bytes > 0); @@ -914,8 +925,10 @@ mod tests { // // write0 wrote a batch, but never linked it in, but is still active. assert!(shard_usage_audit.not_leaked_not_referenced_bytes > 0); - // write0 wrote a batch, but never linked it in, and is now expired. - assert!(shard_usage_audit.leaked_bytes > 0); + if !inline_writes_enabled { + // write0 wrote a batch, but never linked it in, and is now expired. + assert!(shard_usage_audit.leaked_bytes > 0); + } } #[mz_persist_proc::test(tokio::test)] diff --git a/src/persist-client/src/write.rs b/src/persist-client/src/write.rs index d4a37aff20dfe..d4167a85918e8 100644 --- a/src/persist-client/src/write.rs +++ b/src/persist-client/src/write.rs @@ -16,6 +16,8 @@ use std::sync::Arc; use differential_dataflow::difference::Semigroup; use differential_dataflow::lattice::Lattice; use differential_dataflow::trace::Description; +use futures::stream::FuturesUnordered; +use futures::StreamExt; use mz_ore::instrument; use mz_ore::task::RuntimeExt; use mz_persist::location::Blob; @@ -511,23 +513,47 @@ where ) .await; - let maintenance = match res { - CompareAndAppendRes::Success(_seqno, maintenance) => { - self.upper = desc.upper().clone(); - for batch in batches.iter_mut() { - batch.mark_consumed(); + let maintenance = loop { + match res { + CompareAndAppendRes::Success(_seqno, maintenance) => { + self.upper = desc.upper().clone(); + for batch in batches.iter_mut() { + batch.mark_consumed(); + } + break maintenance; + } + CompareAndAppendRes::InvalidUsage(invalid_usage) => return Err(invalid_usage), + CompareAndAppendRes::UpperMismatch(_seqno, current_upper) => { + // We tried to to a compare_and_append with the wrong expected upper, that + // won't work. Update the cached upper to the current upper. + self.upper = current_upper.clone(); + return Ok(Err(UpperMismatch { + current: current_upper, + expected: expected_upper, + })); + } + CompareAndAppendRes::InlineBackpressure => { + // We tried to write an inline part, but there was already + // too much in state. Flush it out to s3 and try again. + let cfg = BatchBuilderConfig::new(&self.cfg, &self.writer_id); + // We could have a large number of inline parts (imagine the + // sharded persist_sink), do this flushing concurrently. + let batches = batches + .iter_mut() + .map(|batch| async { + batch + .flush_to_blob( + &cfg, + &self.metrics.inline.backpressure, + &self.isolated_runtime, + &self.schemas, + ) + .await + }) + .collect::>(); + let () = batches.collect::<()>().await; + continue; } - maintenance - } - CompareAndAppendRes::InvalidUsage(invalid_usage) => return Err(invalid_usage), - CompareAndAppendRes::UpperMismatch(_seqno, current_upper) => { - // We tried to to a compare_and_append with the wrong expected upper, that - // won't work. Update the cached upper to the current upper. - self.upper = current_upper.clone(); - return Ok(Err(UpperMismatch { - current: current_upper, - expected: expected_upper, - })); } }; diff --git a/src/persist-client/tests/machine/gc b/src/persist-client/tests/machine/gc index b14540012bda8..329783ef307ca 100644 --- a/src/persist-client/tests/machine/gc +++ b/src/persist-client/tests/machine/gc @@ -7,6 +7,13 @@ # the Business Source License, use of this software will be governed # by the Apache License, Version 2.0. +# Disable inline writes so interesting things happen in Blob. +dyncfg +persist_inline_update_threshold_bytes 0 +persist_inline_update_max_bytes 0 +---- +ok + # This test uses a simplifying assumption that a batch is # always made up of exactly 1 batch part. This is because # batch parts are given random UUID names, meaning we can't diff --git a/src/persist-client/tests/machine/restore_blob b/src/persist-client/tests/machine/restore_blob index 95ffb040b32e8..8d0bc6e401c5b 100644 --- a/src/persist-client/tests/machine/restore_blob +++ b/src/persist-client/tests/machine/restore_blob @@ -7,6 +7,12 @@ # the Business Source License, use of this software will be governed # by the Apache License, Version 2.0. +# Disable inline writes so interesting things happen in Blob. +dyncfg +persist_inline_update_threshold_bytes 0 +persist_inline_update_max_bytes 0 +---- +ok # Pre-populate some non-trivial state in our shard. diff --git a/src/persist-proc/src/lib.rs b/src/persist-proc/src/lib.rs index 49207e446dea7..b6ed21bf52597 100644 --- a/src/persist-proc/src/lib.rs +++ b/src/persist-proc/src/lib.rs @@ -68,8 +68,18 @@ fn test_impl(attr: TokenStream, item: TokenStream) -> TokenStream { let dyncfgs = [ { - // Placeholder default configuration until inline writes PR. - mz_dyncfg::ConfigUpdates::default() + // Inline writes disabled + let mut x = ::mz_dyncfg::ConfigUpdates::default(); + x.add_dynamic("persist_inline_update_threshold_bytes", ::mz_dyncfg::ConfigVal::Usize(0)); + x.add_dynamic("persist_inline_update_max_bytes", ::mz_dyncfg::ConfigVal::Usize(0)); + x + }, + { + // Inline writes enabled + let mut x = ::mz_dyncfg::ConfigUpdates::default(); + x.add_dynamic("persist_inline_update_threshold_bytes", ::mz_dyncfg::ConfigVal::Usize(4 * 1024)); + x.add_dynamic("persist_inline_update_max_bytes", ::mz_dyncfg::ConfigVal::Usize(1024 * 1024)); + x }, ]; diff --git a/src/persist/build.rs b/src/persist/build.rs index 93c1e83c7108e..56e73f0d6b28f 100644 --- a/src/persist/build.rs +++ b/src/persist/build.rs @@ -14,6 +14,11 @@ fn main() { prost_build::Config::new() .btree_map(["."]) + .type_attribute( + ".mz_persist.gen.persist.ProtoColumnarRecords", + "#[derive(serde::Serialize)]", + ) + .bytes([".mz_persist.gen.persist.ProtoColumnarRecords"]) .compile_protos(&["persist/src/persist.proto"], &[".."]) .unwrap_or_else(|e| panic!("{e}")) } diff --git a/src/persist/src/indexed/columnar.rs b/src/persist/src/indexed/columnar.rs index 033d3f56ba8f4..51728c045e261 100644 --- a/src/persist/src/indexed/columnar.rs +++ b/src/persist/src/indexed/columnar.rs @@ -15,8 +15,12 @@ use std::sync::Arc; use std::{cmp, fmt}; use arrow2::types::Index; -use mz_ore::lgbytes::MetricsRegion; +use bytes::Bytes; +use mz_ore::lgbytes::{LgBytesMetrics, MetricsRegion}; +use mz_ore::metrics::MetricsRegistry; +use mz_proto::{ProtoType, RustType, TryFromProtoError}; +use crate::gen::persist::ProtoColumnarRecords; use crate::metrics::ColumnarMetrics; pub mod arrow; @@ -483,6 +487,40 @@ impl ColumnarRecordsBuilder { } } +impl RustType for ColumnarRecords { + fn into_proto(&self) -> ProtoColumnarRecords { + ProtoColumnarRecords { + len: self.len.into_proto(), + key_offsets: (*self.key_offsets).as_ref().to_vec(), + key_data: Bytes::copy_from_slice((*self.key_data).as_ref()), + val_offsets: (*self.val_offsets).as_ref().to_vec(), + val_data: Bytes::copy_from_slice((*self.val_data).as_ref()), + timestamps: (*self.timestamps).as_ref().to_vec(), + diffs: (*self.diffs).as_ref().to_vec(), + } + } + + fn from_proto(proto: ProtoColumnarRecords) -> Result { + // WIP plumb real metrics + let lgbytes = LgBytesMetrics::new(&MetricsRegistry::new()); + // WIP or should we use MaybeLgBytes in ColumnarRecords? also no .to_vec + let ret = ColumnarRecords { + len: proto.len.into_rust()?, + key_offsets: Arc::new(lgbytes.persist_arrow.heap_region(proto.key_offsets)), + key_data: Arc::new(lgbytes.persist_arrow.heap_region(proto.key_data.to_vec())), + val_offsets: Arc::new(lgbytes.persist_arrow.heap_region(proto.val_offsets)), + val_data: Arc::new(lgbytes.persist_arrow.heap_region(proto.val_data.to_vec())), + timestamps: Arc::new(lgbytes.persist_arrow.heap_region(proto.timestamps)), + diffs: Arc::new(lgbytes.persist_arrow.heap_region(proto.diffs)), + }; + let () = ret + .borrow() + .validate() + .map_err(TryFromProtoError::InvalidPersistState)?; + Ok(ret) + } +} + #[cfg(test)] mod tests { use mz_persist_types::Codec64; diff --git a/src/persist/src/persist.proto b/src/persist/src/persist.proto index 36f02a1ad4339..7bca23a75c2ec 100644 --- a/src/persist/src/persist.proto +++ b/src/persist/src/persist.proto @@ -64,3 +64,13 @@ enum ProtoBatchFormat { // with a trie-like column structure. ParquetKvtd = 2; } + +message ProtoColumnarRecords { + uint64 len = 1; + repeated int32 key_offsets = 2; + bytes key_data = 3; + repeated int32 val_offsets = 4; + bytes val_data = 5; + repeated int64 timestamps = 6; + repeated int64 diffs = 7; +} diff --git a/test/sqllogictest/explain-pushdown.slt b/test/sqllogictest/explain-pushdown.slt index c7c861fb3248a..609d89d70053d 100644 --- a/test/sqllogictest/explain-pushdown.slt +++ b/test/sqllogictest/explain-pushdown.slt @@ -9,6 +9,12 @@ mode cockroach +# Disable persist inline writes so we get real part numbers below +simple conn=mz_system,user=mz_system +ALTER SYSTEM SET persist_inline_update_threshold_bytes = 0 +---- +COMPLETE 0 + # EXPLAIN FILTER PUSHDOWN statements are blocked by a feature flag statement ok CREATE TABLE numbers ( From c50e894a2fb6fc7ee9f159bb900674bcbe826ad5 Mon Sep 17 00:00:00 2001 From: Daniel Harrison <52528+danhhz@users.noreply.github.com> Date: Thu, 4 Apr 2024 09:55:19 -0700 Subject: [PATCH 2/2] fixup! persist: inline small writes directly in consensus State --- src/persist-client/src/batch.rs | 6 +-- src/persist-client/src/internal/compact.rs | 47 ++++++++++++++-------- 2 files changed, 34 insertions(+), 19 deletions(-) diff --git a/src/persist-client/src/batch.rs b/src/persist-client/src/batch.rs index 011f54504de59..569c2c42afa07 100644 --- a/src/persist-client/src/batch.rs +++ b/src/persist-client/src/batch.rs @@ -259,7 +259,7 @@ where .or_current(); let handle = mz_ore::task::spawn( || "batch::flush_to_blob", - BatchParts::write_part( + BatchParts::write_hollow_part( cfg.clone(), Arc::clone(&self.blob), Arc::clone(&self.metrics), @@ -935,7 +935,7 @@ impl BatchParts { debug_span!("batch::write_part", shard = %self.shard_metrics.shard_id).or_current(); mz_ore::task::spawn( || "batch::write_part", - BatchParts::write_part( + BatchParts::write_hollow_part( self.cfg.clone(), Arc::clone(&self.blob), Arc::clone(&self.metrics), @@ -966,7 +966,7 @@ impl BatchParts { } } - async fn write_part( + async fn write_hollow_part( cfg: BatchBuilderConfig, blob: Arc, metrics: Arc, diff --git a/src/persist-client/src/internal/compact.rs b/src/persist-client/src/internal/compact.rs index c037ebc91f928..ecfa2f0632ad5 100644 --- a/src/persist-client/src/internal/compact.rs +++ b/src/persist-client/src/internal/compact.rs @@ -28,7 +28,7 @@ use timely::progress::{Antichain, Timestamp}; use timely::PartialOrder; use tokio::sync::mpsc::Sender; use tokio::sync::{mpsc, oneshot, TryAcquireError}; -use tracing::{debug, debug_span, trace, warn, Instrument, Span}; +use tracing::{debug, debug_span, error, trace, warn, Instrument, Span}; use crate::async_runtime::IsolatedRuntime; use crate::batch::{BatchBuilderConfig, BatchBuilderInternal, PartDeletes}; @@ -79,12 +79,18 @@ pub struct CompactConfig { impl CompactConfig { /// Initialize the compaction config from Persist configuration. pub fn new(value: &PersistConfig, writer_id: &WriterId) -> Self { - CompactConfig { + let mut ret = CompactConfig { compaction_memory_bound_bytes: value.dynamic.compaction_memory_bound_bytes(), compaction_yield_after_n_updates: value.compaction_yield_after_n_updates, version: value.build_version.clone(), batch: BatchBuilderConfig::new(value, writer_id), - } + }; + // Use compaction as a method of getting inline writes out of state, to + // make room for more inline writes. We could instead do this at the end + // of compaction by flushing out the batch, but doing it here based on + // the config allows BatchBuilder to do its normal pipelining of writes. + ret.batch.inline_update_threshold_bytes = 0; + ret } } @@ -737,21 +743,30 @@ where tokio::task::yield_now().await; } let mut batch = batch.finish(&real_schemas, desc.upper().clone()).await?; - // Use compaction as a method of getting inline writes out of state, to - // make room for more inline writes. - let () = batch - .flush_to_blob( - &cfg.batch, - &metrics.compaction.batch, - &isolated_runtime, - &real_schemas, - ) - .await; - let hollow_batch = batch.into_hollow_batch(); - timings.record(&metrics); + // We use compaction as a method of getting inline writes out of state, + // to make room for more inline writes. This happens in + // `CompactConfig::new` by overriding the inline writes threshold + // config. This is a bit action-at-a-distance, so defensively detect if + // this breaks here and log and correct it if so. + let has_inline_parts = batch.batch.parts.iter().any(|x| match x { + BatchPart::Hollow(_) => false, + BatchPart::Inline { .. } => true, + }); + if has_inline_parts { + error!(%shard_id, ?cfg, "compaction result unexpectedly had inline writes"); + let () = batch + .flush_to_blob( + &cfg.batch, + &metrics.compaction.batch, + &isolated_runtime, + &real_schemas, + ) + .await; + } - Ok(hollow_batch) + timings.record(&metrics); + Ok(batch.into_hollow_batch()) } fn validate_req(req: &CompactReq) -> Result<(), anyhow::Error> {