Skip to content

Commit

Permalink
format: improve SQL formatting, including Raft SQL commands
Browse files Browse the repository at this point in the history
  • Loading branch information
erikgrinaker committed Jul 9, 2024
1 parent 457b04c commit 755ccf2
Show file tree
Hide file tree
Showing 24 changed files with 1,229 additions and 1,137 deletions.
2 changes: 1 addition & 1 deletion src/bin/toydump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ fn main() -> Result<()> {
let mut scan = engine.scan(..);
while let Some((key, value)) = scan.next().transpose()? {
let mut string = match raft {
true => format::Raft::key_value(&key, &value),
true => format::Raft::<format::SQLCommand>::key_value(&key, &value),
false => format::MVCC::<format::SQL>::key_value(&key, &value),
};
if raw {
Expand Down
128 changes: 102 additions & 26 deletions src/encoding/format.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Decodes and formats raw keys and values, recursively as needed. Handles both
//! both Raft, MVCC, SQL, and raw binary data.
use super::{bincode, Key as _};
use super::{bincode, Key as _, Value as _};
use crate::raft;
use crate::sql;
use crate::storage::mvcc;
Expand All @@ -19,9 +19,7 @@ pub trait Formatter {

/// Formats a key/value pair.
fn key_value(key: &[u8], value: &[u8]) -> String {
let fkey = Self::key(key);
let fvalue = Self::value(key, value);
format!("{fkey} → {fvalue}")
Self::key_maybe_value(key, Some(value))
}

/// Formats a key/value pair, where the value may not exist.
Expand Down Expand Up @@ -53,23 +51,18 @@ impl Formatter for Raw {
}
}

/// Formats Raft log entries.
///
/// TODO: decode the commands as well.
pub struct Raft;
/// Formats Raft log entries. Dispatches to I for command formatting.
pub struct Raft<I: Formatter>(std::marker::PhantomData<I>);

impl Raft {
impl<I: Formatter> Raft<I> {
pub fn entry(entry: &raft::Entry) -> String {
format!(
"{}@{} {}",
entry.index,
entry.term,
entry.command.as_deref().map(Raw::bytes).unwrap_or("None".to_string())
)
let fcommand =
entry.command.as_deref().map(|c| I::value(&[], c)).unwrap_or("None".to_string());
format!("{}@{} {fcommand}", entry.index, entry.term)
}
}

impl Formatter for Raft {
impl<I: Formatter> Formatter for Raft<I> {
fn key(key: &[u8]) -> String {
let Ok(key) = raft::Key::decode(key) else {
return Raw::key(key);
Expand Down Expand Up @@ -154,41 +147,124 @@ impl<I: Formatter> Formatter for MVCC<I> {
}

/// Formats SQL keys/values.
///
/// TODO: consider more terse formatting, e.g. dropping the value type names and
/// instead relying on unambiguous string formatting.
///
/// TODO: decode and format the applied_index key.
pub struct SQL;

impl SQL {
fn literal(value: &sql::types::Value) -> String {
match value {
sql::types::Value::String(s) => format!("{s:?}"), // quoted string
value => value.to_string(),
}
}

fn values(values: impl IntoIterator<Item = sql::types::Value>) -> String {
values.into_iter().map(|v| Self::literal(&v)).join(",")
}

fn schema(table: sql::types::Table) -> String {
let re = regex::Regex::new(r#"\n\s*"#).expect("regex failed");
re.replace_all(&table.to_string(), " ").into_owned()
}
}

impl Formatter for SQL {
fn key(key: &[u8]) -> String {
// Special-case the applied_index key.
if key == sql::engine::Raft::APPLIED_INDEX_KEY {
return String::from_utf8_lossy(key).into_owned();
}

let Ok(key) = sql::engine::Key::decode(key) else { return Raw::key(key) };
format!("sql:{key:?}")
match key {
sql::engine::Key::Table(name) => format!("sql:Table({name})"),
sql::engine::Key::Index(table, column, value) => {
format!("sql:Index({table}.{column}, {})", Self::literal(&value))
}
sql::engine::Key::Row(table, id) => {
format!("sql:Row({table}, {})", Self::literal(&id))
}
}
}

fn value(key: &[u8], value: &[u8]) -> String {
// Special-case the applied_index key.
if key == sql::engine::Raft::APPLIED_INDEX_KEY {
if let Ok(applied_index) = bincode::deserialize::<raft::Index>(value) {
return applied_index.to_string();
}
}

let Ok(key) = sql::engine::Key::decode(key) else { return Raw::key(value) };
match key {
sql::engine::Key::Table(_) => {
let Ok(table) = bincode::deserialize::<sql::types::Table>(value) else {
return Raw::bytes(value);
};
let re = regex::Regex::new(r#"\n\s*"#).expect("regex failed");
re.replace_all(&format!("{table}"), " ").into_owned()
Self::schema(table)
}
sql::engine::Key::Row(_, _) => {
let Ok(row) = bincode::deserialize::<sql::types::Row>(value) else {
return Raw::bytes(value);
};
row.into_iter().map(|v| format!("{v:?}")).join(",")
Self::values(row)
}
sql::engine::Key::Index(_, _, _) => {
let Ok(index) = bincode::deserialize::<BTreeSet<sql::types::Value>>(value) else {
return Raw::bytes(value);
};
index.into_iter().map(|v| format!("{v:?}")).join(",")
Self::values(index)
}
}
}
}

/// Formats SQL Raft write commands, from the Raft log.
pub struct SQLCommand;

impl Formatter for SQLCommand {
/// There is no key, since they're wrapped in a Raft log entry.
fn key(_: &[u8]) -> String {
panic!("SQL commands don't have a key");
}

fn value(_: &[u8], value: &[u8]) -> String {
let Ok(write) = sql::engine::Write::decode(value) else {
return Raw::bytes(value);
};

let txn = match &write {
sql::engine::Write::Begin => None,
sql::engine::Write::Commit(txn)
| sql::engine::Write::Rollback(txn)
| sql::engine::Write::Delete { txn, .. }
| sql::engine::Write::Insert { txn, .. }
| sql::engine::Write::Update { txn, .. }
| sql::engine::Write::CreateTable { txn, .. }
| sql::engine::Write::DeleteTable { txn, .. } => Some(txn),
};
let ftxn =
txn.filter(|t| !t.read_only).map(|t| format!("t{} ", t.version)).unwrap_or_default();

let fcommand = match write {
sql::engine::Write::Begin => "BEGIN".to_string(),
sql::engine::Write::Commit(_) => "COMMIT".to_string(),
sql::engine::Write::Rollback(_) => "ROLLBACK".to_string(),
sql::engine::Write::Delete { table, ids, .. } => {
format!("DELETE {table} {}", ids.iter().map(|id| id.to_string()).join(","))
}
sql::engine::Write::Insert { table, rows, .. } => {
format!(
"INSERT {table} {}",
rows.into_iter().map(|row| format!("({})", SQL::values(row))).join(" ")
)
}
sql::engine::Write::Update { table, rows, .. } => format!(
"UPDATE {table} {}",
rows.into_iter().map(|(id, row)| format!("{id}→({})", SQL::values(row))).join(" ")
),
sql::engine::Write::CreateTable { schema, .. } => SQL::schema(schema),
sql::engine::Write::DeleteTable { table, .. } => format!("DROP TABLE {table}"),
};
format!("{ftxn}{fcommand}")
}
}
29 changes: 20 additions & 9 deletions src/raft/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,10 @@ mod tests {
args.reject_rest()?;
let index = self.log.append(command)?;
let entry = self.log.get(index)?.expect("entry not found");
output.push_str(&format!("append → {}\n", format::Raft::entry(&entry)));
output.push_str(&format!(
"append → {}\n",
format::Raft::<format::Raw>::entry(&entry)
));
}

// commit INDEX
Expand All @@ -385,7 +388,10 @@ mod tests {
args.reject_rest()?;
let index = self.log.commit(index)?;
let entry = self.log.get(index)?.expect("entry not found");
output.push_str(&format!("commit → {}\n", format::Raft::entry(&entry)));
output.push_str(&format!(
"commit → {}\n",
format::Raft::<format::Raw>::entry(&entry)
));
}

// dump
Expand All @@ -397,7 +403,7 @@ mod tests {
writeln!(
output,
"{} [{}]",
format::Raft::key_value(&key, &value),
format::Raft::<format::Raw>::key_value(&key, &value),
format::Raw::key_value(&key, &value)
)?;
}
Expand All @@ -414,7 +420,7 @@ mod tests {
.log
.get(index)?
.as_ref()
.map(format::Raft::entry)
.map(format::Raft::<format::Raw>::entry)
.unwrap_or("None".to_string());
output.push_str(&format!("{entry}\n"));
}
Expand Down Expand Up @@ -464,7 +470,8 @@ mod tests {
args.reject_rest()?;
let mut scan = self.log.scan(range);
while let Some(entry) = scan.next().transpose()? {
output.push_str(&format!("{}\n", format::Raft::entry(&entry)));
output
.push_str(&format!("{}\n", format::Raft::<format::Raw>::entry(&entry)));
}
}

Expand All @@ -476,7 +483,8 @@ mod tests {
args.reject_rest()?;
let mut scan = self.log.scan_apply(applied_index);
while let Some(entry) = scan.next().transpose()? {
output.push_str(&format!("{}\n", format::Raft::entry(&entry)));
output
.push_str(&format!("{}\n", format::Raft::<format::Raw>::entry(&entry)));
}
}

Expand Down Expand Up @@ -504,7 +512,10 @@ mod tests {
args.reject_rest()?;
let index = self.log.splice(entries)?;
let entry = self.log.get(index)?.expect("entry not found");
output.push_str(&format!("splice → {}\n", format::Raft::entry(&entry)));
output.push_str(&format!(
"splice → {}\n",
format::Raft::<format::Raw>::entry(&entry)
));
}

// status [engine=BOOL]
Expand Down Expand Up @@ -552,14 +563,14 @@ mod tests {
Operation::Delete { key } => writeln!(
output,
"engine delete {} [{}]",
format::Raft::key(&key),
format::Raft::<format::Raw>::key(&key),
format::Raw::key(&key)
)?,
Operation::Flush => writeln!(output, "engine flush")?,
Operation::Set { key, value } => writeln!(
output,
"engine set {} [{}]",
format::Raft::key_value(&key, &value),
format::Raft::<format::Raw>::key_value(&key, &value),
format::Raw::key_value(&key, &value)
)?,
}
Expand Down
2 changes: 1 addition & 1 deletion src/sql/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@ mod session;

pub use engine::{Catalog, Engine, Transaction};
pub use local::{Key, Local};
pub use raft::{Raft, Status};
pub use raft::{Raft, Status, Write};
pub use session::{Session, StatementResult};
17 changes: 11 additions & 6 deletions src/sql/engine/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ pub struct Raft {
}

impl Raft {
/// The unversioned key used to store the applied index. Just use a string
/// for simplicity.
pub const APPLIED_INDEX_KEY: &'static [u8] = b"applied_index";

/// Creates a new Raft-based SQL engine, given a Raft request channel to the
/// local Raft node.
pub fn new(tx: Sender<(raft::Request, Sender<Result<raft::Response>>)>) -> Self {
Expand Down Expand Up @@ -236,7 +240,7 @@ impl<E: storage::Engine> State<E> {
pub fn new(engine: E) -> Result<Self> {
let local = super::Local::new(engine);
let applied_index = local
.get_unversioned(b"applied_index")?
.get_unversioned(Raft::APPLIED_INDEX_KEY)?
.map(|b| bincode::deserialize(&b))
.transpose()?
.unwrap_or(0);
Expand Down Expand Up @@ -298,7 +302,7 @@ impl<E: storage::Engine> raft::State for State<E> {
// lose a tail of the state machine writes (e.g. if the machine
// crashes). Raft will replay the log from the last known applied index.
self.applied_index = entry.index;
self.local.set_unversioned(b"applied_index", bincode::serialize(&entry.index))?;
self.local.set_unversioned(Raft::APPLIED_INDEX_KEY, bincode::serialize(&entry.index))?;
result
}

Expand Down Expand Up @@ -341,8 +345,8 @@ impl<E: storage::Engine> raft::State for State<E> {

/// A Raft engine read. Values correspond to engine method parameters. Uses
/// Cows to allow borrowed encoding and owned decoding.
#[derive(Serialize, Deserialize)]
enum Read<'a> {
#[derive(Debug, Serialize, Deserialize)]
pub enum Read<'a> {
BeginReadOnly {
as_of: Option<mvcc::Version>,
},
Expand Down Expand Up @@ -378,8 +382,8 @@ impl<'a> encoding::Value for Read<'a> {}

/// A Raft engine write. Values correspond to engine method parameters. Uses
/// Cows to allow borrowed encoding (for borrowed params) and owned decoding.
#[derive(Serialize, Deserialize)]
enum Write<'a> {
#[derive(Debug, Serialize, Deserialize)]
pub enum Write<'a> {
Begin,
Commit(Cow<'a, mvcc::TransactionState>),
Rollback(Cow<'a, mvcc::TransactionState>),
Expand All @@ -389,6 +393,7 @@ enum Write<'a> {
Update { txn: Cow<'a, mvcc::TransactionState>, table: Cow<'a, str>, rows: BTreeMap<Value, Row> },

CreateTable { txn: Cow<'a, mvcc::TransactionState>, schema: Table },
// TODO: rename to DropTable.
DeleteTable { txn: Cow<'a, mvcc::TransactionState>, table: Cow<'a, str>, if_exists: bool },
}

Expand Down
8 changes: 4 additions & 4 deletions src/sql/testscripts/schema/create_table
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ CreateTable: test
CreateTable { name: "test" }
storage set mvcc:NextVersion → 2 ["\x00" → "\x02"]
storage set mvcc:TxnActive(1) → "" ["\x01\x00\x00\x00\x00\x00\x00\x00\x01" → ""]
storage set mvcc:TxnWrite(1, sql:Table("test")) → "" ["\x03\x00\x00\x00\x00\x00\x00\x00\x01\x00\xfftest\x00\xff\x00\xff\x00\x00" → ""]
storage set mvcc:Version(sql:Table("test"), 1) → CREATE TABLE test ( id INTEGER PRIMARY KEY ) ["\x04\x00\xfftest\x00\xff\x00\xff\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01" → "\x01\x10\x04test\x00\x01\x02id\x01\x00\x00\x01\x00\x00"]
storage delete mvcc:TxnWrite(1, sql:Table("test")) ["\x03\x00\x00\x00\x00\x00\x00\x00\x01\x00\xfftest\x00\xff\x00\xff\x00\x00"]
storage set mvcc:TxnWrite(1, sql:Table(test)) → "" ["\x03\x00\x00\x00\x00\x00\x00\x00\x01\x00\xfftest\x00\xff\x00\xff\x00\x00" → ""]
storage set mvcc:Version(sql:Table(test), 1) → CREATE TABLE test ( id INTEGER PRIMARY KEY ) ["\x04\x00\xfftest\x00\xff\x00\xff\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01" → "\x01\x10\x04test\x00\x01\x02id\x01\x00\x00\x01\x00\x00"]
storage delete mvcc:TxnWrite(1, sql:Table(test)) ["\x03\x00\x00\x00\x00\x00\x00\x00\x01\x00\xfftest\x00\xff\x00\xff\x00\x00"]
storage delete mvcc:TxnActive(1) ["\x01\x00\x00\x00\x00\x00\x00\x00\x01"]

dump
---
mvcc:NextVersion → 2 ["\x00" → "\x02"]
mvcc:Version(sql:Table("test"), 1) → CREATE TABLE test ( id INTEGER PRIMARY KEY ) ["\x04\x00\xfftest\x00\xff\x00\xff\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01" → "\x01\x10\x04test\x00\x01\x02id\x01\x00\x00\x01\x00\x00"]
mvcc:Version(sql:Table(test), 1) → CREATE TABLE test ( id INTEGER PRIMARY KEY ) ["\x04\x00\xfftest\x00\xff\x00\xff\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01" → "\x01\x10\x04test\x00\x01\x02id\x01\x00\x00\x01\x00\x00"]

# Errors if table already exists.
!> CREATE TABLE test (id INTEGER PRIMARY KEY)
Expand Down
6 changes: 3 additions & 3 deletions src/sql/testscripts/schema/create_table_index
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
---
storage set mvcc:NextVersion → 2 ["\x00" → "\x02"]
storage set mvcc:TxnActive(1) → "" ["\x01\x00\x00\x00\x00\x00\x00\x00\x01" → ""]
storage set mvcc:TxnWrite(1, sql:Table("indexed")) → "" ["\x03\x00\x00\x00\x00\x00\x00\x00\x01\x00\xffindexed\x00\xff\x00\xff\x00\x00" → ""]
storage set mvcc:Version(sql:Table("indexed"), 1) → CREATE TABLE indexed ( id INTEGER PRIMARY KEY, "index" INTEGER DEFAULT NULL INDEX ) ["\x04\x00\xffindexed\x00\xff\x00\xff\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01" → "\x01 \x07indexed\x00\x02\x02id\x01\x00\x00\x01\x00\x00\x05index\x01\x01\x01\x00\x00\x01\x00"]
storage delete mvcc:TxnWrite(1, sql:Table("indexed")) ["\x03\x00\x00\x00\x00\x00\x00\x00\x01\x00\xffindexed\x00\xff\x00\xff\x00\x00"]
storage set mvcc:TxnWrite(1, sql:Table(indexed)) → "" ["\x03\x00\x00\x00\x00\x00\x00\x00\x01\x00\xffindexed\x00\xff\x00\xff\x00\x00" → ""]
storage set mvcc:Version(sql:Table(indexed), 1) → CREATE TABLE indexed ( id INTEGER PRIMARY KEY, "index" INTEGER DEFAULT NULL INDEX ) ["\x04\x00\xffindexed\x00\xff\x00\xff\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01" → "\x01 \x07indexed\x00\x02\x02id\x01\x00\x00\x01\x00\x00\x05index\x01\x01\x01\x00\x00\x01\x00"]
storage delete mvcc:TxnWrite(1, sql:Table(indexed)) ["\x03\x00\x00\x00\x00\x00\x00\x00\x01\x00\xffindexed\x00\xff\x00\xff\x00\x00"]
storage delete mvcc:TxnActive(1) ["\x01\x00\x00\x00\x00\x00\x00\x00\x01"]

schema
Expand Down
Loading

0 comments on commit 755ccf2

Please sign in to comment.