Skip to content
This repository has been archived by the owner on Dec 26, 2024. It is now read-only.

feat(sync): central class cache #1279

Merged
merged 1 commit into from
Oct 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ jsonrpsee = "0.18.1"
jsonschema = "0.17.0"
lazy_static = "1.4.0"
libmdbx = "0.3.5"
lru = "0.12.0"
memmap2 = "0.8.0"
metrics = "0.21.0"
metrics-exporter-prometheus = "0.12.1"
Expand Down
7 changes: 6 additions & 1 deletion config/default_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@
"privacy": "Public",
"value": "0xc662c410C0ECf747543f5bA90660f6ABeBD9C8c4"
},
"central.class_cache_size": {
"description": "Size of class cache, must be a positive integer.",
"privacy": "Public",
"value": 30
},
"central.concurrent_requests": {
"description": "Maximum number of concurrent requests to Starknet feeder-gateway for getting a type of data (for example, blocks).",
"privacy": "Public",
Expand Down Expand Up @@ -189,4 +194,4 @@
"privacy": "Public",
"value": 1000
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@ expression: dumped_default_config
"value": "0xc662c410C0ECf747543f5bA90660f6ABeBD9C8c4",
"privacy": "Public"
},
"central.class_cache_size": {
"description": "Size of class cache, must be a positive integer.",
"value": {
"$serde_json::private::Number": "30"
},
"privacy": "Public"
},
"central.concurrent_requests": {
"description": "Maximum number of concurrent requests to Starknet feeder-gateway for getting a type of data (for example, blocks).",
"value": {
Expand Down
1 change: 1 addition & 0 deletions crates/papyrus_sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ hex.workspace = true
indexmap = { workspace = true, features = ["serde"] }
itertools.workspace = true
libmdbx = { workspace = true, features = ["lifetimed-bytes"] }
lru.workspace = true
metrics.workspace = true
papyrus_storage = { path = "../papyrus_storage", version = "0.0.5" }
papyrus_base_layer = { path = "../papyrus_base_layer" }
Expand Down
21 changes: 19 additions & 2 deletions crates/papyrus_sync/src/sources/central.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ mod central_test;
mod state_update_stream;

use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
use std::num::NonZeroUsize;
use std::sync::{Arc, Mutex};

use async_stream::stream;
use async_trait::async_trait;
Expand All @@ -13,6 +14,7 @@ use futures::stream::BoxStream;
use futures_util::StreamExt;
use indexmap::IndexMap;
use itertools::chain;
use lru::LruCache;
#[cfg(test)]
use mockall::automock;
use papyrus_common::BlockHashAndNumber;
Expand Down Expand Up @@ -49,6 +51,8 @@ pub struct CentralSourceConfig {
pub max_state_updates_to_download: usize,
pub max_state_updates_to_store_in_memory: usize,
pub max_classes_to_download: usize,
// TODO(dan): validate that class_cache_size is a positive integer.
pub class_cache_size: usize,
pub retry_config: RetryConfig,
}

Expand All @@ -61,6 +65,7 @@ impl Default for CentralSourceConfig {
max_state_updates_to_download: 20,
max_state_updates_to_store_in_memory: 20,
max_classes_to_download: 20,
class_cache_size: 30,
retry_config: RetryConfig {
retry_base_millis: 30,
retry_max_delay_millis: 30000,
Expand Down Expand Up @@ -110,6 +115,12 @@ impl SerializeConfig for CentralSourceConfig {
"Maximum number of classes to download at a given time.",
ParamPrivacyInput::Public,
),
ser_param(
"class_cache_size",
&self.class_cache_size,
"Size of class cache, must be a positive integer.",
ParamPrivacyInput::Public,
),
]);
chain!(self_params_dump, append_sub_config_name(self.retry_config.dump(), "retry_config"))
.collect()
Expand All @@ -121,10 +132,11 @@ pub struct GenericCentralSource<TStarknetClient: StarknetReader + Send + Sync> {
pub starknet_client: Arc<TStarknetClient>,
pub storage_reader: StorageReader,
pub state_update_stream_config: StateUpdateStreamConfig,
pub(crate) class_cache: Arc<Mutex<LruCache<ClassHash, ApiContractClass>>>,
}

#[derive(Clone)]
enum ApiContractClass {
pub(crate) enum ApiContractClass {
DeprecatedContractClass(starknet_api::deprecated_contract_class::ContractClass),
ContractClass(starknet_api::state::ContractClass),
}
Expand Down Expand Up @@ -251,6 +263,7 @@ impl<TStarknetClient: StarknetReader + Send + Sync + 'static> CentralSourceTrait
self.starknet_client.clone(),
self.storage_reader.clone(),
self.state_update_stream_config.clone(),
self.class_cache.clone(),
)
.boxed()
}
Expand Down Expand Up @@ -395,6 +408,10 @@ impl CentralSource {
max_state_updates_to_store_in_memory: config.max_state_updates_to_store_in_memory,
max_classes_to_download: config.max_classes_to_download,
},
class_cache: Arc::from(Mutex::new(LruCache::new(
NonZeroUsize::new(config.class_cache_size)
.expect("class_cache_size should be a positive integer."),
))),
})
}
}
32 changes: 30 additions & 2 deletions crates/papyrus_sync/src/sources/central/state_update_stream.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use std::collections::VecDeque;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::task::Poll;

