From 4419dae7d5b27bee0b31ee1b05143b98f8be0e55 Mon Sep 17 00:00:00 2001 From: Shaowei Song Date: Thu, 17 Aug 2023 14:42:53 +0800 Subject: [PATCH] Add CryptoLog of layer 1-crypto --- Cargo.toml | 10 +- src/error.rs | 6 + src/layers/0-bio/block_log.rs | 88 +++- src/layers/0-bio/mod.rs | 3 +- src/layers/1-crypto/crypto_log.rs | 837 +++++++++++++++++++++++++++--- src/layers/1-crypto/mod.rs | 4 +- src/lib.rs | 1 + src/prelude.rs | 4 +- src/tx/mod.rs | 2 +- src/util/mod.rs | 8 + 10 files changed, 888 insertions(+), 75 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index f37ff96..5ebe035 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,15 +10,17 @@ inherit-methods-macro = {git = "https://github.com/jinzhao-dev/inherit-methods-m pod = {git = "https://github.com/jinzhao-dev/pod", rev = "7fa2ed2"} anymap = {version = "0.12.1"} +array-init = "2.1.0" bitvec = {version = "1.0.1", default-features = false, features = ["atomic", "alloc"]} +lending-iterator = "0.1.7" +libc = "0.2.147" +log = "0.4" +lru = "0.11.0" +openssl = "0.10.55" serde = {version = "1.0", features = ["derive"]} spin = "0.9.8" static_assertions = "1.1.0" typetag = "0.2" -openssl = "0.10.55" -libc = "0.2.147" -log = "0.4" -lending-iterator = "0.1.7" [lib] doctest = false diff --git a/src/error.rs b/src/error.rs index c66073f..d4a82f3 100644 --- a/src/error.rs +++ b/src/error.rs @@ -43,6 +43,12 @@ impl Error { } } +impl From for Error { + fn from(errno: Errno) -> Self { + Error::new(errno) + } +} + #[macro_export] macro_rules! return_errno { ($errno: expr) => { diff --git a/src/layers/0-bio/block_log.rs b/src/layers/0-bio/block_log.rs index 483761b..19f5168 100644 --- a/src/layers/0-bio/block_log.rs +++ b/src/layers/0-bio/block_log.rs @@ -1,6 +1,8 @@ -use super::{BufMut, BufRef}; +use super::{Buf, BufMut, BufRef}; +use crate::os::Mutex; use crate::prelude::*; +use core::sync::atomic::{AtomicUsize, Ordering}; use inherit_methods_macro::inherit_methods; /// A log of data blocks that can support random reads and append-only @@ -43,3 +45,87 @@ impl_blocklog_for!(&T, "(**self)"); impl_blocklog_for!(&mut T, "(**self)"); impl_blocklog_for!(Box, "(**self)"); impl_blocklog_for!(Arc, "(**self)"); + +/// An in-memory log that impls `BlockLog`. +pub struct MemLog { + log: Mutex, + append_pos: AtomicUsize, +} + +impl BlockLog for MemLog { + fn read(&self, pos: BlockId, mut buf: BufMut) -> Result<()> { + let nblocks = buf.nblocks(); + if pos + nblocks > self.nblocks() { + return_errno_with_msg!(InvalidArgs, "read range out of bound"); + } + let log = self.log.lock(); + let read_buf = &log.as_slice()[Self::offset(pos)..Self::offset(pos) + nblocks * BLOCK_SIZE]; + buf.as_mut_slice().copy_from_slice(&read_buf); + Ok(()) + } + + fn append(&self, buf: BufRef) -> Result { + let nblocks = buf.nblocks(); + let mut log = self.log.lock(); + let pos = self.append_pos.load(Ordering::Relaxed); + if pos + nblocks > log.nblocks() { + return_errno_with_msg!(InvalidArgs, "append range out of bound"); + } + let write_buf = + &mut log.as_mut_slice()[Self::offset(pos)..Self::offset(pos) + nblocks * BLOCK_SIZE]; + write_buf.copy_from_slice(buf.as_slice()); + self.append_pos.fetch_add(nblocks, Ordering::Release); + Ok(pos) + } + + fn flush(&self) -> Result<()> { + Ok(()) + } + + fn nblocks(&self) -> usize { + self.append_pos.load(Ordering::Relaxed) + } +} + +impl MemLog { + pub fn create(num_blocks: usize) -> Result { + Ok(Self { + log: Mutex::new(Buf::alloc(num_blocks)?), + append_pos: AtomicUsize::new(0), + }) + } + + fn offset(pos: BlockId) -> usize { + pos * BLOCK_SIZE + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn mem_log() -> Result<()> { + let total_blocks = 64; + let append_nblocks = 8; + let mem_log = MemLog::create(total_blocks)?; + assert_eq!(mem_log.nblocks(), 0); + + let mut append_buf = Buf::alloc(append_nblocks)?; + let content = 5_u8; + append_buf.as_mut_slice().fill(content); + let append_pos = mem_log.append(append_buf.as_ref())?; + assert_eq!(append_pos, 0); + assert_eq!(mem_log.nblocks(), append_nblocks); + + mem_log.flush()?; + let mut read_buf = Buf::alloc(1)?; + let read_pos = 7 as BlockId; + mem_log.read(read_pos, read_buf.as_mut())?; + assert_eq!( + read_buf.as_slice(), + &append_buf.as_slice()[read_pos * BLOCK_SIZE..(read_pos + 1) * BLOCK_SIZE] + ); + Ok(()) + } +} diff --git a/src/layers/0-bio/mod.rs b/src/layers/0-bio/mod.rs index 2311e4c..c75dc0c 100644 --- a/src/layers/0-bio/mod.rs +++ b/src/layers/0-bio/mod.rs @@ -6,12 +6,13 @@ mod block_ring; mod block_set; pub use self::block_buf::{Buf, BufMut, BufRef}; -pub use self::block_log::BlockLog; +pub use self::block_log::{BlockLog, MemLog}; pub use self::block_ring::BlockRing; pub use self::block_set::{BlockSet, MemDisk}; pub type BlockId = usize; pub const BLOCK_SIZE: usize = 0x1000; +pub const BID_SIZE: usize = core::mem::size_of::(); // This definition of BlockId assumes the target architecture is 64-bit assert_eq_size!(usize, u64); diff --git a/src/layers/1-crypto/crypto_log.rs b/src/layers/1-crypto/crypto_log.rs index 4fc5f0f..41d7ee4 100644 --- a/src/layers/1-crypto/crypto_log.rs +++ b/src/layers/1-crypto/crypto_log.rs @@ -1,118 +1,223 @@ +use super::{Iv, Key, Mac}; +use crate::layers::bio::{BlockId, BlockLog, Buf, BufMut, BufRef, BLOCK_SIZE}; +use crate::os::Aead as OsAead; +use crate::prelude::*; +use crate::util::{Aead, RandomInit}; + +use alloc::collections::VecDeque; +use core::any::Any; +use core::fmt::{self, Debug}; +use core::mem::size_of; +use core::sync::atomic::{AtomicUsize, Ordering}; +use lending_iterator::LendingIterator; +use pod::Pod; +use serde::{Deserialize, Serialize}; +use spin::{Mutex, RwLock}; +use static_assertions::const_assert; + /// A cryptographically-protected log of user data blocks. -/// +/// /// `CryptoLog`, which is backed by an untrusted block log (`L`), -/// serves as a secure log file that supports random reads and append-only -/// writes of data blocks. `CryptoLog` encrypts the data blocks and +/// serves as a secure log file that supports random reads and append-only +/// writes of data blocks. `CryptoLog` encrypts the data blocks and /// protects them with a Merkle Hash Tree (MHT), which itself is also encrypted. -/// +/// /// # Security -/// +/// /// Each instance of `CryptoLog` is assigned a randomly-generated root key /// upon its creation. The root key is used to encrypt the root MHT block only. /// Each new version of the root MHT block is encrypted with the same key, but /// different random IVs. This arrangement ensures the confidentiality of /// the root block. -/// +/// /// After flushing a `CryptoLog`, a new root MHT (as well as other MHT nodes) /// shall be appended to the backend block log (`L`). /// The metadata of the root MHT, including its position, encryption /// key, IV, and MAC, must be kept by the user of `CryptoLog` so that /// he or she can use the metadata to re-open the `CryptoLog`. -/// The information contained in the metadata is sufficient to verify the +/// The information contained in the metadata is sufficient to verify the /// integrity and freshness of the root MHT node, and thus the whole `CryptoLog`. -/// +/// /// Other MHT nodes as well as data nodes are encrypted with randomly-generated, /// unique keys. Their metadata, including its position, encryption key, IV, and /// MAC, are kept securely in their parent MHT nodes, which are also encrypted. /// Thus, the confidentiality and integrity of non-root nodes are protected. -/// +/// /// # Performance -/// -/// Thanks to its append-only nature, `CryptoLog` avoids MHT's high -/// performance overheads under the workload of random writes +/// +/// Thanks to its append-only nature, `CryptoLog` avoids MHT's high +/// performance overheads under the workload of random writes /// due to "cascades of updates". -/// -/// Behind the scene, `CryptoLog` keeps a cache for nodes so that frequently -/// or lately accessed nodes can be found in the cache, avoiding the I/O +/// +/// Behind the scene, `CryptoLog` keeps a cache for nodes so that frequently +/// or lately accessed nodes can be found in the cache, avoiding the I/O /// and decryption cost incurred when re-reading these nodes. -/// The cache is also used for buffering new data so that multiple writes to +/// The cache is also used for buffering new data so that multiple writes to /// individual nodes can be merged into a large write to the underlying block log. /// Therefore, `CryptoLog` is efficient for both reads and writes. -/// +/// /// # Disk space -/// -/// One consequence of using an append-only block log (`L`) as the backend is +/// +/// One consequence of using an append-only block log (`L`) as the backend is /// that `CryptoLog` cannot do in-place updates to existing MHT nodes. /// This means the new version of MHT nodes are appended to the underlying block /// log and the invalid blocks occupied by old versions are not reclaimed. -/// -/// But lucky for us, this block reclaimation problem is not an issue in pratice. +/// +/// But lucky for us, this block reclamation problem is not an issue in practice. /// This is because a `CryptoLog` is created for one of the following two /// use cases. -/// -/// 1. Write-once-then-read-many. In this use case, all the content of a -/// `CryptoLog` is written in a single run. +/// +/// 1. Write-once-then-read-many. In this use case, all the content of a +/// `CryptoLog` is written in a single run. /// Writing in a single run won't trigger any updates to MHT nodes and thus /// no waste of disk space. /// After the writing is done, the `CryptoLog` becomes read-only. -/// -/// 2. Write-many-then-read-once. In this use case, the content of a +/// +/// 2. Write-many-then-read-once. In this use case, the content of a /// `CryptoLog` may be written in many runs. But the number of `CryptoLog` /// under such workloads is limited and their lengths are also limited. /// So the disk space wasted by such `CryptoLog` is bounded. /// And after such `CryptoLog`s are done writing, they will be read once and -/// then discarded. +/// then discarded. pub struct CryptoLog { block_log: L, key: Key, - root_meta: Option, + root_meta: Mutex>, + cache: Arc, + append_buf: RwLock, + append_pos: AtomicUsize, + lock: Mutex<()>, } +type Lbid = BlockId; // Logical block position, in terms of user +type Pbid = BlockId; // Physical block position, in terms of underlying log + /// The metadata of the root MHT node of a `CryptoLog`. +#[derive(Clone, Copy, Debug, Serialize, Deserialize)] pub struct RootMhtMeta { - pub pos: BlockId, + pub pos: Pbid, pub mac: Mac, pub iv: Iv, } +/// The Merkle-Hash Tree(MHT) node (internal). +/// It contains a header for node metadata and a bunch of entries for managing children nodes. +#[repr(C)] +#[derive(Clone, Copy, Pod)] struct MhtNode { - // The height of the MHT tree whose root is this node - height: u8, - // The total number of valid data blocks covered by this node - num_data_blocks: u32, - // The child nodes - children: [NodeMeta; NUM_BRANCHES], + header: MhtNodeHeader, + entries: [MhtNodeEntry; MHT_NBRANCHES], } +const_assert!(size_of::() <= BLOCK_SIZE); -struct DataNode([u8; BLOCK_SIZE]); - -const NUM_BRANCHES: usize = (BLOCK_SIZE - 8) / mem::size_of::(); +/// The header contains metadata of the current MHT node. +#[repr(C)] +#[derive(Clone, Copy, Debug, Pod)] +struct MhtNodeHeader { + // The height of the MHT whose root is this node + height: u8, + // The total number of valid data nodes covered by this node + num_data_nodes: u32, + // The number of valid child within this node + num_valid_child: u16, +} -struct NodeMeta { - pos: BlockId, +/// The entry of the MHT node, which contains the +/// metadata of the child MHT/data node. +#[repr(C)] +#[derive(Clone, Copy, Debug, Pod)] +struct MhtNodeEntry { + pos: Pbid, key: Key, mac: Mac, } -// Cache must be provided because Cache also needs to be TX-aware -// TODO: Need a node cache (only cache mht nodes) +// Number of branches of one MHT node. (102 for now) +const MHT_NBRANCHES: usize = (BLOCK_SIZE - size_of::()) / size_of::(); +/// The data node (leaf). It contains a block of data. +#[repr(C)] +#[derive(Clone, Copy, Pod)] +struct DataNode([u8; BLOCK_SIZE]); + +/// The node cache used by `CryptoLog`. User-defined node cache +/// can achieve TX-awareness. +pub trait NodeCache { + /// Gets an owned value from cache corresponding to the position. + fn get(&self, pos: Pbid) -> Option>; + + /// Puts a position-value pair into cache. If the value of that position + /// already exists, updates it and returns the old value. Otherwise, `None` is returned. + fn put( + &self, + pos: Pbid, + value: Arc, + ) -> Option>; +} + +impl CryptoLog { + // Buffer capacity for appended data nodes. + const APPEND_BUF_CAPACITY: usize = 1024; -impl CryptoLog { /// Creates a new `CryptoLog`. - /// + /// /// A newly-created instance won't occupy any space on the `block_log` /// until the first flush, which triggers writing the root MHT node. - pub fn new(block_log: L) -> Self { - todo!() + pub fn new(block_log: L, key: Key, cache: Arc) -> Self { + let append_pos = 0 as Lbid; + Self { + block_log, + key, + root_meta: Mutex::new(None), + cache, + append_buf: RwLock::new(AppendBuf::new(append_pos, Self::APPEND_BUF_CAPACITY)), + append_pos: AtomicUsize::new(append_pos), + lock: Mutex::new(()), + } } /// Opens an existing `CryptoLog` backed by a `block_log`. - /// + /// /// The given key and the metadata of the root MHT are sufficient to /// load and verify the root node of the `CryptoLog`. - pub fn open(block_log: L, key: Key, root_meta: RootMhtMeta) -> Result<()> { - todo!() + pub fn open( + block_log: L, + key: Key, + root_meta: RootMhtMeta, + cache: Arc, + ) -> Result { + let append_pos = { + let root_node = if let Some(node) = cache.get(root_meta.pos) { + node.downcast::().map_err(|_| { + Error::with_msg(InvalidArgs, "cache node downcasts to root MHT node failed") + })? + } else { + let mut cipher = Buf::alloc(1)?; + let mut plain = Buf::alloc(1)?; + block_log.read(root_meta.pos, cipher.as_mut())?; + OsAead::new().decrypt( + cipher.as_slice(), + &key, + &root_meta.iv, + &[], + &root_meta.mac, + plain.as_mut_slice(), + )?; + Arc::new(MhtNode::from_bytes(plain.as_slice())) + }; + let append_pos = root_node.header.num_data_nodes as Lbid; + cache.put(root_meta.pos, root_node); + append_pos + }; + Ok(Self { + block_log, + key, + root_meta: Mutex::new(Some(root_meta)), + cache, + append_buf: RwLock::new(AppendBuf::new(append_pos, Self::APPEND_BUF_CAPACITY)), + append_pos: AtomicUsize::new(append_pos), + lock: Mutex::new(()), + }) } /// Gets the root key. @@ -120,38 +225,640 @@ impl CryptoLog { &self.key } - /// Gets the metadata of the root block. - pub fn root_meta(&self) -> Option<&RootMhtMeta> { - self.root_meta.as_ref() + /// Gets the metadata of the root MHT node. Returns `None` if there hasn't been + /// any appends or flush to build a new root. + pub fn root_meta(&self) -> Option { + *self.root_meta.lock() } - /// Gets the number of data blocks. + /// Gets the number of data nodes(blocks). pub fn nblocks(&self) -> usize { - todo!() + let append_buf_nblocks = self.append_buf.read().num_append(); + let Ok(root_node) = self.root_mht_node() else { + return append_buf_nblocks; + }; + append_buf_nblocks + root_node.header.num_data_nodes as usize } /// Reads one or multiple data blocks at a specified position. - pub fn read(&self, pos: BlockId, buf: &mut impl BlockBuf) -> Result<()> { - todo!() + pub fn read(&self, mut pos: Lbid, mut buf: BufMut) -> Result<()> { + let (mut lookup_mht_node, mut nth_entry) = (None, 0); + let mut iter_mut = buf.iter_mut(); + while let Some(mut block_buf) = iter_mut.next() { + // Read from append buffer first + if let Some(data_node) = self.append_buf.read().get_data(pos) { + block_buf.as_mut_slice().copy_from_slice(&data_node.0); + continue; + } + + if lookup_mht_node.is_none() { + // Locate the target last-level MHT node given the position + let (node, nth) = self.locate_last_level_mht_node_with_nth_entry(pos)?; + debug_assert_eq!(node.header.height, 1); + let _ = lookup_mht_node.insert(node); + nth_entry = nth; + } + let target_mht_node = lookup_mht_node.as_ref().unwrap(); + + // Read the target data node + let target_entry = &target_mht_node.entries[nth_entry]; + self.read_data_node(target_entry, block_buf)?; + + nth_entry += 1; + pos += 1; + // Should lookup the next target if current lookup reaches the MHT node's end + if nth_entry == target_mht_node.header.num_valid_child as _ { + let _ = lookup_mht_node.take(); + } + } + Ok(()) + } + + // Given a position, locates the last-level MHT node(height always equals 1) + // and the inner index of entries to the target data node. + fn locate_last_level_mht_node_with_nth_entry( + &self, + pos: Lbid, + ) -> Result<(Arc, usize)> { + let root_node = self.root_mht_node()?; + if root_node.header.num_data_nodes <= pos as _ { + return_errno_with_msg!(InvalidArgs, "read out of bound"); + } + + let mut offset = pos; + let mut lookup_node = root_node; + // Loop until we reach to the leaf + while lookup_node.header.height != 1 { + // Locate the target entry of current node and update the offset + let nth_entry = offset / MHT_NBRANCHES; + debug_assert!(nth_entry < lookup_node.header.num_valid_child as _); + let entry = lookup_node.entries[nth_entry]; + offset -= nth_entry * (lookup_node.max_num_data_nodes() / MHT_NBRANCHES); + + // Lookup in the target child MHT node + lookup_node = + self.read_mht_node(entry.pos, &entry.key, &entry.mac, &Iv::new_zeroed())?; + } + + Ok((lookup_node, offset)) } /// Appends one or multiple data blocks at the end. - pub fn append(&self, buf: &impl BlockBuf) -> Result<()> { - todo!() + pub fn append(&self, buf: BufRef) -> Result<()> { + let _lock = self.lock.lock(); + let mut append_buf = self.append_buf.write(); + for block_buf in buf.iter() { + let mut data_node = DataNode::new_uninit(); + data_node.0.copy_from_slice(block_buf.as_slice()); + + // Whether the append buffer is full + if append_buf.append_data(Arc::new(data_node)) { + drop(append_buf); + self.flush_append_buf()?; + append_buf = self.append_buf.write(); + } + } + Ok(()) } /// Ensures that all new data are persisted. - /// + /// /// Each successful flush triggers writing a new version of the root MHT /// node to the underlying block log. The metadata of the latest root MHT /// can be obtained via the `root_meta` method. pub fn flush(&self) -> Result<()> { - todo!() + let lock = self.lock.lock(); + self.flush_append_buf()?; + drop(lock); + + self.block_log.flush() } + + // Flushes all buffered data nodes to the underlying block log. A new root MHT node + // would be built and the whole MHT is updated based on the newly-appended data. + fn flush_append_buf(&self) -> Result<()> { + let data_nodes = self.append_buf.read().all_buffered_data(); + if data_nodes.is_empty() { + return Ok(()); + } + + let new_root_meta = self.build_mht(&data_nodes)?; + let _ = self.root_meta.lock().insert(new_root_meta); + + self.append_buf.write().clear(); + Ok(()) + } + + // Builds a MHT given a bunch of data nodes (along with the incomplete MHT nodes from previous), + // On success, returns the metadata of the generated root node. + fn build_mht(&self, data_nodes: &Vec>) -> Result { + let root_node_opt = self.root_mht_node().ok(); + let root_height = root_node_opt + .as_ref() + .map_or_else(|| 0, |node| node.header.height); + + // Collect incomplete MHT nodes first + let mut incomplete_node_queue = VecDeque::new(); + if let Some(root_node) = root_node_opt { + self.collect_incomplete_mht_nodes(&root_node, &mut incomplete_node_queue)?; + } + + self.do_build_mht(data_nodes, incomplete_node_queue, root_height) + } + + fn do_build_mht( + &self, + data_nodes: &Vec>, + mut incomplete_node_queue: VecDeque>, + root_height: u8, + ) -> Result { + let need_incomplete_nodes_for_building = !incomplete_node_queue.is_empty(); + let mut height = 1; + + // Prepare two queues for each iteration(each level) + let mut curr_level_entries: VecDeque = { + let mut entries = VecDeque::with_capacity(data_nodes.len()); + // Append newly data nodes and collect corresponding MHT node entries. + for data_node in data_nodes { + entries.push_back(self.append_data_node(&data_node)?); + } + entries + }; + let mut curr_level_num_data_nodes: VecDeque = + curr_level_entries.iter().map(|_| 1_u32).collect(); + + if need_incomplete_nodes_for_building { + update_level_queues_with_incomplete_nodes( + &mut incomplete_node_queue, + &mut curr_level_entries, + &mut curr_level_num_data_nodes, + height, + ); + } + + // Loop to construct the MHT hierarchically, from bottom to top + let new_root_node = 'outer: loop { + let level_size = curr_level_entries.len(); + // Each loop builds a whole level of MHT nodes + for idx in 0..level_size { + let is_complete = (idx + 1) % MHT_NBRANCHES == 0; + if !is_complete && idx != level_size - 1 { + // Not a complete MHT node, or not the last entry of current level + continue; + } + + let new_mht_node = { + let num_valid_child = if is_complete { + MHT_NBRANCHES + } else { + // Last incomplete node of current level + level_size % MHT_NBRANCHES + }; + let mut num_data_nodes = 0; + let entries = array_init::array_init(|i| { + if i < num_valid_child { + num_data_nodes += curr_level_num_data_nodes.pop_front().unwrap(); + curr_level_entries.pop_front().unwrap() + } else { + // Padding invalid entries to the rest + MhtNodeEntry::new_uninit() + } + }); + Arc::new(MhtNode { + header: MhtNodeHeader { + height, + num_data_nodes, + num_valid_child: num_valid_child as _, + }, + entries, + }) + }; + + // If there is only one MHT node built on current level, and it's + // higher than previous root, then it becomes the new root node + if height >= root_height && level_size <= MHT_NBRANCHES { + break 'outer new_mht_node; + } + + let new_entry: MhtNodeEntry = self.append_mht_node(&new_mht_node)?; + curr_level_entries.push_back(new_entry); + curr_level_num_data_nodes.push_back(new_mht_node.header.num_data_nodes); + } + height += 1; + + if need_incomplete_nodes_for_building { + update_level_queues_with_incomplete_nodes( + &mut incomplete_node_queue, + &mut curr_level_entries, + &mut curr_level_num_data_nodes, + height, + ); + } + }; + + return self.append_root_mht_node(&new_root_node); + + // If there are incomplete nodes from previous, we need to update the two queues + fn update_level_queues_with_incomplete_nodes( + incomplete_node_queue: &mut VecDeque>, + level_entries: &mut VecDeque, + level_num_data_nodes: &mut VecDeque, + level_height: u8, + ) { + // Incompelete nodes only involve in the building for the same level + if incomplete_node_queue.is_empty() + || incomplete_node_queue.back().unwrap().header.height != level_height + { + return; + } + + let incomplete_node = incomplete_node_queue.pop_back().unwrap(); + let header = &incomplete_node.header; + // Only collect the complete entries from the incomplete node, ignore the last incomplete entry(if exists) + // since its corresponding node already been processed from previous building round + let num_complete_entries: usize = + if header.num_data_nodes as usize % MHT_NBRANCHES == 0 || header.height == 1 { + header.num_valid_child as _ + } else { + (header.num_valid_child - 1) as _ + }; + let num_data_nodes_each_entry = incomplete_node.max_num_data_nodes() / MHT_NBRANCHES; + + for i in (0..num_complete_entries).rev() { + // Put them to the front of the queue for later building + level_entries.push_front(incomplete_node.entries[i]); + level_num_data_nodes.push_front(num_data_nodes_each_entry as _); + } + } + } + + /// Collect incomplete(not fully filled) MHT nodes from the given root node. + fn collect_incomplete_mht_nodes( + &self, + root_node: &Arc, + res_queue: &mut VecDeque>, + ) -> Result<()> { + let mut lookup_node = root_node.clone(); + let root_height = root_node.header.height; + loop { + let header = lookup_node.header; + // Loop until we reach to the last complete MHT node, + // or to the bottom level of MHT nodes + if !lookup_node.is_incomplete() || header.height == 1 { + if header.height == root_height { + // Root node must be collected for later building, + // no matter if it is incomplete or not + res_queue.push_back(lookup_node); + } + break; + } + res_queue.push_back(lookup_node.clone()); + + // Next lookup node must at the end of the children valid nodes + let entry: MhtNodeEntry = lookup_node.entries[(header.num_valid_child - 1) as usize]; + lookup_node = + self.read_mht_node(entry.pos, &entry.key, &entry.mac, &Iv::new_zeroed())?; + } + Ok(()) + } + + fn append_root_mht_node(&self, node: &Arc) -> Result { + let (cipher, mac, iv) = { + let plain = node.as_bytes(); + let mut cipher = Buf::alloc(1)?; + let iv = Iv::random(); + let mac = + OsAead::new().encrypt(&plain, &self.key, &iv, &[], &mut cipher.as_mut_slice())?; + (cipher, mac, iv) + }; + let pos = self.block_log.append(cipher.as_ref())?; + let append_pos = self.append_pos.fetch_add(1, Ordering::Release); + debug_assert_eq!(pos, append_pos); + + self.cache.put(pos, node.clone()); + Ok(RootMhtMeta { pos, mac, iv }) + } + + fn append_mht_node(&self, node: &Arc) -> Result { + let (cipher, key, mac) = { + let plain = node.as_bytes(); + let mut cipher = Buf::alloc(1)?; + let key = Key::random(); + let mac = OsAead::new().encrypt( + &plain, + &key, + &Iv::new_zeroed(), + &[], + &mut cipher.as_mut_slice(), + )?; + (cipher, key, mac) + }; + let pos = self.block_log.append(cipher.as_ref())?; + let append_pos = self.append_pos.fetch_add(1, Ordering::Release); + debug_assert_eq!(pos, append_pos); + + self.cache.put(pos, node.clone()); + Ok(MhtNodeEntry { pos, key, mac }) + } + + fn append_data_node(&self, node: &DataNode) -> Result { + let (cipher, key, mac) = { + let mut cipher = Buf::alloc(1)?; + let key = Key::random(); + let mac = OsAead::new().encrypt( + &node.0, + &key, + &Iv::new_zeroed(), + &[], + &mut cipher.as_mut_slice(), + )?; + (cipher, key, mac) + }; + let pos = self.block_log.append(cipher.as_ref())?; + let append_pos = self.append_pos.fetch_add(1, Ordering::Release); + debug_assert_eq!(pos, append_pos); + + Ok(MhtNodeEntry { pos, key, mac }) + } + + fn root_mht_node(&self) -> Result> { + let root_meta = self + .root_meta + .lock() + .ok_or(Error::with_msg(NotFound, "root MHT node not found"))?; + self.read_mht_node(root_meta.pos, &self.key, &root_meta.mac, &root_meta.iv) + } + + fn read_mht_node(&self, pos: Pbid, key: &Key, mac: &Mac, iv: &Iv) -> Result> { + if let Some(node) = self.cache.get(pos) { + return Ok(node.downcast::().map_err(|_| { + Error::with_msg(InvalidArgs, "cache node downcasts to MHT node failed") + })?); + } + + let mht_node = { + let mut cipher = Buf::alloc(1)?; + let mut plain = Buf::alloc(1)?; + self.block_log.read(pos, cipher.as_mut())?; + OsAead::new().decrypt(cipher.as_slice(), key, iv, &[], mac, plain.as_mut_slice())?; + Arc::new(MhtNode::from_bytes(plain.as_slice())) + }; + + self.cache.put(pos, mht_node.clone()); + Ok(mht_node) + } + + fn read_data_node(&self, entry: &MhtNodeEntry, mut buf: BufMut) -> Result<()> { + let mut cipher = Buf::alloc(1)?; + self.block_log.read(entry.pos, cipher.as_mut())?; + OsAead::new().decrypt( + cipher.as_slice(), + &entry.key, + &Iv::new_zeroed(), + &[], + &entry.mac, + buf.as_mut_slice(), + ) + } + + pub fn display_mht(&self) { + println!("{:?}", CryptoLogDisplayer(self)); + } +} + +impl MhtNode { + pub fn is_incomplete(&self) -> bool { + self.header.num_data_nodes != self.max_num_data_nodes() as _ + } + + pub fn max_num_data_nodes(&self) -> usize { + MHT_NBRANCHES.pow(self.header.height as _) + } +} + +impl Debug for CryptoLog { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("CryptoLog") + .field("key", &self.key) + .field("root_mht_meta", &self.root_meta.lock()) + .field("append_buf_nblocks", &self.append_buf.read().num_append()) + .field("append_pos", &self.append_pos.load(Ordering::Relaxed)) + .finish() + } +} + +impl Debug for MhtNode { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("MhtNode") + .field("height", &self.header.height) + .field("num_data_nodes", &self.header.num_data_nodes) + .field("num_valid_child", &self.header.num_valid_child) + .finish() + } +} + +impl Debug for DataNode { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("DataNode") + .field("first 16 bytes", &&self.0[..16]) + .finish() + } +} + +struct CryptoLogDisplayer<'a, L>(&'a CryptoLog); + +impl<'a, L: BlockLog> Debug for CryptoLogDisplayer<'a, L> { + // A heavy implementation to display the whole MHT. + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let mut debug_struct = f.debug_struct("CryptoLog"); + + // Display root MHT node + let root_meta = self.0.root_meta(); + debug_struct.field("\nroot_meta", &root_meta); + if root_meta.is_none() { + return debug_struct.finish(); + } + let root_mht_node = self.0.root_mht_node().unwrap(); + debug_struct.field("\n-> root_mht_node", &root_mht_node); + let mut height = root_mht_node.header.height; + if height == 1 { + return debug_struct.finish(); + } + + // Display internal MHT nodes level-by-level + let mut level_entries: VecDeque = root_mht_node + .entries + .into_iter() + .take(root_mht_node.header.num_valid_child as _) + .collect(); + 'outer: loop { + let level_size = level_entries.len(); + for _ in 0..level_size { + let entry = level_entries.pop_front().unwrap(); + let node = self + .0 + .read_mht_node(entry.pos, &entry.key, &entry.mac, &Iv::new_zeroed()) + .unwrap(); + debug_struct.field("\n node_entry", &entry); + debug_struct.field("\n -> mht_node", &node); + for i in 0..node.header.num_valid_child { + level_entries.push_back(node.entries[i as usize]); + } + } + height -= 1; + if height == 1 { + break 'outer; + } + } + debug_struct.finish() + } +} + +/// A buffer that contains appended data. +struct AppendBuf { + data_queue: Vec>, + append_pos: Lbid, + capacity: usize, } -// CryptoLog can derive CryptoLogSnapshot, which is a read-only snapshot -// of the CryptoLog. -// A derived CryptoLogSnapshot shared the same cache with its parent. -// -// But CryptoLog does not support rollback. How to do it? \ No newline at end of file +impl AppendBuf { + pub fn new(append_pos: Lbid, capacity: usize) -> Self { + Self { + data_queue: Vec::new(), + append_pos, + capacity, + } + } + + pub fn all_buffered_data(&self) -> Vec> { + self.data_queue.iter().map(|data| data.clone()).collect() + } + + pub fn append_data(&mut self, node: Arc) -> bool { + self.data_queue.push(node); + // Returns whether the buffer is at capacity + self.data_queue.len() >= self.capacity + } + + pub fn get_data(&self, pos: Lbid) -> Option> { + let append_pos = self.append_pos; + if pos < append_pos || pos - append_pos >= self.data_queue.len() { + return None; + } + Some(self.data_queue[pos - append_pos].clone()) + } + + pub fn num_append(&self) -> usize { + self.data_queue.len() + } + + pub fn clear(&mut self) { + self.data_queue.clear(); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::layers::bio::MemLog; + + struct NoCache; + impl NodeCache for NoCache { + fn get(&self, _pos: Pbid) -> Option> { + None + } + fn put( + &self, + _pos: Pbid, + _value: Arc, + ) -> Option> { + None + } + } + + fn create_crypto_log() -> Result> { + let mem_log = MemLog::create(4 * 1024)?; + let key = Key::random(); + let cache = Arc::new(NoCache {}); + Ok(CryptoLog::new(mem_log, key, cache)) + } + + #[test] + fn crypto_log_fns() -> Result<()> { + let log = create_crypto_log()?; + let append_cnt = MHT_NBRANCHES - 1; + let mut buf = Buf::alloc(1)?; + for i in 0..append_cnt { + buf.as_mut_slice().fill(i as _); + log.append(buf.as_ref())?; + } + log.flush()?; + println!("{:?}", log); + log.display_mht(); + + let content = 5u8; + buf.as_mut_slice().fill(content); + log.append(buf.as_ref())?; + log.flush()?; + log.display_mht(); + log.append(buf.as_ref())?; + log.flush()?; + log.display_mht(); + + let (root_meta, root_node) = (log.root_meta().unwrap(), log.root_mht_node()?); + assert_eq!(root_meta.pos, 107); + assert_eq!(root_node.header.height, 2); + assert_eq!(root_node.header.num_data_nodes as usize, append_cnt + 2); + assert_eq!(root_node.header.num_valid_child, 2); + + log.read(5 as BlockId, buf.as_mut())?; + assert_eq!(buf.as_slice(), [content; BLOCK_SIZE]); + let mut buf = Buf::alloc(2)?; + log.read((MHT_NBRANCHES - 1) as BlockId, buf.as_mut())?; + assert_eq!(buf.as_slice(), [content; 2 * BLOCK_SIZE]); + Ok(()) + } + + #[test] + fn write_once_read_many() -> Result<()> { + let log = create_crypto_log()?; + let append_cnt = 1024_usize; + let mut buf = Buf::alloc(append_cnt)?; + for i in 0..append_cnt { + buf.as_mut_slice()[i * BLOCK_SIZE..(i + 1) * BLOCK_SIZE].fill(i as _); + } + log.append(buf.as_ref())?; + log.flush()?; + log.display_mht(); + + let mut buf = Buf::alloc(1)?; + for i in (0..append_cnt).rev() { + log.read(i as Lbid, buf.as_mut())?; + assert_eq!(buf.as_slice(), [i as u8; BLOCK_SIZE]); + } + Ok(()) + } + + #[test] + fn write_many_read_once() -> Result<()> { + let log = create_crypto_log()?; + let append_cnt = 1024_usize; + let mut buf = Buf::alloc(1)?; + for i in 0..append_cnt { + buf.as_mut_slice().fill(i as _); + log.append(buf.as_ref())?; + } + log.flush()?; + log.display_mht(); + + let mut buf = Buf::alloc(append_cnt)?; + log.read(0 as Lbid, buf.as_mut())?; + for i in 0..append_cnt { + assert_eq!( + buf.as_slice()[i * BLOCK_SIZE..(i + 1) * BLOCK_SIZE], + [i as u8; BLOCK_SIZE] + ); + } + Ok(()) + } +} diff --git a/src/layers/1-crypto/mod.rs b/src/layers/1-crypto/mod.rs index c6b0c0d..57c378a 100644 --- a/src/layers/1-crypto/mod.rs +++ b/src/layers/1-crypto/mod.rs @@ -2,11 +2,11 @@ mod crypto_blob; mod crypto_chain; -// mod crypto_log; // Uncomment this when it's ready +mod crypto_log; pub use self::crypto_blob::CryptoBlob; pub use self::crypto_chain::CryptoChain; -// pub use self::crypto_log::{CryptoLog, RootMhtMeta}; +pub use self::crypto_log::{CryptoLog, NodeCache, RootMhtMeta}; pub type Key = crate::os::AeadKey; pub type Iv = crate::os::AeadIv; diff --git a/src/lib.rs b/src/lib.rs index 5fef1bf..e2e751a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,6 +2,7 @@ #![feature(let_chains)] #![feature(negative_impls)] #![feature(new_uninit)] +#![feature(slice_concat_trait)] mod error; mod layers; diff --git a/src/prelude.rs b/src/prelude.rs index db1ac01..98ce432 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -1,6 +1,8 @@ pub(crate) use crate::error::{Errno::*, Error}; pub(crate) use crate::layers::bio::{BlockId, BLOCK_SIZE}; -pub(crate) use crate::util::{Aead as _, RandomInit, Rng as _, Skcipher as _}; +pub(crate) use crate::util::{ + align_down, align_up, Aead as _, RandomInit, Rng as _, Skcipher as _, +}; pub(crate) use crate::{return_errno, return_errno_with_msg}; pub(crate) type Result = core::result::Result; diff --git a/src/tx/mod.rs b/src/tx/mod.rs index 3d982f0..73f9318 100644 --- a/src/tx/mod.rs +++ b/src/tx/mod.rs @@ -285,7 +285,7 @@ pub type TxId = u64; pub trait TxData: anymap::any::Any {} #[cfg(test)] -mod test { +mod tests { use super::*; use alloc::collections::BTreeSet; use spin::Mutex; diff --git a/src/util/mod.rs b/src/util/mod.rs index adb1da9..e1f781e 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -3,3 +3,11 @@ mod lazy_delete; pub use self::crypto::{Aead, RandomInit, Rng, Skcipher}; pub use self::lazy_delete::LazyDelete; + +pub(crate) const fn align_up(x: usize, align: usize) -> usize { + ((x + align - 1) / align) * align +} + +pub(crate) const fn align_down(x: usize, align: usize) -> usize { + (x / align) * align +}