Skip to content

Commit

Permalink
Setting init tip number allows skipping the synchronization of previo…
Browse files Browse the repository at this point in the history
…us blocks
  • Loading branch information
EthanYuan committed Nov 21, 2023
1 parent c6373ca commit 6acdba4
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 14 deletions.
6 changes: 5 additions & 1 deletion resource/ckb.toml
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,12 @@ block_uncles_cache_size = 30
# block_filter = "block.header.number.to_uint() > \"10000000\".to_uint()"
# cell_filter = "let script = output.type;script.code_hash == \"0xbbad126377d45f90a8ee120da988a2d7332c78ba8fd679aab478a19d6c133494\" && script.hash_type == \"data1\""
#
# [indexer_v2.indexer_r]
# # The 'ckb-indexer-r' is an indexer built on relational databases.
# [indexer_v2.indexer_r]
# # The initial tip number and hash can be set as the starting height for building indexes in indexer-r. Effective only during the initial index creation.
# init_tip_number = 0
# # The initial tip hash must match the tip number; otherwise, it will result in a rollback.
# init_tip_hash = "0x92b197aa1fba0f63633922c61c92375c9c074a93e85963554f5499fe1450d0e5"
# # By default, it uses an embedded SQLite database.
# # Alternatively, you can prepare a PostgreSQL database service and provide the connection parameters.
# db_type = "postgres"
Expand Down
9 changes: 9 additions & 0 deletions util/app-config/src/configs/indexer_r.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use ckb_types::H256;
use serde::{Deserialize, Serialize};
use std::{default::Default, path::PathBuf};

