diff --git a/src/frontend/src/scheduler/plan_fragmenter.rs b/src/frontend/src/scheduler/plan_fragmenter.rs index 04337f57274ff..d2eb0e268dc6f 100644 --- a/src/frontend/src/scheduler/plan_fragmenter.rs +++ b/src/frontend/src/scheduler/plan_fragmenter.rs @@ -272,7 +272,7 @@ impl Query { #[derive(Debug, Clone)] pub enum SourceFetchParameters { - IcebergPredicate(IcebergPredicate), + IcebergSpecificInfo(IcebergSpecificInfo), KafkaTimebound { lower: Option, upper: Option, @@ -290,12 +290,12 @@ pub struct SourceFetchInfo { /// e.g. predicate pushdown for iceberg, timebound for kafka. pub fetch_parameters: SourceFetchParameters, pub as_of: Option, - pub iceberg_specific_info: Option, } #[derive(Debug, Clone)] pub struct IcebergSpecificInfo { pub iceberg_scan_type: IcebergScanType, + pub predicate: IcebergPredicate, } #[derive(Clone, Debug)] @@ -367,7 +367,7 @@ impl SourceScanInfo { } ( ConnectorProperties::Iceberg(prop), - SourceFetchParameters::IcebergPredicate(predicate), + SourceFetchParameters::IcebergSpecificInfo(iceberg_specific_info), ) => { let iceberg_enumerator = IcebergSplitEnumerator::new(*prop, SourceEnumeratorContext::dummy().into()) @@ -395,18 +395,14 @@ impl SourceScanInfo { } None => None, }; - let iceberg_scan_type = fetch_info - .iceberg_specific_info - .ok_or_else(|| anyhow!("Missing iceberg specific info"))? - .iceberg_scan_type; let split_info = iceberg_enumerator .list_splits_batch( fetch_info.schema, time_travel_info, batch_parallelism, - iceberg_scan_type, - predicate, + iceberg_specific_info.iceberg_scan_type, + iceberg_specific_info.predicate, ) .await? .into_iter() @@ -1111,7 +1107,6 @@ impl BatchPlanFragmenter { upper: timestamp_bound.1, }, as_of: None, - iceberg_specific_info: None, }))); } } else if let Some(batch_iceberg_scan) = node.as_batch_iceberg_scan() { @@ -1124,13 +1119,13 @@ impl BatchPlanFragmenter { return Ok(Some(SourceScanInfo::new(SourceFetchInfo { schema: batch_iceberg_scan.base.schema().clone(), connector: property, - fetch_parameters: SourceFetchParameters::IcebergPredicate( - batch_iceberg_scan.predicate.clone(), + fetch_parameters: SourceFetchParameters::IcebergSpecificInfo( + IcebergSpecificInfo { + predicate: batch_iceberg_scan.predicate.clone(), + iceberg_scan_type: batch_iceberg_scan.iceberg_scan_type(), + }, ), as_of, - iceberg_specific_info: Some(IcebergSpecificInfo { - iceberg_scan_type: batch_iceberg_scan.iceberg_scan_type(), - }), }))); } } else if let Some(source_node) = node.as_batch_source() { @@ -1146,7 +1141,6 @@ impl BatchPlanFragmenter { connector: property, fetch_parameters: SourceFetchParameters::Empty, as_of, - iceberg_specific_info: None, }))); } }