Skip to content

Commit

Permalink
fix?: lagged missing data
Browse files Browse the repository at this point in the history
  • Loading branch information
discord9 committed May 30, 2024
1 parent 3b5439c commit 41d0a12
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 7 deletions.
3 changes: 2 additions & 1 deletion src/flow/src/adapter/node_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ pub struct SourceSender {
impl Default for SourceSender {
fn default() -> Self {
Self {
sender: broadcast::Sender::new(BROADCAST_CAP),
// TODO(discord9): found a better way then increase this to prevent lagging and hence missing input data
sender: broadcast::Sender::new(BROADCAST_CAP * 2),
send_buf: Default::default(),
}
}
Expand Down
33 changes: 27 additions & 6 deletions src/flow/src/compute/render/src_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ use common_telemetry::{debug, info};
use hydroflow::scheduled::graph_ext::GraphExt;
use itertools::Itertools;
use snafu::OptionExt;
use tokio::sync::broadcast::error::TryRecvError;
use tokio::sync::{broadcast, mpsc};

use crate::adapter::error::{Error, PlanSnafu};
use crate::compute::render::Context;
use crate::compute::types::{Arranged, Collection, CollectionBundle, Toff};
use crate::expr::GlobalId;
use crate::expr::error::InternalSnafu;
use crate::expr::{EvalError, GlobalId};
use crate::repr::{DiffRow, Row, BROADCAST_CAP};

#[allow(clippy::mutable_key_type)]
Expand Down Expand Up @@ -65,11 +67,30 @@ impl<'referred, 'df> Context<'referred, 'df> {
let mut to_send = Vec::new();
let mut to_arrange = Vec::new();
// TODO(discord9): handling tokio broadcast error
while let Ok((r, t, d)) = src_recv.try_recv() {
if t <= now {
to_send.push((r, t, d));
} else {
to_arrange.push(((r, Row::empty()), t, d));
loop {
match src_recv.try_recv() {
Ok((r, t, d)) => {
if t <= now {
to_send.push((r, t, d));
} else {
to_arrange.push(((r, Row::empty()), t, d));
}
}
Err(TryRecvError::Lagged(lag_offset)) => {
common_telemetry::error!("Flow missing {} rows behind", lag_offset);
break;
}
Err(err) => {
err_collector.run(|| -> Result<(), EvalError> {
InternalSnafu {
reason: format!(
"Error receiving from broadcast channel: {}",
err
),
}
.fail()
});
}
}
}
let all = prev_avail.chain(to_send).collect_vec();
Expand Down

0 comments on commit 41d0a12

Please sign in to comment.