Skip to content

Commit

Permalink
fix comm
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs committed Nov 22, 2024
1 parent 78d43ed commit 3718c92
Showing 1 changed file with 10 additions and 16 deletions.
26 changes: 10 additions & 16 deletions src/frontend/src/scheduler/plan_fragmenter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ impl Query {

#[derive(Debug, Clone)]
pub enum SourceFetchParameters {
IcebergPredicate(IcebergPredicate),
IcebergSpecificInfo(IcebergSpecificInfo),
KafkaTimebound {
lower: Option<i64>,
upper: Option<i64>,
Expand All @@ -290,12 +290,12 @@ pub struct SourceFetchInfo {
/// e.g. predicate pushdown for iceberg, timebound for kafka.
pub fetch_parameters: SourceFetchParameters,
pub as_of: Option<AsOf>,
pub iceberg_specific_info: Option<IcebergSpecificInfo>,
}

#[derive(Debug, Clone)]
pub struct IcebergSpecificInfo {
pub iceberg_scan_type: IcebergScanType,
pub predicate: IcebergPredicate,
}

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -367,7 +367,7 @@ impl SourceScanInfo {
}
(
ConnectorProperties::Iceberg(prop),
SourceFetchParameters::IcebergPredicate(predicate),
SourceFetchParameters::IcebergSpecificInfo(iceberg_specific_info),
) => {
let iceberg_enumerator =
IcebergSplitEnumerator::new(*prop, SourceEnumeratorContext::dummy().into())
Expand Down Expand Up @@ -395,18 +395,14 @@ impl SourceScanInfo {
}
None => None,
};
let iceberg_scan_type = fetch_info
.iceberg_specific_info
.ok_or_else(|| anyhow!("Missing iceberg specific info"))?
.iceberg_scan_type;

let split_info = iceberg_enumerator
.list_splits_batch(
fetch_info.schema,
time_travel_info,
batch_parallelism,
iceberg_scan_type,
predicate,
iceberg_specific_info.iceberg_scan_type,
iceberg_specific_info.predicate,
)
.await?
.into_iter()
Expand Down Expand Up @@ -1111,7 +1107,6 @@ impl BatchPlanFragmenter {
upper: timestamp_bound.1,
},
as_of: None,
iceberg_specific_info: None,
})));
}
} else if let Some(batch_iceberg_scan) = node.as_batch_iceberg_scan() {
Expand All @@ -1124,13 +1119,13 @@ impl BatchPlanFragmenter {
return Ok(Some(SourceScanInfo::new(SourceFetchInfo {
schema: batch_iceberg_scan.base.schema().clone(),
connector: property,
fetch_parameters: SourceFetchParameters::IcebergPredicate(
batch_iceberg_scan.predicate.clone(),
fetch_parameters: SourceFetchParameters::IcebergSpecificInfo(
IcebergSpecificInfo {
predicate: batch_iceberg_scan.predicate.clone(),
iceberg_scan_type: batch_iceberg_scan.iceberg_scan_type(),
},
),
as_of,
iceberg_specific_info: Some(IcebergSpecificInfo {
iceberg_scan_type: batch_iceberg_scan.iceberg_scan_type(),
}),
})));
}
} else if let Some(source_node) = node.as_batch_source() {
Expand All @@ -1146,7 +1141,6 @@ impl BatchPlanFragmenter {
connector: property,
fetch_parameters: SourceFetchParameters::Empty,
as_of,
iceberg_specific_info: None,
})));
}
}
Expand Down

0 comments on commit 3718c92

Please sign in to comment.