Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(flow): correctness bugs #4018

Merged
merged 22 commits into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 82 additions & 41 deletions src/flow/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@ use itertools::Itertools;
use query::{QueryEngine, QueryEngineFactory};
use serde::{Deserialize, Serialize};
use session::context::QueryContext;
use snafu::{OptionExt, ResultExt};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::{ConcreteDataType, RegionId};
use table::metadata::TableId;
use tokio::sync::{oneshot, watch, Mutex, RwLock};

use crate::adapter::error::{ExternalSnafu, TableNotFoundSnafu, UnexpectedSnafu};
use crate::adapter::error::{ExternalSnafu, InternalSnafu, TableNotFoundSnafu, UnexpectedSnafu};
pub(crate) use crate::adapter::node_context::FlownodeContext;
use crate::adapter::table_source::TableSource;
use crate::adapter::util::column_schemas_to_proto;
Expand All @@ -66,6 +66,11 @@ use error::Error;

pub const PER_REQ_MAX_ROW_CNT: usize = 8192;

// TODO: replace this with `GREPTIME_TIMESTAMP` before v0.9
pub const AUTO_CREATED_PLACEHOLDER_TS_COL: &str = "__ts_placeholder";
discord9 marked this conversation as resolved.
Show resolved Hide resolved

pub const UPDATE_AT_TS_COL: &str = "update_at";