use futures_util::stream::FuturesOrdered;
use futures_util::{Future, Stream, StreamExt};
use indexmap::IndexMap;
use lru::LruCache;
use papyrus_storage::state::StateStorageReader;
use papyrus_storage::StorageReader;
use starknet_api::block::BlockNumber;
Expand Down Expand Up @@ -39,6 +40,7 @@ pub(crate) struct StateUpdateStream<TStarknetClient: StarknetReader + Send + 'st
classes_to_download: VecDeque<ClassHash>,
download_class_tasks: TasksQueue<CentralResult<Option<ApiContractClass>>>,
downloaded_classes: VecDeque<ApiContractClass>,
class_cache: Arc<Mutex<LruCache<ClassHash, ApiContractClass>>>,
config: StateUpdateStreamConfig,
}

Expand Down Expand Up @@ -89,6 +91,7 @@ impl<TStarknetClient: StarknetReader + Send + Sync + 'static> StateUpdateStream<
starknet_client: Arc<TStarknetClient>,
storage_reader: StorageReader,
config: StateUpdateStreamConfig,
class_cache: Arc<Mutex<LruCache<ClassHash, ApiContractClass>>>,
) -> Self {
StateUpdateStream {
initial_block_number,
Expand All @@ -107,6 +110,7 @@ impl<TStarknetClient: StarknetReader + Send + Sync + 'static> StateUpdateStream<
config.max_state_updates_to_store_in_memory * 5,
),
config,
class_cache,
}
}

