Skip to content

Commit

Permalink
chore: add test
Browse files Browse the repository at this point in the history
  • Loading branch information
Taylor-lagrange committed May 16, 2024
1 parent 721a7d4 commit 6ab7d7f
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 20 deletions.
8 changes: 5 additions & 3 deletions src/promql/src/extension_plan/instant_manipulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,9 +347,11 @@ impl InstantManipulateStream {
.column(self.time_index)
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.ok_or(DataFusionError::Execution(
"Time index Column downcast to TimestampMillisecondArray failed".into(),
))?;
.ok_or_else(|| {
DataFusionError::Execution(
"Time index Column downcast to TimestampMillisecondArray failed".into(),
)
})?;

// field column for staleness check
let field_column = self
Expand Down
8 changes: 5 additions & 3 deletions src/promql/src/extension_plan/normalize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,9 +254,11 @@ impl SeriesNormalizeStream {
.column(self.time_index)
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.ok_or(DataFusionError::Execution(
"Time index Column downcast to TimestampMillisecondArray failed".into(),
))?;
.ok_or_else(|| {
DataFusionError::Execution(
"Time index Column downcast to TimestampMillisecondArray failed".into(),
)
})?;

// bias the timestamp column by offset
let ts_column_biased = if self.offset == 0 {
Expand Down
8 changes: 5 additions & 3 deletions src/promql/src/extension_plan/range_manipulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -480,9 +480,11 @@ impl RangeManipulateStream {
.column(self.time_index)
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.ok_or(DataFusionError::Execution(
"Time index Column downcast to TimestampMillisecondArray failed".into(),
))?;
.ok_or_else(|| {
DataFusionError::Execution(
"Time index Column downcast to TimestampMillisecondArray failed".into(),
)
})?;

let mut aligned_ts = vec![];
let mut ranges = vec![];
Expand Down
108 changes: 100 additions & 8 deletions src/promql/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -911,7 +911,8 @@ impl PromPlanner {
.resolve_table(table_ref.clone())
.await
.context(CatalogSnafu)?;
let table_schema = provider

let is_time_index_ms = provider
.as_any()
.downcast_ref::<DefaultTableSource>()
.context(UnknownTableSnafu)?
Expand All @@ -920,17 +921,13 @@ impl PromPlanner {
.downcast_ref::<DfTableProviderAdapter>()
.context(UnknownTableSnafu)?
.table()
.schema();

let time_index_type = &table_schema
.schema()
.timestamp_column()
.with_context(|| TimeIndexNotFoundSnafu {
table: table_ref.to_quoted_string(),
})?
.data_type;

let is_time_index_ms = *time_index_type == ConcreteDataType::time_millisecond_datatype()
|| *time_index_type == ConcreteDataType::timestamp_millisecond_datatype();
.data_type
== ConcreteDataType::timestamp_millisecond_datatype();

let mut scan_plan = LogicalPlanBuilder::scan(table_ref.clone(), provider, None)
.context(DataFusionPlanningSnafu)?
Expand Down Expand Up @@ -3029,4 +3026,99 @@ mod test {
assert!(plan.is_err(), "query: {:?}", query);
}
}

#[tokio::test]
async fn test_non_ms_precision() {
let catalog_list = MemoryCatalogManager::with_default_setup();
let columns = vec![
ColumnSchema::new(
"tag".to_string(),
ConcreteDataType::string_datatype(),
false,
),
ColumnSchema::new(
"timestamp".to_string(),
ConcreteDataType::timestamp_nanosecond_datatype(),
false,
)
.with_time_index(true),
ColumnSchema::new(
"field".to_string(),
ConcreteDataType::float64_datatype(),
true,
),
];
let schema = Arc::new(Schema::new(columns));
let table_meta = TableMetaBuilder::default()
.schema(schema)
.primary_key_indices(vec![0])
.value_indices(vec![2])
.next_column_id(1024)
.build()
.unwrap();
let table_info = TableInfoBuilder::default()
.name("metrics".to_string())
.meta(table_meta)
.build()
.unwrap();
let table = EmptyTable::from_table_info(&table_info);
assert!(catalog_list
.register_table_sync(RegisterTableRequest {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table_name: "metrics".to_string(),
table_id: 1024,
table,
})
.is_ok());

let plan = PromPlanner::stmt_to_plan(
DfTableSourceProvider::new(catalog_list.clone(), false, QueryContext::arc().as_ref()),
EvalStmt {
expr: parser::parse("metrics{tag = \"1\"}").unwrap(),
start: UNIX_EPOCH,
end: UNIX_EPOCH
.checked_add(Duration::from_secs(100_000))
.unwrap(),
interval: Duration::from_secs(5),
lookback_delta: Duration::from_secs(1),
},
)
.await
.unwrap();
assert_eq!(plan.display_indent_schema().to_string(),
"PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
\n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [false] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
\n PromSeriesDivide: tags=[\"tag\"] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
\n Sort: metrics.tag DESC NULLS LAST, metrics.timestamp DESC NULLS LAST [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
\n Filter: metrics.tag = Utf8(\"1\") AND metrics.timestamp >= TimestampMillisecond(-1000, None) AND metrics.timestamp <= TimestampMillisecond(100001000, None) [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
\n Projection: metrics.field, metrics.tag, CAST(metrics.timestamp AS Timestamp(Millisecond, None)) AS timestamp [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
\n TableScan: metrics [tag:Utf8, timestamp:Timestamp(Nanosecond, None), field:Float64;N]"
);
let plan = PromPlanner::stmt_to_plan(
DfTableSourceProvider::new(catalog_list.clone(), false, QueryContext::arc().as_ref()),
EvalStmt {
expr: parser::parse("avg_over_time(metrics{tag = \"1\"}[5s])").unwrap(),
start: UNIX_EPOCH,
end: UNIX_EPOCH
.checked_add(Duration::from_secs(100_000))
.unwrap(),
interval: Duration::from_secs(5),
lookback_delta: Duration::from_secs(1),
},
)
.await
.unwrap();
assert_eq!(plan.display_indent_schema().to_string(),
"Filter: prom_avg_over_time(timestamp_range,field) IS NOT NULL [timestamp:Timestamp(Millisecond, None), prom_avg_over_time(timestamp_range,field):Float64;N, tag:Utf8]\
\n Projection: metrics.timestamp, prom_avg_over_time(timestamp_range, field) AS prom_avg_over_time(timestamp_range,field), metrics.tag [timestamp:Timestamp(Millisecond, None), prom_avg_over_time(timestamp_range,field):Float64;N, tag:Utf8]\
\n PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[5000], time index=[timestamp], values=[\"field\"] [field:Dictionary(Int64, Float64);N, tag:Utf8, timestamp:Timestamp(Millisecond, None), timestamp_range:Dictionary(Int64, Timestamp(Millisecond, None))]\
\n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
\n PromSeriesDivide: tags=[\"tag\"] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
\n Sort: metrics.tag DESC NULLS LAST, metrics.timestamp DESC NULLS LAST [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
\n Filter: metrics.tag = Utf8(\"1\") AND metrics.timestamp >= TimestampMillisecond(-6000, None) AND metrics.timestamp <= TimestampMillisecond(100001000, None) [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
\n Projection: metrics.field, metrics.tag, CAST(metrics.timestamp AS Timestamp(Millisecond, None)) AS timestamp [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
\n TableScan: metrics [tag:Utf8, timestamp:Timestamp(Nanosecond, None), field:Float64;N]"
);
}
}
8 changes: 5 additions & 3 deletions src/query/src/range_select/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1116,9 +1116,11 @@ impl RangeSelectStream {
let ts_column_ref = ts_column
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.ok_or(DataFusionError::Execution(
"Time index Column downcast to TimestampMillisecondArray failed".into(),
))?;
.ok_or_else(|| {
DataFusionError::Execution(
"Time index Column downcast to TimestampMillisecondArray failed".into(),
)
})?;
for i in 0..self.range_exec.len() {
let args = self.evaluate_many(&batch, &self.range_exec[i].args)?;
// use self.modify_map record (hash, align_ts) => [row_nums]
Expand Down

0 comments on commit 6ab7d7f

Please sign in to comment.