Skip to content

Commit

Permalink
feat(sync): store old block's pending until we download pending from …
Browse files Browse the repository at this point in the history
…new block (#1291)
  • Loading branch information
ShahakShama authored Oct 22, 2023
1 parent 1587f79 commit d4b043f
Show file tree
Hide file tree
Showing 5 changed files with 164 additions and 50 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 11 additions & 1 deletion crates/papyrus_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ use papyrus_sync::sources::base_layer::{BaseLayerSourceError, EthereumBaseLayerS
use papyrus_sync::sources::central::{CentralError, CentralSource};
use papyrus_sync::sources::pending::PendingSource;
use papyrus_sync::{StateSync, StateSyncError};
use starknet_api::block::BlockHash;
use starknet_api::hash::{StarkFelt, GENESIS_HASH};
use starknet_api::stark_felt;
use starknet_client::reader::objects::pending_data::PendingBlock;
use starknet_client::reader::PendingData;
use tokio::sync::RwLock;
use tracing::metadata::LevelFilter;
Expand All @@ -44,7 +48,13 @@ async fn run_threads(config: NodeConfig) -> anyhow::Result<()> {

// The sync is the only writer of the syncing state.
let shared_highest_block = Arc::new(RwLock::new(None));
let pending_data = Arc::new(RwLock::new(PendingData::default()));
let pending_data = Arc::new(RwLock::new(PendingData {
block: PendingBlock {
parent_block_hash: BlockHash(stark_felt!(GENESIS_HASH)),
..Default::default()
},
..Default::default()
}));
let pending_classes = Arc::new(RwLock::new(PendingClasses::default()));

// JSON-RPC server.
Expand Down
1 change: 1 addition & 0 deletions crates/papyrus_sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,5 @@ papyrus_storage = { path = "../papyrus_storage", features = ["testing"] }
pretty_assertions.workspace = true
starknet_client = { path = "../starknet_client", features = ["testing"] }
starknet_api = { workspace = true, features = ["testing"] }
test_utils = { path = "../test_utils" }
tokio-stream.workspace = true
31 changes: 20 additions & 11 deletions crates/papyrus_sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ use sources::base_layer::BaseLayerSourceError;
use starknet_api::block::{Block, BlockHash, BlockNumber};
use starknet_api::core::{ClassHash, CompiledClassHash};
use starknet_api::deprecated_contract_class::ContractClass as DeprecatedContractClass;
// TODO(shahak): Consider adding genesis hash to the config to support chains that have
// different genesis hash.
use starknet_api::hash::{StarkFelt, GENESIS_HASH};
use starknet_api::stark_felt;
use starknet_api::state::StateDiff;
use starknet_client::reader::PendingData;
use tokio::sync::RwLock;
Expand Down Expand Up @@ -882,18 +886,19 @@ async fn sync_pending_data<TPendingSource: PendingSourceTrait + Sync + Send>(
pending_data: Arc<RwLock<PendingData>>,
sleep_duration: Duration,
) -> Result<(), StateSyncError> {
*pending_data.write().await = PendingData::default();
let txn = reader.begin_ro_txn()?;
let header_marker = txn.get_header_marker()?;
// TODO: Consider extracting this functionality to different а function.
let latest_block_hash = match header_marker {
// TODO: make sure this is the correct value for the genesis's parent block in all the
// environments.
BlockNumber(0) => BlockHash::default(),
BlockNumber(0) => BlockHash(stark_felt!(GENESIS_HASH)),
_ => {
txn.get_block_header(header_marker.prev().expect("Header marker isn't zero."))?
.expect("Block before the header marker must have header in the database.")
.block_hash
txn.get_block_header(
header_marker
.prev()
.expect("All blocks other than the first block should have a predecessor."),
)?
.expect("Block before the header marker must have header in the database.")
.block_hash
}
};
loop {
Expand All @@ -902,10 +907,14 @@ async fn sync_pending_data<TPendingSource: PendingSourceTrait + Sync + Send>(
debug!("A new block was found. Stopping pending sync.");
return Ok(());
};
let current_pending_num_transactions = pending_data.read().await.block.transactions.len();
if current_pending_num_transactions == 0
|| new_pending_data.block.transactions.len() > current_pending_num_transactions
{
let (current_pending_num_transactions, current_pending_parent_hash) = {
let pending_block = &pending_data.read().await.block;
(pending_block.transactions.len(), pending_block.parent_block_hash)
};
let is_new_pending_data_more_advanced = current_pending_parent_hash
!= new_pending_data.block.parent_block_hash
|| new_pending_data.block.transactions.len() > current_pending_num_transactions;
if is_new_pending_data_more_advanced {
debug!("Received new pending data.");
trace!("Pending data: {new_pending_data:#?}.");
*pending_data.write().await = new_pending_data;
Expand Down
169 changes: 131 additions & 38 deletions crates/papyrus_sync/src/sync_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,18 @@ use indexmap::IndexMap;
use papyrus_storage::base_layer::BaseLayerStorageReader;
use papyrus_storage::header::HeaderStorageWriter;
use papyrus_storage::test_utils::get_test_storage;
use papyrus_storage::StorageWriter;
use papyrus_storage::{StorageReader, StorageWriter};
use pretty_assertions::assert_eq;
use starknet_api::block::{BlockHash, BlockHeader, BlockNumber, GasPrice};
use starknet_api::block::{BlockHash, BlockHeader, BlockNumber};
use starknet_api::core::{ClassHash, CompiledClassHash, ContractAddress, Nonce, PatriciaKey};
use starknet_api::deprecated_contract_class::ContractClass as DeprecatedContractClass;
use starknet_api::hash::{StarkFelt, StarkHash};
use starknet_api::hash::{StarkFelt, StarkHash, GENESIS_HASH};
use starknet_api::state::{ContractClass, StateDiff, StorageKey};
use starknet_api::{patricia_key, stark_felt};
use starknet_client::reader::objects::pending_data::PendingBlock;
use starknet_client::reader::objects::transaction::Transaction as ClientTransaction;
use starknet_client::reader::PendingData;
use test_utils::{get_rng, GetTestInstance};
use tokio::sync::RwLock;

use crate::sources::base_layer::MockBaseLayerSourceTrait;
Expand Down Expand Up @@ -223,52 +226,142 @@ fn add_headers(headers_num: u64, writer: &mut StorageWriter) {
}
}

#[tokio::test]
async fn pending_sync() {
// Storage with one default block header.
let (reader, mut writer) = get_test_storage().0;
writer
.begin_rw_txn()
.unwrap()
.append_header(BlockNumber(0), &BlockHeader::default())
.unwrap()
.commit()
.unwrap();

async fn test_pending_sync(
reader: StorageReader,
old_pending_data: PendingData,
new_pending_datas: Vec<PendingData>,
expected_pending_data: PendingData,
non_existing_hash: BlockHash,
) {
let mut mock_pending_source = MockPendingSourceTrait::new();
let pending_data_lock = Arc::new(RwLock::new(old_pending_data));

const PENDING_QUERIES: usize = 2;
for call_count in 0..=PENDING_QUERIES {
mock_pending_source.expect_get_pending_data().times(1).returning(move || {
let mut block = PendingData::default();
block.block.parent_block_hash = BlockHash::default();
block.block.gas_price = GasPrice(call_count as u128);
Ok(block)
});
for new_pending_data in new_pending_datas {
mock_pending_source
.expect_get_pending_data()
.times(1)
.return_once(move || Ok(new_pending_data));
}

// A different parent block hash than the last block in the database tells that a new block was
// created, and pending sync should wait until the new block is written to the storage. so
// this pending data should not be written.
mock_pending_source.expect_get_pending_data().times(1).returning(|| {
let mut block = PendingData::default();
block.block.parent_block_hash = BlockHash(stark_felt!("0x1"));
Ok(block)
// The syncing will stop once we see a new parent_block_hash in the pending data. It won't
// store the pending data with the new hash in that case.
mock_pending_source.expect_get_pending_data().times(1).return_once(move || {
Ok(PendingData {
block: PendingBlock { parent_block_hash: non_existing_hash, ..Default::default() },
..Default::default()
})
});

let pending_data = Arc::new(RwLock::new(PendingData::default()));

sync_pending_data(
reader,
Arc::new(mock_pending_source),
pending_data.clone(),
Duration::from_millis(1),
pending_data_lock.clone(),
Duration::ZERO,
)
.await
.unwrap();

// The Last query for pending data (with parent block hash 0x1) should not be written so the gas
// price should PENDING_QUERIES.
assert_eq!(pending_data.read().await.block.parent_block_hash, BlockHash::default());
assert_eq!(pending_data.read().await.block.gas_price, GasPrice(PENDING_QUERIES as u128));
assert_eq!(pending_data_lock.read().await.clone(), expected_pending_data);
}

#[tokio::test]
async fn pending_sync_advances_only_when_new_data_has_more_transactions() {
let genesis_hash = BlockHash(stark_felt!(GENESIS_HASH));
// Storage with no block headers.
let (reader, _) = get_test_storage().0;
let mut rng = get_rng();

let old_pending_data = PendingData {
block: PendingBlock {
parent_block_hash: genesis_hash,
transactions: vec![ClientTransaction::get_test_instance(&mut rng)],
..Default::default()
},
..Default::default()
};
let advanced_pending_data = PendingData {
block: PendingBlock {
parent_block_hash: genesis_hash,
transactions: vec![
ClientTransaction::get_test_instance(&mut rng),
ClientTransaction::get_test_instance(&mut rng),
ClientTransaction::get_test_instance(&mut rng),
],
..Default::default()
},
..Default::default()
};
let less_advanced_pending_data = PendingData {
block: PendingBlock {
parent_block_hash: genesis_hash,
transactions: vec![
ClientTransaction::get_test_instance(&mut rng),
ClientTransaction::get_test_instance(&mut rng),
],
..Default::default()
},
..Default::default()
};
let expected_pending_data = advanced_pending_data.clone();
test_pending_sync(
reader,
old_pending_data,
vec![advanced_pending_data, less_advanced_pending_data],
expected_pending_data,
BlockHash(StarkHash::ONE),
)
.await
}

#[tokio::test]
async fn pending_sync_new_data_has_more_advanced_hash_and_less_transactions() {
const FIRST_BLOCK_HASH: BlockHash = BlockHash(StarkHash::ONE);
let genesis_hash = BlockHash(stark_felt!(GENESIS_HASH));
// Storage with one block header.
let (reader, mut writer) = get_test_storage().0;
writer
.begin_rw_txn()
.unwrap()
.append_header(
BlockNumber(0),
&BlockHeader {
block_hash: FIRST_BLOCK_HASH,
parent_hash: genesis_hash,
block_number: BlockNumber(0),
..Default::default()
},
)
.unwrap()
.commit()
.unwrap();
let mut rng = get_rng();

let old_pending_data = PendingData {
block: PendingBlock {
parent_block_hash: genesis_hash,
transactions: vec![
ClientTransaction::get_test_instance(&mut rng),
ClientTransaction::get_test_instance(&mut rng),
],
..Default::default()
},
..Default::default()
};
let new_pending_data = PendingData {
block: PendingBlock {
parent_block_hash: FIRST_BLOCK_HASH,
transactions: vec![ClientTransaction::get_test_instance(&mut rng)],
..Default::default()
},
..Default::default()
};
let expected_pending_data = new_pending_data.clone();
test_pending_sync(
reader,
old_pending_data,
vec![new_pending_data],
expected_pending_data,
BlockHash(StarkHash::TWO),
)
.await
}

0 comments on commit d4b043f

Please sign in to comment.