Skip to content

Commit

Permalink
fix(flow): mfp operator missing rows (#4084)
Browse files Browse the repository at this point in the history
* fix: mfp missing rows if run twice in same tick

* tests: run mfp for multiple times

* refactor: make mfp less hacky

* feat: make channel larger

* chore: typos
  • Loading branch information
discord9 authored Jun 4, 2024
1 parent a626939 commit 0a07130
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 5 deletions.
2 changes: 0 additions & 2 deletions src/flow/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,6 @@ mod table_source;

use error::Error;

pub const PER_REQ_MAX_ROW_CNT: usize = 8192;

// TODO: replace this with `GREPTIME_TIMESTAMP` before v0.9
pub const AUTO_CREATED_PLACEHOLDER_TS_COL: &str = "__ts_placeholder";

Expand Down
60 changes: 58 additions & 2 deletions src/flow/src/compute/render/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,21 @@ fn mfp_subgraph(
scheduler: &Scheduler,
send: &PortCtx<SEND, Toff>,
) {
// all updates that should be send immediately
let mut output_now = vec![];
let run_mfp = || {
let all_updates = eval_mfp_core(input, mfp_plan, now, err_collector);
arrange.write().apply_updates(now, all_updates)?;
let mut all_updates = eval_mfp_core(input, mfp_plan, now, err_collector);
all_updates.retain(|(kv, ts, d)| {
if *ts > now {
true
} else {
output_now.push((kv.clone(), *ts, *d));
false
}
});
let future_updates = all_updates;

arrange.write().apply_updates(now, future_updates)?;
Ok(())
};
err_collector.run(run_mfp);
Expand All @@ -130,13 +142,19 @@ fn mfp_subgraph(
std::ops::Bound::Excluded(from),
std::ops::Bound::Included(now),
);

// find all updates that need to be send from arrangement
let output_kv = arrange.read().get_updates_in_range(range);

// the output is expected to be key -> empty val
let output = output_kv
.into_iter()
.chain(output_now) // chain previous immediately send updates
.map(|((key, _v), ts, diff)| (key, ts, diff))
.collect_vec();
// send output
send.give(output);

let run_compaction = || {
arrange.write().compact_to(now)?;
Ok(())
Expand Down Expand Up @@ -305,4 +323,42 @@ mod test {
]);
run_and_check(&mut state, &mut df, 1..5, expected, output);
}

/// test if mfp operator can run multiple times within same tick
#[test]
fn test_render_mfp_multiple_times() {
let mut df = Hydroflow::new();
let mut state = DataflowState::default();
let mut ctx = harness_test_ctx(&mut df, &mut state);

let (sender, recv) = tokio::sync::broadcast::channel(1000);
let collection = ctx.render_source(recv).unwrap();
ctx.insert_global(GlobalId::User(1), collection);
let input_plan = Plan::Get {
id: expr::Id::Global(GlobalId::User(1)),
};
let typ = RelationType::new(vec![ColumnType::new_nullable(
ConcreteDataType::int64_datatype(),
)]);
// filter: col(0)>1
let mfp = MapFilterProject::new(1)
.filter(vec![ScalarExpr::Column(0).call_binary(
ScalarExpr::literal(1.into(), ConcreteDataType::int32_datatype()),
BinaryFunc::Gt,
)])
.unwrap();
let bundle = ctx
.render_mfp(Box::new(input_plan.with_types(typ)), mfp)
.unwrap();

let output = get_output_handle(&mut ctx, bundle);
drop(ctx);
sender.send((Row::new(vec![2.into()]), 0, 1)).unwrap();
state.run_available_with_schedule(&mut df);
assert_eq!(output.borrow().len(), 1);
output.borrow_mut().clear();
sender.send((Row::new(vec![3.into()]), 0, 1)).unwrap();
state.run_available_with_schedule(&mut df);
assert_eq!(output.borrow().len(), 1);
}
}
3 changes: 2 additions & 1 deletion src/flow/src/repr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ pub type KeyValDiffRow = ((Row, Row), Timestamp, Diff);

/// broadcast channel capacity, can be important to memory consumption, since this influence how many
/// updates can be buffered in memory in the entire dataflow
pub const BROADCAST_CAP: usize = 8192;
/// TODO(discord9): add config for this, so cpu&mem usage can be balanced and configured by this
pub const BROADCAST_CAP: usize = 65535;

/// Convert a value that is or can be converted to Datetime to internal timestamp
///
Expand Down

0 comments on commit 0a07130

Please sign in to comment.