diff --git a/influxdb3_server/src/http/v1.rs b/influxdb3_server/src/http/v1.rs index a3e0af9af92..a6b16f542de 100644 --- a/influxdb3_server/src/http/v1.rs +++ b/influxdb3_server/src/http/v1.rs @@ -20,7 +20,8 @@ use arrow::{ use bytes::Bytes; use chrono::{format::SecondsFormat, DateTime}; use datafusion::physical_plan::SendableRecordBatchStream; -use futures::{ready, stream::Fuse, Stream, StreamExt}; +use futures::future::FusedFuture; +use futures::{pin_mut, ready, stream::Fuse, Stream, StreamExt}; use hyper::http::HeaderValue; use hyper::{ header::ACCEPT, header::CONTENT_TYPE, header::TRANSFER_ENCODING, Body, Request, Response, @@ -776,6 +777,7 @@ mod tests { use arrow::datatypes::{DataType, Field, Schema}; use arrow::record_batch::RecordBatch; use arrow_array::{Float64Array, Int64Array, StringArray, TimestampNanosecondArray}; + use datafusion::error::DataFusionError; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use futures::stream::{self, StreamExt}; use serde_json::json; @@ -791,15 +793,7 @@ mod tests { Arc::new(StringArray::from_iter(vals)) } - fn f64s(vals: &[Option]) -> ArrayRef { - Arc::new(Float64Array::from_iter(vals.iter())) - } - - fn i64s(vals: &[Option]) -> ArrayRef { - Arc::new(Int64Array::from_iter(vals.iter().cloned())) - } - - fn create_test_record_batch() -> RecordBatch { + fn create_test_record_batch() -> Vec> { let meta = serde_json::to_string(&json!({ "measurement_column_index": 0, "tag_key_columns": [], @@ -813,42 +807,57 @@ mod tests { DataType::Timestamp(TimeUnit::Nanosecond, None), false, ), - Field::new("cpu", DataType::Utf8, true), - Field::new("device", DataType::Utf8, true), - Field::new("usage_idle", DataType::Float64, true), - Field::new("free", DataType::Int64, true), + Field::new("value", DataType::Utf8, true), ], HashMap::from([("iox::influxql::group_key::metadata".to_owned(), meta)]), )); - RecordBatch::try_new( - schema, + let record_batch_0 = Ok(RecordBatch::try_new( + Arc::clone(&schema), + vec![ + strs(&[Some("cpu"), Some("cpu")]), + times(&[1157082300000000000, 1157082310000000000]), + strs(&[Some("cpu0"), Some("cpu0")]), + ], + ) + .unwrap()); + + let record_batch_1 = Ok(RecordBatch::try_new( + Arc::clone(&schema), vec![ - strs(&[Some("cpu"), Some("cpu"), Some("cpu"), Some("cpu")]), + strs(&[Some("cpu"), Some("cpu"), Some("cpu")]), times(&[ 1157082300000000000, - 1157082310000000000, 1157082400000000000, 1157082320000000000, ]), - strs(&[Some("cpu0"), Some("cpu0"), Some("cpu1"), Some("cpu2")]), - strs(&[Some("disk1s1"), None, Some("disk1s1"), None]), - f64s(&[Some(99.1), Some(99.8), Some(99.2), Some(99.3)]), - i64s(&[None, Some(2133), Some(4110), Some(1995)]), + strs(&[Some("cpu0"), Some("cpu0"), Some("cpu2")]), ], ) - .unwrap() + .unwrap()); + + let record_batch_2 = Ok(RecordBatch::try_new( + Arc::clone(&schema), + vec![ + strs(&[Some("mem"), Some("mem")]), + times(&[1157082500000000000, 1157082420000000000]), + strs(&[Some("mem0"), Some("mem2")]), + ], + ) + .unwrap()); + + vec![record_batch_0, record_batch_1, record_batch_2] } #[tokio::test] async fn test_partial_flag() { let batch = create_test_record_batch(); - let schema = batch.schema(); - let input_stream = stream::iter(vec![Ok(batch.clone())]); + let schema = batch[0].as_ref().unwrap().schema(); + let input_stream = stream::iter(batch); let input: SendableRecordBatchStream = Box::pin(RecordBatchStreamAdapter::new( schema, Box::pin(input_stream), )); - let chunk_size = Some(1); + let chunk_size = Some(2); let mut query_response_stream = QueryResponseStream::new(0, input, chunk_size, QueryFormat::Json, None).unwrap(); @@ -863,22 +872,26 @@ mod tests { match counter { 0 => { assert!(resp.results[0].partial.unwrap()); + assert!(resp.results[0].series[0].partial.unwrap()); assert_eq!(resp.results[0].series[0].name, "cpu"); - assert_eq!(resp.results[0].series[0].values.len(), 1); + assert_eq!(resp.results[0].series[0].values.len(), 2); } 1 => { assert!(resp.results[0].partial.unwrap()); + assert!(resp.results[0].series[0].partial.unwrap()); assert_eq!(resp.results[0].series[0].name, "cpu"); - assert_eq!(resp.results[0].series[0].values.len(), 1); + assert_eq!(resp.results[0].series[0].values.len(), 2); } 2 => { assert!(resp.results[0].partial.unwrap()); + assert_eq!(resp.results[0].series[0].partial, None); assert_eq!(resp.results[0].series[0].name, "cpu"); assert_eq!(resp.results[0].series[0].values.len(), 1); } 3 => { assert_eq!(resp.results[0].partial, None); - assert_eq!(resp.results[0].series[0].name, "cpu"); + assert_eq!(resp.results[0].series[0].partial, None); + assert_eq!(resp.results[0].series[0].name, "mem"); assert_eq!(resp.results[0].series[0].values.len(), 1); } _ => panic!("Received more responses than expected"),