From 18a0660950b96c90d11cd683a1f0b34f13ff7880 Mon Sep 17 00:00:00 2001 From: Steve Myers Date: Tue, 19 Sep 2023 17:29:23 -0500 Subject: [PATCH] feat: add bdk_sqlite_store crate implementing PersistBackend backed by a SQLite database --- Cargo.toml | 1 + crates/sqlite_store/Cargo.toml | 21 ++ crates/sqlite_store/README.md | 11 + crates/sqlite_store/src/lib.rs | 59 ++++ crates/sqlite_store/src/store.rs | 319 ++++++++++++++++++ .../sqlite_store/tests/test_sqlite_store.rs | 1 + 6 files changed, 412 insertions(+) create mode 100644 crates/sqlite_store/Cargo.toml create mode 100644 crates/sqlite_store/README.md create mode 100644 crates/sqlite_store/src/lib.rs create mode 100644 crates/sqlite_store/src/store.rs create mode 100644 crates/sqlite_store/tests/test_sqlite_store.rs diff --git a/Cargo.toml b/Cargo.toml index e818d89964..c0ba90cbdb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,6 +3,7 @@ members = [ "crates/bdk", "crates/chain", "crates/file_store", + "crates/sqlite_store", "crates/electrum", "crates/esplora", "example-crates/example_cli", diff --git a/crates/sqlite_store/Cargo.toml b/crates/sqlite_store/Cargo.toml new file mode 100644 index 0000000000..3fc2026b06 --- /dev/null +++ b/crates/sqlite_store/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "bdk_sqlite_store" +version = "0.2.0" +edition = "2021" +license = "MIT OR Apache-2.0" +repository = "https://github.com/bitcoindevkit/bdk" +documentation = "https://docs.rs/bdk_file_store" +description = "A simple append-only SQLite based implementation of Persist for Bitcoin Dev Kit." +keywords = ["bitcoin", "persist", "persistence", "bdk", "sqlite"] +authors = ["Bitcoin Dev Kit Developers"] +readme = "README.md" + +[dependencies] +bdk_chain = { path = "../chain", version = "0.5.0", features = [ "serde", "miniscript" ] } +rusqlite = { version = "0.27.0", features = ["bundled"]} +serde = { version = "1", features = ["derive"] } +log = { version = "0.4", features = [] } +serde_json = "1.0.107" + +[dev-dependencies] +tempfile = "3" diff --git a/crates/sqlite_store/README.md b/crates/sqlite_store/README.md new file mode 100644 index 0000000000..0fbece97e4 --- /dev/null +++ b/crates/sqlite_store/README.md @@ -0,0 +1,11 @@ +# BDK SQLite Store + +This is a simple append-only [SQLite] database backed implementation of +[`Persist`](`bdk_chain::Persist`). + +The main structure is [`Store`](`crate::Store`), which can be used with [`bdk`]'s +`Wallet` to persist wallet data into a SQLite database file. + +[`bdk`]: https://docs.rs/bdk/latest +[`bdk_chain`]: https://docs.rs/bdk_chain/latest +[SQLite]: https://www.sqlite.org/index.html diff --git a/crates/sqlite_store/src/lib.rs b/crates/sqlite_store/src/lib.rs new file mode 100644 index 0000000000..6a4c6fff1c --- /dev/null +++ b/crates/sqlite_store/src/lib.rs @@ -0,0 +1,59 @@ +#![doc = include_str!("../README.md")] + +mod store; + +pub use store::*; + +/// Error that occurs while appending new change sets to the DB. +#[derive(Debug)] +pub enum AppendError { + /// JSON encoding error. + Json(serde_json::Error), + /// SQLite error. + Sqlite(rusqlite::Error), +} + +impl core::fmt::Display for AppendError { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + match self { + Self::Json(e) => write!(f, "json error trying to append change set: {}", e), + Self::Sqlite(e) => write!(f, "sqlite error trying to append change set: {}", e), + } + } +} + +impl From for AppendError { + fn from(error: serde_json::Error) -> Self { + Self::Json(error) + } +} + +impl std::error::Error for AppendError {} + +/// Error the occurs while iterating stored change sets from the DB. +#[derive(Debug)] +pub enum IterError { + /// Json decoding + Json { + rowid: usize, + err: serde_json::Error, + }, + /// Sqlite error + Sqlite(rusqlite::Error), + /// FromSql error + FromSql(rusqlite::types::FromSqlError), +} + +impl core::fmt::Display for IterError { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + match self { + IterError::Json { rowid, err } => write!( + f, + "json error trying to decode change set {}, {}", + rowid, err + ), + IterError::Sqlite(err) => write!(f, "Sqlite error {}", err), + IterError::FromSql(err) => write!(f, "FromSql error {}", err), + } + } +} diff --git a/crates/sqlite_store/src/store.rs b/crates/sqlite_store/src/store.rs new file mode 100644 index 0000000000..edfcbca41f --- /dev/null +++ b/crates/sqlite_store/src/store.rs @@ -0,0 +1,319 @@ +use log::info; +use rusqlite::{named_params, Connection, Error}; +use std::marker::PhantomData; +use std::time::{SystemTime, UNIX_EPOCH}; +use std::{fmt::Debug, path::Path}; + +use bdk_chain::{Append, PersistBackend}; + +use crate::{AppendError, IterError}; + +const MIGRATIONS: &[&str] = &[ + // schema version control + "CREATE TABLE version (version INTEGER)", + "INSERT INTO version VALUES (1)", + // changeset data + "CREATE TABLE changeset (timestamp INTEGER NOT NULL, json TEXT NOT NULL);", +]; + +/// Persists changesets (`C`) to a single SQLite DB file. +/// +/// The changesets are the results of altering wallet blockchain data. +#[derive(Debug)] +pub struct Store { + /// A rusqlite connection object to the SQLite database + pub conn: Connection, + marker: PhantomData, +} + +impl PersistBackend for Store +where + C: Default + Append + serde::Serialize + serde::de::DeserializeOwned, +{ + type WriteError = AppendError; + + type LoadError = IterError; + + fn write_changes(&mut self, changeset: &C) -> Result<(), Self::WriteError> { + self.append_changeset(changeset) + } + + fn load_from_persistence(&mut self) -> Result { + self.aggregate_changesets() + } +} + +impl Store +where + C: Default + Append + serde::Serialize + serde::de::DeserializeOwned, +{ + /// Creates a new store from a [`Path`]. + /// + /// The file must have been opened with read and write permissions. + /// + /// [`Path`]: std::path::Path + pub fn new>(path: P) -> Result { + let mut conn = Connection::open(path)?; + Self::migrate(&mut conn)?; + + Ok(Self { + conn, + marker: Default::default(), + }) + } + + /// Creates a new memory db store. + pub fn new_memory() -> Result { + let mut conn = Connection::open_in_memory()?; + Self::migrate(&mut conn)?; + + Ok(Self { + conn, + marker: Default::default(), + }) + } + + pub fn migrate(conn: &mut Connection) -> Result<(), Error> { + let version = Self::get_schema_version(conn)?; + let stmts = &MIGRATIONS[(version as usize)..]; + + // begin transaction, all migration statements and new schema version commit or rollback + let tx = conn.transaction()?; + + // execute every statement and return `Some` new schema version + // if execution fails, return `Error::Rusqlite` + // if no statements executed returns `None` + let new_version = stmts + .iter() + .enumerate() + .map(|version_stmt| { + info!( + "executing db migration {}: `{}`", + version + version_stmt.0 as i32 + 1, + version_stmt.1 + ); + tx.execute(version_stmt.1, []) + // map result value to next migration version + .map(|_| version_stmt.0 as i32 + version + 1) + }) + .last() + .transpose()?; + + // if `Some` new statement version, set new schema version + if let Some(version) = new_version { + Self::set_schema_version(&tx, version)?; + } else { + info!("db up to date, no migration needed"); + } + + // commit transaction + tx.commit()?; + Ok(()) + } + + fn get_schema_version(conn: &Connection) -> rusqlite::Result { + let statement = conn.prepare_cached("SELECT version FROM version"); + match statement { + Err(Error::SqliteFailure(e, Some(msg))) => { + if msg == "no such table: version" { + Ok(0) + } else { + Err(Error::SqliteFailure(e, Some(msg))) + } + } + Ok(mut stmt) => { + let mut rows = stmt.query([])?; + match rows.next()? { + Some(row) => { + let version: i32 = row.get(0)?; + Ok(version) + } + None => Ok(0), + } + } + _ => Ok(0), + } + } + + fn set_schema_version(conn: &Connection, version: i32) -> rusqlite::Result { + conn.execute( + "UPDATE version SET version=:version", + named_params! {":version": version}, + ) + } + + /// Loads all the changesets that have been stored as one giant changeset. + /// + /// This function returns a tuple of the aggregate changeset and a result that indicates + /// whether an error occurred while reading or deserializing one of the entries. If so the + /// changeset will consist of all of those it was able to read. + /// + /// You should usually check the error. In many applications, it may make sense to do a full + /// wallet scan with a stop-gap after getting an error, since it is likely that one of the + /// changesets it was unable to read changed the derivation indices of the tracker. + /// + /// **WARNING**: This method changes the write position of the underlying file. The next + /// changeset will be written over the erroring entry (or the end of the file if none existed). + pub fn aggregate_changesets(&mut self) -> Result { + let mut stmt = self + .conn + .prepare_cached("SELECT rowid, json FROM changeset") + .expect("select changesets statement"); + + let rows = stmt + .query_map([], |row| { + let rowid = row.get_unwrap::(0); + row.get_ref(1).and_then(|r| Ok(r.as_str()?)).map(|j| { + serde_json::from_str::(j).map_err(|e| IterError::Json { rowid, err: e }) + }) + }) + .map_err(IterError::Sqlite)?; + + let result = rows + .into_iter() + .try_fold(C::default(), |mut aggregate, row_changeset| { + let changeset_result = row_changeset.map_err(IterError::Sqlite); + match changeset_result { + Ok(Ok(changeset)) => { + aggregate.append(changeset); + Ok(aggregate) + } + Ok(Err(e)) => Err((aggregate, e)), + Err(e) => Err((aggregate, e)), + } + }); + + match result { + Ok(changeset) => Ok(changeset), + Err((changeset, IterError::Json { rowid, err: _ })) => { + // remove bad changesets starting with first errored rowid + let mut stmt = self + .conn + .prepare("DELETE FROM changeset where rowid >= :rowid") + .expect("delete changeset statement"); + + stmt.execute(named_params! {":rowid": rowid }) + .map_err(IterError::Sqlite)?; + Ok(changeset) + } + Err((_, err)) => Err(err), + } + } + + /// Append a new changeset to the file and truncate the file to the end of the appended + /// changeset. + /// + /// The truncation is to avoid the possibility of having a valid but inconsistent changeset + /// directly after the appended changeset. + pub fn append_changeset(&mut self, changeset: &C) -> Result<(), AppendError> { + // no need to write anything if changeset is empty + if changeset.is_empty() { + return Ok(()); + } + + let conn = &self.conn; + let json = serde_json::to_string(changeset)?; + let timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("current timestamp") + .as_secs(); + + let mut stmt = conn + .prepare_cached("INSERT INTO changeset (timestamp, json) VALUES (:timestamp, :json)") + .expect("insert changeset statement"); + let rows = stmt + .execute(named_params! {":timestamp": timestamp, ":json": json }) + .map_err(AppendError::Sqlite)?; + assert_eq!(rows, 1); + + Ok(()) + } +} + +#[cfg(test)] +mod test { + use super::*; + + type TestChangeSet = Vec; + + #[derive(Debug)] + struct TestTracker; + + #[test] + fn insert_and_load_aggregate_changesets() { + // initial data to write to file (magic bytes + invalid data) + let changeset1 = vec!["one".into()]; + let changeset2 = vec!["two".into()]; + let changeset3 = vec!["three!".into()]; + + let mut store = Store::::new_memory().expect("create new memory db store"); + // let mut store = Store::::new(Path::new("test_agg.db")) + // .expect("create new file db store"); + + store + .append_changeset(&changeset1) + .expect("append changeset1"); + store + .append_changeset(&changeset2) + .expect("append changeset2"); + store + .append_changeset(&changeset3) + .expect("append changeset3"); + + let agg_changeset = store.aggregate_changesets().expect("aggregated changeset"); + + assert_eq!(agg_changeset, vec!("one", "two", "three!")); // TODO assert + } + + #[test] + fn remove_bad_rows_on_load_aggregate_changesets() { + // initial data to write to file (magic bytes + invalid data) + let changeset1: Vec = vec!["one".into()]; + let changeset2: Vec = vec!["two".into()]; + + let mut store = Store::::new_memory().expect("create new memory db store"); + // let mut store = Store::::new(Path::new("test_bad.db")) + // .expect("create new file db store"); + + store.append_changeset(&changeset1).expect("appended"); + store.append_changeset(&changeset2).expect("appended"); + + let timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("current timestamp") + .as_secs(); + + let bad_json1 = "bad1!"; + let rows = store + .conn + .execute( + "INSERT INTO changeset (timestamp, json) VALUES (:timestamp, :json)", + named_params! {":timestamp": timestamp, ":json": bad_json1 }, + ) + .expect("insert"); + assert_eq!(rows, 1); + + let bad_json2 = "bad2!!"; + let rows = store + .conn + .execute( + "INSERT INTO changeset (timestamp, json) VALUES (:timestamp, :json)", + named_params! {":timestamp": timestamp, ":json": bad_json2 }, + ) + .expect("insert"); + assert_eq!(rows, 1); + + let changeset5: Vec = vec!["five".into()]; + store.append_changeset(&changeset5).expect("appended"); + + let agg_changeset = store.aggregate_changesets().expect("aggregated changeset"); + + assert_eq!(agg_changeset, vec!("one", "two")); + + let rows: usize = store + .conn + .query_row("SELECT count(*) FROM changeset", [], |row| row.get(0)) + .expect("number of rows"); + assert_eq!(rows, 2); + } +} diff --git a/crates/sqlite_store/tests/test_sqlite_store.rs b/crates/sqlite_store/tests/test_sqlite_store.rs new file mode 100644 index 0000000000..8b13789179 --- /dev/null +++ b/crates/sqlite_store/tests/test_sqlite_store.rs @@ -0,0 +1 @@ +