Skip to content

Commit

Permalink
fix(flow): batch builder with type (#5195)
Browse files Browse the repository at this point in the history
* fix: typed builder

* chore: clippy

* chore: rename

* fix: unit tests

* refactor: per review
  • Loading branch information
discord9 authored Dec 19, 2024
1 parent 7ea8a44 commit a4d61bc
Show file tree
Hide file tree
Showing 11 changed files with 434 additions and 64 deletions.
22 changes: 19 additions & 3 deletions src/flow/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,8 @@ impl FlowWorkerManager {
let default_interval = Duration::from_secs(1);
let mut avg_spd = 0; // rows/sec
let mut since_last_run = tokio::time::Instant::now();
let run_per_trace = 10;
let mut run_cnt = 0;
loop {
// TODO(discord9): only run when new inputs arrive or scheduled to
let row_cnt = self.run_available(true).await.unwrap_or_else(|err| {
Expand Down Expand Up @@ -607,10 +609,19 @@ impl FlowWorkerManager {
} else {
(9 * avg_spd + cur_spd) / 10
};
trace!("avg_spd={} r/s, cur_spd={} r/s", avg_spd, cur_spd);
let new_wait = BATCH_SIZE * 1000 / avg_spd.max(1); //in ms
let new_wait = Duration::from_millis(new_wait as u64).min(default_interval);
trace!("Wait for {} ms, row_cnt={}", new_wait.as_millis(), row_cnt);

// print trace every `run_per_trace` times so that we can see if there is something wrong
// but also not get flooded with trace
if run_cnt >= run_per_trace {
trace!("avg_spd={} r/s, cur_spd={} r/s", avg_spd, cur_spd);
trace!("Wait for {} ms, row_cnt={}", new_wait.as_millis(), row_cnt);
run_cnt = 0;
} else {
run_cnt += 1;
}

METRIC_FLOW_RUN_INTERVAL_MS.set(new_wait.as_millis() as i64);
since_last_run = tokio::time::Instant::now();
tokio::time::sleep(new_wait).await;
Expand Down Expand Up @@ -670,13 +681,18 @@ impl FlowWorkerManager {
&self,
region_id: RegionId,
rows: Vec<DiffRow>,
batch_datatypes: &[ConcreteDataType],
) -> Result<(), Error> {
let rows_len = rows.len();
let table_id = region_id.table_id();
let _timer = METRIC_FLOW_INSERT_ELAPSED
.with_label_values(&[table_id.to_string().as_str()])
.start_timer();
self.node_context.read().await.send(table_id, rows).await?;
self.node_context
.read()
.await
.send(table_id, rows, batch_datatypes)
.await?;
trace!(
"Handling write request for table_id={} with {} rows",
table_id,
Expand Down
13 changes: 11 additions & 2 deletions src/flow/src/adapter/flownode_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use itertools::Itertools;
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;

use super::util::from_proto_to_data_type;
use crate::adapter::{CreateFlowArgs, FlowWorkerManager};
use crate::error::InternalSnafu;
use crate::metrics::METRIC_FLOW_TASK_COUNT;
Expand Down Expand Up @@ -206,9 +207,17 @@ impl Flownode for FlowWorkerManager {
})
.map(|r| (r, now, 1))
.collect_vec();
self.handle_write_request(region_id.into(), rows)
.await
let batch_datatypes = insert_schema
.iter()
.map(from_proto_to_data_type)
.collect::<std::result::Result<Vec<_>, _>>()
.map_err(to_meta_err)?;
self.handle_write_request(region_id.into(), rows, &batch_datatypes)
.await
.map_err(|err| {
common_telemetry::error!(err;"Failed to handle write request");
to_meta_err(err)
})?;
}
Ok(Default::default())
}
Expand Down
23 changes: 18 additions & 5 deletions src/flow/src/adapter/node_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

use common_telemetry::trace;
use datatypes::prelude::ConcreteDataType;
use session::context::QueryContext;
use snafu::{OptionExt, ResultExt};
use table::metadata::TableId;
Expand Down Expand Up @@ -131,16 +132,23 @@ impl SourceSender {
}

/// return number of rows it actual send(including what's in the buffer)
pub async fn send_rows(&self, rows: Vec<DiffRow>) -> Result<usize, Error> {
pub async fn send_rows(
&self,
rows: Vec<DiffRow>,
batch_datatypes: &[ConcreteDataType],
) -> Result<usize, Error> {
METRIC_FLOW_INPUT_BUF_SIZE.add(rows.len() as _);
while self.send_buf_row_cnt.load(Ordering::SeqCst) >= BATCH_SIZE * 4 {
tokio::task::yield_now().await;
}
// row count metrics is approx so relaxed order is ok
self.send_buf_row_cnt
.fetch_add(rows.len(), Ordering::SeqCst);
let batch = Batch::try_from_rows(rows.into_iter().map(|(row, _, _)| row).collect())
.context(EvalSnafu)?;
let batch = Batch::try_from_rows_with_types(
rows.into_iter().map(|(row, _, _)| row).collect(),
batch_datatypes,
)
.context(EvalSnafu)?;
common_telemetry::trace!("Send one batch to worker with {} rows", batch.row_count());
self.send_buf_tx.send(batch).await.map_err(|e| {
crate::error::InternalSnafu {
Expand All @@ -157,14 +165,19 @@ impl FlownodeContext {
/// return number of rows it actual send(including what's in the buffer)
///
/// TODO(discord9): make this concurrent
pub async fn send(&self, table_id: TableId, rows: Vec<DiffRow>) -> Result<usize, Error> {
pub async fn send(
&self,
table_id: TableId,
rows: Vec<DiffRow>,
batch_datatypes: &[ConcreteDataType],
) -> Result<usize, Error> {
let sender = self
.source_sender
.get(&table_id)
.with_context(|| TableNotFoundSnafu {
name: table_id.to_string(),
})?;
sender.send_rows(rows).await
sender.send_rows(rows, batch_datatypes).await
}

/// flush all sender's buf
Expand Down
15 changes: 15 additions & 0 deletions src/flow/src/adapter/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,27 @@ use api::helper::ColumnDataTypeWrapper;
use api::v1::column_def::options_from_column_schema;
use api::v1::{ColumnDataType, ColumnDataTypeExtension, SemanticType};
use common_error::ext::BoxedError;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use itertools::Itertools;
use snafu::ResultExt;

use crate::error::{Error, ExternalSnafu};

pub fn from_proto_to_data_type(
column_schema: &api::v1::ColumnSchema,
) -> Result<ConcreteDataType, Error> {
let wrapper = ColumnDataTypeWrapper::try_new(
column_schema.datatype,
column_schema.datatype_extension.clone(),
)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let cdt = ConcreteDataType::from(wrapper);

Ok(cdt)
}

/// convert `ColumnSchema` lists to it's corresponding proto type
pub fn column_schemas_to_proto(
column_schemas: Vec<ColumnSchema>,
Expand Down
23 changes: 18 additions & 5 deletions src/flow/src/compute/render.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::compute::types::{Collection, CollectionBundle, ErrCollector, Toff};
use crate::error::{Error, InvalidQuerySnafu, NotImplementedSnafu};
use crate::expr::{self, Batch, GlobalId, LocalId};
use crate::plan::{Plan, TypedPlan};
use crate::repr::{self, DiffRow};
use crate::repr::{self, DiffRow, RelationType};

mod map;
mod reduce;
Expand Down Expand Up @@ -124,10 +124,10 @@ impl Context<'_, '_> {
/// Like `render_plan` but in Batch Mode
pub fn render_plan_batch(&mut self, plan: TypedPlan) -> Result<CollectionBundle<Batch>, Error> {
match plan.plan {
Plan::Constant { rows } => Ok(self.render_constant_batch(rows)),
Plan::Constant { rows } => Ok(self.render_constant_batch(rows, &plan.schema.typ)),
Plan::Get { id } => self.get_batch_by_id(id),
Plan::Let { id, value, body } => self.eval_batch_let(id, value, body),
Plan::Mfp { input, mfp } => self.render_mfp_batch(input, mfp),
Plan::Mfp { input, mfp } => self.render_mfp_batch(input, mfp, &plan.schema.typ),
Plan::Reduce {
input,
key_val_plan,
Expand Down Expand Up @@ -172,7 +172,11 @@ impl Context<'_, '_> {
/// render Constant, take all rows that have a timestamp not greater than the current time
/// This function is primarily used for testing
/// Always assume input is sorted by timestamp
pub fn render_constant_batch(&mut self, rows: Vec<DiffRow>) -> CollectionBundle<Batch> {
pub fn render_constant_batch(
&mut self,
rows: Vec<DiffRow>,
output_type: &RelationType,
) -> CollectionBundle<Batch> {
let (send_port, recv_port) = self.df.make_edge::<_, Toff<Batch>>("constant_batch");
let mut per_time: BTreeMap<repr::Timestamp, Vec<DiffRow>> = Default::default();
for (key, group) in &rows.into_iter().group_by(|(_row, ts, _diff)| *ts) {
Expand All @@ -185,6 +189,8 @@ impl Context<'_, '_> {
let scheduler_inner = scheduler.clone();
let err_collector = self.err_collector.clone();

let output_type = output_type.clone();

let subgraph_id =
self.df
.add_subgraph_source("ConstantBatch", send_port, move |_ctx, send_port| {
Expand All @@ -199,7 +205,14 @@ impl Context<'_, '_> {
not_great_than_now.into_iter().for_each(|(_ts, rows)| {
err_collector.run(|| {
let rows = rows.into_iter().map(|(row, _ts, _diff)| row).collect();
let batch = Batch::try_from_rows(rows)?;
let batch = Batch::try_from_rows_with_types(
rows,
&output_type
.column_types
.iter()
.map(|ty| ty.scalar_type().clone())
.collect_vec(),
)?;
send_port.give(vec![batch]);
Ok(())
});
Expand Down
3 changes: 2 additions & 1 deletion src/flow/src/compute/render/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::compute::types::{Arranged, Collection, CollectionBundle, ErrCollector
use crate::error::{Error, PlanSnafu};
use crate::expr::{Batch, EvalError, MapFilterProject, MfpPlan, ScalarExpr};
use crate::plan::TypedPlan;
use crate::repr::{self, DiffRow, KeyValDiffRow, Row};
use crate::repr::{self, DiffRow, KeyValDiffRow, RelationType, Row};
use crate::utils::ArrangeHandler;

impl Context<'_, '_> {
Expand All @@ -34,6 +34,7 @@ impl Context<'_, '_> {
&mut self,
input: Box<TypedPlan>,
mfp: MapFilterProject,
_output_type: &RelationType,
) -> Result<CollectionBundle<Batch>, Error> {
let input = self.render_plan_batch(*input)?;

Expand Down
35 changes: 21 additions & 14 deletions src/flow/src/compute/render/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ impl Context<'_, '_> {
})?;
let key_val_plan = key_val_plan.clone();

let output_type = output_type.clone();

let now = self.compute_state.current_time_ref();

let err_collector = self.err_collector.clone();
Expand Down Expand Up @@ -118,6 +120,7 @@ impl Context<'_, '_> {
src_data,
&key_val_plan,
&accum_plan,
&output_type,
SubgraphArg {
now,
err_collector: &err_collector,
Expand Down Expand Up @@ -354,6 +357,7 @@ fn reduce_batch_subgraph(
src_data: impl IntoIterator<Item = Batch>,
key_val_plan: &KeyValPlan,
accum_plan: &AccumulablePlan,
output_type: &RelationType,
SubgraphArg {
now,
err_collector,
Expand Down Expand Up @@ -535,17 +539,13 @@ fn reduce_batch_subgraph(
// this output part is not supposed to be resource intensive
// (because for every batch there wouldn't usually be as many output row?),
// so we can do some costly operation here
let output_types = all_output_dict.first_entry().map(|entry| {
entry
.key()
.iter()
.chain(entry.get().iter())
.map(|v| v.data_type())
.collect::<Vec<ConcreteDataType>>()
});
let output_types = output_type
.column_types
.iter()
.map(|t| t.scalar_type.clone())
.collect_vec();

if let Some(output_types) = output_types {
err_collector.run(|| {
err_collector.run(|| {
let column_cnt = output_types.len();
let row_cnt = all_output_dict.len();

Expand Down Expand Up @@ -585,7 +585,6 @@ fn reduce_batch_subgraph(

Ok(())
});
}
}

/// reduce subgraph, reduce the input data into a single row
Expand Down Expand Up @@ -1516,7 +1515,9 @@ mod test {
let mut ctx = harness_test_ctx(&mut df, &mut state);

let rows = vec![
(Row::new(vec![1i64.into()]), 1, 1),
(Row::new(vec![Value::Null]), -1, 1),
(Row::new(vec![1i64.into()]), 0, 1),
(Row::new(vec![Value::Null]), 1, 1),
(Row::new(vec![2i64.into()]), 2, 1),
(Row::new(vec![3i64.into()]), 3, 1),
(Row::new(vec![1i64.into()]), 4, 1),
Expand Down Expand Up @@ -1558,13 +1559,15 @@ mod test {
Box::new(input_plan.with_types(typ.into_unnamed())),
&key_val_plan,
&reduce_plan,
&RelationType::empty(),
&RelationType::new(vec![ColumnType::new(CDT::int64_datatype(), true)]),
)
.unwrap();

{
let now_inner = now.clone();
let expected = BTreeMap::<i64, Vec<i64>>::from([
(-1, vec![]),
(0, vec![1i64]),
(1, vec![1i64]),
(2, vec![3i64]),
(3, vec![6i64]),
Expand All @@ -1581,7 +1584,11 @@ mod test {

if let Some(expected) = expected.get(&now) {
let batch = expected.iter().map(|v| Value::from(*v)).collect_vec();
let batch = Batch::try_from_rows(vec![batch.into()]).unwrap();
let batch = Batch::try_from_rows_with_types(
vec![batch.into()],
&[CDT::int64_datatype()],
)
.unwrap();
assert_eq!(res.first(), Some(&batch));
}
});
Expand Down
Loading

0 comments on commit a4d61bc

Please sign in to comment.