Skip to content

Commit

Permalink
fix: demo tumble
Browse files Browse the repository at this point in the history
  • Loading branch information
discord9 committed May 17, 2024
1 parent 2ff8bd8 commit f2d8fbf
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 67 deletions.
11 changes: 5 additions & 6 deletions src/common/meta/src/ddl/create_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,12 +119,11 @@ impl CreateFlowProcedure {
&sink_table_name.table_name,
))
.await?;
ensure!(
!exists,
error::TableAlreadyExistsSnafu {
table_name: sink_table_name.to_string(),
}
);
// TODO(discord9): due to undefined behavior in flow's plan in how to transform types in mfp, sometime flow can't deduce correct schema
// and require manually create sink table
if exists {
common_telemetry::warn!("Table already exists, table: {}", sink_table_name);
}

self.collect_source_tables().await?;
self.allocate_flow_id().await?;
Expand Down
113 changes: 67 additions & 46 deletions src/flow/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use crate::adapter::worker::{create_worker, Worker, WorkerHandle};
use crate::compute::ErrCollector;
use crate::expr::GlobalId;
use crate::repr::{self, DiffRow, Row};
use crate::transform::sql_to_flow_plan;
use crate::transform::{register_function_to_query_engine, sql_to_flow_plan};

pub(crate) mod error;
mod flownode_impl;
Expand Down Expand Up @@ -120,6 +120,8 @@ impl FlownodeBuilder {
);
let query_engine = query_engine_factory.query_engine();

register_function_to_query_engine(&query_engine);

let (tx, rx) = oneshot::channel();

