Skip to content

Commit

Permalink
Integrate with ObjInfo
Browse files Browse the repository at this point in the history
  • Loading branch information
lxfind committed Dec 26, 2024
1 parent b6ae76b commit b9e9130
Show file tree
Hide file tree
Showing 9 changed files with 64 additions and 126 deletions.
12 changes: 9 additions & 3 deletions crates/sui-indexer-alt-framework/src/pipeline/concurrent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,12 @@ pub trait Handler: Processor<Value: FieldCount> {

/// Clean up data between checkpoints `_from` and `_to` (inclusive) in the database, returning
/// the number of rows affected. This function is optional, and defaults to not pruning at all.
async fn prune(_from: u64, _to: u64, _conn: &mut db::Connection<'_>) -> anyhow::Result<usize> {
async fn prune(
&self,
_from: u64,
_to: u64,
_conn: &mut db::Connection<'_>,
) -> anyhow::Result<usize> {
Ok(0)
}
}
Expand Down Expand Up @@ -201,9 +206,10 @@ pub(crate) fn pipeline<H: Handler + Send + Sync + 'static>(
// the global cancel signal. We achieve this by creating a child cancel token that we call
// cancel on once the committer tasks have shutdown.
let pruner_cancel = cancel.child_token();
let handler = Arc::new(handler);

let processor = processor(
handler,
handler.clone(),
checkpoint_rx,
processor_tx,
metrics.clone(),
Expand Down Expand Up @@ -246,7 +252,7 @@ pub(crate) fn pipeline<H: Handler + Send + Sync + 'static>(
pruner_cancel.clone(),
);

let pruner = pruner::<H>(pruner_config, db, metrics, pruner_cancel.clone());
let pruner = pruner(handler, pruner_config, db, metrics, pruner_cancel.clone());

tokio::spawn(async move {
let (_, _, _, _) = futures::join!(processor, collector, committer, commit_watermark);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ use super::{Handler, PrunerConfig};
///
/// The task will shutdown if the `cancel` token is signalled. If the `config` is `None`, the task
/// will shutdown immediately.
pub(super) fn pruner<H: Handler + 'static>(
pub(super) fn pruner<H: Handler + Send + Sync + 'static>(
handler: Arc<H>,
config: Option<PrunerConfig>,
db: Db,
metrics: Arc<IndexerMetrics>,
Expand Down Expand Up @@ -136,7 +137,7 @@ pub(super) fn pruner<H: Handler + 'static>(
};

let (from, to) = watermark.next_chunk(config.max_chunk_size);
let affected = match H::prune(from, to, &mut conn).await {
let affected = match handler.prune(from, to, &mut conn).await {
Ok(affected) => {
guard.stop_and_record();
watermark.pruner_hi = to as i64;
Expand Down
3 changes: 1 addition & 2 deletions crates/sui-indexer-alt-framework/src/pipeline/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub trait Processor {
/// The task will shutdown if the `cancel` token is cancelled, or if any of the workers encounters
/// an error -- there is no retry logic at this level.
pub(super) fn processor<P: Processor + Send + Sync + 'static>(
processor: P,
processor: Arc<P>,
rx: mpsc::Receiver<Arc<CheckpointData>>,
tx: mpsc::Sender<IndexedCheckpoint<P>>,
metrics: Arc<IndexerMetrics>,
Expand All @@ -61,7 +61,6 @@ pub(super) fn processor<P: Processor + Send + Sync + 'static>(
&metrics.latest_processed_checkpoint_timestamp_lag_ms,
&metrics.latest_processed_checkpoint,
);
let processor = Arc::new(processor);

match ReceiverStream::new(rx)
.try_for_each_spawned(P::FANOUT, |checkpoint| {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ pub(crate) fn pipeline<H: Handler + Send + Sync + 'static>(
let (processor_tx, committer_rx) = mpsc::channel(H::FANOUT + PIPELINE_BUFFER);

let processor = processor(
handler,
Arc::new(handler),
checkpoint_rx,
processor_tx,
metrics.clone(),
Expand Down
9 changes: 3 additions & 6 deletions crates/sui-indexer-alt/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,6 @@ pub struct PipelineLayer {

// Concurrent pipelines with a lagged consistent pruner which is also a concurrent pipeline.
// Use concurrent layer for the pruner pipelines so that they could override checkpoint lag if needed.
pub obj_info: Option<CommitterLayer>,
pub obj_info_pruner: Option<ConcurrentLayer>,
pub coin_balance_buckets: Option<CommitterLayer>,
pub coin_balance_buckets_pruner: Option<ConcurrentLayer>,

Expand All @@ -150,6 +148,7 @@ pub struct PipelineLayer {
pub kv_objects: Option<ConcurrentLayer>,
pub kv_protocol_configs: Option<ConcurrentLayer>,
pub kv_transactions: Option<ConcurrentLayer>,
pub obj_info: Option<ConcurrentLayer>,
pub obj_versions: Option<ConcurrentLayer>,
pub tx_affected_addresses: Option<ConcurrentLayer>,
pub tx_affected_objects: Option<ConcurrentLayer>,
Expand Down Expand Up @@ -270,8 +269,6 @@ impl PipelineLayer {
PipelineLayer {
sum_displays: Some(Default::default()),
sum_packages: Some(Default::default()),
obj_info: Some(Default::default()),
obj_info_pruner: Some(Default::default()),
coin_balance_buckets: Some(Default::default()),
coin_balance_buckets_pruner: Some(Default::default()),
cp_sequence_numbers: Some(Default::default()),
Expand All @@ -284,6 +281,7 @@ impl PipelineLayer {
kv_objects: Some(Default::default()),
kv_protocol_configs: Some(Default::default()),
kv_transactions: Some(Default::default()),
obj_info: Some(Default::default()),
obj_versions: Some(Default::default()),
tx_affected_addresses: Some(Default::default()),
tx_affected_objects: Some(Default::default()),
Expand Down Expand Up @@ -405,8 +403,6 @@ impl Merge for PipelineLayer {
PipelineLayer {
sum_displays: self.sum_displays.merge(other.sum_displays),
sum_packages: self.sum_packages.merge(other.sum_packages),
obj_info: self.obj_info.merge(other.obj_info),
obj_info_pruner: self.obj_info_pruner.merge(other.obj_info_pruner),
coin_balance_buckets: self.coin_balance_buckets.merge(other.coin_balance_buckets),
coin_balance_buckets_pruner: self
.coin_balance_buckets_pruner
Expand All @@ -421,6 +417,7 @@ impl Merge for PipelineLayer {
kv_objects: self.kv_objects.merge(other.kv_objects),
kv_protocol_configs: self.kv_protocol_configs.merge(other.kv_protocol_configs),
kv_transactions: self.kv_transactions.merge(other.kv_transactions),
obj_info: self.obj_info.merge(other.obj_info),
obj_versions: self.obj_versions.merge(other.obj_versions),
tx_affected_addresses: self
.tx_affected_addresses
Expand Down
1 change: 0 additions & 1 deletion crates/sui-indexer-alt/src/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ pub(crate) mod kv_objects;
pub(crate) mod kv_protocol_configs;
pub(crate) mod kv_transactions;
pub(crate) mod obj_info;
pub(crate) mod obj_info_pruner;
pub(crate) mod obj_versions;
pub(crate) mod sum_displays;
pub(crate) mod sum_packages;
Expand Down
46 changes: 45 additions & 1 deletion crates/sui-indexer-alt/src/handlers/obj_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@
use std::{collections::BTreeMap, sync::Arc};

use anyhow::Result;
use diesel::sql_query;
use diesel_async::RunQueryDsl;
use sui_field_count::FieldCount;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor};
use sui_indexer_alt_schema::{objects::StoredObjInfo, schema::obj_info};
use sui_pg_db as db;
use sui_types::{base_types::ObjectID, full_checkpoint_content::CheckpointData, object::Object};

use crate::consistent_pruning::PruningLookupTable;
use crate::consistent_pruning::{PruningInfo, PruningLookupTable};

#[derive(Default)]
pub(crate) struct ObjInfo {
Expand Down Expand Up @@ -41,6 +42,7 @@ impl Processor for ObjInfo {
.map(|o| (o.id(), o))
.collect::<BTreeMap<_, _>>();
let mut values: BTreeMap<ObjectID, Self::Value> = BTreeMap::new();
let mut prune_info = PruningInfo::new();
for object_id in checkpoint_input_objects.keys() {
if !latest_live_output_objects.contains_key(object_id) {
// If an input object is not in the latest live output objects, it must have been deleted
Expand All @@ -54,6 +56,7 @@ impl Processor for ObjInfo {
update: ProcessedObjInfoUpdate::Delete(*object_id),
},
);
prune_info.add_deleted_object(*object_id);
}
}
for (object_id, object) in latest_live_output_objects.iter() {
Expand All @@ -71,8 +74,15 @@ impl Processor for ObjInfo {
update: ProcessedObjInfoUpdate::Insert((*object).clone()),
},
);
// We do not need to prune if the object was created in this checkpoint,
// because this object would not have been in the table prior to this checkpoint.
if checkpoint_input_objects.contains_key(object_id) {
prune_info.add_mutated_object(*object_id);
}
}
}
self.pruning_lookup_table
.insert(cp_sequence_number, prune_info);

Ok(values.into_values().collect())
}
Expand All @@ -91,6 +101,40 @@ impl Handler for ObjInfo {
.execute(conn)
.await?)
}

async fn prune(&self, from: u64, to: u64, conn: &mut db::Connection<'_>) -> Result<usize> {
use sui_indexer_alt_schema::schema::obj_info::dsl;

let to_prune = self.pruning_lookup_table.take(from, to)?;

// For each (object_id, cp_sequence_number_exclusive), delete all entries in obj_info with
// cp_sequence_number less than cp_sequence_number_exclusive that match the object_id.

let values = to_prune
.iter()
.map(|(object_id, seq_number)| {
let object_id_hex = hex::encode(object_id);
format!("('\\x{}'::BYTEA, {}::BIGINT)", object_id_hex, seq_number)
})
.collect::<Vec<_>>()
.join(",");
let query = format!(
"
WITH to_prune_data (object_id, cp_sequence_number_exclusive) AS (
VALUES {}
)
DELETE FROM obj_info
USING to_prune_data
WHERE obj_info.{:?} = to_prune_data.object_id
AND obj_info.{:?} < to_prune_data.cp_sequence_number_exclusive
",
values,
dsl::object_id,
dsl::cp_sequence_number,
);
let rows_deleted = sql_query(query).execute(conn).await?;
Ok(rows_deleted)
}
}

impl FieldCount for ProcessedObjInfo {
Expand Down
102 changes: 0 additions & 102 deletions crates/sui-indexer-alt/src/handlers/obj_info_pruner.rs

This file was deleted.

10 changes: 2 additions & 8 deletions crates/sui-indexer-alt/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use bootstrap::bootstrap;
use config::{ConsistencyConfig, IndexerConfig, PipelineLayer};
use handlers::coin_balance_buckets::CoinBalanceBuckets;
use handlers::coin_balance_buckets_pruner::CoinBalanceBucketsPruner;
use handlers::obj_info_pruner::ObjInfoPruner;
use handlers::{
ev_emit_mod::EvEmitMod, ev_struct_inst::EvStructInst, kv_checkpoints::KvCheckpoints,
kv_epoch_ends::KvEpochEnds, kv_epoch_starts::KvEpochStarts, kv_feature_flags::KvFeatureFlags,
Expand Down Expand Up @@ -59,8 +58,6 @@ pub async fn start_indexer(
let PipelineLayer {
sum_displays,
sum_packages,
obj_info,
obj_info_pruner,
coin_balance_buckets,
coin_balance_buckets_pruner,
cp_sequence_numbers,
Expand All @@ -73,6 +70,7 @@ pub async fn start_indexer(
kv_objects,
kv_protocol_configs,
kv_transactions,
obj_info,
obj_versions,
tx_affected_addresses,
tx_affected_objects,
Expand Down Expand Up @@ -192,11 +190,6 @@ pub async fn start_indexer(
add_sequential!(SumDisplays, sum_displays);
add_sequential!(SumPackages, sum_packages);

add_consistent!(
ObjInfo::default(), obj_info;
ObjInfoPruner, obj_info_pruner
);

add_consistent!(
CoinBalanceBuckets, coin_balance_buckets;
CoinBalanceBucketsPruner, coin_balance_buckets_pruner
Expand All @@ -211,6 +204,7 @@ pub async fn start_indexer(
add_concurrent!(KvEpochStarts, kv_epoch_starts);
add_concurrent!(KvObjects, kv_objects);
add_concurrent!(KvTransactions, kv_transactions);
add_concurrent!(ObjInfo::default(), obj_info);
add_concurrent!(ObjVersions, obj_versions);
add_concurrent!(TxAffectedAddresses, tx_affected_addresses);
add_concurrent!(TxAffectedObjects, tx_affected_objects);
Expand Down

0 comments on commit b9e9130

Please sign in to comment.