Expand Down Expand Up @@ -27,6 +28,12 @@ impl ToString for DBDriver {
/// IndexerR config options.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct IndexerRConfig {
/// The init tip block number
#[serde(default)]
pub init_tip_number: Option<u64>,
/// The init tip block hash
#[serde(default)]
pub init_tip_hash: Option<H256>,
/// IndexerR database type.
#[serde(default)]
pub db_type: DBDriver,
Expand Down Expand Up @@ -54,6 +61,8 @@ pub struct IndexerRConfig {
impl Default for IndexerRConfig {
fn default() -> Self {
Self {
init_tip_number: None,
init_tip_hash: None,
db_type: DBDriver::default(),
store: PathBuf::default(),
db_name: default_db_name(),
Expand Down
8 changes: 3 additions & 5 deletions util/indexer-r/src/indexer/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,11 @@ pub(crate) async fn append_block(
}

pub(crate) async fn append_block_with_filter_mode(
block_view: &BlockView,
block_hash: &[u8],
block_number: i64,
tx: &mut Transaction<'_, Any>,
) -> Result<(), Error> {
let block_row = (
block_view.hash().raw_data().to_vec(),
block_view.number() as i32,
);
let block_row = (block_hash, block_number);

// insert block
// build query str
Expand Down
7 changes: 6 additions & 1 deletion util/indexer-r/src/indexer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,12 @@ impl AsyncIndexerR {
append_block(block, &mut tx).await?;
self.insert_transactions(block, &mut tx).await?;
} else {
append_block_with_filter_mode(block, &mut tx).await?;
append_block_with_filter_mode(
&block.hash().raw_data().to_vec(),
block.number() as i64,
&mut tx,
)
.await?;
}
tx.commit().await.map_err(|err| Error::DB(err.to_string()))
}
Expand Down
32 changes: 25 additions & 7 deletions util/indexer-r/src/store/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod page;

use crate::indexer::append_block_with_filter_mode;
use page::COUNT_COLUMN;
pub use page::{build_next_cursor, PaginationRequest, PaginationResponse};

Expand Down Expand Up @@ -91,21 +92,22 @@ impl SQLXPool {
.set(pool)
.map_err(|_| anyhow!("set pool failed!"))?;
if require_init {
self.create_tables_for_sqlite().await?;
self.create_tables_for_sqlite(db_config).await?;
}
Ok(())
}
DBDriver::Postgres => {
let require_init = self.is_postgres_require_init(db_config).await?;
let uri = build_url_for_posgres(db_config);
let uri = build_url_for_postgres(db_config);
log::debug!("postgres uri: {}", uri);
let mut connection_options = AnyConnectOptions::from_str(&uri)?;
connection_options.log_statements(LevelFilter::Trace);
let pool = pool_options.connect_with(connection_options).await?;
self.pool
.set(pool)
.map_err(|_| anyhow!("set pool failed"))?;
if require_init {
self.create_tables_for_postgres().await?;
self.create_tables_for_postgres(db_config).await?;
}
Ok(())
}
Expand Down Expand Up @@ -230,28 +232,44 @@ impl SQLXPool {
self.max_conn
}

async fn create_tables_for_sqlite(&self) -> Result<()> {
async fn create_tables_for_sqlite(&self, config: &IndexerRConfig) -> Result<()> {
let mut tx = self.transaction().await?;
sqlx::query(SQL_CREATE_SQLITE).execute(&mut *tx).await?;
if config.init_tip_hash.is_some() && config.init_tip_number.is_some() {
append_block_with_filter_mode(
config.init_tip_hash.clone().unwrap().as_bytes(),
config.init_tip_number.unwrap() as i64,
&mut tx,
)
.await?;
}
tx.commit().await.map_err(Into::into)
}

async fn create_tables_for_postgres(&mut self) -> Result<()> {
async fn create_tables_for_postgres(&mut self, config: &IndexerRConfig) -> Result<()> {
let mut tx = self.transaction().await?;
let commands = SQL_CREATE_POSTGRES.split(';');
for command in commands {
if !command.trim().is_empty() {
sqlx::query(command).execute(&mut *tx).await?;
}
}
if config.init_tip_hash.is_some() && config.init_tip_number.is_some() {
append_block_with_filter_mode(
config.init_tip_hash.clone().unwrap().as_bytes(),
config.init_tip_number.unwrap() as i64,
&mut tx,
)
.await?;
}
tx.commit().await.map_err(Into::into)
}

pub async fn is_postgres_require_init(&mut self, db_config: &IndexerRConfig) -> Result<bool> {
// Connect to the "postgres" database first
let mut temp_config = db_config.clone();
temp_config.db_name = "postgres".to_string();
let uri = build_url_for_posgres(&temp_config);
let uri = build_url_for_postgres(&temp_config);
log::info!("postgres uri: {}", uri);
let mut connection_options = AnyConnectOptions::from_str(&uri)?;
connection_options.log_statements(LevelFilter::Trace);
Expand Down Expand Up @@ -283,7 +301,7 @@ fn build_url_for_sqlite(db_config: &IndexerRConfig) -> String {
db_config.db_type.to_string() + db_config.store.to_str().expect("get store path")
}

fn build_url_for_posgres(db_config: &IndexerRConfig) -> String {
fn build_url_for_postgres(db_config: &IndexerRConfig) -> String {
db_config.db_type.to_string()
+ db_config.db_user.as_str()
+ ":"
Expand Down
35 changes: 35 additions & 0 deletions util/indexer-r/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,3 +143,38 @@ async fn test_rollback_block() {
.unwrap()
);
}

#[tokio::test]
async fn test_block_filter_and_rollback_block() {
let storage = connect_sqlite(MEMORY_DB).await;
let indexer = AsyncIndexerR::new(
storage.clone(),
100,
1000,
None,
CustomFilters::new(
Some("block.header.number.to_uint() >= \"0x1\".to_uint()"),
None,
),
);

let data_path = String::from(BLOCK_DIR);
indexer
.append(&read_block_view(0, data_path.clone()).into())
.await
.unwrap();

assert_eq!(1, storage.fetch_count("block").await.unwrap());
assert_eq!(0, storage.fetch_count("ckb_transaction").await.unwrap());
assert_eq!(0, storage.fetch_count("output").await.unwrap());
assert_eq!(0, storage.fetch_count("input").await.unwrap());
assert_eq!(0, storage.fetch_count("script").await.unwrap());

indexer.rollback().await.unwrap();

assert_eq!(0, storage.fetch_count("block").await.unwrap());
assert_eq!(0, storage.fetch_count("ckb_transaction").await.unwrap());
assert_eq!(0, storage.fetch_count("output").await.unwrap());
assert_eq!(0, storage.fetch_count("input").await.unwrap());
assert_eq!(0, storage.fetch_count("script").await.unwrap());
}

0 comments on commit 6acdba4

Please sign in to comment.