diff --git a/src/flow/src/adapter/flownode_impl.rs b/src/flow/src/adapter/flownode_impl.rs index 3131212ad532..99d02d27fe46 100644 --- a/src/flow/src/adapter/flownode_impl.rs +++ b/src/flow/src/adapter/flownode_impl.rs @@ -147,8 +147,8 @@ impl Flownode for FlownodeManager { let rows: Vec = rows_proto .into_iter() - .map(repr::Row::from) .map(|r| { + let r = repr::Row::from(r); let reordered = fetch_order .iter() .map(|&i| r.inner[i].clone()) diff --git a/src/flow/src/adapter/node_context.rs b/src/flow/src/adapter/node_context.rs index a5db59698dc0..1ef5823a477e 100644 --- a/src/flow/src/adapter/node_context.rs +++ b/src/flow/src/adapter/node_context.rs @@ -303,7 +303,7 @@ impl FlownodeContext { .get_by_name(table_name) .map(|(_, gid)| gid) .unwrap(); - self.schema.insert(gid, schema.into_named(vec![])); + self.schema.insert(gid, schema.into_unnamed()); Ok(()) } diff --git a/src/flow/src/adapter/table_source.rs b/src/flow/src/adapter/table_source.rs index 36e9cd7561eb..53932cd692c2 100644 --- a/src/flow/src/adapter/table_source.rs +++ b/src/flow/src/adapter/table_source.rs @@ -122,17 +122,17 @@ impl TableSource { ]; let raw_schema = table_info_value.table_info.meta.schema; - let (col_names, column_types): (Vec<_>, Vec<_>) = raw_schema + let (column_types, col_names): (Vec<_>, Vec<_>) = raw_schema .column_schemas .clone() .into_iter() .map(|col| { ( - col.name.clone(), ColumnType { nullable: col.is_nullable(), scalar_type: col.data_type, }, + col.name, ) }) .unzip(); diff --git a/src/flow/src/repr/relation.rs b/src/flow/src/repr/relation.rs index 48f4de2894f0..09e0b88344b7 100644 --- a/src/flow/src/repr/relation.rs +++ b/src/flow/src/repr/relation.rs @@ -263,9 +263,18 @@ impl RelationType { true } + /// Return relation describe with column names pub fn into_named(self, names: Vec) -> RelationDesc { RelationDesc { typ: self, names } } + + /// Return relation describe without column names + pub fn into_unnamed(self) -> RelationDesc { + RelationDesc { + typ: self, + names: vec![], + } + } } /// The type of a `Value` diff --git a/src/flow/src/transform.rs b/src/flow/src/transform.rs index 03927f516962..6f93e36e9682 100644 --- a/src/flow/src/transform.rs +++ b/src/flow/src/transform.rs @@ -211,7 +211,7 @@ mod test { let schema = RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), false)]); tri_map.insert(Some(name.clone()), Some(1024), gid); - schemas.insert(gid, schema.into_named(vec![])); + schemas.insert(gid, schema.into_unnamed()); } { @@ -225,7 +225,7 @@ mod test { ColumnType::new(CDT::uint32_datatype(), false), ColumnType::new(CDT::datetime_datatype(), false), ]); - schemas.insert(gid, schema.into_named(vec![])); + schemas.insert(gid, schema.into_unnamed()); tri_map.insert(Some(name.clone()), Some(1025), gid); } diff --git a/src/flow/src/transform/expr.rs b/src/flow/src/transform/expr.rs index 9058b98c0971..dfbc2c9125f9 100644 --- a/src/flow/src/transform/expr.rs +++ b/src/flow/src/transform/expr.rs @@ -101,7 +101,6 @@ impl TypedExpr { .unzip(); match arg_len { - // because variadic function can also have 1 arguments, we need to check if it's a variadic function first 1 if UnaryFunc::from_str_and_type(fn_name, None).is_ok() => { let func = UnaryFunc::from_str_and_type(fn_name, None)?; let arg = arg_exprs[0].clone(); @@ -124,7 +123,6 @@ impl TypedExpr { Ok(TypedExpr::new(arg.call_unary(func), ret_type)) } - // because variadic function can also have 2 arguments, we need to check if it's a variadic function first 2 if BinaryFunc::from_str_expr_and_type( fn_name, &arg_exprs, diff --git a/src/flow/src/utils.rs b/src/flow/src/utils.rs index 1762d0255282..30d48f0319d4 100644 --- a/src/flow/src/utils.rs +++ b/src/flow/src/utils.rs @@ -101,7 +101,14 @@ impl KeyExpiryManager { .or_default() .insert(row.clone()); - self.get_expire_duration(now, row) + if let Some(expire_time) = self.compute_expiration_timestamp(now) { + if expire_time > event_ts { + // return how much time it's expired + return Ok(Some(expire_time - event_ts)); + } + } + + Ok(None) } /// Get the expire duration of a key, if it's expired by now.