diff --git a/resource/ckb.toml b/resource/ckb.toml index d2ed2f4c4c1..67147e41713 100644 --- a/resource/ckb.toml +++ b/resource/ckb.toml @@ -202,8 +202,12 @@ block_uncles_cache_size = 30 # block_filter = "block.header.number.to_uint() > \"10000000\".to_uint()" # cell_filter = "let script = output.type;script.code_hash == \"0xbbad126377d45f90a8ee120da988a2d7332c78ba8fd679aab478a19d6c133494\" && script.hash_type == \"data1\"" # -# [indexer_v2.indexer_r] # # The 'ckb-indexer-r' is an indexer built on relational databases. +# [indexer_v2.indexer_r] +# # The initial tip number and hash can be set as the starting height for building indexes in indexer-r. Effective only during the initial index creation. +# init_tip_number = 0 +# # The initial tip hash must match the tip number; otherwise, it will result in a rollback. +# init_tip_hash = "0x92b197aa1fba0f63633922c61c92375c9c074a93e85963554f5499fe1450d0e5" # # By default, it uses an embedded SQLite database. # # Alternatively, you can prepare a PostgreSQL database service and provide the connection parameters. # db_type = "postgres" diff --git a/util/app-config/src/configs/indexer_r.rs b/util/app-config/src/configs/indexer_r.rs index 4bfcfb48192..615f7802be8 100644 --- a/util/app-config/src/configs/indexer_r.rs +++ b/util/app-config/src/configs/indexer_r.rs @@ -1,3 +1,4 @@ +use ckb_types::H256; use serde::{Deserialize, Serialize}; use std::{default::Default, path::PathBuf}; @@ -27,6 +28,12 @@ impl ToString for DBDriver { /// IndexerR config options. #[derive(Clone, Debug, Serialize, Deserialize)] pub struct IndexerRConfig { + /// The init tip block number + #[serde(default)] + pub init_tip_number: Option, + /// The init tip block hash + #[serde(default)] + pub init_tip_hash: Option, /// IndexerR database type. #[serde(default)] pub db_type: DBDriver, @@ -54,6 +61,8 @@ pub struct IndexerRConfig { impl Default for IndexerRConfig { fn default() -> Self { Self { + init_tip_number: None, + init_tip_hash: None, db_type: DBDriver::default(), store: PathBuf::default(), db_name: default_db_name(), diff --git a/util/indexer-r/src/indexer/insert.rs b/util/indexer-r/src/indexer/insert.rs index e62493807fd..1d413a80b17 100644 --- a/util/indexer-r/src/indexer/insert.rs +++ b/util/indexer-r/src/indexer/insert.rs @@ -38,13 +38,11 @@ pub(crate) async fn append_block( } pub(crate) async fn append_block_with_filter_mode( - block_view: &BlockView, + block_hash: &[u8], + block_number: i64, tx: &mut Transaction<'_, Any>, ) -> Result<(), Error> { - let block_row = ( - block_view.hash().raw_data().to_vec(), - block_view.number() as i32, - ); + let block_row = (block_hash, block_number); // insert block // build query str diff --git a/util/indexer-r/src/indexer/mod.rs b/util/indexer-r/src/indexer/mod.rs index cb535cda6be..995afb4498a 100644 --- a/util/indexer-r/src/indexer/mod.rs +++ b/util/indexer-r/src/indexer/mod.rs @@ -140,7 +140,12 @@ impl AsyncIndexerR { append_block(block, &mut tx).await?; self.insert_transactions(block, &mut tx).await?; } else { - append_block_with_filter_mode(block, &mut tx).await?; + append_block_with_filter_mode( + &block.hash().raw_data().to_vec(), + block.number() as i64, + &mut tx, + ) + .await?; } tx.commit().await.map_err(|err| Error::DB(err.to_string())) } diff --git a/util/indexer-r/src/store/mod.rs b/util/indexer-r/src/store/mod.rs index 0cc20d37963..afb01976e10 100644 --- a/util/indexer-r/src/store/mod.rs +++ b/util/indexer-r/src/store/mod.rs @@ -1,5 +1,6 @@ pub mod page; +use crate::indexer::append_block_with_filter_mode; use page::COUNT_COLUMN; pub use page::{build_next_cursor, PaginationRequest, PaginationResponse}; @@ -91,13 +92,14 @@ impl SQLXPool { .set(pool) .map_err(|_| anyhow!("set pool failed!"))?; if require_init { - self.create_tables_for_sqlite().await?; + self.create_tables_for_sqlite(db_config).await?; } Ok(()) } DBDriver::Postgres => { let require_init = self.is_postgres_require_init(db_config).await?; - let uri = build_url_for_posgres(db_config); + let uri = build_url_for_postgres(db_config); + log::debug!("postgres uri: {}", uri); let mut connection_options = AnyConnectOptions::from_str(&uri)?; connection_options.log_statements(LevelFilter::Trace); let pool = pool_options.connect_with(connection_options).await?; @@ -105,7 +107,7 @@ impl SQLXPool { .set(pool) .map_err(|_| anyhow!("set pool failed"))?; if require_init { - self.create_tables_for_postgres().await?; + self.create_tables_for_postgres(db_config).await?; } Ok(()) } @@ -230,13 +232,21 @@ impl SQLXPool { self.max_conn } - async fn create_tables_for_sqlite(&self) -> Result<()> { + async fn create_tables_for_sqlite(&self, config: &IndexerRConfig) -> Result<()> { let mut tx = self.transaction().await?; sqlx::query(SQL_CREATE_SQLITE).execute(&mut *tx).await?; + if config.init_tip_hash.is_some() && config.init_tip_number.is_some() { + append_block_with_filter_mode( + config.init_tip_hash.clone().unwrap().as_bytes(), + config.init_tip_number.unwrap() as i64, + &mut tx, + ) + .await?; + } tx.commit().await.map_err(Into::into) } - async fn create_tables_for_postgres(&mut self) -> Result<()> { + async fn create_tables_for_postgres(&mut self, config: &IndexerRConfig) -> Result<()> { let mut tx = self.transaction().await?; let commands = SQL_CREATE_POSTGRES.split(';'); for command in commands { @@ -244,6 +254,14 @@ impl SQLXPool { sqlx::query(command).execute(&mut *tx).await?; } } + if config.init_tip_hash.is_some() && config.init_tip_number.is_some() { + append_block_with_filter_mode( + config.init_tip_hash.clone().unwrap().as_bytes(), + config.init_tip_number.unwrap() as i64, + &mut tx, + ) + .await?; + } tx.commit().await.map_err(Into::into) } @@ -251,7 +269,7 @@ impl SQLXPool { // Connect to the "postgres" database first let mut temp_config = db_config.clone(); temp_config.db_name = "postgres".to_string(); - let uri = build_url_for_posgres(&temp_config); + let uri = build_url_for_postgres(&temp_config); log::info!("postgres uri: {}", uri); let mut connection_options = AnyConnectOptions::from_str(&uri)?; connection_options.log_statements(LevelFilter::Trace); @@ -283,7 +301,7 @@ fn build_url_for_sqlite(db_config: &IndexerRConfig) -> String { db_config.db_type.to_string() + db_config.store.to_str().expect("get store path") } -fn build_url_for_posgres(db_config: &IndexerRConfig) -> String { +fn build_url_for_postgres(db_config: &IndexerRConfig) -> String { db_config.db_type.to_string() + db_config.db_user.as_str() + ":" diff --git a/util/indexer-r/src/tests/mod.rs b/util/indexer-r/src/tests/mod.rs index 87a05e9e23c..099a4877d11 100644 --- a/util/indexer-r/src/tests/mod.rs +++ b/util/indexer-r/src/tests/mod.rs @@ -143,3 +143,38 @@ async fn test_rollback_block() { .unwrap() ); } + +#[tokio::test] +async fn test_block_filter_and_rollback_block() { + let storage = connect_sqlite(MEMORY_DB).await; + let indexer = AsyncIndexerR::new( + storage.clone(), + 100, + 1000, + None, + CustomFilters::new( + Some("block.header.number.to_uint() >= \"0x1\".to_uint()"), + None, + ), + ); + + let data_path = String::from(BLOCK_DIR); + indexer + .append(&read_block_view(0, data_path.clone()).into()) + .await + .unwrap(); + + assert_eq!(1, storage.fetch_count("block").await.unwrap()); + assert_eq!(0, storage.fetch_count("ckb_transaction").await.unwrap()); + assert_eq!(0, storage.fetch_count("output").await.unwrap()); + assert_eq!(0, storage.fetch_count("input").await.unwrap()); + assert_eq!(0, storage.fetch_count("script").await.unwrap()); + + indexer.rollback().await.unwrap(); + + assert_eq!(0, storage.fetch_count("block").await.unwrap()); + assert_eq!(0, storage.fetch_count("ckb_transaction").await.unwrap()); + assert_eq!(0, storage.fetch_count("output").await.unwrap()); + assert_eq!(0, storage.fetch_count("input").await.unwrap()); + assert_eq!(0, storage.fetch_count("script").await.unwrap()); +}