Skip to content

Commit

Permalink
refactor: per reviews
Browse files Browse the repository at this point in the history
  • Loading branch information
discord9 committed May 24, 2024
1 parent 23ccfd1 commit 3535a52
Show file tree
Hide file tree
Showing 7 changed files with 23 additions and 9 deletions.
2 changes: 1 addition & 1 deletion src/flow/src/adapter/flownode_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,8 @@ impl Flownode for FlownodeManager {

let rows: Vec<DiffRow> = rows_proto
.into_iter()
.map(repr::Row::from)
.map(|r| {
let r = repr::Row::from(r);
let reordered = fetch_order
.iter()
.map(|&i| r.inner[i].clone())
Expand Down
2 changes: 1 addition & 1 deletion src/flow/src/adapter/node_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ impl FlownodeContext {
.get_by_name(table_name)
.map(|(_, gid)| gid)
.unwrap();
self.schema.insert(gid, schema.into_named(vec![]));
self.schema.insert(gid, schema.into_unnamed());
Ok(())
}

Expand Down
4 changes: 2 additions & 2 deletions src/flow/src/adapter/table_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,17 +122,17 @@ impl TableSource {
];

let raw_schema = table_info_value.table_info.meta.schema;
let (col_names, column_types): (Vec<_>, Vec<_>) = raw_schema
let (column_types, col_names): (Vec<_>, Vec<_>) = raw_schema
.column_schemas
.clone()
.into_iter()
.map(|col| {
(
col.name.clone(),
ColumnType {
nullable: col.is_nullable(),
scalar_type: col.data_type,
},
col.name,
)
})
.unzip();
Expand Down
9 changes: 9 additions & 0 deletions src/flow/src/repr/relation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,18 @@ impl RelationType {
true
}

/// Return relation describe with column names
pub fn into_named(self, names: Vec<ColumnName>) -> RelationDesc {
RelationDesc { typ: self, names }
}

/// Return relation describe without column names
pub fn into_unnamed(self) -> RelationDesc {
RelationDesc {
typ: self,
names: vec![],
}
}
}

/// The type of a `Value`
Expand Down
4 changes: 2 additions & 2 deletions src/flow/src/transform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ mod test {
let schema = RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), false)]);

tri_map.insert(Some(name.clone()), Some(1024), gid);
schemas.insert(gid, schema.into_named(vec![]));
schemas.insert(gid, schema.into_unnamed());
}

{
Expand All @@ -225,7 +225,7 @@ mod test {
ColumnType::new(CDT::uint32_datatype(), false),
ColumnType::new(CDT::datetime_datatype(), false),
]);
schemas.insert(gid, schema.into_named(vec![]));
schemas.insert(gid, schema.into_unnamed());
tri_map.insert(Some(name.clone()), Some(1025), gid);
}

Expand Down
2 changes: 0 additions & 2 deletions src/flow/src/transform/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ impl TypedExpr {
.unzip();

match arg_len {
// because variadic function can also have 1 arguments, we need to check if it's a variadic function first
1 if UnaryFunc::from_str_and_type(fn_name, None).is_ok() => {
let func = UnaryFunc::from_str_and_type(fn_name, None)?;
let arg = arg_exprs[0].clone();
Expand All @@ -124,7 +123,6 @@ impl TypedExpr {

Ok(TypedExpr::new(arg.call_unary(func), ret_type))
}
// because variadic function can also have 2 arguments, we need to check if it's a variadic function first
2 if BinaryFunc::from_str_expr_and_type(
fn_name,
&arg_exprs,
Expand Down
9 changes: 8 additions & 1 deletion src/flow/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,14 @@ impl KeyExpiryManager {
.or_default()
.insert(row.clone());

self.get_expire_duration(now, row)
if let Some(expire_time) = self.compute_expiration_timestamp(now) {
if expire_time > event_ts {
// return how much time it's expired
return Ok(Some(expire_time - event_ts));
}
}

Ok(None)
}

/// Get the expire duration of a key, if it's expired by now.
Expand Down

0 comments on commit 3535a52

Please sign in to comment.