diff --git a/influxdb3/tests/server/query.rs b/influxdb3/tests/server/query.rs index 208674f3186..87775e33442 100644 --- a/influxdb3/tests/server/query.rs +++ b/influxdb3/tests/server/query.rs @@ -1046,8 +1046,7 @@ async fn api_v1_query_chunked() { [1, "a", 0.9], [2, "a", 0.89], [3, "a", 0.85] - ], - "partial": true + ] } ], "statement_id": 0, @@ -1108,8 +1107,7 @@ async fn api_v1_query_chunked() { "columns": ["time","host","usage"], "values": [ [3, "a", 0.85] - ], - "partial": true + ] } ], "statement_id": 0, diff --git a/influxdb3_server/src/http/v1.rs b/influxdb3_server/src/http/v1.rs index 8f9cc616d9d..149da6c61a7 100644 --- a/influxdb3_server/src/http/v1.rs +++ b/influxdb3_server/src/http/v1.rs @@ -393,6 +393,15 @@ impl ChunkBuffer { } } + /// This function returns true if the number of rows in the current series exceeds the chunk size + fn is_partial_series(&self) -> bool { + if let (Some(size), Some(m)) = (self.size, self.series.back()) { + m.1.len() > size + } else { + false + } + } + /// The [`ChunkBuffer`] is operating in chunked mode, and can flush a chunk fn can_flush(&self) -> bool { if let (Some(size), Some(m)) = (self.size, self.series.back()) { @@ -550,8 +559,8 @@ impl QueryResponseStream { fn flush_one(&mut self) -> QueryResponse { let columns = self.columns(); - let partial = self.buffer.can_flush().then_some(true); - + let partial_series = self.buffer.is_partial_series().then_some(true); + let partial_results = self.buffer.can_flush().then_some(true); // this unwrap is okay because we only ever call flush_one // after calling can_flush on the buffer: let (name, values) = self.buffer.flush_one().unwrap(); @@ -559,13 +568,13 @@ impl QueryResponseStream { name, columns, values, - partial, + partial: partial_series, }]; QueryResponse { results: vec![StatementResponse { statement_id: self.statement_id, series, - partial, + partial: partial_results, }], format: self.format, }