Expand Down Expand Up @@ -151,7 +155,9 @@ impl<TStarknetClient: StarknetReader + Send + Sync + 'static> StateUpdateStream<
};
let starknet_client = self.starknet_client.clone();
let storage_reader = self.storage_reader.clone();
let cache = self.class_cache.clone();
self.download_class_tasks.push_back(Box::pin(download_class_if_necessary(
cache,
class_hash,
starknet_client,
storage_reader,
Expand Down Expand Up @@ -330,10 +336,18 @@ fn client_to_central_state_update(
// If not found in the storage, the class is downloaded.
#[instrument(skip(starknet_client, storage_reader), level = "debug", err)]
async fn download_class_if_necessary<TStarknetClient: StarknetReader>(
cache: Arc<Mutex<LruCache<ClassHash, ApiContractClass>>>,
class_hash: ClassHash,
starknet_client: Arc<TStarknetClient>,
storage_reader: StorageReader,
) -> CentralResult<Option<ApiContractClass>> {
{
let mut cache = cache.lock().expect("Failed to lock class cache.");
if let Some(class) = cache.get(&class_hash) {
return Ok(Some(class.clone()));
}
}

let txn = storage_reader.begin_ro_txn()?;
let state_reader = txn.get_state_reader()?;
let block_number = txn.get_state_marker()?;
Expand All @@ -342,6 +356,10 @@ async fn download_class_if_necessary<TStarknetClient: StarknetReader>(
// Check declared classes.
if let Ok(Some(class)) = state_reader.get_class_definition_at(state_number, &class_hash) {
trace!("Class {:?} retrieved from storage.", class_hash);
{
let mut cache = cache.lock().expect("Failed to lock class cache.");
cache.put(class_hash, ApiContractClass::ContractClass(class.clone()));
}
return Ok(Some(ApiContractClass::ContractClass(class)));
};

Expand All @@ -350,6 +368,10 @@ async fn download_class_if_necessary<TStarknetClient: StarknetReader>(
state_reader.get_deprecated_class_definition_at(state_number, &class_hash)
{
trace!("Deprecated class {:?} retrieved from storage.", class_hash);
{
let mut cache = cache.lock().expect("Failed to lock class cache.");
cache.put(class_hash, ApiContractClass::DeprecatedContractClass(class.clone()));
}
return Ok(Some(ApiContractClass::DeprecatedContractClass(class)));
}

Expand All @@ -358,6 +380,12 @@ async fn download_class_if_necessary<TStarknetClient: StarknetReader>(
let client_class = starknet_client.class_by_hash(class_hash).await.map_err(Arc::new)?;
match client_class {
None => Ok(None),
Some(class) => Ok(Some(class.into())),
Some(class) => {
{
let mut cache = cache.lock().expect("Failed to lock class cache.");
cache.put(class_hash, class.clone().into());
}
Ok(Some(class.into()))
}
}
}
15 changes: 14 additions & 1 deletion crates/papyrus_sync/src/sources/central_test.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use std::sync::Arc;
use std::num::NonZeroUsize;
use std::sync::{Arc, Mutex};

use assert_matches::assert_matches;
use cairo_lang_starknet::casm_contract_class::CasmContractClass;
use futures_util::pin_mut;
use indexmap::{indexmap, IndexMap};
use lru::LruCache;
use mockall::predicate;
use papyrus_storage::state::StateStorageWriter;
use papyrus_storage::test_utils::get_test_storage;
Expand Down Expand Up @@ -38,6 +40,7 @@ use starknet_client::ClientError;
use tokio_stream::StreamExt;

use super::state_update_stream::StateUpdateStreamConfig;
use super::ApiContractClass;
use crate::sources::central::{CentralError, CentralSourceTrait, GenericCentralSource};

const TEST_CONCURRENT_REQUESTS: usize = 300;
Expand All @@ -58,6 +61,7 @@ async fn last_block_number() {
concurrent_requests: TEST_CONCURRENT_REQUESTS,
storage_reader: reader,
state_update_stream_config: state_update_stream_config_for_test(),
class_cache: get_test_class_cache(),
};

let last_block_number = central_source.get_latest_block().await.unwrap().unwrap().block_number;
Expand All @@ -83,6 +87,7 @@ async fn stream_block_headers() {
starknet_client: Arc::new(mock),
storage_reader: reader,
state_update_stream_config: state_update_stream_config_for_test(),
class_cache: get_test_class_cache(),
};

let mut expected_block_num = BlockNumber(START_BLOCK_NUMBER);
Expand Down Expand Up @@ -120,6 +125,7 @@ async fn stream_block_headers_some_are_missing() {
starknet_client: Arc::new(mock),
storage_reader: reader,
state_update_stream_config: state_update_stream_config_for_test(),
class_cache: get_test_class_cache(),
};

let mut expected_block_num = BlockNumber(START_BLOCK_NUMBER);
Expand Down Expand Up @@ -172,6 +178,7 @@ async fn stream_block_headers_error() {
starknet_client: Arc::new(mock),
storage_reader: reader,
state_update_stream_config: state_update_stream_config_for_test(),
class_cache: get_test_class_cache(),
};

let mut expected_block_num = BlockNumber(START_BLOCK_NUMBER);
Expand Down Expand Up @@ -308,6 +315,7 @@ async fn stream_state_updates() {
starknet_client: Arc::new(mock),
storage_reader: reader,
state_update_stream_config: state_update_stream_config_for_test(),
class_cache: get_test_class_cache(),
};
let initial_block_num = BlockNumber(START_BLOCK_NUMBER);

Expand Down Expand Up @@ -418,6 +426,7 @@ async fn stream_compiled_classes() {
starknet_client: Arc::new(mock),
storage_reader: reader,
state_update_stream_config: state_update_stream_config_for_test(),
class_cache: get_test_class_cache(),
};

let stream = central_source.stream_compiled_classes(BlockNumber(0), BlockNumber(2));
Expand All @@ -442,3 +451,7 @@ fn state_update_stream_config_for_test() -> StateUpdateStreamConfig {
max_classes_to_download: 10,
}
}

fn get_test_class_cache() -> Arc<Mutex<LruCache<ClassHash, ApiContractClass>>> {
Arc::from(Mutex::new(LruCache::new(NonZeroUsize::new(2).unwrap())))
}
Loading