let node_id = Some(self.flow_node_id);
Expand Down Expand Up @@ -261,7 +263,7 @@ impl FlownodeManager {
let ctx = Arc::new(QueryContext::with(&catalog, &schema));
// TODO(discord9): instead of auto build table from request schema, actually build table
// before `create flow` to be able to assign pk and ts etc.
let (primary_keys, schema) = if let Some(table_id) = self
let (primary_keys, schema, is_auto_create) = if let Some(table_id) = self
.table_info_source
.get_table_id_from_name(&table_name)
.await?
Expand All @@ -278,50 +280,62 @@ impl FlownodeManager {
.map(|i| meta.schema.column_schemas[i].name.clone())
.collect_vec();
let schema = meta.schema.column_schemas;
(primary_keys, schema)
let is_auto_create = schema
.last()
.map(|s| s.name == "__ts_placeholder")
.unwrap_or(false);
(primary_keys, schema, is_auto_create)
} else {
// TODO(discord9): get ts column from `RelationType` once we are done rewriting flow plan to attach ts
let (primary_keys, schema) = {
let node_ctx = self.node_context.lock().await;
let gid: GlobalId = node_ctx
.table_repr
.get_by_name(&table_name)
.map(|x| x.1)
.unwrap();
let schema = node_ctx
.schema
.get(&gid)
.with_context(|| TableNotFoundSnafu {
name: format!("Table name = {:?}", table_name),
})?
.clone();
// TODO(discord9): use default key from schema
let primary_keys = schema
.keys
.first()
.map(|v| {
v.column_indices
.iter()
.map(|i| format!("Col_{i}"))
.collect_vec()
})
.unwrap_or_default();
let ts_col =
ColumnSchema::new("update_at", ConcreteDataType::datetime_datatype(), true);

let wout_ts = schema
.column_types
.into_iter()
.enumerate()
.map(|(idx, typ)| {
ColumnSchema::new(format!("Col_{idx}"), typ.scalar_type, typ.nullable)
})
.collect_vec();
let mut with_ts = wout_ts.clone();
with_ts.push(ts_col);
(primary_keys, with_ts)
};
(primary_keys, schema)
// TODO(discord9): condiser remove buggy auto create by schema

let node_ctx = self.node_context.lock().await;
let gid: GlobalId = node_ctx
.table_repr
.get_by_name(&table_name)
.map(|x| x.1)
.unwrap();
let schema = node_ctx
.schema
.get(&gid)
.with_context(|| TableNotFoundSnafu {
name: format!("Table name = {:?}", table_name),
})?
.clone();
// TODO(discord9): use default key from schema
let primary_keys = schema
.keys
.first()
.map(|v| {
v.column_indices
.iter()
.map(|i| format!("Col_{i}"))
.collect_vec()
})
.unwrap_or_default();
let update_at =
ColumnSchema::new("update_at", ConcreteDataType::datetime_datatype(), true);
// TODO(discord9): bugged so we can't infer time index from flow plan, so we have to manually set one
let ts_col = ColumnSchema::new(
"__ts_placeholder",
ConcreteDataType::timestamp_millisecond_datatype(),
true,
)
.with_time_index(true);

let wout_ts = schema
.column_types
.into_iter()
.enumerate()
.map(|(idx, typ)| {
ColumnSchema::new(format!("Col_{idx}"), typ.scalar_type, typ.nullable)
})
.collect_vec();

let mut with_ts = wout_ts.clone();
with_ts.push(update_at);
with_ts.push(ts_col);

(primary_keys, with_ts, true)
};

let proto_schema = column_schemas_to_proto(schema, &primary_keys)?;
Expand All @@ -348,7 +362,14 @@ impl FlownodeManager {
let rows_proto: Vec<v1::Row> = insert
.into_iter()
.map(|(mut row, _ts)| {
row.extend(Some(Value::from(common_time::DateTime::new(now))));
// `update_at` col
row.extend([Value::from(common_time::DateTime::new(now))]);
// ts col, if auto create
if is_auto_create {
row.extend([Value::from(
common_time::Timestamp::new_millisecond(0),
)]);
}
row.into()
})
.collect::<Vec<_>>();
Expand Down
37 changes: 22 additions & 15 deletions src/flow/src/expr/func.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ use std::sync::OnceLock;

use common_error::ext::BoxedError;
use common_telemetry::debug;
use common_time::DateTime;
use common_time::timestamp::TimeUnit;
use common_time::{DateTime, Timestamp};
use datafusion_expr::Operator;
use datatypes::data_type::ConcreteDataType;
use datatypes::types::cast;
Expand All @@ -33,8 +34,8 @@ use substrait::df_logical_plan::consumer::name_to_op;

use crate::adapter::error::{Error, ExternalSnafu, InvalidQuerySnafu, PlanSnafu};
use crate::expr::error::{
CastValueSnafu, DivisionByZeroSnafu, EvalError, InternalSnafu, TryFromValueSnafu,
TypeMismatchSnafu,
CastValueSnafu, DivisionByZeroSnafu, EvalError, InternalSnafu, OverflowSnafu,
TryFromValueSnafu, TypeMismatchSnafu,
};
use crate::expr::signature::{GenericFn, Signature};
use crate::expr::{InvalidArgumentSnafu, ScalarExpr, TypedExpr};
Expand Down Expand Up @@ -170,13 +171,13 @@ impl UnaryFunc {
generic_fn: GenericFn::Cast,
},
Self::TumbleWindowFloor { .. } => Signature {
input: smallvec![ConcreteDataType::datetime_datatype()],
output: ConcreteDataType::datetime_datatype(),
input: smallvec![ConcreteDataType::timestamp_millisecond_datatype()],
output: ConcreteDataType::timestamp_millisecond_datatype(),
generic_fn: GenericFn::TumbleWindow,
},
Self::TumbleWindowCeiling { .. } => Signature {
input: smallvec![ConcreteDataType::datetime_datatype()],
output: ConcreteDataType::datetime_datatype(),
input: smallvec![ConcreteDataType::timestamp_millisecond_datatype()],
output: ConcreteDataType::timestamp_millisecond_datatype(),
generic_fn: GenericFn::TumbleWindow,
},
}
Expand Down Expand Up @@ -276,33 +277,39 @@ impl UnaryFunc {
window_size,
start_time,
} => {
let ts = arg.as_datetime().with_context(|| TypeMismatchSnafu {
expected: ConcreteDataType::datetime_datatype(),
let ts = arg.as_timestamp().with_context(|| TypeMismatchSnafu {
expected: ConcreteDataType::timestamp_millisecond_datatype(),
actual: arg.data_type(),
})?;
let ts = ts.val();
let ts = ts
.convert_to(TimeUnit::Millisecond)
.context(OverflowSnafu)?
.value();
let start_time = start_time.map(|t| t.val()).unwrap_or(0);
let window_size = (window_size.to_nanosecond() / 1_000_000) as repr::Duration; // nanosecond to millisecond
let window_start = start_time + (ts - start_time) / window_size * window_size;

let ret = DateTime::new(window_start);
let ret = Timestamp::new_millisecond(window_start);
Ok(Value::from(ret))
}
Self::TumbleWindowCeiling {
window_size,
start_time,
} => {
let ts = arg.as_datetime().with_context(|| TypeMismatchSnafu {
expected: ConcreteDataType::datetime_datatype(),
let ts = arg.as_timestamp().with_context(|| TypeMismatchSnafu {
expected: ConcreteDataType::timestamp_millisecond_datatype(),
actual: arg.data_type(),
})?;
let ts = ts.val();
let ts = ts
.convert_to(TimeUnit::Millisecond)
.context(OverflowSnafu)?
.value();
let start_time = start_time.map(|t| t.val()).unwrap_or(0);
let window_size = (window_size.to_nanosecond() / 1_000_000) as repr::Duration; // nanosecond to millisecond
let window_start = start_time + (ts - start_time) / window_size * window_size;

let window_end = window_start + window_size;
let ret = DateTime::new(window_end);
let ret = Timestamp::new_millisecond(window_end);
Ok(Value::from(ret))
}
}
Expand Down

0 comments on commit f2d8fbf

Please sign in to comment.