Skip to content

Commit

Permalink
feat: initial export snapshot skeleton (#40)
Browse files Browse the repository at this point in the history
* feat: initial export snapshot template

* doc: clarify miniblocks
  • Loading branch information
zeapoz authored Nov 16, 2023
1 parent d457b88 commit 5408f3f
Show file tree
Hide file tree
Showing 7 changed files with 349 additions and 1 deletion.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ members = ["state-reconstruct-fetcher"]
[dependencies]
async-trait = "0.1.74"
blake2 = "0.10.6"
chrono = "0.4.31"
clap = { version = "4.4.7", features = ["derive", "env"] }
ethers = "1.0.2"
eyre = "0.6.8"
Expand Down
8 changes: 8 additions & 0 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,14 @@ pub enum Command {
#[arg(short, long, env = "ZK_SYNC_DB_PATH")]
db_path: Option<String>,
},

/// Testing.
ExportSnapshot {
#[command(flatten)]
l1_fetcher_options: L1FetcherOptions,
/// The path of the file to export the snapshot to.
file: Option<String>,
},
}

#[derive(Parser)]
Expand Down
25 changes: 25 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::{
use clap::Parser;
use cli::{Cli, Command, ReconstructSource};
use eyre::Result;
use processor::snapshot::SnapshotExporter;
use state_reconstruct_fetcher::{
constants::storage,
l1_fetcher::{L1Fetcher, L1FetcherOptions},
Expand Down Expand Up @@ -52,6 +53,7 @@ fn start_logger(default_level: LevelFilter) {
}

#[tokio::main]
#[allow(clippy::too_many_lines)]
async fn main() -> Result<()> {
start_logger(LevelFilter::INFO);

Expand Down Expand Up @@ -150,6 +152,29 @@ async fn main() -> Result<()> {
println!("{result}");
}
}
Command::ExportSnapshot {
l1_fetcher_options,
file,
} => {
let fetcher_options = L1FetcherOptions {
http_url: l1_fetcher_options.http_url,
start_block: l1_fetcher_options.start_block,
block_step: l1_fetcher_options.block_step,
block_count: l1_fetcher_options.block_count,
disable_polling: l1_fetcher_options.disable_polling,
};

let fetcher = L1Fetcher::new(fetcher_options, None)?;
let processor = SnapshotExporter::new(file);

let (tx, rx) = mpsc::channel::<CommitBlockInfoV1>(5);
let processor_handle = tokio::spawn(async move {
processor.run(rx).await;
});

fetcher.run(tx).await?;
processor_handle.await?;
}
}

Ok(())
Expand Down
3 changes: 2 additions & 1 deletion src/processor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ use state_reconstruct_fetcher::types::CommitBlockInfoV1;
use tokio::sync::mpsc;

pub mod json;
pub mod snapshot;
pub mod tree;

#[async_trait]
pub trait Processor {
async fn run(self, rx: mpsc::Receiver<CommitBlockInfoV1>);
async fn run(self, mut rx: mpsc::Receiver<CommitBlockInfoV1>);
}
238 changes: 238 additions & 0 deletions src/processor/snapshot/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
use std::{collections::HashMap, fmt, fs, path::PathBuf, str::FromStr};

mod types;

use async_trait::async_trait;
use blake2::{Blake2s256, Digest};
use ethers::types::{Address, H256, U256, U64};
use eyre::Result;
use indexmap::IndexSet;
use state_reconstruct_fetcher::{
constants::{ethereum, storage},
types::CommitBlockInfoV1,
};
use tokio::sync::mpsc;

use self::types::{SnapshotStorageLog, StorageKey, StorageValue};
use super::Processor;
use crate::processor::snapshot::types::MiniblockNumber;

// NOTE: What file extension to use?
const DEFAULT_EXPORT_PATH: &str = "snapshot_export";

pub struct SnapshotExporter {
storage_log_entries: HashMap<StorageKey, SnapshotStorageLog>,
index_to_key_map: IndexSet<U256>,
path: PathBuf,
}

impl SnapshotExporter {
pub fn new(path: Option<String>) -> Self {
let path = match path {
Some(p) => PathBuf::from(p),
None => PathBuf::from(DEFAULT_EXPORT_PATH),
};

let mut index_to_key_map = IndexSet::new();
let mut storage_log_entries = HashMap::new();

reconstruct_genesis_state(
&mut storage_log_entries,
&mut index_to_key_map,
storage::INITAL_STATE_PATH,
)
.unwrap();

Self {
storage_log_entries,
index_to_key_map,
path,
}
}
}

impl fmt::Display for SnapshotExporter {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut s = String::new();

for entry in &self.storage_log_entries {
s.push_str(&entry.1.to_string());
s.push('\n');
}

write!(f, "{s}")
}
}

