From 9e8916cd0d156a85540b68378d366de6fb5aa406 Mon Sep 17 00:00:00 2001 From: Steve Myers Date: Tue, 19 Sep 2023 17:29:23 -0500 Subject: [PATCH] feat: add bdk_sqlite_store crate implementing PersistBackend backed by a SQLite database --- .gitignore | 1 + Cargo.toml | 1 + crates/sqlite_store/Cargo.toml | 22 + crates/sqlite_store/README.md | 10 + crates/sqlite_store/schema/schema_0.sql | 67 ++ crates/sqlite_store/src/lib.rs | 62 ++ crates/sqlite_store/src/persist.rs | 63 ++ crates/sqlite_store/src/schema.rs | 94 +++ crates/sqlite_store/src/store.rs | 879 ++++++++++++++++++++++++ crates/sqlite_store/src/wallet.rs | 30 + 10 files changed, 1229 insertions(+) create mode 100644 crates/sqlite_store/Cargo.toml create mode 100644 crates/sqlite_store/README.md create mode 100644 crates/sqlite_store/schema/schema_0.sql create mode 100644 crates/sqlite_store/src/lib.rs create mode 100644 crates/sqlite_store/src/persist.rs create mode 100644 crates/sqlite_store/src/schema.rs create mode 100644 crates/sqlite_store/src/store.rs create mode 100644 crates/sqlite_store/src/wallet.rs diff --git a/.gitignore b/.gitignore index 95285763a3..e2d4d770a8 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,4 @@ Cargo.lock # Example persisted files. *.db +*.sqlite* diff --git a/Cargo.toml b/Cargo.toml index 87428029af..636705445e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,7 @@ members = [ "crates/bdk", "crates/chain", "crates/file_store", + "crates/sqlite_store", "crates/electrum", "crates/esplora", "crates/bitcoind_rpc", diff --git a/crates/sqlite_store/Cargo.toml b/crates/sqlite_store/Cargo.toml new file mode 100644 index 0000000000..521c13caad --- /dev/null +++ b/crates/sqlite_store/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "bdk_sqlite_store" +version = "0.1.0" +edition = "2021" +license = "MIT OR Apache-2.0" +repository = "https://github.com/bitcoindevkit/bdk" +documentation = "https://docs.rs/bdk_sqlite_store" +description = "A simple SQLite based implementation of Persist for Bitcoin Dev Kit." +keywords = ["bitcoin", "persist", "persistence", "bdk", "sqlite"] +authors = ["Bitcoin Dev Kit Developers"] +readme = "README.md" + +[dependencies] +anyhow = { version = "1", default-features = false } +bdk_chain = { path = "../chain", version = "0.13.0", features = [ "serde", "miniscript" ] } +bdk_persist = { path = "../persist", version = "0.1.0" } +rusqlite = { version = "0.31.0", features = ["bundled"]} +serde = { version = "1", features = ["derive"] } +serde_json = "1" + +# optional +bdk = { path = "../bdk", optional = true } diff --git a/crates/sqlite_store/README.md b/crates/sqlite_store/README.md new file mode 100644 index 0000000000..0a08f0e69b --- /dev/null +++ b/crates/sqlite_store/README.md @@ -0,0 +1,10 @@ +# BDK SQLite Store + +This is a simple [SQLite] relational database schema backed implementation of +[`PersistBackend`](`bdk_chain::PersistBackend`). + +The main structure is [`Store`](`store::Store`) which works with any `bdk_chain` based changeset to persist data into a SQLite database file. To use `Store` with [`bdk`]'s `Wallet` enable the `bdk` feature. + +[`bdk`]: https://docs.rs/bdk/latest +[`bdk_chain`]: https://docs.rs/bdk_chain/latest +[SQLite]: https://www.sqlite.org/index.html diff --git a/crates/sqlite_store/schema/schema_0.sql b/crates/sqlite_store/schema/schema_0.sql new file mode 100644 index 0000000000..2358849661 --- /dev/null +++ b/crates/sqlite_store/schema/schema_0.sql @@ -0,0 +1,67 @@ +-- schema version control +CREATE TABLE version (version INTEGER) STRICT; +INSERT INTO version VALUES (1); + +-- network is the valid network for all other table data +CREATE TABLE network +( + name TEXT UNIQUE NOT NULL +) STRICT; + +-- keychain is the json serialized keychain structure as JSONB, +-- descriptor_id is a sha256::Hash id of the descriptor string w/o the checksum, +-- descriptor is the complete descriptor string, +-- last active index is a u32 +CREATE TABLE keychain +( + keychain BLOB PRIMARY KEY NOT NULL, + descriptor TEXT UNIQUE NOT NULL, + descriptor_id BLOB UNIQUE NOT NULL, + last_revealed INTEGER +) STRICT; + +-- hash is block hash hex string, +-- block height is a u32, +-- previous block hash hex string +CREATE TABLE block +( + hash TEXT PRIMARY KEY NOT NULL, + height INTEGER NOT NULL +) STRICT; + +-- txid is transaction hash hex string (reversed) +-- whole_tx is a consensus encoded transaction, +-- last seen is a u64 unix epoch seconds +CREATE TABLE tx +( + txid TEXT PRIMARY KEY NOT NULL, + whole_tx BLOB NOT NULL, + last_seen INTEGER +) STRICT; + +-- Outpoint txid hash hex string (reversed) +-- Outpoint vout +-- TxOut value as SATs +-- TxOut script consensus encoded +CREATE TABLE txout +( + txid TEXT NOT NULL, + vout INTEGER NOT NULL, + value INTEGER NOT NULL, + script BLOB NOT NULL, + PRIMARY KEY (txid, vout) +) STRICT; + +-- set of block "anchors" and txids +-- block hash hex string +-- block height u32 +-- confirmation height u32 +-- confirmation time unix epoch seconds +-- txid is transaction hash hex string (reversed) +CREATE TABLE anchor +( + block_hash TEXT NOT NULL REFERENCES block (hash), + confirmation_height INTEGER, + confirmation_time INTEGER, + txid TEXT NOT NULL REFERENCES tx (txid) +) STRICT; \ No newline at end of file diff --git a/crates/sqlite_store/src/lib.rs b/crates/sqlite_store/src/lib.rs new file mode 100644 index 0000000000..45c359a50c --- /dev/null +++ b/crates/sqlite_store/src/lib.rs @@ -0,0 +1,62 @@ +#![doc = include_str!("../README.md")] + +pub mod persist; +mod schema; +pub mod store; +#[cfg(feature = "bdk")] +#[cfg_attr(docsrs, doc(cfg(feature = "wallet")))] +pub mod wallet; + +use bdk_chain::bitcoin::Network; +use bdk_chain::{indexed_tx_graph, keychain, local_chain, Anchor, Append}; +use serde::{Deserialize, Serialize}; + +/// Change set representing changes to [`local_chain::ChangeSet`] and [`indexed_tx_graph::ChangeSet`]. +/// +/// This structure is used to persist data with the SQLite based [`store::Store`] provided by this crate. +#[derive(Clone, Debug, PartialEq)] +pub struct ChangeSet Deserialize<'de> + Serialize, A: Anchor> { + pub network: Option, + pub chain: local_chain::ChangeSet, + pub tx_graph: indexed_tx_graph::ChangeSet>, +} + +impl Append for ChangeSet +where + K: Ord + for<'de> Deserialize<'de> + Serialize, + A: Anchor, +{ + fn append(&mut self, mut other: Self) { + assert_eq!(self.network, other.network); + self.chain.append(&mut other.chain); + self.tx_graph.append(other.tx_graph); + } + + fn is_empty(&self) -> bool { + self.chain.is_empty() && self.tx_graph.is_empty() + } +} + +/// Error that occurs while reading or writing change sets with the SQLite database. +#[derive(Debug)] +pub enum Error { + /// Invalid network, cannot change the one already stored in the database. + Network { expected: Network, given: Network }, + /// SQLite error. + Sqlite(rusqlite::Error), +} + +impl core::fmt::Display for Error { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + match self { + Self::Network { expected, given } => write!( + f, + "network error trying to read or write change set, expected {}, given {}", + expected, given + ), + Self::Sqlite(e) => write!(f, "sqlite error reading or writing changeset: {}", e), + } + } +} + +impl std::error::Error for Error {} diff --git a/crates/sqlite_store/src/persist.rs b/crates/sqlite_store/src/persist.rs new file mode 100644 index 0000000000..fca8c5d727 --- /dev/null +++ b/crates/sqlite_store/src/persist.rs @@ -0,0 +1,63 @@ +use crate::store::{ReadWrite, Store}; +use crate::ChangeSet; +use anyhow::anyhow; +use bdk_chain::{BlockId, ConfirmationHeightAnchor, ConfirmationTimeHeightAnchor}; +use bdk_persist::PersistBackend; +use serde::{Deserialize, Serialize}; + +impl PersistBackend> + for Store +where + K: Ord + for<'de> Deserialize<'de> + Serialize + Send, +{ + fn write_changes( + &mut self, + changeset: &ChangeSet, + ) -> anyhow::Result<()> { + self.write(changeset) + .map_err(|e| anyhow!(e).context("unable to write changes to sqlite database")) + } + + fn load_from_persistence( + &mut self, + ) -> anyhow::Result>> { + self.read() + .map_err(|e| anyhow!(e).context("unable to read changes from sqlite database")) + } +} + +impl PersistBackend> + for Store +where + K: Ord + for<'de> Deserialize<'de> + Serialize + Send, +{ + fn write_changes( + &mut self, + changeset: &ChangeSet, + ) -> anyhow::Result<()> { + self.write(changeset) + .map_err(|e| anyhow!(e).context("unable to write changes to sqlite database")) + } + + fn load_from_persistence( + &mut self, + ) -> anyhow::Result>> { + self.read() + .map_err(|e| anyhow!(e).context("unable to read changes from sqlite database")) + } +} + +impl PersistBackend> for Store +where + K: Ord + for<'de> Deserialize<'de> + Serialize + Send, +{ + fn write_changes(&mut self, changeset: &ChangeSet) -> anyhow::Result<()> { + self.write(changeset) + .map_err(|e| anyhow!(e).context("unable to write changes to sqlite database")) + } + + fn load_from_persistence(&mut self) -> anyhow::Result>> { + self.read() + .map_err(|e| anyhow!(e).context("unable to read changes from sqlite database")) + } +} diff --git a/crates/sqlite_store/src/schema.rs b/crates/sqlite_store/src/schema.rs new file mode 100644 index 0000000000..588c802cc3 --- /dev/null +++ b/crates/sqlite_store/src/schema.rs @@ -0,0 +1,94 @@ +use rusqlite::{named_params, Connection, Error}; + +const SCHEMA_0: &str = include_str!("../schema/schema_0.sql"); +const MIGRATIONS: &[&str] = &[SCHEMA_0]; + +pub(crate) trait Schema { + /// Migrate sqlite db schema to latest version. + fn migrate(conn: &mut Connection) -> Result<(), Error> { + let stmts = &MIGRATIONS + .iter() + .flat_map(|stmt| { + // remove comment lines + let s = stmt + .split('\n') + .filter(|l| !l.starts_with("--") && !l.is_empty()) + .collect::>() + .join(" "); + // split into statements + s.split(';') + // remove extra spaces + .map(|s| { + s.trim() + .split(' ') + .filter(|s| !s.is_empty()) + .collect::>() + .join(" ") + }) + .collect::>() + }) + // remove empty statements + .filter(|s| !s.is_empty()) + .collect::>(); + + let version = Self::get_schema_version(conn)?; + let stmts = &stmts[(version as usize)..]; + + // begin transaction, all migration statements and new schema version commit or rollback + let tx = conn.transaction()?; + + // execute every statement and return `Some` new schema version + // if execution fails, return `Error::Rusqlite` + // if no statements executed returns `None` + let new_version = stmts + .iter() + .enumerate() + .map(|version_stmt| { + tx.execute(version_stmt.1.as_str(), []) + // map result value to next migration version + .map(|_| version_stmt.0 as i32 + version + 1) + }) + .last() + .transpose()?; + + // if `Some` new statement version, set new schema version + if let Some(version) = new_version { + Self::set_schema_version(&tx, version)?; + } + + // commit transaction + tx.commit()?; + Ok(()) + } + + fn get_schema_version(conn: &Connection) -> rusqlite::Result { + let statement = conn.prepare_cached("SELECT version FROM version"); + match statement { + Err(Error::SqliteFailure(e, Some(msg))) => { + if msg == "no such table: version" { + Ok(0) + } else { + Err(Error::SqliteFailure(e, Some(msg))) + } + } + Ok(mut stmt) => { + let mut rows = stmt.query([])?; + match rows.next()? { + Some(row) => { + let version: i32 = row.get(0)?; + Ok(version) + } + None => Ok(0), + } + } + _ => Ok(0), + } + } + + fn set_schema_version(conn: &Connection, version: i32) -> rusqlite::Result { + conn.execute( + "UPDATE version SET version=:version", + named_params! {":version": version}, + ) + } +} diff --git a/crates/sqlite_store/src/store.rs b/crates/sqlite_store/src/store.rs new file mode 100644 index 0000000000..57578f6d19 --- /dev/null +++ b/crates/sqlite_store/src/store.rs @@ -0,0 +1,879 @@ +use bdk_chain::bitcoin::consensus::{deserialize, serialize}; +use bdk_chain::bitcoin::hashes::Hash; +use bdk_chain::bitcoin::{Amount, Network, OutPoint, ScriptBuf, Transaction, TxOut}; +use bdk_chain::bitcoin::{BlockHash, Txid}; +use bdk_chain::miniscript::descriptor::{Descriptor, DescriptorPublicKey}; +use rusqlite::{named_params, Connection}; +use serde::{Deserialize, Serialize}; +use std::collections::{BTreeMap, BTreeSet}; +use std::marker::PhantomData; +use std::path::Path; +use std::str::FromStr; +use std::sync::{Arc, Mutex}; + +use crate::schema::Schema; +use crate::{ChangeSet, Error}; +use bdk_chain::{ + indexed_tx_graph, keychain, local_chain, tx_graph, Anchor, Append, BlockId, + ConfirmationHeightAnchor, ConfirmationTimeHeightAnchor, DescriptorExt, DescriptorId, +}; + +/// Persists [`super::ChangeSet`] data in to a relational schema based SQLite database file. +/// +/// The changesets loaded or stored represent changes to keychain and blockchain data. +#[derive(Debug)] +pub struct Store { + /// A rusqlite [`Connection`] object to the SQLite database + conn: Mutex, + keychain_marker: PhantomData, + anchor_marker: PhantomData, +} + +impl Store +where + K: Ord + for<'de> Deserialize<'de> + Serialize + Send, + A: Anchor + Send, +{ + /// Creates a new store from a [`Path`]. + /// + /// The file must have been opened with read and write permissions. + /// + /// [`Path`]: std::path::Path + pub fn new>(path: P) -> Result { + let mut conn = Connection::open(path)?; + Self::migrate(&mut conn)?; + + Ok(Self { + conn: Mutex::new(conn), + keychain_marker: Default::default(), + anchor_marker: Default::default(), + }) + } + + /// Creates a new memory, not persisted database store. + /// + /// This is primarily used for testing. + pub fn new_memory() -> Result { + let mut conn = Connection::open_in_memory()?; + Self::migrate(&mut conn)?; + + Ok(Self { + conn: Mutex::new(conn), + keychain_marker: Default::default(), + anchor_marker: Default::default(), + }) + } + + pub(crate) fn db_transaction(&mut self) -> Result { + let connection = self.conn.get_mut().expect("unlocked connection mutex"); + connection.transaction().map_err(Error::Sqlite) + } +} + +impl Schema for Store +where + K: Ord + for<'de> Deserialize<'de> + Serialize + Send, + A: Anchor + Send, +{ +} + +pub(crate) trait NetworkStore { + /// Insert network for which all other tables data is valid. + /// Error if trying to insert different network value. + fn insert_network( + current_network: &Option, + db_transaction: &rusqlite::Transaction, + network_changeset: &Option, + ) -> Result<(), Error> { + if let Some(network) = network_changeset { + match current_network { + // if no network change do nothing + Some(current_network) if current_network == network => Ok(()), + // if new network not the same as current, error + Some(current_network) => Err(Error::Network { + expected: *current_network, + given: *network, + }), + // insert network if none exists + None => { + let insert_network_stmt = &mut db_transaction + .prepare("INSERT INTO network (name) VALUES (:name)") + .expect("insert network statement"); + let name = network.to_string(); + insert_network_stmt + .execute(named_params! {":name": name }) + .map_err(Error::Sqlite)?; + Ok(()) + } + } + } else { + Ok(()) + } + } + + /// Load network. + fn select_network(db_transaction: &rusqlite::Transaction) -> Result, Error> { + let mut select_network_stmt = db_transaction + .prepare_cached("SELECT name FROM network WHERE rowid = 1") + .expect("select network statement"); + + let network = select_network_stmt + .query_row([], |row| { + let network = row.get_unwrap::(0); + let network = Network::from_str(network.as_str()).expect("valid network"); + Ok(network) + }) + .map_err(Error::Sqlite); + match network { + Ok(network) => Ok(Some(network)), + Err(Error::Sqlite(rusqlite::Error::QueryReturnedNoRows)) => Ok(None), + Err(e) => Err(e), + } + } +} + +impl NetworkStore for Store +where + K: Send, + A: Send, +{ +} + +pub(crate) trait BlockStore { + /// Insert or delete local chain blocks. + /// Error if trying to insert existing block hash. + fn insert_or_delete_blocks( + db_transaction: &rusqlite::Transaction, + chain_changeset: &local_chain::ChangeSet, + ) -> Result<(), Error> { + for (height, hash) in chain_changeset.iter() { + match hash { + // add new hash at height + Some(hash) => { + let insert_block_stmt = &mut db_transaction + .prepare("INSERT INTO block (hash, height) VALUES (:hash, :height)") + .expect("insert block statement"); + let hash = hash.to_string(); + insert_block_stmt + .execute(named_params! {":hash": hash, ":height": height }) + .map_err(Error::Sqlite)?; + } + // delete block at height + None => { + let delete_block_stmt = &mut db_transaction + .prepare("DELETE FROM block WHERE height IS :height") + .expect("delete block statement"); + delete_block_stmt + .execute(named_params! {":height": height }) + .map_err(Error::Sqlite)?; + } + } + } + + Ok(()) + } + + /// Load blocks. + fn select_blocks( + db_transaction: &rusqlite::Transaction, + ) -> Result>, Error> { + let mut select_blocks_stmt = db_transaction + .prepare_cached("SELECT height, hash FROM block") + .expect("select blocks statement"); + + let blocks = select_blocks_stmt + .query_map([], |row| { + let height = row.get_unwrap::(0); + let hash = row.get_unwrap::(1); + let hash = Some(BlockHash::from_str(hash.as_str()).expect("block hash")); + Ok((height, hash)) + }) + .map_err(Error::Sqlite)?; + blocks + .into_iter() + .map(|row| row.map_err(Error::Sqlite)) + .collect() + } +} + +impl BlockStore for Store +where + K: Send, + A: Send, +{ +} + +pub(crate) trait KeychainStore +where + K: Ord + for<'de> Deserialize<'de> + Serialize + Send, + A: Anchor + Send, +{ + /// Insert keychain with descriptor and last active index. + /// If keychain exists only update last active index. + fn insert_keychains( + db_transaction: &rusqlite::Transaction, + tx_graph_changeset: &indexed_tx_graph::ChangeSet>, + ) -> Result<(), Error> { + let keychain_changeset = &tx_graph_changeset.indexer; + for (keychain, descriptor) in keychain_changeset.keychains_added.iter() { + let insert_keychain_stmt = &mut db_transaction + .prepare("INSERT INTO keychain (keychain, descriptor, descriptor_id) VALUES (jsonb(:keychain), :descriptor, :descriptor_id)") + .expect("insert keychain statement"); + let keychain = serde_json::to_string(keychain).expect("keychain json"); + let descriptor_id = descriptor.descriptor_id().to_byte_array(); + let descriptor = descriptor.to_string(); + insert_keychain_stmt.execute(named_params! {":keychain": keychain, ":descriptor": descriptor, ":descriptor_id": descriptor_id }) + .map_err(Error::Sqlite)?; + } + Ok(()) + } + + /// Update descriptor last revealed index. + fn update_last_revealed( + db_transaction: &rusqlite::Transaction, + tx_graph_changeset: &indexed_tx_graph::ChangeSet>, + ) -> Result<(), Error> { + let keychain_changeset = &tx_graph_changeset.indexer; + for (descriptor_id, last_revealed) in keychain_changeset.last_revealed.iter() { + let update_last_revealed_stmt = &mut db_transaction + .prepare( + "UPDATE keychain SET last_revealed = :last_revealed + WHERE descriptor_id = :descriptor_id", + ) + .expect("update last revealed statement"); + let descriptor_id = descriptor_id.to_byte_array(); + update_last_revealed_stmt.execute(named_params! {":descriptor_id": descriptor_id, ":last_revealed": * last_revealed }) + .map_err(Error::Sqlite)?; + } + Ok(()) + } + + /// Load keychains added. + fn select_keychains( + db_transaction: &rusqlite::Transaction, + ) -> Result>, Error> { + let mut select_keychains_added_stmt = db_transaction + .prepare_cached("SELECT json_extract(keychain, '$'), descriptor FROM keychain") + .expect("select keychains statement"); + + let keychains = select_keychains_added_stmt + .query_map([], |row| { + let keychain = row.get_unwrap::(0); + let keychain: K = serde_json::from_str(keychain.as_str()).expect("keychain"); + let descriptor = row.get_unwrap::(1); + let descriptor = Descriptor::from_str(descriptor.as_str()).expect("descriptor"); + Ok((keychain, descriptor)) + }) + .map_err(Error::Sqlite)?; + keychains + .into_iter() + .map(|row| row.map_err(Error::Sqlite)) + .collect() + } + + /// Load descriptor last revealed indexes. + fn select_last_revealed( + db_transaction: &rusqlite::Transaction, + ) -> Result, Error> { + let mut select_last_revealed_stmt = db_transaction + .prepare_cached( + "SELECT descriptor, last_revealed FROM keychain WHERE last_revealed IS NOT NULL", + ) + .expect("select last revealed statement"); + + let last_revealed = select_last_revealed_stmt + .query_map([], |row| { + let descriptor = row.get_unwrap::(0); + let descriptor = Descriptor::from_str(descriptor.as_str()).expect("descriptor"); + let descriptor_id = descriptor.descriptor_id(); + let last_revealed = row.get_unwrap::(1); + Ok((descriptor_id, last_revealed)) + }) + .map_err(Error::Sqlite)?; + last_revealed + .into_iter() + .map(|row| row.map_err(Error::Sqlite)) + .collect() + } +} + +impl KeychainStore for Store +where + K: Ord + for<'de> Deserialize<'de> + Serialize + Send, + A: Anchor + Send, +{ +} + +pub(crate) trait TxStore { + /// Insert transactions. + /// Error if trying to insert existing txid. + fn insert_txs( + db_transaction: &rusqlite::Transaction, + tx_graph_changeset: &indexed_tx_graph::ChangeSet>, + ) -> Result<(), Error> { + for tx in tx_graph_changeset.graph.txs.iter() { + let insert_tx_stmt = &mut db_transaction + .prepare("INSERT INTO tx (txid, whole_tx) VALUES (:txid, :whole_tx)") + .expect("insert tx statement"); + let txid = tx.txid().to_string(); + let whole_tx = serialize(&tx); + insert_tx_stmt + .execute(named_params! {":txid": txid, ":whole_tx": whole_tx }) + .map_err(Error::Sqlite)?; + } + Ok(()) + } + + /// Load transactions. + fn select_txs( + db_transaction: &rusqlite::Transaction, + ) -> Result>, Error> { + let mut select_tx_stmt = db_transaction + .prepare_cached("SELECT whole_tx FROM tx") + .expect("select tx statement"); + + let txs = select_tx_stmt + .query_map([], |row| { + let whole_tx = row.get_unwrap::>(0); + let whole_tx: Transaction = deserialize(&whole_tx).expect("transaction"); + Ok(Arc::new(whole_tx)) + }) + .map_err(Error::Sqlite)?; + + txs.into_iter() + .map(|row| row.map_err(Error::Sqlite)) + .collect() + } + + fn select_last_seen( + db_transaction: &rusqlite::Transaction, + ) -> Result, Error> { + // load tx last_seen + let mut select_last_seen_stmt = db_transaction + .prepare_cached("SELECT txid, last_seen FROM tx WHERE last_seen IS NOT NULL") + .expect("select tx last seen statement"); + + let last_seen = select_last_seen_stmt + .query_map([], |row| { + let txid = row.get_unwrap::(0); + let txid = Txid::from_str(&txid).expect("txid"); + let last_seen = row.get_unwrap::(1); + Ok((txid, last_seen)) + }) + .map_err(Error::Sqlite)?; + last_seen + .into_iter() + .map(|row| row.map_err(Error::Sqlite)) + .collect() + } + + /// Insert txouts. + /// Error if trying to insert existing outpoint. + fn insert_txouts( + db_transaction: &rusqlite::Transaction, + tx_graph_changeset: &indexed_tx_graph::ChangeSet>, + ) -> Result<(), Error> { + for txout in tx_graph_changeset.graph.txouts.iter() { + let insert_txout_stmt = &mut db_transaction + .prepare("INSERT INTO txout (txid, vout, value, script) VALUES (:txid, :vout, :value, :script)") + .expect("insert txout statement"); + let txid = txout.0.txid.to_string(); + let vout = txout.0.vout; + let value = txout.1.value.to_sat(); + let script = txout.1.script_pubkey.as_bytes(); + insert_txout_stmt.execute(named_params! {":txid": txid, ":vout": vout, ":value": value, ":script": script }) + .map_err(Error::Sqlite)?; + } + Ok(()) + } + + fn select_txouts( + db_transaction: &rusqlite::Transaction, + ) -> Result, Error> { + // load tx outs + let mut select_txout_stmt = db_transaction + .prepare_cached("SELECT txid, vout, value, script FROM txout") + .expect("select txout statement"); + + let txouts = select_txout_stmt + .query_map([], |row| { + let txid = row.get_unwrap::(0); + let txid = Txid::from_str(&txid).expect("txid"); + let vout = row.get_unwrap::(1); + let outpoint = OutPoint::new(txid, vout); + let value = row.get_unwrap::(2); + let script_pubkey = row.get_unwrap::>(3); + let script_pubkey = ScriptBuf::from_bytes(script_pubkey); + let txout = TxOut { + value: Amount::from_sat(value), + script_pubkey, + }; + Ok((outpoint, txout)) + }) + .map_err(Error::Sqlite)?; + txouts + .into_iter() + .map(|row| row.map_err(Error::Sqlite)) + .collect() + } + + /// Update transaction last seen times. + fn update_last_seen( + db_transaction: &rusqlite::Transaction, + tx_graph_changeset: &indexed_tx_graph::ChangeSet>, + ) -> Result<(), Error> { + for tx_last_seen in tx_graph_changeset.graph.last_seen.iter() { + let update_tx_last_seen_stmt = &mut db_transaction + .prepare("UPDATE tx SET last_seen = :last_seen WHERE txid = :txid") + .expect("update tx last seen statement"); + let txid = tx_last_seen.0.to_string(); + let last_seen = *tx_last_seen.1; + update_tx_last_seen_stmt + .execute(named_params! {":txid": txid, ":last_seen": last_seen }) + .map_err(Error::Sqlite)?; + } + Ok(()) + } +} + +impl TxStore for Store +where + K: Ord + for<'de> Deserialize<'de> + Serialize + Send, + A: Anchor + Send, +{ +} + +pub(crate) trait AnchorStore { + /// Insert anchors. + fn insert_anchors( + db_transaction: &rusqlite::Transaction, + tx_graph_changeset: &indexed_tx_graph::ChangeSet>, + ) -> Result<(), Error>; + + /// Load anchors. + fn select_anchors(db_transaction: &rusqlite::Transaction) + -> Result, Error>; +} + +impl AnchorStore for Store +where + K: Ord + for<'de> Deserialize<'de> + Serialize + Send, +{ + /// Insert anchors. + fn insert_anchors( + db_transaction: &rusqlite::Transaction, + tx_graph_changeset: &indexed_tx_graph::ChangeSet< + ConfirmationTimeHeightAnchor, + keychain::ChangeSet, + >, + ) -> Result<(), Error> { + for anchor in tx_graph_changeset.graph.anchors.iter() { + let insert_anchor_stmt = &mut db_transaction + .prepare("INSERT INTO anchor (block_hash, confirmation_height, confirmation_time, txid) VALUES (:block_hash, :confirmation_height, :confirmation_time, :txid)") + .expect("insert anchor statement"); + let block_hash = anchor.0.anchor_block.hash.to_string(); + let confirmation_height = anchor.0.confirmation_height; + let confirmation_time = anchor.0.confirmation_time; + let txid = anchor.1.to_string(); + insert_anchor_stmt.execute(named_params! {":block_hash": block_hash, ":confirmation_height": confirmation_height, ":confirmation_time": confirmation_time, ":txid": txid }) + .map_err(Error::Sqlite)?; + } + Ok(()) + } + + /// Load anchors. + fn select_anchors( + db_transaction: &rusqlite::Transaction, + ) -> Result, Error> { + let mut select_anchor_stmt = db_transaction + .prepare_cached("SELECT block_hash, block.height as block_height, confirmation_height, confirmation_time, txid + FROM anchor JOIN block ON anchor.block_hash = block.hash") + .expect("select anchor statement"); + let anchors = select_anchor_stmt + .query_map([], |row| { + let hash = row.get_unwrap::(0); + let hash = BlockHash::from_str(hash.as_str()).expect("block hash"); + let height = row.get_unwrap::(1); + let confirmation_height = row.get_unwrap::(2); + let confirmation_time = row.get_unwrap::(3); + let txid = row.get_unwrap::(4); + let txid = Txid::from_str(&txid).expect("txid"); + + let anchor_block = BlockId { height, hash }; + let anchor = ConfirmationTimeHeightAnchor { + confirmation_height, + confirmation_time, + anchor_block, + }; + Ok((anchor, txid)) + }) + .map_err(Error::Sqlite)?; + anchors + .into_iter() + .map(|row| row.map_err(Error::Sqlite)) + .collect() + } +} + +impl AnchorStore for Store +where + K: Ord + for<'de> Deserialize<'de> + Serialize + Send, +{ + fn insert_anchors( + db_transaction: &rusqlite::Transaction, + tx_graph_changeset: &indexed_tx_graph::ChangeSet< + ConfirmationHeightAnchor, + keychain::ChangeSet, + >, + ) -> Result<(), Error> { + for anchor in tx_graph_changeset.graph.anchors.iter() { + let insert_anchor_stmt = &mut db_transaction + .prepare("INSERT INTO anchor (block_hash, confirmation_height, txid) VALUES (:block_hash, :confirmation_height, :txid)") + .expect("insert anchor statement"); + let block_hash = anchor.0.anchor_block.hash.to_string(); + let confirmation_height = anchor.0.confirmation_height; + let txid = anchor.1.to_string(); + insert_anchor_stmt.execute(named_params! {":block_hash": block_hash, ":confirmation_height": confirmation_height, ":txid": txid }) + .map_err(Error::Sqlite)?; + } + Ok(()) + } + + fn select_anchors( + db_transaction: &rusqlite::Transaction, + ) -> Result, Error> { + let mut select_anchor_stmt = db_transaction + .prepare_cached( + "SELECT block_hash, block.height as block_height, confirmation_height, txid + FROM anchor JOIN block ON anchor.block_hash = block.hash", + ) + .expect("select anchor statement"); + let anchors = select_anchor_stmt + .query_map([], |row| { + let hash = row.get_unwrap::(0); + let hash = BlockHash::from_str(hash.as_str()).expect("block hash"); + let height = row.get_unwrap::(1); + let confirmation_height = row.get_unwrap::(2); + let txid = row.get_unwrap::(3); + let txid = Txid::from_str(&txid).expect("txid"); + + let anchor_block = BlockId { height, hash }; + let anchor = ConfirmationHeightAnchor { + confirmation_height, + anchor_block, + }; + Ok((anchor, txid)) + }) + .map_err(Error::Sqlite)?; + anchors + .into_iter() + .map(|row| row.map_err(Error::Sqlite)) + .collect() + } +} + +impl AnchorStore for Store +where + K: Ord + for<'de> Deserialize<'de> + Serialize + Send, +{ + fn insert_anchors( + db_transaction: &rusqlite::Transaction, + tx_graph_changeset: &indexed_tx_graph::ChangeSet>, + ) -> Result<(), Error> { + for anchor in tx_graph_changeset.graph.anchors.iter() { + let insert_anchor_stmt = &mut db_transaction + .prepare("INSERT INTO anchor (block_hash, txid) VALUES (:block_hash, :txid)") + .expect("insert anchor statement"); + let block_hash = anchor.0.hash.to_string(); + let txid = anchor.1.to_string(); + insert_anchor_stmt + .execute(named_params! {":block_hash": block_hash, ":txid": txid }) + .map_err(Error::Sqlite)?; + } + Ok(()) + } + + fn select_anchors( + db_transaction: &rusqlite::Transaction, + ) -> Result, Error> { + let mut select_anchor_stmt = db_transaction + .prepare_cached( + "SELECT block_hash, block.height as block_height, txid + FROM anchor JOIN block ON anchor.block_hash = block.hash", + ) + .expect("select anchor statement"); + let anchors = select_anchor_stmt + .query_map([], |row| { + let hash = row.get_unwrap::(0); + let hash = BlockHash::from_str(hash.as_str()).expect("block hash"); + let height = row.get_unwrap::(1); + let txid = row.get_unwrap::(2); + let txid = Txid::from_str(&txid).expect("txid"); + + let anchor = BlockId { height, hash }; + Ok((anchor, txid)) + }) + .map_err(Error::Sqlite)?; + anchors + .into_iter() + .map(|row| row.map_err(Error::Sqlite)) + .collect() + } +} + +pub(crate) trait ReadWrite: + NetworkStore + KeychainStore + BlockStore + TxStore + AnchorStore +where + K: Ord + for<'de> Deserialize<'de> + Serialize + Send, + A: Anchor + Send, +{ + fn db_transaction(&mut self) -> Result; + + fn write(&mut self, changeset: &ChangeSet) -> Result<(), Error> { + // no need to write anything if changeset is empty + if changeset.is_empty() { + return Ok(()); + } + + let db_transaction = self.db_transaction()?; + + let network_changeset = &changeset.network; + let current_network = Self::select_network(&db_transaction)?; + Self::insert_network(¤t_network, &db_transaction, network_changeset)?; + + let chain_changeset = &changeset.chain; + Self::insert_or_delete_blocks(&db_transaction, chain_changeset)?; + + let tx_graph_changeset = &changeset.tx_graph; + Self::insert_keychains(&db_transaction, tx_graph_changeset)?; + Self::update_last_revealed(&db_transaction, tx_graph_changeset)?; + Self::insert_txs(&db_transaction, tx_graph_changeset)?; + Self::insert_txouts(&db_transaction, tx_graph_changeset)?; + Self::insert_anchors(&db_transaction, tx_graph_changeset)?; + Self::update_last_seen(&db_transaction, tx_graph_changeset)?; + db_transaction.commit().map_err(Error::Sqlite) + } + + fn read(&mut self) -> Result>, Error> { + let db_transaction = self.db_transaction()?; + + let network = Self::select_network(&db_transaction)?; + let chain = Self::select_blocks(&db_transaction)?; + let keychains_added = Self::select_keychains(&db_transaction)?; + let last_revealed = Self::select_last_revealed(&db_transaction)?; + let txs = Self::select_txs(&db_transaction)?; + let last_seen = Self::select_last_seen(&db_transaction)?; + let txouts = Self::select_txouts(&db_transaction)?; + let anchors = Self::select_anchors(&db_transaction)?; + + let graph: tx_graph::ChangeSet = tx_graph::ChangeSet { + txs, + txouts, + anchors, + last_seen, + }; + + let indexer: keychain::ChangeSet = keychain::ChangeSet { + keychains_added, + last_revealed, + }; + + let tx_graph: indexed_tx_graph::ChangeSet> = + indexed_tx_graph::ChangeSet { graph, indexer }; + + if network.is_none() && chain.is_empty() && tx_graph.is_empty() { + Ok(None) + } else { + Ok(Some(ChangeSet { + network, + chain, + tx_graph, + })) + } + } +} + +impl ReadWrite for Store +where + K: Ord + for<'de> Deserialize<'de> + Serialize + Send, +{ + fn db_transaction(&mut self) -> Result { + self.db_transaction() + } +} + +impl ReadWrite for Store +where + K: Ord + for<'de> Deserialize<'de> + Serialize + Send, +{ + fn db_transaction(&mut self) -> Result { + self.db_transaction() + } +} + +impl ReadWrite for Store +where + K: Ord + for<'de> Deserialize<'de> + Serialize + Send, +{ + fn db_transaction(&mut self) -> Result { + self.db_transaction() + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::ChangeSet; + use bdk_chain::bitcoin::consensus::encode::deserialize; + use bdk_chain::bitcoin::constants::genesis_block; + use bdk_chain::bitcoin::hashes::hex::FromHex; + use bdk_chain::bitcoin::transaction::Transaction; + use bdk_chain::bitcoin::Network::Testnet; + use bdk_chain::bitcoin::{secp256k1, BlockHash, OutPoint}; + use bdk_chain::miniscript::Descriptor; + use bdk_chain::{ + indexed_tx_graph, keychain, tx_graph, ConfirmationTimeHeightAnchor, DescriptorExt, + }; + use bdk_persist::PersistBackend; + use std::str::FromStr; + use std::sync::Arc; + + #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Hash, Debug, Serialize, Deserialize)] + enum Keychain { + External { account: u32, name: String }, + Internal { account: u32, name: String }, + } + + #[test] + fn insert_and_load_aggregate_changesets_with_confirmation_time_height_anchor() { + let changeset = create_test_changeset(&|block_hash_1| ConfirmationTimeHeightAnchor { + confirmation_height: 1, + confirmation_time: 1296667328, + anchor_block: (1, block_hash_1).into(), + }); + + let mut store = Store::::new_memory() + .expect("create new memory db store"); + // let mut store = + // Store::::new(Path::new("test_agg.sqlite")) + // .expect("create new file db store"); + + store.write_changes(&changeset).expect("write changeset"); + + let agg_changeset = store.load_from_persistence().expect("aggregated changeset"); + + assert_eq!(agg_changeset, Some(changeset)); // TODO assert + } + + #[test] + fn insert_and_load_aggregate_changesets_with_confirmation_height_anchor() { + let changeset = create_test_changeset(&|block_hash_1| ConfirmationHeightAnchor { + confirmation_height: 1, + anchor_block: (1, block_hash_1).into(), + }); + + let mut store = Store::::new_memory() + .expect("create new memory db store"); + // let mut store = + // Store::::new(Path::new("test_agg.sqlite")) + // .expect("create new file db store"); + + store.write_changes(&changeset).expect("write changeset"); + + let agg_changeset = store.load_from_persistence().expect("aggregated changeset"); + + assert_eq!(agg_changeset, Some(changeset)); // TODO assert + } + + #[test] + fn insert_and_load_aggregate_changesets_with_blockid_anchor() { + let changeset = create_test_changeset(&|block_hash_1| BlockId { + height: 1, + hash: block_hash_1, + }); + + let mut store = + Store::::new_memory().expect("create new memory db store"); + // let mut store = Store::::new(Path::new("test_agg.sqlite")) + // .expect("create new file db store"); + + store.write_changes(&changeset).expect("write changeset"); + + let agg_changeset = store.load_from_persistence().expect("aggregated changeset"); + + assert_eq!(agg_changeset, Some(changeset)); // TODO assert + } + + fn create_test_changeset( + anchor1_fn: &dyn Fn(BlockHash) -> A, + ) -> ChangeSet { + let secp = &secp256k1::Secp256k1::signing_only(); + + let network_changeset = Some(Testnet); + + let block_hash_0: BlockHash = genesis_block(Testnet).block_hash(); + let block_hash_1 = + BlockHash::from_str("00000000b873e79784647a6c82962c70d228557d24a747ea4d1b8bbe878e1206") + .unwrap(); + let block_hash_2 = + BlockHash::from_str("000000006c02c8ea6e4ff69651f7fcde348fb9d557a06e6957b65552002a7820") + .unwrap(); + + let block_changeset = [ + (0, Some(block_hash_0)), + (1, Some(block_hash_1)), + (2, Some(block_hash_2)), + ] + .into(); + + let ext_keychain = Keychain::External { + account: 0, + name: "ext test".to_string(), + }; + let (ext_desc, _ext_keymap) = Descriptor::parse_descriptor(secp, "wpkh(tprv8ZgxMBicQKsPcx5nBGsR63Pe8KnRUqmbJNENAfGftF3yuXoMMoVJJcYeUw5eVkm9WBPjWYt6HMWYJNesB5HaNVBaFc1M6dRjWSYnmewUMYy/0/*)").unwrap(); + let ext_desc_id = ext_desc.descriptor_id(); + let int_keychain = Keychain::Internal { + account: 0, + name: "int test".to_string(), + }; + let (int_desc, _int_keymap) = Descriptor::parse_descriptor(secp, "wpkh(tprv8ZgxMBicQKsPcx5nBGsR63Pe8KnRUqmbJNENAfGftF3yuXoMMoVJJcYeUw5eVkm9WBPjWYt6HMWYJNesB5HaNVBaFc1M6dRjWSYnmewUMYy/1/*)").unwrap(); + let int_desc_id = int_desc.descriptor_id(); + + let tx0_hex = Vec::::from_hex("01000000010000000000000000000000000000000000000000000000000000000000000000ffffffff4d04ffff001d0104455468652054696d65732030332f4a616e2f32303039204368616e63656c6c6f72206f6e206272696e6b206f66207365636f6e64206261696c6f757420666f722062616e6b73ffffffff0100f2052a01000000434104678afdb0fe5548271967f1a67130b7105cd6a828e03909a67962e0ea1f61deb649f6bc3f4cef38c4f35504e51ec112de5c384df7ba0b8d578a4c702b6bf11d5fac00000000").unwrap(); + let tx0: Arc = Arc::new(deserialize(tx0_hex.as_slice()).unwrap()); + let tx1_hex = Vec::::from_hex("010000000001010000000000000000000000000000000000000000000000000000000000000000ffffffff025151feffffff0200f2052a010000001600149243f727dd5343293eb83174324019ec16c2630f0000000000000000776a24aa21a9ede2f61c3f71d1defd3fa999dfa36953755c690689799962b48bebd836974e8cf94c4fecc7daa2490047304402205e423a8754336ca99dbe16509b877ef1bf98d008836c725005b3c787c41ebe46022047246e4467ad7cc7f1ad98662afcaf14c115e0095a227c7b05c5182591c23e7e01000120000000000000000000000000000000000000000000000000000000000000000000000000").unwrap(); + let tx1: Arc = Arc::new(deserialize(tx1_hex.as_slice()).unwrap()); + + let outpoint0_0 = OutPoint::new(tx0.txid(), 0); + let txout0_0 = tx0.output.first().unwrap().clone(); + let outpoint1_0 = OutPoint::new(tx1.txid(), 0); + let txout1_0 = tx1.output.first().unwrap().clone(); + + let anchor1 = anchor1_fn(block_hash_1); + + let tx_graph_changeset = tx_graph::ChangeSet:: { + txs: [tx0.clone(), tx1.clone()].into(), + txouts: [(outpoint0_0, txout0_0), (outpoint1_0, txout1_0)].into(), + anchors: [(anchor1, tx0.txid()), (anchor1, tx1.txid())].into(), + last_seen: [(tx0.txid(), 1598918400), (tx1.txid(), 1598919121)].into(), + }; + + let keychain_changeset = keychain::ChangeSet { + keychains_added: [(ext_keychain, ext_desc), (int_keychain, int_desc)].into(), + last_revealed: [(ext_desc_id, 124), (int_desc_id, 421)].into(), + }; + + let graph_changeset: indexed_tx_graph::ChangeSet> = + indexed_tx_graph::ChangeSet { + graph: tx_graph_changeset, + indexer: keychain_changeset, + }; + + // test data to write to db + ChangeSet { + network: network_changeset, + chain: block_changeset, + tx_graph: graph_changeset, + } + } +} diff --git a/crates/sqlite_store/src/wallet.rs b/crates/sqlite_store/src/wallet.rs new file mode 100644 index 0000000000..624d1de2ab --- /dev/null +++ b/crates/sqlite_store/src/wallet.rs @@ -0,0 +1,30 @@ +use crate::store::{ReadWrite, Store}; +use crate::ChangeSet; +use anyhow::anyhow; +use bdk::KeychainKind; +use bdk_chain::ConfirmationTimeHeightAnchor; +use bdk_persist::PersistBackend; + +impl PersistBackend for Store { + fn write_changes(&mut self, changeset: &bdk::wallet::ChangeSet) -> anyhow::Result<()> { + let changeset = ChangeSet { + network: changeset.network, + chain: changeset.chain.clone(), + tx_graph: changeset.indexed_tx_graph.clone(), + }; + self.write(&changeset) + .map_err(|e| anyhow!(e).context("unable to write wallet changes")) + } + + fn load_from_persistence(&mut self) -> anyhow::Result> { + let changeset = self + .read() + .map_err(|e| anyhow!(e).context("unable to load wallet changes"))?; + + Ok(changeset.map(|changeset| bdk::wallet::ChangeSet { + network: changeset.network, + chain: changeset.chain, + indexed_tx_graph: changeset.tx_graph, + })) + } +}