// TODO: refactor common types for flow to a separate module
/// FlowId is a unique identifier for a flow task
pub type FlowId = u64;
Expand Down Expand Up @@ -279,10 +284,16 @@ impl FlownodeManager {
.map(|i| meta.schema.column_schemas[i].name.clone())
.collect_vec();
let schema = meta.schema.column_schemas;
let is_auto_create = schema
.last()
.map(|s| s.name == "__ts_placeholder")
.unwrap_or(false);
// check if the last column is the auto created timestamp column, hence the table is auto created from
// flow's plan type
let is_auto_create = {
let correct_name = schema
.last()
.map(|s| s.name == AUTO_CREATED_PLACEHOLDER_TS_COL)
.unwrap_or(false);
let correct_time_index = meta.schema.timestamp_index == Some(schema.len() - 1);
correct_name && correct_time_index
};
(primary_keys, schema, is_auto_create)
} else {
// TODO(discord9): condiser remove buggy auto create by schema
Expand All @@ -302,6 +313,7 @@ impl FlownodeManager {
.clone();
// TODO(discord9): use default key from schema
let primary_keys = schema
.typ()
.keys
.first()
.map(|v| {
Expand All @@ -312,24 +324,31 @@ impl FlownodeManager {
})
.unwrap_or_default();
let update_at = ColumnSchema::new(
"update_at",
UPDATE_AT_TS_COL,
ConcreteDataType::timestamp_millisecond_datatype(),
true,
);
// TODO(discord9): bugged so we can't infer time index from flow plan, so we have to manually set one
let ts_col = ColumnSchema::new(
"__ts_placeholder",
AUTO_CREATED_PLACEHOLDER_TS_COL,
ConcreteDataType::timestamp_millisecond_datatype(),
true,
)
.with_time_index(true);

let wout_ts = schema
.typ()
.column_types
.clone()
.into_iter()
.enumerate()
.map(|(idx, typ)| {
ColumnSchema::new(format!("Col_{idx}"), typ.scalar_type, typ.nullable)
let name = schema
.names
.get(idx)
.cloned()
.unwrap_or(format!("Col_{}", idx));
ColumnSchema::new(name, typ.scalar_type, typ.nullable)
})
.collect_vec();

Expand All @@ -339,7 +358,7 @@ impl FlownodeManager {

(primary_keys, with_ts, true)
};

let schema_len = schema.len();
let proto_schema = column_schemas_to_proto(schema, &primary_keys)?;

debug!(
Expand All @@ -348,16 +367,7 @@ impl FlownodeManager {
table_name.join("."),
reqs
);
let now = SystemTime::now();
let now = now
.duration_since(SystemTime::UNIX_EPOCH)
.map(|s| s.as_millis() as repr::Timestamp)
.unwrap_or_else(|_| {
-(SystemTime::UNIX_EPOCH
.duration_since(now)
.unwrap()
.as_millis() as repr::Timestamp)
});
let now = self.tick_manager.tick();
for req in reqs {
match req {
DiffRequest::Insert(insert) => {
Expand All @@ -370,13 +380,23 @@ impl FlownodeManager {
))]);
// ts col, if auto create
if is_auto_create {
ensure!(
row.len() == schema_len - 1,
InternalSnafu {
reason: format!(
"Row len mismatch, expect {} got {}",
schema_len - 1,
row.len()
)
}
);
row.extend([Value::from(
common_time::Timestamp::new_millisecond(0),
)]);
}
row.into()
Ok(row.into())
})
.collect::<Vec<_>>();
.collect::<Result<Vec<_>, Error>>()?;
let table_name = table_name.last().unwrap().clone();
let req = RowInsertRequest {
table_name,
Expand Down Expand Up @@ -490,9 +510,12 @@ impl FlownodeManager {
debug!("Starting to run");
loop {
// TODO(discord9): only run when new inputs arrive or scheduled to
self.run_available().await.unwrap();
debug!("call run_available in run every second");
self.run_available(true).await.unwrap();
debug!("call send_writeback_requests in run every second");
// TODO(discord9): error handling
self.send_writeback_requests().await.unwrap();
debug!("call log_all_errors in run every second");
self.log_all_errors().await;
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
Expand All @@ -501,29 +524,44 @@ impl FlownodeManager {
/// Run all available subgraph in the flow node
/// This will try to run all dataflow in this node
///
/// However this is not blocking and can sometimes return while actual computation is still running in worker thread
/// set `blocking` to true to wait until lock is acquired
/// and false to return immediately if lock is not acquired
/// TODO(discord9): add flag for subgraph that have input since last run
pub async fn run_available(&self) -> Result<(), Error> {
let now = self.tick_manager.tick();

pub async fn run_available(&self, blocking: bool) -> Result<(), Error> {
loop {
let now = self.tick_manager.tick();
for worker in self.worker_handles.iter() {
// TODO(discord9): consider how to handle error in individual worker
worker.lock().await.run_available(now).await.unwrap();
if blocking {
worker.lock().await.run_available(now).await?;
} else if let Ok(worker) = worker.try_lock() {
worker.run_available(now).await?;
} else {
return Ok(());
}
}
// first check how many inputs were sent
let send_cnt = match self.node_context.lock().await.flush_all_sender() {
Ok(cnt) => cnt,
let (flush_res, buf_len) = if blocking {
let mut ctx = self.node_context.lock().await;
(ctx.flush_all_sender(), ctx.get_send_buf_size())
} else {
match self.node_context.try_lock() {
Ok(mut ctx) => (ctx.flush_all_sender(), ctx.get_send_buf_size()),
Err(_) => return Ok(()),
}
};
match flush_res {
Ok(_) => (),
Err(err) => {
common_telemetry::error!("Flush send buf errors: {:?}", err);
break;
}
};
// if no inputs
if send_cnt == 0 {
// if no thing in send buf then break
if buf_len == 0 {
break;
} else {
debug!("FlownodeManager::run_available: send_cnt={}", send_cnt);
debug!("Send buf len = {}", buf_len);
}
}

Expand All @@ -543,6 +581,8 @@ impl FlownodeManager {
);
let table_id = region_id.table_id();
self.node_context.lock().await.send(table_id, rows)?;
// TODO(discord9): put it in a background task?
// self.run_available(false).await?;
Ok(())
}
}
Expand Down Expand Up @@ -653,21 +693,22 @@ impl FlownodeManager {
///
/// TODO(discord9): better way to do it, and not expose flow tick even to other flow to avoid
/// TSO coord mess
#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct FlowTickManager {
/// The starting instant of the flow, used with `start_timestamp` to calculate the current timestamp
start: Instant,
}

impl std::fmt::Debug for FlowTickManager {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("FlowTickManager").finish()
}
/// The timestamp when the flow started
start_timestamp: repr::Timestamp,
}

impl FlowTickManager {
pub fn new() -> Self {
FlowTickManager {
start: Instant::now(),
start_timestamp: SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_millis() as repr::Timestamp,
}
}

Expand All @@ -677,6 +718,6 @@ impl FlowTickManager {
pub fn tick(&self) -> repr::Timestamp {
let current = Instant::now();
let since_the_epoch = current - self.start;
since_the_epoch.as_millis() as repr::Timestamp
since_the_epoch.as_millis() as repr::Timestamp + self.start_timestamp
}
}
55 changes: 52 additions & 3 deletions src/flow/src/adapter/flownode_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,17 @@

//! impl `FlowNode` trait for FlowNodeManager so standalone can call them

use std::collections::HashMap;

use api::v1::flow::{flow_request, CreateRequest, DropRequest, FlowRequest, FlowResponse};
use api::v1::region::InsertRequests;
use common_error::ext::BoxedError;
use common_meta::error::{ExternalSnafu, Result, UnexpectedSnafu};
use common_meta::node_manager::Flownode;
use common_telemetry::debug;
use itertools::Itertools;
use snafu::ResultExt;
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;

use crate::adapter::FlownodeManager;
use crate::repr::{self, DiffRow};
Expand Down Expand Up @@ -101,12 +105,57 @@ impl Flownode for FlownodeManager {
async fn handle_inserts(&self, request: InsertRequests) -> Result<FlowResponse> {
for write_request in request.requests {
let region_id = write_request.region_id;
let rows_proto = write_request.rows.map(|r| r.rows).unwrap_or(vec![]);
let table_id = RegionId::from(region_id).table_id();

let (insert_schema, rows_proto) = write_request
.rows
.map(|r| (r.schema, r.rows))
.unwrap_or_default();

// TODO(discord9): reconsider time assignment mechanism
let now = self.tick_manager.tick();

let fetch_order = {
let ctx = self.node_context.lock().await;
let table_col_names = ctx
.table_repr
.get_by_table_id(&table_id)
.map(|r| r.1)
.and_then(|id| ctx.schema.get(&id))
.map(|desc| &desc.names)
.context(UnexpectedSnafu {
err_msg: format!("Table not found: {}", table_id),
})?;
let name_to_col = HashMap::<_, _>::from_iter(
insert_schema
.iter()
.enumerate()
.map(|(i, name)| (&name.column_name, i)),
);
let fetch_order: Vec<usize> = table_col_names
.iter()
.map(|names| {
name_to_col.get(names).copied().context(UnexpectedSnafu {
err_msg: format!("Column not found: {}", names),
})
})
.try_collect()?;
if !fetch_order.iter().enumerate().all(|(i, &v)| i == v) {
debug!("Reordering columns: {:?}", fetch_order)
}
fetch_order
};

let rows: Vec<DiffRow> = rows_proto
.into_iter()
.map(repr::Row::from)
.map(|r| {
let r = repr::Row::from(r);
let reordered = fetch_order
.iter()
.map(|&i| r.inner[i].clone())
.collect_vec();
repr::Row::new(reordered)
})
.map(|r| (r, now, 1))
.collect_vec();
self.handle_write_request(region_id.into(), rows)
Expand Down
Loading
Loading