Skip to content

Commit

Permalink
encoding: add format module for raw key/value formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
erikgrinaker committed Jun 29, 2024
1 parent 63c1aa6 commit 0787f87
Show file tree
Hide file tree
Showing 49 changed files with 743 additions and 724 deletions.
9 changes: 3 additions & 6 deletions src/bin/toydump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@
//! human-readable form. It only prints live BitCask data, not garbage entries.
#![warn(clippy::all)]

use toydb::errdata;
use toydb::encoding::format::{self, Formatter as _};
use toydb::error::Result;
use toydb::storage::debug;
use toydb::storage::{BitCask, Engine};

fn main() -> Result<()> {
Expand All @@ -17,10 +16,8 @@ fn main() -> Result<()> {
let mut engine = BitCask::new(file.into())?;
let mut scan = engine.scan(..);
while let Some((key, value)) = scan.next().transpose()? {
let (fkey, Some(fvalue)) = debug::format_key_value(&key, &Some(value)) else {
return errdata!("unexpected missing value at key {:?}", key);
};
println!("{fkey} → {fvalue}");
// TODO: handle SQL and Raft data too.
println!("{}", format::MVCC::<format::Raw>::key_value(&key, &value));
}
Ok(())
}
150 changes: 150 additions & 0 deletions src/encoding/format.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
//! Decodes and formats raw keys and values -- both Raft, MVCC, SQL, and user
//! data, recursively as needed.
use super::{bincode, Key as _};
use crate::raft;
use crate::storage::mvcc;

use itertools::Itertools as _;
use std::collections::BTreeSet;

/// Formats raw key/value pairs.
pub trait Formatter {
/// Formats a key.
fn key(key: &[u8]) -> String;

/// Formats a value.
fn value(key: &[u8], value: &[u8]) -> String;

/// Formats a key/value pair.
fn key_value(key: &[u8], value: &[u8]) -> String {
let fkey = Self::key(key);
let fvalue = Self::value(key, value);
format!("{fkey} → {fvalue}")
}

/// Formats a key/value pair, where the value may not exist.
fn key_maybe_value(key: &[u8], value: &Option<Vec<u8>>) -> String {
let fkey = Self::key(key);
let fvalue = value.as_deref().map(|v| Self::value(key, v)).unwrap_or("None".to_string());
format!("{fkey} → {fvalue}")
}
}

/// Formats raw byte slices.
pub struct Raw;

impl Raw {
pub fn bytes(bytes: &[u8]) -> String {
let escaped = bytes.iter().copied().flat_map(std::ascii::escape_default).collect_vec();
format!("\"{}\"", String::from_utf8_lossy(&escaped))
}
}

impl Formatter for Raw {
fn key(key: &[u8]) -> String {
Self::bytes(key)
}

fn value(_: &[u8], value: &[u8]) -> String {
Self::bytes(value)
}
}

/// Formats Raft log entries.
pub struct Raft;

impl Raft {
pub fn entry(entry: &raft::Entry) -> String {
format!(
"{}@{} {}",
entry.index,
entry.term,
entry.command.as_deref().map(Raw::bytes).unwrap_or("None".to_string())
)
}
}

impl Formatter for Raft {
fn key(key: &[u8]) -> String {
let Ok(key) = raft::Key::decode(key) else {
return Raw::key(key);
};
format!("raft:{key:?}")
}

fn value(key: &[u8], value: &[u8]) -> String {
let Ok(key) = raft::Key::decode(key) else {
return Raw::value(key, value);
};
match key {
raft::Key::CommitIndex => {
match bincode::deserialize::<(raft::Index, raft::Term)>(value) {
Ok((index, term)) => format!("{index}@{term}"),
Err(_) => Raw::bytes(value),
}
}
raft::Key::TermVote => {
match bincode::deserialize::<(raft::Term, Option<raft::NodeID>)>(value) {
Ok((term, vote)) => format!(
"term={term} vote={}",
vote.map(|v| v.to_string()).unwrap_or("None".to_string())
),
Err(_) => Raw::bytes(value),
}
}
raft::Key::Entry(_) => match bincode::deserialize::<raft::Entry>(value) {
Ok(entry) => Self::entry(&entry),
Err(_) => Raw::bytes(value),
},
}
}
}

/// Formats MVCC keys/values. Dispatches to I for inner key/value formatting.
pub struct MVCC<I: Formatter>(std::marker::PhantomData<I>);

impl<I: Formatter> Formatter for MVCC<I> {
fn key(key: &[u8]) -> String {
let Ok(key) = mvcc::Key::decode(key) else { return Raw::key(key) };
match key {
mvcc::Key::TxnWrite(version, innerkey) => {
format!("mvcc:TxnWrite({version}, {})", I::key(&innerkey))
}
mvcc::Key::Version(innerkey, version) => {
format!("mvcc:Version({}, {version})", I::key(&innerkey))
}
mvcc::Key::Unversioned(innerkey) => {
format!("mvcc:Unversioned({})", I::key(&innerkey))
}
mvcc::Key::NextVersion | mvcc::Key::TxnActive(_) | mvcc::Key::TxnActiveSnapshot(_) => {
format!("mvcc:{key:?}")
}
}
}

fn value(key: &[u8], value: &[u8]) -> String {
let Ok(key) = mvcc::Key::decode(key) else { return Raw::bytes(value) };
match key {
mvcc::Key::NextVersion => {
let Ok(version) = bincode::deserialize::<mvcc::Version>(value) else {
return Raw::bytes(value);
};
version.to_string()
}
mvcc::Key::TxnActiveSnapshot(_) => {
let Ok(active) = bincode::deserialize::<BTreeSet<u64>>(value) else {
return Raw::bytes(value);
};
format!("{{{}}}", active.iter().map(|v| v.to_string()).join(","))
}
mvcc::Key::TxnActive(_) | mvcc::Key::TxnWrite(_, _) => Raw::bytes(value),
mvcc::Key::Version(userkey, _) => match bincode::deserialize(value) {
Ok(Some(value)) => I::value(&userkey, value),
Ok(None) => "None".to_string(),
Err(_) => Raw::bytes(value),
},
mvcc::Key::Unversioned(userkey) => I::value(&userkey, value),
}
}
}
1 change: 1 addition & 0 deletions src/encoding/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
//! * bincode: used for values in the key/value store and network protocols.
pub mod bincode;
pub mod format;
pub mod keycode;

use crate::error::Result;
Expand Down
58 changes: 25 additions & 33 deletions src/raft/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,7 @@ impl<'a> std::iter::Iterator for Iterator<'a> {
#[cfg(test)]
mod tests {
use super::*;
use crate::encoding::format::{self, Formatter as _};
use crossbeam::channel::Receiver;
use regex::Regex;
use std::fmt::Write as _;
Expand Down Expand Up @@ -374,7 +375,7 @@ mod tests {
args.reject_rest()?;
let index = self.log.append(command)?;
let entry = self.log.get(index)?.expect("entry not found");
output.push_str(&format!("append → {}\n", Self::format_entry(&entry)));
output.push_str(&format!("append → {}\n", format::Raft::entry(&entry)));
}

// commit INDEX
Expand All @@ -384,7 +385,7 @@ mod tests {
args.reject_rest()?;
let index = self.log.commit(index)?;
let entry = self.log.get(index)?.expect("entry not found");
output.push_str(&format!("commit → {}\n", Self::format_entry(&entry)));
output.push_str(&format!("commit → {}\n", format::Raft::entry(&entry)));
}

// dump
Expand All @@ -393,8 +394,12 @@ mod tests {
let range = (std::ops::Bound::Unbounded, std::ops::Bound::Unbounded);
let mut scan = self.log.engine.scan_dyn(range);
while let Some((key, value)) = scan.next().transpose()? {
output.push_str(&Self::format_key_value(&key, &value));
output.push('\n');
writeln!(
output,
"{} [{}]",
format::Raft::key_value(&key, &value),
format::Raw::key_value(&key, &value)
)?;
}
}

Expand All @@ -409,7 +414,7 @@ mod tests {
.log
.get(index)?
.as_ref()
.map(Self::format_entry)
.map(format::Raft::entry)
.unwrap_or("None".to_string());
output.push_str(&format!("{entry}\n"));
}
Expand Down Expand Up @@ -459,7 +464,7 @@ mod tests {
args.reject_rest()?;
let mut scan = self.log.scan(range);
while let Some(entry) = scan.next().transpose()? {
output.push_str(&format!("{}\n", Self::format_entry(&entry)));
output.push_str(&format!("{}\n", format::Raft::entry(&entry)));
}
}

Expand All @@ -471,7 +476,7 @@ mod tests {
args.reject_rest()?;
let mut scan = self.log.scan_apply(applied_index);
while let Some(entry) = scan.next().transpose()? {
output.push_str(&format!("{}\n", Self::format_entry(&entry)));
output.push_str(&format!("{}\n", format::Raft::entry(&entry)));
}
}

Expand Down Expand Up @@ -499,7 +504,7 @@ mod tests {
args.reject_rest()?;
let index = self.log.splice(entries)?;
let entry = self.log.get(index)?.expect("entry not found");
output.push_str(&format!("splice → {}\n", Self::format_entry(&entry)));
output.push_str(&format!("splice → {}\n", format::Raft::entry(&entry)));
}

// status [engine=BOOL]
Expand Down Expand Up @@ -544,13 +549,19 @@ mod tests {
while let Ok(op) = self.op_rx.try_recv() {
match op {
_ if !show_ops => {}
Operation::Delete { key } => {
writeln!(output, "engine delete {}", Self::format_key(&key))?
}
Operation::Delete { key } => writeln!(
output,
"engine delete {} [{}]",
format::Raft::key(&key),
format::Raw::key(&key)
)?,
Operation::Flush => writeln!(output, "engine flush")?,
Operation::Set { key, value } => {
writeln!(output, "engine set {}", Self::format_key_value(&key, &value))?
}
Operation::Set { key, value } => writeln!(
output,
"engine set {} [{}]",
format::Raft::key_value(&key, &value),
format::Raw::key_value(&key, &value)
)?,
}
}
Ok(output)
Expand All @@ -571,25 +582,6 @@ mod tests {
Self { log, op_rx, tempdir }
}

/// Formats a log entry.
fn format_entry(entry: &Entry) -> String {
let command = match entry.command.as_ref() {
Some(raw) => std::str::from_utf8(raw).expect("invalid command"),
None => "None",
};
format!("{}@{} {command}", entry.index, entry.term)
}

/// Formats a raw key.
fn format_key(key: &[u8]) -> String {
format!("{:?} 0x{}", Key::decode(key).expect("invalid key"), hex::encode(key))
}

/// Formats a raw key/value pair.
fn format_key_value(key: &[u8], value: &[u8]) -> String {
format!("{} = 0x{}", Self::format_key(key), hex::encode(value))
}

/// Parses an index@term pair.
fn parse_index_term(s: &str) -> Result<(Index, Term), Box<dyn Error>> {
let re = Regex::new(r"^(\d+)@(\d+)$").expect("invalid regex");
Expand Down
2 changes: 1 addition & 1 deletion src/raft/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ mod message;
mod node;
mod state;

pub use log::{Entry, Index, Log};
pub use log::{Entry, Index, Key, Log};
pub use message::{Envelope, Message, ReadSequence, Request, RequestID, Response, Status};
pub use node::{Node, NodeID, Options, Term, Ticks};
pub use state::State;
Expand Down
30 changes: 15 additions & 15 deletions src/raft/testscripts/log/append
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ Panic: can't append entry in term 0
set_term 2
append foo [ops]
---
append → 1@2 foo
engine set Entry(1) 0x000000000000000001 = 0x01020103666f6f
append → 1@2 "foo"
engine set raft:Entry(1) → 1@2 "foo" ["\x00\x00\x00\x00\x00\x00\x00\x00\x01" → "\x01\x02\x01\x03foo"]
engine flush

# Appending a noop entry (no command) also works.
append [ops]
---
append → 2@2 None
engine set Entry(2) 0x000000000000000002 = 0x020200
engine set raft:Entry(2) → 2@2 None ["\x00\x00\x00\x00\x00\x00\x00\x00\x02" → "\x02\x02\x00"]
engine flush

# Check that the last index/term is updated (commit index isn't), and that
Expand All @@ -26,19 +26,19 @@ scan
dump
---
term=2 last=2@2 commit=0@0 vote=None
1@2 foo
1@2 "foo"
2@2 None
Entry(1) 0x000000000000000001 = 0x01020103666f6f
Entry(2) 0x000000000000000002 = 0x020200
TermVote 0x01 = 0x0200
raft:Entry(1) → 1@2 "foo" ["\x00\x00\x00\x00\x00\x00\x00\x00\x01" → "\x01\x02\x01\x03foo"]
raft:Entry(2) → 2@2 None ["\x00\x00\x00\x00\x00\x00\x00\x00\x02" → "\x02\x02\x00"]
raft:TermVote → term=2 vote=None ["\x01" → "\x02\x00"]

# Skipping a term then appending is allowed.
set_term 3
append command
set_term 5
append
---
append → 3@3 command
append → 3@3 "command"
append → 4@5 None

# Dump the final status and data.
Expand All @@ -47,12 +47,12 @@ scan
dump
---
term=5 last=4@5 commit=0@0 vote=None
1@2 foo
1@2 "foo"
2@2 None
3@3 command
3@3 "command"
4@5 None
Entry(1) 0x000000000000000001 = 0x01020103666f6f
Entry(2) 0x000000000000000002 = 0x020200
Entry(3) 0x000000000000000003 = 0x03030107636f6d6d616e64
Entry(4) 0x000000000000000004 = 0x040500
TermVote 0x01 = 0x0500
raft:Entry(1) → 1@2 "foo" ["\x00\x00\x00\x00\x00\x00\x00\x00\x01" → "\x01\x02\x01\x03foo"]
raft:Entry(2) → 2@2 None ["\x00\x00\x00\x00\x00\x00\x00\x00\x02" → "\x02\x02\x00"]
raft:Entry(3) → 3@3 "command" ["\x00\x00\x00\x00\x00\x00\x00\x00\x03" → "\x03\x03\x01\x07command"]
raft:Entry(4) → 4@5 None ["\x00\x00\x00\x00\x00\x00\x00\x00\x04" → "\x04\x05\x00"]
raft:TermVote → term=5 vote=None ["\x01" → "\x05\x00"]
Loading

0 comments on commit 0787f87

Please sign in to comment.