Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(flow): batch builder with type #5195

Merged
merged 5 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions src/flow/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,8 @@ impl FlowWorkerManager {
let default_interval = Duration::from_secs(1);
let mut avg_spd = 0; // rows/sec
let mut since_last_run = tokio::time::Instant::now();
let run_per_trace = 10;
let mut run_cnt = 0;
loop {
// TODO(discord9): only run when new inputs arrive or scheduled to
let row_cnt = self.run_available(true).await.unwrap_or_else(|err| {
Expand Down Expand Up @@ -607,10 +609,19 @@ impl FlowWorkerManager {
} else {
(9 * avg_spd + cur_spd) / 10
};
trace!("avg_spd={} r/s, cur_spd={} r/s", avg_spd, cur_spd);
let new_wait = BATCH_SIZE * 1000 / avg_spd.max(1); //in ms
let new_wait = Duration::from_millis(new_wait as u64).min(default_interval);
trace!("Wait for {} ms, row_cnt={}", new_wait.as_millis(), row_cnt);

// print trace every `run_per_trace` times so that we can see if there is something wrong
// but also not get flooded with trace
if run_cnt >= run_per_trace {
trace!("avg_spd={} r/s, cur_spd={} r/s", avg_spd, cur_spd);
trace!("Wait for {} ms, row_cnt={}", new_wait.as_millis(), row_cnt);
run_cnt = 0;
} else {
run_cnt += 1;
}

METRIC_FLOW_RUN_INTERVAL_MS.set(new_wait.as_millis() as i64);
since_last_run = tokio::time::Instant::now();
tokio::time::sleep(new_wait).await;
Expand Down Expand Up @@ -670,13 +681,18 @@ impl FlowWorkerManager {
&self,
region_id: RegionId,
rows: Vec<DiffRow>,
batch_datatypes: &[ConcreteDataType],
) -> Result<(), Error> {
let rows_len = rows.len();
let table_id = region_id.table_id();
let _timer = METRIC_FLOW_INSERT_ELAPSED
.with_label_values(&[table_id.to_string().as_str()])
.start_timer();
self.node_context.read().await.send(table_id, rows).await?;
self.node_context
.read()
.await
.send(table_id, rows, batch_datatypes)
.await?;
trace!(
"Handling write request for table_id={} with {} rows",
table_id,
Expand Down
13 changes: 11 additions & 2 deletions src/flow/src/adapter/flownode_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use itertools::Itertools;
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;

use super::util::from_proto_to_data_type;
use crate::adapter::{CreateFlowArgs, FlowWorkerManager};
use crate::error::InternalSnafu;
use crate::metrics::METRIC_FLOW_TASK_COUNT;
Expand Down Expand Up @@ -206,9 +207,17 @@ impl Flownode for FlowWorkerManager {
})
.map(|r| (r, now, 1))
.collect_vec();
self.handle_write_request(region_id.into(), rows)
.await
let batch_datatypes = insert_schema
.iter()
.map(from_proto_to_data_type)
.collect::<std::result::Result<Vec<_>, _>>()
.map_err(to_meta_err)?;
self.handle_write_request(region_id.into(), rows, &batch_datatypes)
.await
.map_err(|err| {
common_telemetry::error!(err;"Failed to handle write request");
to_meta_err(err)
})?;
}
Ok(Default::default())
}
Expand Down
23 changes: 18 additions & 5 deletions src/flow/src/adapter/node_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

use common_telemetry::trace;
use datatypes::prelude::ConcreteDataType;
use session::context::QueryContext;
use snafu::{OptionExt, ResultExt};
use table::metadata::TableId;
Expand Down Expand Up @@ -131,16 +132,23 @@ impl SourceSender {
}

/// return number of rows it actual send(including what's in the buffer)
pub async fn send_rows(&self, rows: Vec<DiffRow>) -> Result<usize, Error> {
pub async fn send_rows(
&self,
rows: Vec<DiffRow>,
batch_datatypes: &[ConcreteDataType],
) -> Result<usize, Error> {
METRIC_FLOW_INPUT_BUF_SIZE.add(rows.len() as _);
while self.send_buf_row_cnt.load(Ordering::SeqCst) >= BATCH_SIZE * 4 {
tokio::task::yield_now().await;
}
// row count metrics is approx so relaxed order is ok
self.send_buf_row_cnt
.fetch_add(rows.len(), Ordering::SeqCst);
let batch = Batch::try_from_rows(rows.into_iter().map(|(row, _, _)| row).collect())
.context(EvalSnafu)?;
let batch = Batch::try_from_rows_with_types(
rows.into_iter().map(|(row, _, _)| row).collect(),
batch_datatypes,
)
.context(EvalSnafu)?;
common_telemetry::trace!("Send one batch to worker with {} rows", batch.row_count());
self.send_buf_tx.send(batch).await.map_err(|e| {
crate::error::InternalSnafu {
Expand All @@ -157,14 +165,19 @@ impl FlownodeContext {
/// return number of rows it actual send(including what's in the buffer)
///
/// TODO(discord9): make this concurrent
pub async fn send(&self, table_id: TableId, rows: Vec<DiffRow>) -> Result<usize, Error> {
pub async fn send(
&self,
table_id: TableId,
rows: Vec<DiffRow>,
batch_datatypes: &[ConcreteDataType],
) -> Result<usize, Error> {
let sender = self
.source_sender
.get(&table_id)
.with_context(|| TableNotFoundSnafu {
name: table_id.to_string(),
})?;
sender.send_rows(rows).await
sender.send_rows(rows, batch_datatypes).await
}

/// flush all sender's buf
Expand Down
15 changes: 15 additions & 0 deletions src/flow/src/adapter/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,27 @@ use api::helper::ColumnDataTypeWrapper;
use api::v1::column_def::options_from_column_schema;
use api::v1::{ColumnDataType, ColumnDataTypeExtension, SemanticType};
use common_error::ext::BoxedError;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use itertools::Itertools;
use snafu::ResultExt;

use crate::error::{Error, ExternalSnafu};

pub fn from_proto_to_data_type(
column_schema: &api::v1::ColumnSchema,
) -> Result<ConcreteDataType, Error> {
let wrapper = ColumnDataTypeWrapper::try_new(
column_schema.datatype,
column_schema.datatype_extension.clone(),
)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let cdt = ConcreteDataType::from(wrapper);

Ok(cdt)
}

/// convert `ColumnSchema` lists to it's corresponding proto type
pub fn column_schemas_to_proto(
column_schemas: Vec<ColumnSchema>,
Expand Down
23 changes: 18 additions & 5 deletions src/flow/src/compute/render.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::compute::types::{Collection, CollectionBundle, ErrCollector, Toff};
use crate::error::{Error, InvalidQuerySnafu, NotImplementedSnafu};
use crate::expr::{self, Batch, GlobalId, LocalId};
use crate::plan::{Plan, TypedPlan};
use crate::repr::{self, DiffRow};
use crate::repr::{self, DiffRow, RelationType};

mod map;
mod reduce;
Expand Down Expand Up @@ -124,10 +124,10 @@ impl Context<'_, '_> {
/// Like `render_plan` but in Batch Mode
pub fn render_plan_batch(&mut self, plan: TypedPlan) -> Result<CollectionBundle<Batch>, Error> {
match plan.plan {
Plan::Constant { rows } => Ok(self.render_constant_batch(rows)),
Plan::Constant { rows } => Ok(self.render_constant_batch(rows, &plan.schema.typ)),
Plan::Get { id } => self.get_batch_by_id(id),
Plan::Let { id, value, body } => self.eval_batch_let(id, value, body),
Plan::Mfp { input, mfp } => self.render_mfp_batch(input, mfp),
Plan::Mfp { input, mfp } => self.render_mfp_batch(input, mfp, &plan.schema.typ),
Plan::Reduce {
input,
key_val_plan,
Expand Down Expand Up @@ -172,7 +172,11 @@ impl Context<'_, '_> {
/// render Constant, take all rows that have a timestamp not greater than the current time
/// This function is primarily used for testing
/// Always assume input is sorted by timestamp
pub fn render_constant_batch(&mut self, rows: Vec<DiffRow>) -> CollectionBundle<Batch> {
pub fn render_constant_batch(
&mut self,
rows: Vec<DiffRow>,
output_type: &RelationType,
) -> CollectionBundle<Batch> {
let (send_port, recv_port) = self.df.make_edge::<_, Toff<Batch>>("constant_batch");
let mut per_time: BTreeMap<repr::Timestamp, Vec<DiffRow>> = Default::default();
for (key, group) in &rows.into_iter().group_by(|(_row, ts, _diff)| *ts) {
Expand All @@ -185,6 +189,8 @@ impl Context<'_, '_> {
let scheduler_inner = scheduler.clone();
let err_collector = self.err_collector.clone();

let output_type = output_type.clone();

let subgraph_id =
self.df
.add_subgraph_source("ConstantBatch", send_port, move |_ctx, send_port| {
Expand All @@ -199,7 +205,14 @@ impl Context<'_, '_> {
not_great_than_now.into_iter().for_each(|(_ts, rows)| {
err_collector.run(|| {
let rows = rows.into_iter().map(|(row, _ts, _diff)| row).collect();
let batch = Batch::try_from_rows(rows)?;
let batch = Batch::try_from_rows_with_types(
rows,
&output_type
.column_types
.iter()
.map(|ty| ty.scalar_type().clone())
.collect_vec(),
)?;
send_port.give(vec![batch]);
Ok(())
});
Expand Down
3 changes: 2 additions & 1 deletion src/flow/src/compute/render/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::compute::types::{Arranged, Collection, CollectionBundle, ErrCollector
use crate::error::{Error, PlanSnafu};
use crate::expr::{Batch, EvalError, MapFilterProject, MfpPlan, ScalarExpr};
use crate::plan::TypedPlan;
use crate::repr::{self, DiffRow, KeyValDiffRow, Row};
use crate::repr::{self, DiffRow, KeyValDiffRow, RelationType, Row};
use crate::utils::ArrangeHandler;

impl Context<'_, '_> {
Expand All @@ -34,6 +34,7 @@ impl Context<'_, '_> {
&mut self,
input: Box<TypedPlan>,
mfp: MapFilterProject,
_output_type: &RelationType,
) -> Result<CollectionBundle<Batch>, Error> {
let input = self.render_plan_batch(*input)?;

Expand Down
35 changes: 21 additions & 14 deletions src/flow/src/compute/render/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ impl Context<'_, '_> {
})?;
let key_val_plan = key_val_plan.clone();

let output_type = output_type.clone();

let now = self.compute_state.current_time_ref();

let err_collector = self.err_collector.clone();
Expand Down Expand Up @@ -118,6 +120,7 @@ impl Context<'_, '_> {
src_data,
&key_val_plan,
&accum_plan,
&output_type,
SubgraphArg {
now,
err_collector: &err_collector,
Expand Down Expand Up @@ -354,6 +357,7 @@ fn reduce_batch_subgraph(
src_data: impl IntoIterator<Item = Batch>,
key_val_plan: &KeyValPlan,
accum_plan: &AccumulablePlan,
output_type: &RelationType,
SubgraphArg {
now,
err_collector,
Expand Down Expand Up @@ -535,17 +539,13 @@ fn reduce_batch_subgraph(
// this output part is not supposed to be resource intensive
// (because for every batch there wouldn't usually be as many output row?),
// so we can do some costly operation here
let output_types = all_output_dict.first_entry().map(|entry| {
entry
.key()
.iter()
.chain(entry.get().iter())
.map(|v| v.data_type())
.collect::<Vec<ConcreteDataType>>()
});
let output_types = output_type
.column_types
.iter()
.map(|t| t.scalar_type.clone())
.collect_vec();

if let Some(output_types) = output_types {
err_collector.run(|| {
err_collector.run(|| {
let column_cnt = output_types.len();
let row_cnt = all_output_dict.len();

Expand Down Expand Up @@ -585,7 +585,6 @@ fn reduce_batch_subgraph(

Ok(())
});
}
}

/// reduce subgraph, reduce the input data into a single row
Expand Down Expand Up @@ -1516,7 +1515,9 @@ mod test {
let mut ctx = harness_test_ctx(&mut df, &mut state);

let rows = vec![
(Row::new(vec![1i64.into()]), 1, 1),
(Row::new(vec![Value::Null]), -1, 1),
(Row::new(vec![1i64.into()]), 0, 1),
(Row::new(vec![Value::Null]), 1, 1),
(Row::new(vec![2i64.into()]), 2, 1),
(Row::new(vec![3i64.into()]), 3, 1),
(Row::new(vec![1i64.into()]), 4, 1),
Expand Down Expand Up @@ -1558,13 +1559,15 @@ mod test {
Box::new(input_plan.with_types(typ.into_unnamed())),
&key_val_plan,
&reduce_plan,
&RelationType::empty(),
&RelationType::new(vec![ColumnType::new(CDT::int64_datatype(), true)]),
)
.unwrap();

{
let now_inner = now.clone();
let expected = BTreeMap::<i64, Vec<i64>>::from([
(-1, vec![]),
(0, vec![1i64]),
(1, vec![1i64]),
(2, vec![3i64]),
(3, vec![6i64]),
Expand All @@ -1581,7 +1584,11 @@ mod test {

if let Some(expected) = expected.get(&now) {
let batch = expected.iter().map(|v| Value::from(*v)).collect_vec();
let batch = Batch::try_from_rows(vec![batch.into()]).unwrap();
let batch = Batch::try_from_rows_with_types(
vec![batch.into()],
&[CDT::int64_datatype()],
)
.unwrap();
assert_eq!(res.first(), Some(&batch));
}
});
Expand Down
Loading
Loading