#[async_trait]
impl Processor for SnapshotExporter {
async fn run(mut self, mut rx: mpsc::Receiver<CommitBlockInfoV1>) {
// TODO: Send from fetcher.
let l1_block_number = U64::from(0);

while let Some(block) = rx.recv().await {
// Initial calldata.
for (key, value) in &block.initial_storage_changes {
let key = U256::from_little_endian(key);
let value = H256::from(value);
self.index_to_key_map.insert(key);

let log = self
.storage_log_entries
.entry(key)
.or_insert(SnapshotStorageLog {
key,
value: StorageValue::default(),
// NOTE: This isn't stored in L1, can we procure it some other way?
miniblock_number_of_initial_write: U64::from(0),
l1_batch_number_of_initial_write: l1_block_number,
enumeration_index: 0,
});
log.value = value;
}

// Repeated calldata.
for (index, value) in &block.repeated_storage_changes {
let index = usize::try_from(*index).expect("truncation failed");
// Index is 1-based so we subtract 1.
let key = *self.index_to_key_map.get_index(index - 1).unwrap();
let value = H256::from(value);

self.storage_log_entries
.entry(key)
.and_modify(|log| log.value = value);
}

// TODO: We need to index these by hash.
// Factory dependencies.
// for dep in &block.factory_deps {}
}

fs::write(&self.path, self.to_string()).expect("failed to export snapshot");
tracing::info!("Successfully exported snapshot to {}", self.path.display());
}
}

// TODO: Can this be made somewhat generic?
/// Attempts to reconstruct the genesis state from a CSV file.
fn reconstruct_genesis_state(
storage_log_entries: &mut HashMap<U256, SnapshotStorageLog>,
index_to_key: &mut IndexSet<U256>,
path: &str,
) -> Result<()> {
fn cleanup_encoding(input: &'_ str) -> &'_ str {
input
.strip_prefix("E'\\\\x")
.unwrap()
.strip_suffix('\'')
.unwrap()
}

let mut block_batched_accesses = vec![];

let input = fs::read_to_string(path)?;
for line in input.lines() {
let mut separated = line.split(',');
let _derived_key = separated.next().unwrap();
let address = separated.next().unwrap();
let key = separated.next().unwrap();
let value = separated.next().unwrap();
let op_number: u32 = separated.next().unwrap().parse()?;
let _ = separated.next().unwrap();
let miniblock_number: u32 = separated.next().unwrap().parse()?;

if miniblock_number != 0 {
break;
}

let address = Address::from_str(cleanup_encoding(address))?;
let key = U256::from_str_radix(cleanup_encoding(key), 16)?;
let value = U256::from_str_radix(cleanup_encoding(value), 16)?;

let record = (address, key, value, op_number, miniblock_number);
block_batched_accesses.push(record);
}

// Sort in block block.
block_batched_accesses.sort_by(|a, b| match a.0.cmp(&b.0) {
std::cmp::Ordering::Equal => match a.1.cmp(&b.1) {
std::cmp::Ordering::Equal => match a.3.cmp(&b.3) {
std::cmp::Ordering::Equal => {
panic!("must be unique")
}
a => a,
},
a => a,
},
a => a,
});

let mut key_set = std::collections::HashSet::new();

// Batch.
for el in &block_batched_accesses {
let derived_key = derive_final_address_for_params(&el.0, &el.1);
key_set.insert(derived_key);
}

let mut batched = vec![];
let mut it = block_batched_accesses.into_iter();
let mut previous = it.next().unwrap();
for el in it {
if el.0 != previous.0 || el.1 != previous.1 {
batched.push((previous.0, previous.1, previous.2, previous.4));
}

previous = el;
}

// Finalize.
batched.push((previous.0, previous.1, previous.2, previous.4));

tracing::trace!("Have {} unique keys in the tree", key_set.len());

for (address, key, value, miniblock_number) in batched {
let derived_key = derive_final_address_for_params(&address, &key);
// TODO: what to do here?
// let version = tree.latest_version().unwrap_or_default();
// let _leaf = tree.read_leaves(version, &[key]);

// let existing_value = U256::from_big_endian(existing_leaf.leaf.value());
// if existing_value == value {
// // we downgrade to read
// // println!("Downgrading to read")
// } else {
// we write
let mut tmp = [0u8; 32];
value.to_big_endian(&mut tmp);

let key = U256::from_little_endian(&derived_key);
let value = H256::from(tmp);

let log = storage_log_entries
.entry(key)
.or_insert(SnapshotStorageLog {
key,
value: StorageValue::default(),
miniblock_number_of_initial_write: MiniblockNumber::from(miniblock_number),
l1_batch_number_of_initial_write: U64::from(ethereum::GENESIS_BLOCK),
enumeration_index: 0,
});

log.value = value;
index_to_key.insert(key);
}

Ok(())
}

fn derive_final_address_for_params(address: &Address, key: &U256) -> [u8; 32] {
let mut buffer = [0u8; 64];
buffer[12..32].copy_from_slice(&address.0);
key.to_big_endian(&mut buffer[32..64]);

let mut result = [0u8; 32];
result.copy_from_slice(Blake2s256::digest(buffer).as_slice());

result
}
Loading

0 comments on commit 5408f3f

Please sign in to comment.