Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
erikgrinaker committed Apr 1, 2024
1 parent 2f63025 commit 64b6fbb
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 106 deletions.
21 changes: 3 additions & 18 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,24 +67,9 @@ impl Client {
/// Executes a query
pub async fn execute(&self, query: &str) -> Result<ResultSet> {
let mut conn = self.conn.lock().await;
let mut resultset =
match self.call_locked(&mut conn, Request::Execute(query.into())).await? {
Response::Execute(rs) => rs,
resp => return Err(Error::Internal(format!("Unexpected response {:?}", resp))),
};
if let ResultSet::Query { columns, .. } = resultset {
// FIXME We buffer rows for now to avoid lifetime hassles
let mut rows = Vec::new();
while let Some(result) = conn.try_next().await? {
match result? {
Response::Row(Some(row)) => rows.push(row),
Response::Row(None) => break,
response => {
return Err(Error::Internal(format!("Unexpected response {:?}", response)))
}
}
}
resultset = ResultSet::Query { columns, rows: Box::new(rows.into_iter().map(Ok)) }
let resultset = match self.call_locked(&mut conn, Request::Execute(query.into())).await? {
Response::Execute(rs) => rs,
resp => return Err(Error::Internal(format!("Unexpected response {:?}", resp))),
};
match &resultset {
ResultSet::Begin { version, read_only } => self.txn.set(Some((*version, *read_only))),
Expand Down
25 changes: 1 addition & 24 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use crate::sql;
use crate::sql::engine::Engine as _;
use crate::sql::execution::ResultSet;
use crate::sql::schema::{Catalog as _, Table};
use crate::sql::types::Row;

use ::log::{debug, error, info};
use futures::sink::SinkExt as _;
Expand Down Expand Up @@ -81,7 +80,6 @@ pub enum Request {
#[derive(Debug, Serialize, Deserialize)]
pub enum Response {
Execute(ResultSet),
Row(Option<Row>),
GetTable(Table),
ListTables(Vec<String>),
Status(sql::engine::Status),
Expand All @@ -106,29 +104,8 @@ impl Session {
tokio_serde::formats::Bincode::default(),
);
while let Some(request) = stream.try_next().await? {
let mut response = tokio::task::block_in_place(|| self.request(request));
let mut rows: Box<dyn Iterator<Item = Result<Response>> + Send> =
Box::new(std::iter::empty());
if let Ok(Response::Execute(ResultSet::Query { rows: ref mut resultrows, .. })) =
&mut response
{
rows = Box::new(
std::mem::replace(resultrows, Box::new(std::iter::empty()))
.map(|result| result.map(|row| Response::Row(Some(row))))
.chain(std::iter::once(Ok(Response::Row(None))))
.scan(false, |err_sent, response| match (&err_sent, &response) {
(true, _) => None,
(_, Err(error)) => {
*err_sent = true;
Some(Err(error.clone()))
}
_ => Some(response),
})
.fuse(),
);
}
let response = tokio::task::block_in_place(|| self.request(request));
stream.send(response).await?;
stream.send_all(&mut tokio_stream::iter(rows.map(Ok))).await?;
}
Ok(())
}
Expand Down
12 changes: 8 additions & 4 deletions src/sql/execution/aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ impl<T: Transaction> Executor<T> for Aggregation<T> {
let agg_count = self.aggregates.len();
match self.source.execute(txn)? {
ResultSet::Query { columns, mut rows } => {
while let Some(mut row) = rows.next().transpose()? {
for mut row in rows {
self.accumulators
.entry(row.split_off(self.aggregates.len()))
.or_insert(self.aggregates.iter().map(<dyn Accumulator>::from).collect())
Expand All @@ -48,9 +48,13 @@ impl<T: Transaction> Executor<T> for Aggregation<T> {
.enumerate()
.map(|(i, c)| if i < agg_count { Column { name: None } } else { c })
.collect(),
rows: Box::new(self.accumulators.into_iter().map(|(bucket, accs)| {
Ok(accs.into_iter().map(|acc| acc.aggregate()).chain(bucket).collect())
})),
rows: self
.accumulators
.into_iter()
.map(|(bucket, accs)| {
accs.into_iter().map(|acc| acc.aggregate()).chain(bucket).collect()
})
.collect(),
})
}
r => Err(Error::Internal(format!("Unexpected result {:?}", r))),
Expand Down
48 changes: 10 additions & 38 deletions src/sql/execution/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,60 +76,32 @@ impl<T: Transaction + 'static> dyn Executor<T> {
#[derivative(Debug, PartialEq)]
pub enum ResultSet {
// Transaction started
Begin {
version: u64,
read_only: bool,
},
Begin { version: u64, read_only: bool },
// Transaction committed
Commit {
version: u64,
},
Commit { version: u64 },
// Transaction rolled back
Rollback {
version: u64,
},
Rollback { version: u64 },
// Rows created
Create {
count: u64,
},
Create { count: u64 },
// Rows deleted
Delete {
count: u64,
},
Delete { count: u64 },
// Rows updated
Update {
count: u64,
},
Update { count: u64 },
// Table created
CreateTable {
name: String,
},
CreateTable { name: String },
// Table dropped
DropTable {
name: String,
},
DropTable { name: String },
// Query result
Query {
columns: Columns,
#[derivative(Debug = "ignore")]
#[derivative(PartialEq = "ignore")]
#[serde(skip, default = "ResultSet::empty_rows")]
rows: Rows,
},
Query { columns: Columns, rows: Rows },
// Explain result
Explain(Node),
}

impl ResultSet {
/// Creates an empty row iterator, for use by serde(default).
fn empty_rows() -> Rows {
Box::new(std::iter::empty())
}

/// Converts the ResultSet into a row, or errors if not a query result with rows.
pub fn into_row(self) -> Result<Row> {
if let ResultSet::Query { mut rows, .. } = self {
rows.next().transpose()?.ok_or_else(|| Error::Value("No rows returned".into()))
rows.into_iter().next().ok_or_else(|| Error::Value("No rows returned".into()))
} else {
Err(Error::Value(format!("Not a query result: {:?}", self)))
}
Expand Down
32 changes: 12 additions & 20 deletions src/sql/execution/source.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::super::engine::Transaction;
use super::super::types::{Column, Expression, Row, Value};
use super::super::types::{Column, Expression, Row, Rows, Value};
use super::{Executor, ResultSet};
use crate::error::Result;

Expand All @@ -21,8 +21,8 @@ impl<T: Transaction> Executor<T> for Scan {
fn execute(self: Box<Self>, txn: &mut T) -> Result<ResultSet> {
let table = txn.must_read_table(&self.table)?;
Ok(ResultSet::Query {
columns: table.columns.iter().map(|c| Column { name: Some(c.name.clone()) }).collect(),
rows: Box::new(txn.scan(&table.name, self.filter)?),
rows: txn.scan(&table.name, self.filter)?.collect::<Result<Rows>>()?,
columns: table.columns.into_iter().map(|c| Column { name: Some(c.name) }).collect(),
})
}
}
Expand All @@ -42,17 +42,13 @@ impl KeyLookup {
impl<T: Transaction> Executor<T> for KeyLookup {
fn execute(self: Box<Self>, txn: &mut T) -> Result<ResultSet> {
let table = txn.must_read_table(&self.table)?;

// FIXME Is there a way to pass the txn into an iterator closure instead?
let rows = self
.keys
.into_iter()
.filter_map(|key| txn.read(&table.name, &key).transpose())
.collect::<Result<Vec<Row>>>()?;

Ok(ResultSet::Query {
columns: table.columns.iter().map(|c| Column { name: Some(c.name.clone()) }).collect(),
rows: Box::new(rows.into_iter().map(Ok)),
rows: self
.keys
.into_iter()
.filter_map(|key| txn.read(&table.name, &key).transpose())
.collect::<Result<Rows>>()?,
columns: table.columns.into_iter().map(|c| Column { name: Some(c.name) }).collect(),
})
}
}
Expand All @@ -79,15 +75,14 @@ impl<T: Transaction> Executor<T> for IndexLookup {
pks.extend(txn.read_index(&self.table, &self.column, &value)?);
}

// FIXME Is there a way to pass the txn into an iterator closure instead?
let rows = pks
.into_iter()
.filter_map(|pk| txn.read(&table.name, &pk).transpose())
.collect::<Result<Vec<Row>>>()?;

Ok(ResultSet::Query {
columns: table.columns.iter().map(|c| Column { name: Some(c.name.clone()) }).collect(),
rows: Box::new(rows.into_iter().map(Ok)),
rows,
columns: table.columns.into_iter().map(|c| Column { name: Some(c.name) }).collect(),
})
}
}
Expand All @@ -103,9 +98,6 @@ impl Nothing {

impl<T: Transaction> Executor<T> for Nothing {
fn execute(self: Box<Self>, _: &mut T) -> Result<ResultSet> {
Ok(ResultSet::Query {
columns: Vec::new(),
rows: Box::new(std::iter::once(Ok(Row::new()))),
})
Ok(ResultSet::Query { columns: Vec::new(), rows: Vec::new() })
}
}
8 changes: 6 additions & 2 deletions src/sql/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,12 @@ impl From<&str> for Value {
/// A row of values
pub type Row = Vec<Value>;

/// A row iterator
pub type Rows = Box<dyn Iterator<Item = Result<Row>> + Send>;
/// A set of rows.
///
/// For simplicitly, the SQL engine uses row vectors instead of streaming
/// iterators, since the Raft and SQL network protocols don't support streaming
/// reads anyway.
pub type Rows = Vec<Row>;

/// A column (in a result set, see schema::Column for table columns)
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
Expand Down

0 comments on commit 64b6fbb

Please sign in to comment.