Skip to content

Commit

Permalink
fix: after rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
discord9 committed Feb 22, 2024
1 parent 46927e3 commit 68a299d
Showing 1 changed file with 7 additions and 7 deletions.
14 changes: 7 additions & 7 deletions src/flow/src/expr/linear.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,14 @@ impl MapFilterProject {

// Validate column references.
assert!(predicate
.support()
.get_all_ref_columns()
.into_iter()
.all(|c| c < self.input_arity + self.expressions.len()));

// Insert predicate as eagerly as it can be evaluated:
// just after the largest column in its support is formed.
let max_support = predicate
.support()
.get_all_ref_columns()
.into_iter()
.max()
.map(|c| c + 1)
Expand All @@ -145,7 +145,7 @@ impl MapFilterProject {

// Validate column references.
assert!(expression
.support()
.get_all_ref_columns()
.into_iter()
.all(|c| c < self.input_arity + self.expressions.len()));

Expand Down Expand Up @@ -186,7 +186,7 @@ impl MapFilterProject {
///
/// The main behavior is extract temporal predicates, which cannot be evaluated
/// using the standard machinery.
pub fn into_plan(self) -> Result<MfpPlan, String> {
pub fn into_plan(self) -> Result<MfpPlan, EvalError> {
MfpPlan::create_from(self)
}

Expand All @@ -198,12 +198,12 @@ impl MapFilterProject {
pub fn demand(&self) -> BTreeSet<usize> {
let mut demanded = BTreeSet::new();
for (_index, pred) in self.predicates.iter() {
demanded.extend(pred.support());
demanded.extend(pred.get_all_ref_columns());
}
demanded.extend(self.projection.iter().cloned());
for index in (0..self.expressions.len()).rev() {
if demanded.contains(&(self.input_arity + index)) {
demanded.extend(self.expressions[index].support());
demanded.extend(self.expressions[index].get_all_ref_columns());
}
}
demanded.retain(|col| col < &self.input_arity);
Expand Down Expand Up @@ -355,7 +355,7 @@ pub struct MfpPlan {

impl MfpPlan {
/// find `now` in `predicates` and put them into lower/upper temporal bounds for temporal filter to use
pub fn create_from(mut mfp: MapFilterProject) -> Result<Self, String> {
pub fn create_from(mut mfp: MapFilterProject) -> Result<Self, EvalError> {
let mut lower_bounds = Vec::new();
let mut upper_bounds = Vec::new();

Expand Down

0 comments on commit 68a299d

Please sign in to comment.