diff --git a/doc/developer/design/20240401_persist_inline_writes.md b/doc/developer/design/20240401_persist_inline_writes.md new file mode 100644 index 0000000000000..c23e115e8f4d5 --- /dev/null +++ b/doc/developer/design/20240401_persist_inline_writes.md @@ -0,0 +1,64 @@ +# Persist Inline Writes + +- Associated: [#24832](https://github.com/MaterializeInc/materialize/pull/24832) + +## The Problem + +A non-empty persist write currently always requires a write to S3 and a write to +CockroachDB. S3 writes are much slower than CRDB writes and incur a per-PUT +charge, so in the case of very small writes, this is wasteful of both latency +and cost. + +Persist latencies directly impact the user experience of Materialize in a number +of ways. The above waste is particularly egregious in DDL, which may serially +perform a number of small persist writes. + +## Success Criteria + +Eligible persist writes have latency of `O(crdb write)` and not `O(s3 write)`. + +## Out of Scope + +- Latency guarantees: In particular, to keep persist metadata small, inline + writes are an optimization, they are not guaranteed. +- Read latencies. + +## Solution Proposal + +Persist internally has the concept of a _part_, which corresponds 1:1:1 with a +persist _blob_ and an _object_ in S3. Currently, all data in a persist shard is +stored in S3 and then a _hollow_ reference to it is stored in CRDB. This +reference also includes metadata, such as pushdown statistics and the encoded +size. + +We make this reference instead a two variant enum: `Hollow` and `Inline`. The +`Inline` variant stores the same data as would be written to s3, but in an +encoded protobuf. This protobuf is only decoded in data fetch paths, and is +otherwise passed around as opaque bytes to save allocations and cpu cycles. +Pushdown statistics and the encoded size are both unnecessary for inline parts. + +The persist state stored in CRDB is a control plane concept, so there is both a +performance and a stability risk from mixing the data plane into it. We reduce +the inline parts over time by making compaction flush them out to S3, never +inlining them. However, nothing prevents new writes from arriving faster than +compaction can pull them out. We protect the control plane with the following +two limits to create a hard upper bound on how much data can be inline: + +- `persist_inline_update_threshold_bytes`: An (exclusive) maximum size of a + write that persist will inline in metadata. +- `persist_inline_update_max_bytes`: An (inclusive) maximum total size of inline + writes in metadata. Any attempted writes beyond this threshold will instead + fall through to the s3 path. + +## Alternatives + +- In addition to S3, also store _blobs_ in CRDB (or a third technology). CRDB is + not tuned as a blob store and doesn't handle these workloads well. A third + technology would not be worth the additional operational burden. +- Make S3 faster. This is not actionable in the short term. + +## Open Questions + +- How do we tune the two thresholds? +- Should every inline write result in a compaction request for the new batch + immediately flushing it out to s3? diff --git a/misc/python/materialize/mzcompose/__init__.py b/misc/python/materialize/mzcompose/__init__.py index 4a9b6f92c182b..b16a2f5d90c08 100644 --- a/misc/python/materialize/mzcompose/__init__.py +++ b/misc/python/materialize/mzcompose/__init__.py @@ -87,6 +87,8 @@ "enable_worker_core_affinity": "true", "persist_batch_delete_enabled": "true", "persist_fast_path_limit": "1000", + "persist_inline_update_max_bytes": "1048576", + "persist_inline_update_threshold_bytes": "4096", "persist_pubsub_client_enabled": "true", "persist_pubsub_push_diff_enabled": "true", "persist_sink_minimum_batch_updates": "128", diff --git a/src/persist-client/build.rs b/src/persist-client/build.rs index 2971d76adb37c..73ec5e4ceeb10 100644 --- a/src/persist-client/build.rs +++ b/src/persist-client/build.rs @@ -66,6 +66,7 @@ fn main() { // is to re-run if any file in the crate changes; that's still a bit too // broad, but it's better. .emit_rerun_if_changed(false) + .extern_path(".mz_persist", "::mz_persist") .extern_path(".mz_persist_types", "::mz_persist_types") .extern_path(".mz_proto", "::mz_proto") .compile_with_config( diff --git a/src/persist-client/src/batch.rs b/src/persist-client/src/batch.rs index 34fd78a21d3aa..569c2c42afa07 100644 --- a/src/persist-client/src/batch.rs +++ b/src/persist-client/src/batch.rs @@ -43,11 +43,11 @@ use tracing::{debug_span, error, trace_span, warn, Instrument}; use crate::async_runtime::IsolatedRuntime; use crate::cfg::MiB; use crate::error::InvalidUsage; -use crate::internal::encoding::{LazyPartStats, Schemas}; +use crate::internal::encoding::{LazyInlineBatchPart, LazyPartStats, Schemas}; use crate::internal::machine::retry_external; use crate::internal::metrics::{BatchWriteMetrics, Metrics, RetryMetrics, ShardMetrics}; use crate::internal::paths::{PartId, PartialBatchKey, WriterKey}; -use crate::internal::state::{BatchPart, HollowBatch, HollowBatchPart}; +use crate::internal::state::{BatchPart, HollowBatch, HollowBatchPart, ProtoInlineBatchPart}; use crate::stats::{ part_stats_for_legacy_part, untrimmable_columns, STATS_BUDGET_BYTES, STATS_COLLECTION_ENABLED, }; @@ -228,6 +228,56 @@ where self.mark_consumed(); ret } + + pub(crate) async fn flush_to_blob( + &mut self, + cfg: &BatchBuilderConfig, + batch_metrics: &BatchWriteMetrics, + isolated_runtime: &Arc, + stats_schemas: &Schemas, + ) { + // It's necessary for correctness to keep the parts in the same order. + // We could introduce concurrency here with FuturesOrdered, but it would + // be pretty unexpected to have inline writes in more than one part, so + // don't bother. + let mut parts = Vec::new(); + for part in self.batch.parts.drain(..) { + let (updates, key_lower, ts_rewrite) = match part { + BatchPart::Hollow(x) => { + parts.push(BatchPart::Hollow(x)); + continue; + } + BatchPart::Inline { + updates, + key_lower, + ts_rewrite, + } => (updates, key_lower, ts_rewrite), + }; + let updates = updates.decode::().expect("valid inline part"); + let write_span = + debug_span!("batch::flush_to_blob", shard = %self.shard_metrics.shard_id) + .or_current(); + let handle = mz_ore::task::spawn( + || "batch::flush_to_blob", + BatchParts::write_hollow_part( + cfg.clone(), + Arc::clone(&self.blob), + Arc::clone(&self.metrics), + Arc::clone(&self.shard_metrics), + batch_metrics.clone(), + Arc::clone(isolated_runtime), + updates, + key_lower, + ts_rewrite, + stats_schemas.clone(), + ) + .instrument(write_span), + ); + let part = handle.await.expect("part write task failed"); + parts.push(part); + } + self.batch.parts = parts; + } } /// Indicates what work was done in a call to [BatchBuilder::add] @@ -248,6 +298,7 @@ pub struct BatchBuilderConfig { pub(crate) blob_target_size: usize, pub(crate) batch_delete_enabled: bool, pub(crate) batch_builder_max_outstanding_parts: usize, + pub(crate) inline_update_threshold_bytes: usize, pub(crate) stats_collection_enabled: bool, pub(crate) stats_budget: usize, pub(crate) stats_untrimmable_columns: Arc, @@ -272,6 +323,20 @@ pub(crate) const BLOB_TARGET_SIZE: Config = Config::new( "A target maximum size of persist blob payloads in bytes (Materialize).", ); +pub(crate) const INLINE_UPDATE_THRESHOLD_BYTES: Config = Config::new( + "persist_inline_update_threshold_bytes", + 0, + "The (exclusive) maximum size of a write that persist will inline in metadata.", +); + +pub(crate) const INLINE_UPDATE_MAX_BYTES: Config = Config::new( + "persist_inline_update_max_bytes", + 0, + "\ + The (inclusive) maximum total size of inline writes in metadata before \ + persist will backpressure them by flushing out to s3.", +); + impl BatchBuilderConfig { /// Initialize a batch builder config based on a snapshot of the Persist config. pub fn new(value: &PersistConfig, _writer_id: &WriterId) -> Self { @@ -283,6 +348,7 @@ impl BatchBuilderConfig { batch_builder_max_outstanding_parts: value .dynamic .batch_builder_max_outstanding_parts(), + inline_update_threshold_bytes: INLINE_UPDATE_THRESHOLD_BYTES.get(value), stats_collection_enabled: STATS_COLLECTION_ENABLED.get(value), stats_budget: STATS_BUDGET_BYTES.get(value), stats_untrimmable_columns: Arc::new(untrimmable_columns(value)), @@ -837,107 +903,53 @@ impl BatchParts { since: Antichain, ) { let desc = Description::new(self.lower.clone(), upper, since); - let metrics = Arc::clone(&self.metrics); - let shard_metrics = Arc::clone(&self.shard_metrics); - let blob = Arc::clone(&self.blob); - let isolated_runtime = Arc::clone(&self.isolated_runtime); let batch_metrics = self.batch_metrics.clone(); - let partial_key = PartialBatchKey::new(&self.cfg.writer_key, &PartId::new()); - let key = partial_key.complete(&self.shard_id); let index = u64::cast_from(self.finished_parts.len() + self.writing_parts.len()); - let stats_collection_enabled = self.cfg.stats_collection_enabled; - let stats_budget = self.cfg.stats_budget; - let schemas = schemas.clone(); - let untrimmable_columns = Arc::clone(&self.cfg.stats_untrimmable_columns); - - let write_span = debug_span!("batch::write_part", shard = %self.shard_id).or_current(); - let handle = mz_ore::task::spawn( - || "batch::write_part", - async move { - let goodbytes = updates.goodbytes(); - let batch = BlobTraceBatchPart { - desc, - updates: vec![updates], - index, - }; - - let (stats, (buf, encode_time)) = isolated_runtime - .spawn_named(|| "batch::encode_part", async move { - let stats = if stats_collection_enabled { - let stats_start = Instant::now(); - match part_stats_for_legacy_part(&schemas, &batch.updates) { - Ok(x) => { - let mut trimmed_bytes = 0; - let x = LazyPartStats::encode(&x, |s| { - trimmed_bytes = trim_to_budget(s, stats_budget, |s| { - untrimmable_columns.should_retain(s) - }); - }); - Some((x, stats_start.elapsed(), trimmed_bytes)) - } - Err(err) => { - error!("failed to construct part stats: {}", err); - None - } - } - } else { - None - }; - - let encode_start = Instant::now(); - let mut buf = Vec::new(); - batch.encode(&mut buf); - - // Drop batch as soon as we can to reclaim its memory. - drop(batch); - (stats, (Bytes::from(buf), encode_start.elapsed())) - }) - .instrument(debug_span!("batch::encode_part")) - .await - .expect("part encode task failed"); - // Can't use the `CodecMetrics::encode` helper because of async. - metrics.codecs.batch.encode_count.inc(); - metrics - .codecs - .batch - .encode_seconds - .inc_by(encode_time.as_secs_f64()); - - let start = Instant::now(); - let payload_len = buf.len(); - let () = retry_external(&metrics.retries.external.batch_set, || async { - shard_metrics.blob_sets.inc(); - blob.set(&key, Bytes::clone(&buf)).await - }) - .instrument(trace_span!("batch::set", payload_len)) - .await; - batch_metrics.seconds.inc_by(start.elapsed().as_secs_f64()); - batch_metrics.bytes.inc_by(u64::cast_from(payload_len)); - batch_metrics.goodbytes.inc_by(u64::cast_from(goodbytes)); - let stats = stats.map(|(stats, stats_step_timing, trimmed_bytes)| { - batch_metrics - .step_stats - .inc_by(stats_step_timing.as_secs_f64()); - if trimmed_bytes > 0 { - metrics.pushdown.parts_stats_trimmed_count.inc(); - metrics - .pushdown - .parts_stats_trimmed_bytes - .inc_by(u64::cast_from(trimmed_bytes)); + let ts_rewrite = None; + + let handle = if updates.goodbytes() < self.cfg.inline_update_threshold_bytes { + let span = debug_span!("batch::inline_part", shard = %self.shard_id).or_current(); + mz_ore::task::spawn( + || "batch::inline_part", + async move { + let updates = LazyInlineBatchPart::from(&ProtoInlineBatchPart { + desc: Some(desc.into_proto()), + index: index.into_proto(), + updates: Some(updates.into_proto()), + }); + BatchPart::Inline { + updates, + key_lower, + ts_rewrite, } - stats - }); - - BatchPart::Hollow(HollowBatchPart { - key: partial_key, - encoded_size_bytes: payload_len, + } + .instrument(span), + ) + } else { + let part = BlobTraceBatchPart { + desc, + updates: vec![updates], + index, + }; + let write_span = + debug_span!("batch::write_part", shard = %self.shard_metrics.shard_id).or_current(); + mz_ore::task::spawn( + || "batch::write_part", + BatchParts::write_hollow_part( + self.cfg.clone(), + Arc::clone(&self.blob), + Arc::clone(&self.metrics), + Arc::clone(&self.shard_metrics), + batch_metrics.clone(), + Arc::clone(&self.isolated_runtime), + part, key_lower, - stats, - ts_rewrite: None, - }) - } - .instrument(write_span), - ); + ts_rewrite, + schemas.clone(), + ) + .instrument(write_span), + ) + }; self.writing_parts.push_back(handle); while self.writing_parts.len() > self.cfg.batch_builder_max_outstanding_parts { @@ -954,6 +966,98 @@ impl BatchParts { } } + async fn write_hollow_part( + cfg: BatchBuilderConfig, + blob: Arc, + metrics: Arc, + shard_metrics: Arc, + batch_metrics: BatchWriteMetrics, + isolated_runtime: Arc, + updates: BlobTraceBatchPart, + key_lower: Vec, + ts_rewrite: Option>, + schemas: Schemas, + ) -> BatchPart { + let partial_key = PartialBatchKey::new(&cfg.writer_key, &PartId::new()); + let key = partial_key.complete(&shard_metrics.shard_id); + let goodbytes = updates.updates.iter().map(|x| x.goodbytes()).sum::(); + + let (stats, (buf, encode_time)) = isolated_runtime + .spawn_named(|| "batch::encode_part", async move { + let stats = if cfg.stats_collection_enabled { + let stats_start = Instant::now(); + match part_stats_for_legacy_part(&schemas, &updates.updates) { + Ok(x) => { + let mut trimmed_bytes = 0; + let x = LazyPartStats::encode(&x, |s| { + trimmed_bytes = trim_to_budget(s, cfg.stats_budget, |s| { + cfg.stats_untrimmable_columns.should_retain(s) + }); + }); + Some((x, stats_start.elapsed(), trimmed_bytes)) + } + Err(err) => { + error!("failed to construct part stats: {}", err); + None + } + } + } else { + None + }; + + let encode_start = Instant::now(); + let mut buf = Vec::new(); + updates.encode(&mut buf); + + // Drop batch as soon as we can to reclaim its memory. + drop(updates); + (stats, (Bytes::from(buf), encode_start.elapsed())) + }) + .instrument(debug_span!("batch::encode_part")) + .await + .expect("part encode task failed"); + // Can't use the `CodecMetrics::encode` helper because of async. + metrics.codecs.batch.encode_count.inc(); + metrics + .codecs + .batch + .encode_seconds + .inc_by(encode_time.as_secs_f64()); + + let start = Instant::now(); + let payload_len = buf.len(); + let () = retry_external(&metrics.retries.external.batch_set, || async { + shard_metrics.blob_sets.inc(); + blob.set(&key, Bytes::clone(&buf)).await + }) + .instrument(trace_span!("batch::set", payload_len)) + .await; + batch_metrics.seconds.inc_by(start.elapsed().as_secs_f64()); + batch_metrics.bytes.inc_by(u64::cast_from(payload_len)); + batch_metrics.goodbytes.inc_by(u64::cast_from(goodbytes)); + let stats = stats.map(|(stats, stats_step_timing, trimmed_bytes)| { + batch_metrics + .step_stats + .inc_by(stats_step_timing.as_secs_f64()); + if trimmed_bytes > 0 { + metrics.pushdown.parts_stats_trimmed_count.inc(); + metrics + .pushdown + .parts_stats_trimmed_bytes + .inc_by(u64::cast_from(trimmed_bytes)); + } + stats + }); + + BatchPart::Hollow(HollowBatchPart { + key: partial_key, + encoded_size_bytes: payload_len, + key_lower, + stats, + ts_rewrite, + }) + } + #[instrument(level = "debug", name = "batch::finish_upload", fields(shard = %self.shard_id))] pub(crate) async fn finish(self) -> Vec> { let mut parts = self.finished_parts; @@ -1016,11 +1120,15 @@ pub(crate) fn validate_truncate_batch( pub(crate) struct PartDeletes(BTreeSet); impl PartDeletes { - // Adds the part to the set to be deleted and returns true if it was already - // present. + // Adds the part to the set to be deleted and returns true if it was newly + // inserted. pub fn add(&mut self, part: &BatchPart) -> bool { match part { BatchPart::Hollow(x) => self.0.insert(x.key.clone()), + BatchPart::Inline { .. } => { + // Nothing to delete. + true + } } } @@ -1176,6 +1284,7 @@ mod tests { for part in &batch.batch.parts { let part = match part { BatchPart::Hollow(x) => x, + BatchPart::Inline { .. } => panic!("batch unexpectedly used inline part"), }; match BlobKey::parse_ids(&part.key.complete(&shard_id)) { Ok((shard, PartialBlobKey::Batch(writer, _))) => { @@ -1226,6 +1335,7 @@ mod tests { for part in &batch.batch.parts { let part = match part { BatchPart::Hollow(x) => x, + BatchPart::Inline { .. } => panic!("batch unexpectedly used inline part"), }; match BlobKey::parse_ids(&part.key.complete(&shard_id)) { Ok((shard, PartialBlobKey::Batch(writer, _))) => { diff --git a/src/persist-client/src/cfg.rs b/src/persist-client/src/cfg.rs index 0cd3aa6301f8b..d9918f2a1903e 100644 --- a/src/persist-client/src/cfg.rs +++ b/src/persist-client/src/cfg.rs @@ -312,6 +312,8 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet { mz_persist::cfg::all_dyn_configs(configs) .add(&crate::batch::BATCH_DELETE_ENABLED) .add(&crate::batch::BLOB_TARGET_SIZE) + .add(&crate::batch::INLINE_UPDATE_MAX_BYTES) + .add(&crate::batch::INLINE_UPDATE_THRESHOLD_BYTES) .add(&crate::cfg::CONSENSUS_CONNECTION_POOL_TTL_STAGGER) .add(&crate::cfg::CONSENSUS_CONNECTION_POOL_TTL) .add(&crate::cfg::CRDB_CONNECT_TIMEOUT) diff --git a/src/persist-client/src/cli/inspect.rs b/src/persist-client/src/cli/inspect.rs index 551740ae86028..01d231208b0b1 100644 --- a/src/persist-client/src/cli/inspect.rs +++ b/src/persist-client/src/cli/inspect.rs @@ -39,7 +39,7 @@ use crate::internal::encoding::{Rollup, UntypedState}; use crate::internal::paths::{ BlobKey, BlobKeyPrefix, PartialBatchKey, PartialBlobKey, PartialRollupKey, WriterKey, }; -use crate::internal::state::{BatchPart, HollowBatchPart, ProtoRollup, ProtoStateDiff, State}; +use crate::internal::state::{BatchPart, ProtoRollup, ProtoStateDiff, State}; use crate::rpc::NoopPubSubSender; use crate::usage::{HumanBytes, StorageUsageClient}; use crate::{Metrics, PersistClient, PersistConfig, ShardId}; @@ -347,17 +347,11 @@ pub async fn blob_batch_part( let parsed = BlobTraceBatchPart::::decode(&buf, &metrics.columnar).expect("decodable"); let desc = parsed.desc.clone(); - let part = HollowBatchPart { - key, - encoded_size_bytes: 0, - key_lower: vec![], - stats: None, - ts_rewrite: None, - }; let encoded_part = EncodedPart::new( metrics.read.snapshot.clone(), parsed.desc.clone(), - &part, + &key, + None, parsed, ); let mut out = BatchPartOutput { @@ -605,6 +599,7 @@ pub async fn unreferenced_blobs(args: &StateArgs) -> Result known_parts.insert(x.key.clone()), + BatchPart::Inline { .. } => continue, }; } } diff --git a/src/persist-client/src/fetch.rs b/src/persist-client/src/fetch.rs index 6948c86fd2528..94fefddbdefc8 100644 --- a/src/persist-client/src/fetch.rs +++ b/src/persist-client/src/fetch.rs @@ -36,7 +36,7 @@ use crate::batch::{ ProtoLeasedBatchPart, }; use crate::error::InvalidUsage; -use crate::internal::encoding::{LazyPartStats, LazyProto, Schemas}; +use crate::internal::encoding::{LazyInlineBatchPart, LazyPartStats, LazyProto, Schemas}; use crate::internal::machine::retry_external; use crate::internal::metrics::{Metrics, ReadMetrics, ShardMetrics}; use crate::internal::paths::BlobKey; @@ -120,6 +120,15 @@ where part: x.clone(), } } + BatchPart::Inline { + updates, + key_lower: _, + ts_rewrite, + } => FetchedBlobBuf::Inline { + desc: part.desc.clone(), + updates: updates.clone(), + ts_rewrite: ts_rewrite.clone(), + }, }; let fetched_blob = FetchedBlob { metrics: Arc::clone(&self.metrics), @@ -324,7 +333,7 @@ where read_metrics.part_goodbytes.inc_by(u64::cast_from( parsed.updates.iter().map(|x| x.goodbytes()).sum::(), )); - EncodedPart::new(read_metrics.clone(), registered_desc, part, parsed) + EncodedPart::from_hollow(read_metrics.clone(), registered_desc, part, parsed) }) } @@ -475,6 +484,11 @@ enum FetchedBlobBuf { buf: SegmentedBytes, part: HollowBatchPart, }, + Inline { + desc: Description, + updates: LazyInlineBatchPart, + ts_rewrite: Option>, + }, } impl Clone for FetchedBlob { @@ -506,6 +520,19 @@ impl FetchedBlob { + let parsed = EncodedPart::from_inline( + self.read_metrics.clone(), + desc.clone(), + updates, + ts_rewrite.as_ref(), + ); + (parsed, None) + } }; FetchedPart::new( Arc::clone(&self.metrics), @@ -711,14 +738,50 @@ where ) .await } + BatchPart::Inline { + updates, + key_lower: _, + ts_rewrite, + } => Ok(EncodedPart::from_inline( + read_metrics.clone(), + registered_desc.clone(), + updates, + ts_rewrite.as_ref(), + )), } } - pub(crate) fn new( + pub(crate) fn from_inline( + metrics: ReadMetrics, + desc: Description, + x: &LazyInlineBatchPart, + ts_rewrite: Option<&Antichain>, + ) -> Self { + let parsed = x.decode().expect("valid inline part"); + Self::new(metrics, desc, "inline", ts_rewrite, parsed) + } + + pub(crate) fn from_hollow( metrics: ReadMetrics, registered_desc: Description, part: &HollowBatchPart, parsed: BlobTraceBatchPart, + ) -> Self { + Self::new( + metrics, + registered_desc, + &part.key, + part.ts_rewrite.as_ref(), + parsed, + ) + } + + pub(crate) fn new( + metrics: ReadMetrics, + registered_desc: Description, + key: &str, + ts_rewrite: Option<&Antichain>, + parsed: BlobTraceBatchPart, ) -> Self { // There are two types of batches in persist: // - Batches written by a persist user (either directly or indirectly @@ -738,11 +801,11 @@ where assert!( PartialOrder::less_equal(inline_desc.lower(), registered_desc.lower()), "key={} inline={:?} registered={:?}", - part.key, + key, inline_desc, registered_desc ); - if part.ts_rewrite.is_none() { + if ts_rewrite.is_none() { // The ts rewrite feature allows us to advance the registered // upper of a batch that's already been staged (the inline // upper), so if it's been used, then there's no useful @@ -750,7 +813,7 @@ where assert!( PartialOrder::less_equal(registered_desc.upper(), inline_desc.upper()), "key={} inline={:?} registered={:?}", - part.key, + key, inline_desc, registered_desc ); @@ -763,7 +826,7 @@ where inline_desc.since(), &Antichain::from_elem(T::minimum()), "key={} inline={:?} registered={:?}", - part.key, + key, inline_desc, registered_desc ); @@ -771,7 +834,7 @@ where assert_eq!( inline_desc, ®istered_desc, "key={} inline={:?} registered={:?}", - part.key, inline_desc, registered_desc + key, inline_desc, registered_desc ); } @@ -780,7 +843,7 @@ where registered_desc, part: Arc::new(parsed), needs_truncation, - ts_rewrite: part.ts_rewrite.clone(), + ts_rewrite: ts_rewrite.cloned(), } } diff --git a/src/persist-client/src/internal/compact.rs b/src/persist-client/src/internal/compact.rs index 9a48c224c3a9a..ecfa2f0632ad5 100644 --- a/src/persist-client/src/internal/compact.rs +++ b/src/persist-client/src/internal/compact.rs @@ -28,7 +28,7 @@ use timely::progress::{Antichain, Timestamp}; use timely::PartialOrder; use tokio::sync::mpsc::Sender; use tokio::sync::{mpsc, oneshot, TryAcquireError}; -use tracing::{debug, debug_span, trace, warn, Instrument, Span}; +use tracing::{debug, debug_span, error, trace, warn, Instrument, Span}; use crate::async_runtime::IsolatedRuntime; use crate::batch::{BatchBuilderConfig, BatchBuilderInternal, PartDeletes}; @@ -79,12 +79,18 @@ pub struct CompactConfig { impl CompactConfig { /// Initialize the compaction config from Persist configuration. pub fn new(value: &PersistConfig, writer_id: &WriterId) -> Self { - CompactConfig { + let mut ret = CompactConfig { compaction_memory_bound_bytes: value.dynamic.compaction_memory_bound_bytes(), compaction_yield_after_n_updates: value.compaction_yield_after_n_updates, version: value.build_version.clone(), batch: BatchBuilderConfig::new(value, writer_id), - } + }; + // Use compaction as a method of getting inline writes out of state, to + // make room for more inline writes. We could instead do this at the end + // of compaction by flushing out the batch, but doing it here based on + // the config allows BatchBuilder to do its normal pipelining of writes. + ret.batch.inline_update_threshold_bytes = 0; + ret } } @@ -685,7 +691,7 @@ where metrics.compaction.batch.clone(), desc.lower().clone(), Arc::clone(&blob), - isolated_runtime, + Arc::clone(&isolated_runtime), shard_id.clone(), cfg.version.clone(), desc.since().clone(), @@ -736,12 +742,31 @@ where } tokio::task::yield_now().await; } - let batch = batch.finish(&real_schemas, desc.upper().clone()).await?; - let hollow_batch = batch.into_hollow_batch(); + let mut batch = batch.finish(&real_schemas, desc.upper().clone()).await?; + + // We use compaction as a method of getting inline writes out of state, + // to make room for more inline writes. This happens in + // `CompactConfig::new` by overriding the inline writes threshold + // config. This is a bit action-at-a-distance, so defensively detect if + // this breaks here and log and correct it if so. + let has_inline_parts = batch.batch.parts.iter().any(|x| match x { + BatchPart::Hollow(_) => false, + BatchPart::Inline { .. } => true, + }); + if has_inline_parts { + error!(%shard_id, ?cfg, "compaction result unexpectedly had inline writes"); + let () = batch + .flush_to_blob( + &cfg.batch, + &metrics.compaction.batch, + &isolated_runtime, + &real_schemas, + ) + .await; + } timings.record(&metrics); - - Ok(hollow_batch) + Ok(batch.into_hollow_batch()) } fn validate_req(req: &CompactReq) -> Result<(), anyhow::Error> { @@ -873,6 +898,7 @@ mod tests { assert_eq!(res.output.parts.len(), 1); let part = match &res.output.parts[0] { BatchPart::Hollow(x) => x, + BatchPart::Inline { .. } => panic!("test outputs a hollow part"), }; let (part, updates) = expect_fetch_part( write.blob.as_ref(), @@ -958,6 +984,7 @@ mod tests { assert_eq!(res.output.parts.len(), 1); let part = match &res.output.parts[0] { BatchPart::Hollow(x) => x, + BatchPart::Inline { .. } => panic!("test outputs a hollow part"), }; let (part, updates) = expect_fetch_part( write.blob.as_ref(), diff --git a/src/persist-client/src/internal/encoding.rs b/src/persist-client/src/internal/encoding.rs index b5538fe41a1a8..6ae01748257e2 100644 --- a/src/persist-client/src/internal/encoding.rs +++ b/src/persist-client/src/internal/encoding.rs @@ -10,6 +10,7 @@ use std::cmp::Ordering; use std::collections::BTreeMap; use std::fmt::Debug; +use std::hash::{Hash, Hasher}; use std::marker::PhantomData; use std::sync::Arc; @@ -17,6 +18,7 @@ use bytes::{Buf, Bytes}; use differential_dataflow::lattice::Lattice; use differential_dataflow::trace::Description; use mz_ore::halt; +use mz_persist::indexed::encoding::BlobTraceBatchPart; use mz_persist::location::{SeqNo, VersionedData}; use mz_persist_types::stats::{PartStats, ProtoStructStats}; use mz_persist_types::{Codec, Codec64}; @@ -25,6 +27,7 @@ use proptest::prelude::Arbitrary; use proptest::strategy::Strategy; use prost::Message; use semver::Version; +use serde::ser::SerializeStruct; use serde::{Deserialize, Serialize}; use timely::progress::{Antichain, Timestamp}; use uuid::Uuid; @@ -38,10 +41,11 @@ use crate::internal::state::{ HollowBatchPart, HollowRollup, IdempotencyToken, LeasedReaderState, OpaqueState, ProtoCriticalReaderState, ProtoFuelingMerge, ProtoHandleDebugState, ProtoHollowBatch, ProtoHollowBatchPart, ProtoHollowRollup, ProtoIdFuelingMerge, ProtoIdHollowBatch, - ProtoIdSpineBatch, ProtoInlinedDiffs, ProtoLeasedReaderState, ProtoRollup, ProtoSpineBatch, - ProtoSpineId, ProtoStateDiff, ProtoStateField, ProtoStateFieldDiffType, ProtoStateFieldDiffs, - ProtoTrace, ProtoU64Antichain, ProtoU64Description, ProtoVersionedData, ProtoWriterState, - State, StateCollections, TypedState, WriterState, + ProtoIdSpineBatch, ProtoInlineBatchPart, ProtoInlinedDiffs, ProtoLeasedReaderState, + ProtoRollup, ProtoSpineBatch, ProtoSpineId, ProtoStateDiff, ProtoStateField, + ProtoStateFieldDiffType, ProtoStateFieldDiffs, ProtoTrace, ProtoU64Antichain, + ProtoU64Description, ProtoVersionedData, ProtoWriterState, State, StateCollections, TypedState, + WriterState, }; use crate::internal::state_diff::{ ProtoStateFieldDiff, ProtoStateFieldDiffsWriter, StateDiff, StateFieldDiff, StateFieldValDiff, @@ -133,6 +137,13 @@ impl Ord for LazyProto { } } +impl Hash for LazyProto { + fn hash(&self, state: &mut H) { + let LazyProto { buf, _phantom } = self; + buf.hash(state); + } +} + impl From<&T> for LazyProto { fn from(value: &T) -> Self { let buf = Bytes::from(value.encode_to_vec()); @@ -1240,6 +1251,17 @@ impl RustType for BatchPart { key_stats: x.stats.into_proto(), ts_rewrite: x.ts_rewrite.as_ref().map(|x| x.into_proto()), }, + BatchPart::Inline { + updates, + key_lower, + ts_rewrite, + } => ProtoHollowBatchPart { + kind: Some(proto_hollow_batch_part::Kind::Inline(updates.into_proto())), + encoded_size_bytes: 0, + key_lower: Bytes::copy_from_slice(key_lower), + key_stats: None, + ts_rewrite: ts_rewrite.as_ref().map(|x| x.into_proto()), + }, } } @@ -1248,17 +1270,27 @@ impl RustType for BatchPart { Some(ts_rewrite) => Some(ts_rewrite.into_rust()?), None => None, }; - let encoded_size_bytes = proto.encoded_size_bytes.into_rust()?; match proto.kind { Some(proto_hollow_batch_part::Kind::Key(key)) => { Ok(BatchPart::Hollow(HollowBatchPart { key: key.into_rust()?, - encoded_size_bytes, + encoded_size_bytes: proto.encoded_size_bytes.into_rust()?, key_lower: proto.key_lower.into(), stats: proto.key_stats.into_rust()?, ts_rewrite, })) } + Some(proto_hollow_batch_part::Kind::Inline(x)) => { + assert_eq!(proto.encoded_size_bytes, 0); + assert!(proto.key_stats.is_none()); + let key_lower = proto.key_lower.into(); + let updates = LazyInlineBatchPart(x.into_rust()?); + Ok(BatchPart::Inline { + updates, + key_lower, + ts_rewrite, + }) + } None => Err(TryFromProtoError::unknown_enum_variant( "ProtoHollowBatchPart::kind", )), @@ -1327,6 +1359,81 @@ impl Arbitrary for LazyPartStats { } } +impl RustType for BlobTraceBatchPart { + fn into_proto(&self) -> ProtoInlineBatchPart { + // BlobTraceBatchPart has a Vec. Inline writes only + // needs one and it's nice to only have to model one at the + // ProtoInlineBatchPart level. I'm _pretty_ sure that the actual + // BlobTraceBatchPart we've serialized into parquet always have exactly + // one (_maaaaaybe_ zero or one), but that's a scary thing to start + // enforcing, so separate it out (so we can e.g. use sentry errors! to + // confirm before rolling anything out). In the meantime, just construct + // the ProtoInlineBatchPart directly in BatchParts where it still knows + // that it has exactly one ColumnarRecords. + unreachable!("unused") + } + + fn from_proto(proto: ProtoInlineBatchPart) -> Result { + Ok(BlobTraceBatchPart { + desc: proto.desc.into_rust_if_some("ProtoInlineBatchPart::desc")?, + index: proto.index.into_rust()?, + updates: vec![proto + .updates + .into_rust_if_some("ProtoInlineBatchPart::updates")?], + }) + } +} + +/// A batch part stored inlined in State. +#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct LazyInlineBatchPart(LazyProto); + +impl From<&ProtoInlineBatchPart> for LazyInlineBatchPart { + fn from(value: &ProtoInlineBatchPart) -> Self { + LazyInlineBatchPart(value.into()) + } +} + +impl Serialize for LazyInlineBatchPart { + fn serialize(&self, s: S) -> Result { + // NB: This serialize impl is only used for QA and debugging, so emit a + // truncated version. + let proto = self.0.decode().expect("valid proto"); + let mut s = s.serialize_struct("InlineBatchPart", 3)?; + let () = s.serialize_field("desc", &proto.desc)?; + let () = s.serialize_field("index", &proto.index)?; + let () = s.serialize_field("updates[len]", &proto.updates.map_or(0, |x| x.len))?; + s.end() + } +} + +impl LazyInlineBatchPart { + pub(crate) fn encoded_size_bytes(&self) -> usize { + self.0.buf.len() + } + + /// Decodes and returns PartStats from the encoded representation. + /// + /// This does not cache the returned value, it decodes each time it's + /// called. + pub fn decode( + &self, + ) -> Result, TryFromProtoError> { + let proto = self.0.decode().expect("valid proto"); + proto.into_rust() + } +} + +impl RustType for LazyInlineBatchPart { + fn into_proto(&self) -> Bytes { + self.0.into_proto() + } + + fn from_proto(proto: Bytes) -> Result { + Ok(LazyInlineBatchPart(proto.into_rust()?)) + } +} + impl RustType for HollowRollup { fn into_proto(&self) -> ProtoHollowRollup { ProtoHollowRollup { @@ -1392,7 +1499,7 @@ mod tests { use crate::internal::paths::PartialRollupKey; use crate::internal::state::tests::any_state; - use crate::internal::state::HandleDebugState; + use crate::internal::state::{BatchPart, HandleDebugState}; use crate::internal::state_diff::StateDiff; use crate::tests::new_test_client_cache; use crate::ShardId; diff --git a/src/persist-client/src/internal/gc.rs b/src/persist-client/src/internal/gc.rs index d6b24bb624e31..dd62371ff6f7c 100644 --- a/src/persist-client/src/internal/gc.rs +++ b/src/persist-client/src/internal/gc.rs @@ -471,6 +471,7 @@ where BatchPart::Hollow(x) => { assert_eq!(batch_parts_to_delete.get(&x.key), None) } + BatchPart::Inline { .. } => {} } } } diff --git a/src/persist-client/src/internal/machine.rs b/src/persist-client/src/internal/machine.rs index 9c0d1cc7229b4..73d9843966a1c 100644 --- a/src/persist-client/src/internal/machine.rs +++ b/src/persist-client/src/internal/machine.rs @@ -32,6 +32,7 @@ use timely::PartialOrder; use tracing::{debug, info, trace_span, warn, Instrument}; use crate::async_runtime::IsolatedRuntime; +use crate::batch::INLINE_UPDATE_MAX_BYTES; use crate::cache::StateCache; use crate::cfg::RetryParameters; use crate::critical::CriticalReaderId; @@ -240,6 +241,9 @@ where CompareAndAppendRes::InvalidUsage(x) => { return CompareAndAppendRes::InvalidUsage(x) } + CompareAndAppendRes::InlineBackpressure => { + return CompareAndAppendRes::InlineBackpressure + } CompareAndAppendRes::UpperMismatch(seqno, _current_upper) => { // If the state machine thinks that the shard upper is not // far enough along, it could be because the caller of this @@ -376,7 +380,7 @@ where loop { let cmd_res = self .applier - .apply_unbatched_cmd(&metrics.cmds.compare_and_append, |_, _, state| { + .apply_unbatched_cmd(&metrics.cmds.compare_and_append, |_, cfg, state| { writer_was_present = state.writers.contains_key(writer_id); state.compare_and_append( batch, @@ -385,6 +389,7 @@ where lease_duration_ms, idempotency_token, debug_info, + INLINE_UPDATE_MAX_BYTES.get(cfg), ) }) .await; @@ -451,6 +456,11 @@ where assert!(indeterminate.is_none()); return CompareAndAppendRes::InvalidUsage(err); } + Err(CompareAndAppendBreak::InlineBackpressure) => { + // We tried to write an inline part, but there was already + // too much in state. Flush it out to s3 and try again. + return CompareAndAppendRes::InlineBackpressure; + } Err(CompareAndAppendBreak::Upper { shard_upper, writer_upper, @@ -1008,6 +1018,7 @@ pub(crate) enum CompareAndAppendRes { Success(SeqNo, WriterMaintenance), InvalidUsage(InvalidUsage), UpperMismatch(SeqNo, Antichain), + InlineBackpressure, } #[cfg(test)] @@ -1583,7 +1594,7 @@ pub mod datadriven { val: Arc::new(UnitSchema), }; let builder = BatchBuilderInternal::new( - cfg, + cfg.clone(), Arc::clone(&datadriven.client.metrics), Arc::clone(&datadriven.machine.applier.shard_metrics), schemas.clone(), @@ -1599,18 +1610,32 @@ pub mod datadriven { ); let mut builder = BatchBuilder { builder, - stats_schemas: schemas, + stats_schemas: schemas.clone(), }; for ((k, ()), t, d) in updates { builder.add(&k, &(), &t, &d).await.expect("invalid batch"); } - let batch = builder.finish(upper).await?.into_hollow_batch(); + let mut batch = builder.finish(upper).await?; + // We can only reasonably use parts_size_override with hollow batches, + // so if it's set, flush any inline batches out. + if parts_size_override.is_some() { + batch + .flush_to_blob( + &cfg, + &datadriven.client.metrics.user, + &datadriven.client.isolated_runtime, + &schemas, + ) + .await; + } + let batch = batch.into_hollow_batch(); if let Some(size) = parts_size_override { let mut batch = batch.clone(); for part in batch.parts.iter_mut() { match part { - BatchPart::Hollow(x) => x.encoded_size_bytes = size, + BatchPart::Hollow(part) => part.encoded_size_bytes = size, + BatchPart::Inline { .. } => unreachable!("flushed out above"), } } datadriven.batches.insert(output.to_owned(), batch); @@ -1651,6 +1676,7 @@ pub mod datadriven { } }; } + BatchPart::Inline { .. } => {} }; let part = EncodedPart::fetch( &datadriven.shard_id, @@ -1717,7 +1743,10 @@ pub mod datadriven { let batch = datadriven.batches.get_mut(input).expect("unknown batch"); for part in batch.parts.iter_mut() { match part { - BatchPart::Hollow(part) => part.encoded_size_bytes = size, + BatchPart::Hollow(x) => x.encoded_size_bytes = size, + BatchPart::Inline { .. } => { + panic!("set_batch_parts_size only supports hollow parts") + } } } Ok("ok\n".to_string()) diff --git a/src/persist-client/src/internal/metrics.rs b/src/persist-client/src/internal/metrics.rs index f5a08500451e0..4bf1494557ba6 100644 --- a/src/persist-client/src/internal/metrics.rs +++ b/src/persist-client/src/internal/metrics.rs @@ -96,6 +96,8 @@ pub struct Metrics { pub tasks: TasksMetrics, /// Metrics for columnar data encoding and decoding. pub columnar: ColumnarMetrics, + /// Metrics for inline writes. + pub inline: InlineMetrics, /// Metrics for the persist sink. pub sink: SinkMetrics, @@ -153,6 +155,7 @@ impl Metrics { blob_cache_mem: BlobMemCache::new(registry), tasks: TasksMetrics::new(registry), columnar, + inline: InlineMetrics::new(registry), sink: SinkMetrics::new(registry), s3_blob, postgres_consensus: PostgresClientMetrics::new(registry, "mz_persist"), @@ -1209,7 +1212,9 @@ pub struct ShardsMetrics { backpressure_emitted_bytes: IntCounterVec, backpressure_last_backpressured_bytes: UIntGaugeVec, backpressure_retired_bytes: IntCounterVec, - rewrite_part_count: mz_ore::metrics::UIntGaugeVec, + rewrite_part_count: UIntGaugeVec, + inline_part_count: UIntGaugeVec, + inline_part_bytes: UIntGaugeVec, // We hand out `Arc` to read and write handles, but store it // here as `Weak`. This allows us to discover if it's no longer in use and // so we can remove it from the map. @@ -1399,6 +1404,16 @@ impl ShardsMetrics { help: "count of batch parts with rewrites by shard", var_labels: ["shard", "name"], )), + inline_part_count: registry.register(metric!( + name: "mz_persist_shard_inline_part_count", + help: "count of parts inline in shard metadata", + var_labels: ["shard", "name"], + )), + inline_part_bytes: registry.register(metric!( + name: "mz_persist_shard_inline_part_bytes", + help: "total size of parts inline in shard metadata", + var_labels: ["shard", "name"], + )), shards, } } @@ -1477,6 +1492,8 @@ pub struct ShardMetrics { Arc>>, pub backpressure_retired_bytes: Arc>>, pub rewrite_part_count: DeleteOnDropGauge<'static, AtomicU64, Vec>, + pub inline_part_count: DeleteOnDropGauge<'static, AtomicU64, Vec>, + pub inline_part_bytes: DeleteOnDropGauge<'static, AtomicU64, Vec>, } impl ShardMetrics { @@ -1588,6 +1605,12 @@ impl ShardMetrics { ), rewrite_part_count: shards_metrics .rewrite_part_count + .get_delete_on_drop_gauge(vec![shard.clone(), name.to_string()]), + inline_part_count: shards_metrics + .inline_part_count + .get_delete_on_drop_gauge(vec![shard.clone(), name.to_string()]), + inline_part_bytes: shards_metrics + .inline_part_bytes .get_delete_on_drop_gauge(vec![shard, name.to_string()]), } } @@ -2763,6 +2786,19 @@ impl TasksMetrics { } } +#[derive(Debug)] +pub struct InlineMetrics { + pub(crate) backpressure: BatchWriteMetrics, +} + +impl InlineMetrics { + fn new(registry: &MetricsRegistry) -> Self { + InlineMetrics { + backpressure: BatchWriteMetrics::new(registry, "inline_backpressure"), + } + } +} + fn blob_key_shard_id(key: &str) -> Option { let (shard_id, _) = BlobKey::parse_ids(key).ok()?; Some(shard_id.to_string()) diff --git a/src/persist-client/src/internal/restore.rs b/src/persist-client/src/internal/restore.rs index bbe6c694f0c5d..0b58ec37f870f 100644 --- a/src/persist-client/src/internal/restore.rs +++ b/src/persist-client/src/internal/restore.rs @@ -88,6 +88,7 @@ pub(crate) async fn restore_blob( for part in &batch.parts { let key = match part { BatchPart::Hollow(x) => x.key.complete(&shard_id), + BatchPart::Inline { .. } => continue, }; check_restored(&key, blob.restore(&key).await); } @@ -98,6 +99,7 @@ pub(crate) async fn restore_blob( for part in &after.parts { let key = match part { BatchPart::Hollow(x) => x.key.complete(&shard_id), + BatchPart::Inline { .. } => continue, }; check_restored(&key, blob.restore(&key).await); } diff --git a/src/persist-client/src/internal/state.proto b/src/persist-client/src/internal/state.proto index 4f32742f48d72..d2c7b2a5c3b8c 100644 --- a/src/persist-client/src/internal/state.proto +++ b/src/persist-client/src/internal/state.proto @@ -13,6 +13,8 @@ syntax = "proto3"; package mz_persist_client.internal.state; +import "persist/src/persist.proto"; + message ProtoU64Antichain { repeated int64 elements = 1; } @@ -26,16 +28,24 @@ message ProtoU64Description { message ProtoHollowBatchPart { oneof kind { string key = 1; + bytes inline = 5; } - uint64 encoded_size_bytes = 2; - bytes key_lower = 3; ProtoU64Antichain ts_rewrite = 4; + // Only set when Kind is Inline + uint64 encoded_size_bytes = 2; optional bytes key_stats = 536870906; + reserved 536870907 to 536870911; } +message ProtoInlineBatchPart { + ProtoU64Description desc = 1; + uint64 index = 2; + mz_persist.gen.persist.ProtoColumnarRecords updates = 3; +} + message ProtoHollowBatch { ProtoU64Description desc = 1; repeated ProtoHollowBatchPart parts = 4; diff --git a/src/persist-client/src/internal/state.rs b/src/persist-client/src/internal/state.rs index de2684bd6bf80..e2286bce9791f 100644 --- a/src/persist-client/src/internal/state.rs +++ b/src/persist-client/src/internal/state.rs @@ -35,7 +35,7 @@ use uuid::Uuid; use crate::critical::CriticalReaderId; use crate::error::InvalidUsage; -use crate::internal::encoding::{parse_id, LazyPartStats}; +use crate::internal::encoding::{parse_id, LazyInlineBatchPart, LazyPartStats}; use crate::internal::gc::GcReq; use crate::internal::paths::{PartialBatchKey, PartialRollupKey}; use crate::internal::trace::{ApplyMergeResult, FueledMergeReq, FueledMergeRes, Trace}; @@ -176,12 +176,18 @@ pub struct HandleDebugState { #[serde(tag = "type")] pub enum BatchPart { Hollow(HollowBatchPart), + Inline { + updates: LazyInlineBatchPart, + key_lower: Vec, + ts_rewrite: Option>, + }, } impl BatchPart { pub fn encoded_size_bytes(&self) -> usize { match self { BatchPart::Hollow(x) => x.encoded_size_bytes, + BatchPart::Inline { updates, .. } => updates.encoded_size_bytes(), } } @@ -190,24 +196,28 @@ impl BatchPart { pub fn printable_name(&self) -> &str { match self { BatchPart::Hollow(x) => x.key.0.as_str(), + BatchPart::Inline { .. } => "", } } pub fn stats(&self) -> Option<&LazyPartStats> { match self { BatchPart::Hollow(x) => x.stats.as_ref(), + BatchPart::Inline { .. } => None, } } pub fn key_lower(&self) -> &[u8] { match self { BatchPart::Hollow(x) => x.key_lower.as_slice(), + BatchPart::Inline { key_lower, .. } => key_lower.as_slice(), } } pub fn ts_rewrite(&self) -> Option<&Antichain> { match self { BatchPart::Hollow(x) => x.ts_rewrite.as_ref(), + BatchPart::Inline { ts_rewrite, .. } => ts_rewrite.as_ref(), } } } @@ -222,6 +232,29 @@ impl Ord for BatchPart { fn cmp(&self, other: &Self) -> Ordering { match (self, other) { (BatchPart::Hollow(s), BatchPart::Hollow(o)) => s.cmp(o), + ( + BatchPart::Inline { + updates: s_updates, + key_lower: s_key_lower, + ts_rewrite: s_ts_rewrite, + }, + BatchPart::Inline { + updates: o_updates, + key_lower: o_key_lower, + ts_rewrite: o_ts_rewrite, + }, + ) => ( + s_updates, + s_key_lower, + s_ts_rewrite.as_ref().map(|x| x.elements()), + ) + .cmp(&( + o_updates, + o_key_lower, + o_ts_rewrite.as_ref().map(|x| x.elements()), + )), + (BatchPart::Hollow(_), BatchPart::Inline { .. }) => Ordering::Less, + (BatchPart::Inline { .. }, BatchPart::Hollow(_)) => Ordering::Greater, } } } @@ -363,6 +396,16 @@ impl HollowBatch { emitted_implicit: false, } } + + pub(crate) fn inline_bytes(&self) -> usize { + self.parts + .iter() + .map(|x| match x { + BatchPart::Inline { updates, .. } => updates.encoded_size_bytes(), + BatchPart::Hollow(_) => 0, + }) + .sum() + } } pub(crate) struct HollowBatchRunIter<'a, T> { @@ -456,6 +499,7 @@ impl HollowBatch { for part in &mut self.parts { match part { BatchPart::Hollow(part) => part.ts_rewrite = Some(frontier.clone()), + BatchPart::Inline { ts_rewrite, .. } => *ts_rewrite = Some(frontier.clone()), } } Ok(()) @@ -558,6 +602,7 @@ pub enum CompareAndAppendBreak { writer_upper: Antichain, }, InvalidUsage(InvalidUsage), + InlineBackpressure, } #[derive(Debug)] @@ -699,6 +744,7 @@ where lease_duration_ms: u64, idempotency_token: &IdempotencyToken, debug_info: &HandleDebugState, + inline_update_max_bytes: usize, ) -> ControlFlow, Vec>> { // We expire all writers if the upper and since both advance to the // empty antichain. Gracefully handle this. At the same time, @@ -775,6 +821,16 @@ where }); } + let new_inline_bytes = batch.inline_bytes(); + if new_inline_bytes > 0 { + let mut existing_inline_bytes = 0; + self.trace + .map_batches(|x| existing_inline_bytes += x.inline_bytes()); + if existing_inline_bytes + new_inline_bytes >= inline_update_max_bytes { + return Break(CompareAndAppendBreak::InlineBackpressure); + } + } + let merge_reqs = if batch.desc.upper() != batch.desc.lower() { self.trace.push_batch(batch.clone()) } else { @@ -1391,6 +1447,13 @@ where if x.ts_rewrite().is_some() { ret.rewrite_part_count += 1; } + match x { + BatchPart::Hollow(_) => {} + BatchPart::Inline { updates, .. } => { + ret.inline_part_count += 1; + ret.inline_part_bytes += updates.encoded_size_bytes(); + } + } } ret.largest_batch_bytes = std::cmp::max(ret.largest_batch_bytes, batch_size); ret.state_batches_bytes += batch_size; @@ -1655,6 +1718,8 @@ pub struct StateSizeMetrics { pub state_batches_bytes: usize, pub state_rollups_bytes: usize, pub state_rollup_count: usize, + pub inline_part_count: usize, + pub inline_part_bytes: usize, } #[derive(Default)] @@ -1675,9 +1740,11 @@ pub struct Upper(pub Antichain); pub(crate) mod tests { use std::ops::Range; + use bytes::Bytes; use mz_build_info::DUMMY_BUILD_INFO; use mz_dyncfg::ConfigUpdates; use mz_ore::now::SYSTEM_TIME; + use mz_proto::RustType; use proptest::prelude::*; use proptest::strategy::ValueTree; @@ -1705,7 +1772,7 @@ pub(crate) mod tests { any::(), any::(), any::(), - proptest::collection::vec(any_hollow_batch_part::(), 0..3), + proptest::collection::vec(any_batch_part::(), 0..3), any::(), any::(), ), @@ -1716,7 +1783,6 @@ pub(crate) mod tests { (Antichain::from_elem(t1), Antichain::from_elem(t0)) }; let since = Antichain::from_elem(since); - let parts = parts.into_iter().map(BatchPart::Hollow).collect::>(); let runs = if runs { vec![parts.len()] } else { vec![] }; HollowBatch { desc: Description::new(lower, upper, since), @@ -1728,6 +1794,30 @@ pub(crate) mod tests { ) } + pub fn any_batch_part() -> impl Strategy> { + Strategy::prop_map( + ( + any::(), + any_hollow_batch_part(), + proptest::collection::vec(any::(), 0..3), + any::>(), + ), + |(is_hollow, hollow, key_lower, ts_rewrite)| { + if is_hollow { + BatchPart::Hollow(hollow) + } else { + let updates = LazyInlineBatchPart::from_proto(Bytes::new()).unwrap(); + let ts_rewrite = ts_rewrite.map(Antichain::from_elem); + BatchPart::Inline { + updates, + key_lower, + ts_rewrite, + } + } + }, + ) + } + pub fn any_hollow_batch_part( ) -> impl Strategy> { Strategy::prop_map( @@ -2073,6 +2163,7 @@ pub(crate) mod tests { LEASE_DURATION_MS, &IdempotencyToken::new(), &debug_state(), + 0, ), Break(CompareAndAppendBreak::Upper { shard_upper: Antichain::from_elem(0), @@ -2089,6 +2180,7 @@ pub(crate) mod tests { LEASE_DURATION_MS, &IdempotencyToken::new(), &debug_state(), + 0, ) .is_continue()); @@ -2101,6 +2193,7 @@ pub(crate) mod tests { LEASE_DURATION_MS, &IdempotencyToken::new(), &debug_state(), + 0, ), Break(CompareAndAppendBreak::InvalidUsage(InvalidBounds { lower: Antichain::from_elem(5), @@ -2117,6 +2210,7 @@ pub(crate) mod tests { LEASE_DURATION_MS, &IdempotencyToken::new(), &debug_state(), + 0, ), Break(CompareAndAppendBreak::InvalidUsage( InvalidEmptyTimeInterval { @@ -2136,6 +2230,7 @@ pub(crate) mod tests { LEASE_DURATION_MS, &IdempotencyToken::new(), &debug_state(), + 0, ) .is_continue()); } @@ -2180,6 +2275,7 @@ pub(crate) mod tests { LEASE_DURATION_MS, &IdempotencyToken::new(), &debug_state(), + 0, ) .is_continue()); @@ -2250,6 +2346,7 @@ pub(crate) mod tests { LEASE_DURATION_MS, &IdempotencyToken::new(), &debug_state(), + 0, ) .is_continue()); @@ -2278,6 +2375,7 @@ pub(crate) mod tests { LEASE_DURATION_MS, &IdempotencyToken::new(), &debug_state(), + 0, ) .is_continue()); @@ -2338,6 +2436,7 @@ pub(crate) mod tests { LEASE_DURATION_MS, &IdempotencyToken::new(), &debug_state(), + 0, ) .is_continue()); assert!(state @@ -2349,6 +2448,7 @@ pub(crate) mod tests { LEASE_DURATION_MS, &IdempotencyToken::new(), &debug_state(), + 0, ) .is_continue()); @@ -2403,6 +2503,7 @@ pub(crate) mod tests { LEASE_DURATION_MS, &IdempotencyToken::new(), &debug_state(), + 0, ) .is_continue()); @@ -2421,6 +2522,7 @@ pub(crate) mod tests { LEASE_DURATION_MS, &IdempotencyToken::new(), &debug_state(), + 0, ) .is_continue()); } @@ -2453,6 +2555,7 @@ pub(crate) mod tests { LEASE_DURATION_MS, &IdempotencyToken::new(), &debug_state(), + 0, ); assert_eq!(state.maybe_gc(false), None); diff --git a/src/persist-client/src/internal/state_serde.json b/src/persist-client/src/internal/state_serde.json index 2d396b10e888f..1c371e0f827a7 100644 --- a/src/persist-client/src/internal/state_serde.json +++ b/src/persist-client/src/internal/state_serde.json @@ -88,7 +88,7 @@ 16259358130896200968 ], "upper": [ - 18139951554495009546 + 14783176325274669853 ], "batches": [ { @@ -96,50 +96,40 @@ 0 ], "upper": [ - 6077620655969140312 + 3215901267041653792 ], "since": [ - 11695933775649927238 + 16107715624355499630 ], - "len": 7, - "part_runs": [] - }, - { - "lower": [ - 6077620655969140312 - ], - "upper": [ - 9048289724302440207 - ], - "since": [ - 16259358130896200968 - ], - "len": 8, + "len": 3, "part_runs": [ [ { "type": "Hollow", - "key": "𝕏`𑜈Ⱥ5:<", - "encoded_size_bytes": 16872256722443379164, - "key_lower": "009abac79278769a8279207fb05f71d6fa9de6df3a3175e60606a6f4b7e471b019f49aaf03c0ca34281ee4c943fb5497062565268d24444fd890cab724176dca822cc29101aef59d92d9093e", + "key": "𖭠 { @@ -1118,22 +1128,35 @@ impl ReferencedBlobValidator { assert_eq!(inc_lower, full_lower); assert_eq!(inc_upper, full_upper); + fn part_unique(x: &BatchPart) -> String { + match x { + BatchPart::Hollow(x) => x.key.to_string(), + BatchPart::Inline { + updates, + key_lower, + ts_rewrite, + } => { + let mut h = DefaultHasher::new(); + updates.hash(&mut h); + key_lower.hash(&mut h); + ts_rewrite.as_ref().map(|x| x.elements()).hash(&mut h); + h.finish().to_string() + } + } + } + // Check that the overall set of parts contained in both representations is the same. let inc_parts: HashSet<_> = self .inc_batches .iter() .flat_map(|x| x.parts.iter()) - .map(|x| match x { - BatchPart::Hollow(x) => &x.key, - }) + .map(part_unique) .collect(); let full_parts = self .full_batches .iter() .flat_map(|x| x.parts.iter()) - .map(|x| match x { - BatchPart::Hollow(x) => &x.key, - }) + .map(part_unique) .collect(); assert_eq!(inc_parts, full_parts); diff --git a/src/persist-client/src/iter.rs b/src/persist-client/src/iter.rs index 4f3a6c33d7f40..d3a45fb8b722d 100644 --- a/src/persist-client/src/iter.rs +++ b/src/persist-client/src/iter.rs @@ -85,6 +85,9 @@ impl FetchData { FetchData::Unleased { part, .. } => part.key.split().0 >= min_version, FetchData::Leased { part, .. } => match &part.part { BatchPart::Hollow(x) => x.key.split().0 >= min_version, + // Inline parts are only written directly by the user and so may + // be unconsolidated. + BatchPart::Inline { .. } => true, }, FetchData::AlreadyFetched => false, } @@ -308,6 +311,21 @@ impl Consolidator { + let read_metrics = read_metrics(&metrics.read).clone(); + let part = EncodedPart::from_inline( + read_metrics, + desc.clone(), + updates, + ts_rewrite.as_ref(), + ); + let c_part = ConsolidationPart::from_encoded(part, &self.filter, true); + (c_part, updates.encoded_size_bytes()) + } }) .collect(); self.runs.push(run); diff --git a/src/persist-client/src/operators/shard_source.rs b/src/persist-client/src/operators/shard_source.rs index ce29eeb4a4e7c..16f039ededd58 100644 --- a/src/persist-client/src/operators/shard_source.rs +++ b/src/persist-client/src/operators/shard_source.rs @@ -383,10 +383,13 @@ where for mut part_desc in parts { // TODO: Push the filter down into the Subscribe? if STATS_FILTER_ENABLED.get(&cfg) { - let should_fetch = part_desc.part.stats().map_or(true, |stats| { - should_fetch_part(&stats.decode(), current_frontier.borrow()) - }); - let bytes = u64::cast_from(part_desc.part.encoded_size_bytes()); + let should_fetch = match &part_desc.part { + BatchPart::Hollow(x) => x.stats.as_ref().map_or(true, |stats| { + should_fetch_part(&stats.decode(), current_frontier.borrow()) + }), + BatchPart::Inline { .. } => true, + }; + let bytes = u64::cast_from(part_desc.encoded_size_bytes()); if should_fetch { audit_budget_bytes = audit_budget_bytes.saturating_add(part_desc.part.encoded_size_bytes()); @@ -401,6 +404,7 @@ where x.key.hash(&mut h); usize::cast_from(h.finish()) % 100 < STATS_AUDIT_PERCENT.get(&cfg) } + BatchPart::Inline { .. } => false, }; if should_audit && part_desc.part.encoded_size_bytes() < audit_budget_bytes { diff --git a/src/persist-client/src/usage.rs b/src/persist-client/src/usage.rs index 25010c499fa1d..e21d3826232de 100644 --- a/src/persist-client/src/usage.rs +++ b/src/persist-client/src/usage.rs @@ -454,6 +454,7 @@ impl StorageUsageClient { for part in x.parts.iter() { let part = match part { BatchPart::Hollow(x) => x, + BatchPart::Inline { .. } => continue, }; let parsed = BlobKey::parse_ids(&part.key.complete(&shard_id)); if let Ok((_, PartialBlobKey::Batch(writer_id, _))) = parsed { @@ -481,6 +482,7 @@ impl StorageUsageClient { for part in x.parts.iter() { let part = match part { BatchPart::Hollow(x) => x, + BatchPart::Inline { .. } => continue, }; current_state_batches_bytes += u64::cast_from(part.encoded_size_bytes); } @@ -740,7 +742,7 @@ mod tests { use semver::Version; use timely::progress::Antichain; - use crate::batch::BLOB_TARGET_SIZE; + use crate::batch::{BLOB_TARGET_SIZE, INLINE_UPDATE_THRESHOLD_BYTES}; use crate::internal::paths::{PartialRollupKey, RollupId}; use crate::tests::new_test_client; use crate::ShardId; @@ -758,6 +760,7 @@ mod tests { ]; let client = new_test_client(&dyncfgs).await; + let inline_writes_enabled = INLINE_UPDATE_THRESHOLD_BYTES.get(&client.cfg) > 0; let build_version = client.cfg.build_version.clone(); let shard_id_one = ShardId::new(); let shard_id_two = ShardId::new(); @@ -818,7 +821,12 @@ mod tests { assert!(shard_one_size > 0); assert!(shard_two_size > 0); - assert!(shard_one_size < shard_two_size); + if inline_writes_enabled { + // Allow equality, but only if inline writes are enabled. + assert!(shard_one_size <= shard_two_size); + } else { + assert!(shard_one_size < shard_two_size); + } assert_eq!( shard_two_size, writer_one_size + writer_two_size + versioned_size + rollups_size @@ -866,6 +874,7 @@ mod tests { let shard_id = ShardId::new(); let mut client = new_test_client(&dyncfgs).await; + let inline_writes_enabled = INLINE_UPDATE_THRESHOLD_BYTES.get(&client.cfg) > 0; let (mut write0, _) = client .expect_open::(shard_id) @@ -901,9 +910,11 @@ mod tests { let usage = StorageUsageClient::open(client); let shard_usage_audit = usage.shard_usage_audit(shard_id).await; let shard_usage_referenced = usage.shard_usage_referenced(shard_id).await; - // We've written data. - assert!(shard_usage_audit.current_state_batches_bytes > 0); - assert!(shard_usage_referenced.batches_bytes > 0); + if !inline_writes_enabled { + // We've written data. + assert!(shard_usage_audit.current_state_batches_bytes > 0); + assert!(shard_usage_referenced.batches_bytes > 0); + } // There's always at least one rollup. assert!(shard_usage_audit.current_state_rollups_bytes > 0); assert!(shard_usage_referenced.rollup_bytes > 0); @@ -914,8 +925,10 @@ mod tests { // // write0 wrote a batch, but never linked it in, but is still active. assert!(shard_usage_audit.not_leaked_not_referenced_bytes > 0); - // write0 wrote a batch, but never linked it in, and is now expired. - assert!(shard_usage_audit.leaked_bytes > 0); + if !inline_writes_enabled { + // write0 wrote a batch, but never linked it in, and is now expired. + assert!(shard_usage_audit.leaked_bytes > 0); + } } #[mz_persist_proc::test(tokio::test)] diff --git a/src/persist-client/src/write.rs b/src/persist-client/src/write.rs index d4a37aff20dfe..d4167a85918e8 100644 --- a/src/persist-client/src/write.rs +++ b/src/persist-client/src/write.rs @@ -16,6 +16,8 @@ use std::sync::Arc; use differential_dataflow::difference::Semigroup; use differential_dataflow::lattice::Lattice; use differential_dataflow::trace::Description; +use futures::stream::FuturesUnordered; +use futures::StreamExt; use mz_ore::instrument; use mz_ore::task::RuntimeExt; use mz_persist::location::Blob; @@ -511,23 +513,47 @@ where ) .await; - let maintenance = match res { - CompareAndAppendRes::Success(_seqno, maintenance) => { - self.upper = desc.upper().clone(); - for batch in batches.iter_mut() { - batch.mark_consumed(); + let maintenance = loop { + match res { + CompareAndAppendRes::Success(_seqno, maintenance) => { + self.upper = desc.upper().clone(); + for batch in batches.iter_mut() { + batch.mark_consumed(); + } + break maintenance; + } + CompareAndAppendRes::InvalidUsage(invalid_usage) => return Err(invalid_usage), + CompareAndAppendRes::UpperMismatch(_seqno, current_upper) => { + // We tried to to a compare_and_append with the wrong expected upper, that + // won't work. Update the cached upper to the current upper. + self.upper = current_upper.clone(); + return Ok(Err(UpperMismatch { + current: current_upper, + expected: expected_upper, + })); + } + CompareAndAppendRes::InlineBackpressure => { + // We tried to write an inline part, but there was already + // too much in state. Flush it out to s3 and try again. + let cfg = BatchBuilderConfig::new(&self.cfg, &self.writer_id); + // We could have a large number of inline parts (imagine the + // sharded persist_sink), do this flushing concurrently. + let batches = batches + .iter_mut() + .map(|batch| async { + batch + .flush_to_blob( + &cfg, + &self.metrics.inline.backpressure, + &self.isolated_runtime, + &self.schemas, + ) + .await + }) + .collect::>(); + let () = batches.collect::<()>().await; + continue; } - maintenance - } - CompareAndAppendRes::InvalidUsage(invalid_usage) => return Err(invalid_usage), - CompareAndAppendRes::UpperMismatch(_seqno, current_upper) => { - // We tried to to a compare_and_append with the wrong expected upper, that - // won't work. Update the cached upper to the current upper. - self.upper = current_upper.clone(); - return Ok(Err(UpperMismatch { - current: current_upper, - expected: expected_upper, - })); } }; diff --git a/src/persist-client/tests/machine/gc b/src/persist-client/tests/machine/gc index b14540012bda8..329783ef307ca 100644 --- a/src/persist-client/tests/machine/gc +++ b/src/persist-client/tests/machine/gc @@ -7,6 +7,13 @@ # the Business Source License, use of this software will be governed # by the Apache License, Version 2.0. +# Disable inline writes so interesting things happen in Blob. +dyncfg +persist_inline_update_threshold_bytes 0 +persist_inline_update_max_bytes 0 +---- +ok + # This test uses a simplifying assumption that a batch is # always made up of exactly 1 batch part. This is because # batch parts are given random UUID names, meaning we can't diff --git a/src/persist-client/tests/machine/restore_blob b/src/persist-client/tests/machine/restore_blob index 95ffb040b32e8..8d0bc6e401c5b 100644 --- a/src/persist-client/tests/machine/restore_blob +++ b/src/persist-client/tests/machine/restore_blob @@ -7,6 +7,12 @@ # the Business Source License, use of this software will be governed # by the Apache License, Version 2.0. +# Disable inline writes so interesting things happen in Blob. +dyncfg +persist_inline_update_threshold_bytes 0 +persist_inline_update_max_bytes 0 +---- +ok # Pre-populate some non-trivial state in our shard. diff --git a/src/persist-proc/src/lib.rs b/src/persist-proc/src/lib.rs index 49207e446dea7..b6ed21bf52597 100644 --- a/src/persist-proc/src/lib.rs +++ b/src/persist-proc/src/lib.rs @@ -68,8 +68,18 @@ fn test_impl(attr: TokenStream, item: TokenStream) -> TokenStream { let dyncfgs = [ { - // Placeholder default configuration until inline writes PR. - mz_dyncfg::ConfigUpdates::default() + // Inline writes disabled + let mut x = ::mz_dyncfg::ConfigUpdates::default(); + x.add_dynamic("persist_inline_update_threshold_bytes", ::mz_dyncfg::ConfigVal::Usize(0)); + x.add_dynamic("persist_inline_update_max_bytes", ::mz_dyncfg::ConfigVal::Usize(0)); + x + }, + { + // Inline writes enabled + let mut x = ::mz_dyncfg::ConfigUpdates::default(); + x.add_dynamic("persist_inline_update_threshold_bytes", ::mz_dyncfg::ConfigVal::Usize(4 * 1024)); + x.add_dynamic("persist_inline_update_max_bytes", ::mz_dyncfg::ConfigVal::Usize(1024 * 1024)); + x }, ]; diff --git a/src/persist/build.rs b/src/persist/build.rs index 93c1e83c7108e..56e73f0d6b28f 100644 --- a/src/persist/build.rs +++ b/src/persist/build.rs @@ -14,6 +14,11 @@ fn main() { prost_build::Config::new() .btree_map(["."]) + .type_attribute( + ".mz_persist.gen.persist.ProtoColumnarRecords", + "#[derive(serde::Serialize)]", + ) + .bytes([".mz_persist.gen.persist.ProtoColumnarRecords"]) .compile_protos(&["persist/src/persist.proto"], &[".."]) .unwrap_or_else(|e| panic!("{e}")) } diff --git a/src/persist/src/indexed/columnar.rs b/src/persist/src/indexed/columnar.rs index 033d3f56ba8f4..51728c045e261 100644 --- a/src/persist/src/indexed/columnar.rs +++ b/src/persist/src/indexed/columnar.rs @@ -15,8 +15,12 @@ use std::sync::Arc; use std::{cmp, fmt}; use arrow2::types::Index; -use mz_ore::lgbytes::MetricsRegion; +use bytes::Bytes; +use mz_ore::lgbytes::{LgBytesMetrics, MetricsRegion}; +use mz_ore::metrics::MetricsRegistry; +use mz_proto::{ProtoType, RustType, TryFromProtoError}; +use crate::gen::persist::ProtoColumnarRecords; use crate::metrics::ColumnarMetrics; pub mod arrow; @@ -483,6 +487,40 @@ impl ColumnarRecordsBuilder { } } +impl RustType for ColumnarRecords { + fn into_proto(&self) -> ProtoColumnarRecords { + ProtoColumnarRecords { + len: self.len.into_proto(), + key_offsets: (*self.key_offsets).as_ref().to_vec(), + key_data: Bytes::copy_from_slice((*self.key_data).as_ref()), + val_offsets: (*self.val_offsets).as_ref().to_vec(), + val_data: Bytes::copy_from_slice((*self.val_data).as_ref()), + timestamps: (*self.timestamps).as_ref().to_vec(), + diffs: (*self.diffs).as_ref().to_vec(), + } + } + + fn from_proto(proto: ProtoColumnarRecords) -> Result { + // WIP plumb real metrics + let lgbytes = LgBytesMetrics::new(&MetricsRegistry::new()); + // WIP or should we use MaybeLgBytes in ColumnarRecords? also no .to_vec + let ret = ColumnarRecords { + len: proto.len.into_rust()?, + key_offsets: Arc::new(lgbytes.persist_arrow.heap_region(proto.key_offsets)), + key_data: Arc::new(lgbytes.persist_arrow.heap_region(proto.key_data.to_vec())), + val_offsets: Arc::new(lgbytes.persist_arrow.heap_region(proto.val_offsets)), + val_data: Arc::new(lgbytes.persist_arrow.heap_region(proto.val_data.to_vec())), + timestamps: Arc::new(lgbytes.persist_arrow.heap_region(proto.timestamps)), + diffs: Arc::new(lgbytes.persist_arrow.heap_region(proto.diffs)), + }; + let () = ret + .borrow() + .validate() + .map_err(TryFromProtoError::InvalidPersistState)?; + Ok(ret) + } +} + #[cfg(test)] mod tests { use mz_persist_types::Codec64; diff --git a/src/persist/src/persist.proto b/src/persist/src/persist.proto index 36f02a1ad4339..7bca23a75c2ec 100644 --- a/src/persist/src/persist.proto +++ b/src/persist/src/persist.proto @@ -64,3 +64,13 @@ enum ProtoBatchFormat { // with a trie-like column structure. ParquetKvtd = 2; } + +message ProtoColumnarRecords { + uint64 len = 1; + repeated int32 key_offsets = 2; + bytes key_data = 3; + repeated int32 val_offsets = 4; + bytes val_data = 5; + repeated int64 timestamps = 6; + repeated int64 diffs = 7; +} diff --git a/test/sqllogictest/explain-pushdown.slt b/test/sqllogictest/explain-pushdown.slt index c7c861fb3248a..609d89d70053d 100644 --- a/test/sqllogictest/explain-pushdown.slt +++ b/test/sqllogictest/explain-pushdown.slt @@ -9,6 +9,12 @@ mode cockroach +# Disable persist inline writes so we get real part numbers below +simple conn=mz_system,user=mz_system +ALTER SYSTEM SET persist_inline_update_threshold_bytes = 0 +---- +COMPLETE 0 + # EXPLAIN FILTER PUSHDOWN statements are blocked by a feature flag statement ok CREATE TABLE numbers (