From 570de3c90b3e3c21a73f844f11e3faf867ce6f82 Mon Sep 17 00:00:00 2001 From: noel Date: Mon, 10 Jun 2024 14:21:39 +0800 Subject: [PATCH] apply adaptive rate limit immediately from first chunk --- .../executor/backfill/arrangement_backfill.rs | 44 +++++++++---------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/src/stream/src/executor/backfill/arrangement_backfill.rs b/src/stream/src/executor/backfill/arrangement_backfill.rs index d8db3bc4efbd9..9e299387ce97a 100644 --- a/src/stream/src/executor/backfill/arrangement_backfill.rs +++ b/src/stream/src/executor/backfill/arrangement_backfill.rs @@ -113,28 +113,6 @@ where let vnodes = upstream_table.vnodes().clone(); let mut rate_limit = self.rate_limit; - // These builders will build data chunks. - // We must supply them with the full datatypes which correspond to - // pk + output_indices. - let snapshot_data_types = self - .upstream - .schema() - .fields() - .iter() - .map(|field| field.data_type.clone()) - .collect_vec(); - let mut builders: Builders = upstream_table - .vnodes() - .iter_vnodes() - .map(|vnode| { - let builder = - create_builder(rate_limit, self.chunk_size, snapshot_data_types.clone()); - (vnode, builder) - }) - .collect(); - - let mut upstream = self.upstream.execute(); - // Query the current barrier latency from meta. // Permit a 2x fluctuation in barrier latency. Set threshold to 15s. let mut total_barrier_latency = Self::get_total_barrier_latency(&self.metrics); @@ -159,6 +137,28 @@ where rate_limit = Some(INITIAL_ADAPTIVE_RATE_LIMIT); } + // These builders will build data chunks. + // We must supply them with the full datatypes which correspond to + // pk + output_indices. + let snapshot_data_types = self + .upstream + .schema() + .fields() + .iter() + .map(|field| field.data_type.clone()) + .collect_vec(); + let mut builders: Builders = upstream_table + .vnodes() + .iter_vnodes() + .map(|vnode| { + let builder = + create_builder(rate_limit, self.chunk_size, snapshot_data_types.clone()); + (vnode, builder) + }) + .collect(); + + let mut upstream = self.upstream.execute(); + // Poll the upstream to get the first barrier. let first_barrier = expect_first_barrier(&mut upstream).await?; let mut paused = first_barrier.is_pause_on_startup();