diff --git a/src/client.rs b/src/client.rs index e5b7df8ec..e5edc417c 100644 --- a/src/client.rs +++ b/src/client.rs @@ -67,24 +67,9 @@ impl Client { /// Executes a query pub async fn execute(&self, query: &str) -> Result { let mut conn = self.conn.lock().await; - let mut resultset = - match self.call_locked(&mut conn, Request::Execute(query.into())).await? { - Response::Execute(rs) => rs, - resp => return Err(Error::Internal(format!("Unexpected response {:?}", resp))), - }; - if let ResultSet::Query { columns, .. } = resultset { - // FIXME We buffer rows for now to avoid lifetime hassles - let mut rows = Vec::new(); - while let Some(result) = conn.try_next().await? { - match result? { - Response::Row(Some(row)) => rows.push(row), - Response::Row(None) => break, - response => { - return Err(Error::Internal(format!("Unexpected response {:?}", response))) - } - } - } - resultset = ResultSet::Query { columns, rows: Box::new(rows.into_iter().map(Ok)) } + let resultset = match self.call_locked(&mut conn, Request::Execute(query.into())).await? { + Response::Execute(rs) => rs, + resp => return Err(Error::Internal(format!("Unexpected response {:?}", resp))), }; match &resultset { ResultSet::Begin { version, read_only } => self.txn.set(Some((*version, *read_only))), diff --git a/src/server.rs b/src/server.rs index 864402b3a..54966ca8d 100644 --- a/src/server.rs +++ b/src/server.rs @@ -4,7 +4,6 @@ use crate::sql; use crate::sql::engine::Engine as _; use crate::sql::execution::ResultSet; use crate::sql::schema::{Catalog as _, Table}; -use crate::sql::types::Row; use ::log::{debug, error, info}; use futures::sink::SinkExt as _; @@ -81,7 +80,6 @@ pub enum Request { #[derive(Debug, Serialize, Deserialize)] pub enum Response { Execute(ResultSet), - Row(Option), GetTable(Table), ListTables(Vec), Status(sql::engine::Status), @@ -106,29 +104,8 @@ impl Session { tokio_serde::formats::Bincode::default(), ); while let Some(request) = stream.try_next().await? { - let mut response = tokio::task::block_in_place(|| self.request(request)); - let mut rows: Box> + Send> = - Box::new(std::iter::empty()); - if let Ok(Response::Execute(ResultSet::Query { rows: ref mut resultrows, .. })) = - &mut response - { - rows = Box::new( - std::mem::replace(resultrows, Box::new(std::iter::empty())) - .map(|result| result.map(|row| Response::Row(Some(row)))) - .chain(std::iter::once(Ok(Response::Row(None)))) - .scan(false, |err_sent, response| match (&err_sent, &response) { - (true, _) => None, - (_, Err(error)) => { - *err_sent = true; - Some(Err(error.clone())) - } - _ => Some(response), - }) - .fuse(), - ); - } + let response = tokio::task::block_in_place(|| self.request(request)); stream.send(response).await?; - stream.send_all(&mut tokio_stream::iter(rows.map(Ok))).await?; } Ok(()) } diff --git a/src/sql/execution/aggregation.rs b/src/sql/execution/aggregation.rs index e06f19379..781885dc7 100644 --- a/src/sql/execution/aggregation.rs +++ b/src/sql/execution/aggregation.rs @@ -26,7 +26,7 @@ impl Executor for Aggregation { let agg_count = self.aggregates.len(); match self.source.execute(txn)? { ResultSet::Query { columns, mut rows } => { - while let Some(mut row) = rows.next().transpose()? { + for mut row in rows { self.accumulators .entry(row.split_off(self.aggregates.len())) .or_insert(self.aggregates.iter().map(::from).collect()) @@ -48,9 +48,13 @@ impl Executor for Aggregation { .enumerate() .map(|(i, c)| if i < agg_count { Column { name: None } } else { c }) .collect(), - rows: Box::new(self.accumulators.into_iter().map(|(bucket, accs)| { - Ok(accs.into_iter().map(|acc| acc.aggregate()).chain(bucket).collect()) - })), + rows: self + .accumulators + .into_iter() + .map(|(bucket, accs)| { + accs.into_iter().map(|acc| acc.aggregate()).chain(bucket).collect() + }) + .collect(), }) } r => Err(Error::Internal(format!("Unexpected result {:?}", r))), diff --git a/src/sql/execution/mod.rs b/src/sql/execution/mod.rs index 2566d75b8..1461777d8 100644 --- a/src/sql/execution/mod.rs +++ b/src/sql/execution/mod.rs @@ -76,60 +76,32 @@ impl dyn Executor { #[derivative(Debug, PartialEq)] pub enum ResultSet { // Transaction started - Begin { - version: u64, - read_only: bool, - }, + Begin { version: u64, read_only: bool }, // Transaction committed - Commit { - version: u64, - }, + Commit { version: u64 }, // Transaction rolled back - Rollback { - version: u64, - }, + Rollback { version: u64 }, // Rows created - Create { - count: u64, - }, + Create { count: u64 }, // Rows deleted - Delete { - count: u64, - }, + Delete { count: u64 }, // Rows updated - Update { - count: u64, - }, + Update { count: u64 }, // Table created - CreateTable { - name: String, - }, + CreateTable { name: String }, // Table dropped - DropTable { - name: String, - }, + DropTable { name: String }, // Query result - Query { - columns: Columns, - #[derivative(Debug = "ignore")] - #[derivative(PartialEq = "ignore")] - #[serde(skip, default = "ResultSet::empty_rows")] - rows: Rows, - }, + Query { columns: Columns, rows: Rows }, // Explain result Explain(Node), } impl ResultSet { - /// Creates an empty row iterator, for use by serde(default). - fn empty_rows() -> Rows { - Box::new(std::iter::empty()) - } - /// Converts the ResultSet into a row, or errors if not a query result with rows. pub fn into_row(self) -> Result { if let ResultSet::Query { mut rows, .. } = self { - rows.next().transpose()?.ok_or_else(|| Error::Value("No rows returned".into())) + rows.into_iter().next().ok_or_else(|| Error::Value("No rows returned".into())) } else { Err(Error::Value(format!("Not a query result: {:?}", self))) } diff --git a/src/sql/execution/source.rs b/src/sql/execution/source.rs index a098b5df3..38c339f1b 100644 --- a/src/sql/execution/source.rs +++ b/src/sql/execution/source.rs @@ -1,5 +1,5 @@ use super::super::engine::Transaction; -use super::super::types::{Column, Expression, Row, Value}; +use super::super::types::{Column, Expression, Row, Rows, Value}; use super::{Executor, ResultSet}; use crate::error::Result; @@ -21,8 +21,8 @@ impl Executor for Scan { fn execute(self: Box, txn: &mut T) -> Result { let table = txn.must_read_table(&self.table)?; Ok(ResultSet::Query { - columns: table.columns.iter().map(|c| Column { name: Some(c.name.clone()) }).collect(), - rows: Box::new(txn.scan(&table.name, self.filter)?), + rows: txn.scan(&table.name, self.filter)?.collect::>()?, + columns: table.columns.into_iter().map(|c| Column { name: Some(c.name) }).collect(), }) } } @@ -42,17 +42,13 @@ impl KeyLookup { impl Executor for KeyLookup { fn execute(self: Box, txn: &mut T) -> Result { let table = txn.must_read_table(&self.table)?; - - // FIXME Is there a way to pass the txn into an iterator closure instead? - let rows = self - .keys - .into_iter() - .filter_map(|key| txn.read(&table.name, &key).transpose()) - .collect::>>()?; - Ok(ResultSet::Query { - columns: table.columns.iter().map(|c| Column { name: Some(c.name.clone()) }).collect(), - rows: Box::new(rows.into_iter().map(Ok)), + rows: self + .keys + .into_iter() + .filter_map(|key| txn.read(&table.name, &key).transpose()) + .collect::>()?, + columns: table.columns.into_iter().map(|c| Column { name: Some(c.name) }).collect(), }) } } @@ -79,15 +75,14 @@ impl Executor for IndexLookup { pks.extend(txn.read_index(&self.table, &self.column, &value)?); } - // FIXME Is there a way to pass the txn into an iterator closure instead? let rows = pks .into_iter() .filter_map(|pk| txn.read(&table.name, &pk).transpose()) .collect::>>()?; Ok(ResultSet::Query { - columns: table.columns.iter().map(|c| Column { name: Some(c.name.clone()) }).collect(), - rows: Box::new(rows.into_iter().map(Ok)), + rows, + columns: table.columns.into_iter().map(|c| Column { name: Some(c.name) }).collect(), }) } } @@ -103,9 +98,6 @@ impl Nothing { impl Executor for Nothing { fn execute(self: Box, _: &mut T) -> Result { - Ok(ResultSet::Query { - columns: Vec::new(), - rows: Box::new(std::iter::once(Ok(Row::new()))), - }) + Ok(ResultSet::Query { columns: Vec::new(), rows: Vec::new() }) } } diff --git a/src/sql/types/mod.rs b/src/sql/types/mod.rs index bb7138bc0..490635377 100644 --- a/src/sql/types/mod.rs +++ b/src/sql/types/mod.rs @@ -177,8 +177,12 @@ impl From<&str> for Value { /// A row of values pub type Row = Vec; -/// A row iterator -pub type Rows = Box> + Send>; +/// A set of rows. +/// +/// For simplicitly, the SQL engine uses row vectors instead of streaming +/// iterators, since the Raft and SQL network protocols don't support streaming +/// reads anyway. +pub type Rows = Vec; /// A column (in a result set, see schema::Column for table columns) #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]