Skip to content

Commit

Permalink
[indexer-alt] add prune impls for each pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
emmazzz authored and wlmyng committed Dec 19, 2024
1 parent 4a54772 commit 438479b
Show file tree
Hide file tree
Showing 12 changed files with 120 additions and 12 deletions.
11 changes: 10 additions & 1 deletion crates/sui-indexer-alt/src/handlers/ev_emit_mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
use std::{collections::BTreeSet, sync::Arc};

use anyhow::Result;
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor};
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor, PrunableRange};
use sui_indexer_alt_schema::{events::StoredEvEmitMod, schema::ev_emit_mod};
use sui_pg_db as db;
use sui_types::full_checkpoint_content::CheckpointData;
Expand Down Expand Up @@ -57,4 +58,12 @@ impl Handler for EvEmitMod {
.execute(conn)
.await?)
}

async fn prune(range: PrunableRange, conn: &mut db::Connection<'_>) -> Result<usize> {
let (from, to) = range.tx_interval();
let filter = ev_emit_mod::table
.filter(ev_emit_mod::tx_sequence_number.between(from as i64, to as i64 - 1));

Ok(diesel::delete(filter).execute(conn).await?)
}
}
11 changes: 10 additions & 1 deletion crates/sui-indexer-alt/src/handlers/ev_struct_inst.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
use std::{collections::BTreeSet, sync::Arc};

use anyhow::{Context, Result};
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor};
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor, PrunableRange};
use sui_indexer_alt_schema::{events::StoredEvStructInst, schema::ev_struct_inst};
use sui_pg_db as db;
use sui_types::full_checkpoint_content::CheckpointData;
Expand Down Expand Up @@ -60,4 +61,12 @@ impl Handler for EvStructInst {
.execute(conn)
.await?)
}

async fn prune(range: PrunableRange, conn: &mut db::Connection<'_>) -> Result<usize> {
let (from, to) = range.tx_interval();
let filter = ev_struct_inst::table
.filter(ev_struct_inst::tx_sequence_number.between(from as i64, to as i64 - 1));

Ok(diesel::delete(filter).execute(conn).await?)
}
}
11 changes: 10 additions & 1 deletion crates/sui-indexer-alt/src/handlers/kv_checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
use std::sync::Arc;

use anyhow::{Context, Result};
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor};
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor, PrunableRange};
use sui_indexer_alt_schema::{checkpoints::StoredCheckpoint, schema::kv_checkpoints};
use sui_pg_db as db;
use sui_types::full_checkpoint_content::CheckpointData;
Expand Down Expand Up @@ -38,4 +39,12 @@ impl Handler for KvCheckpoints {
.execute(conn)
.await?)
}

async fn prune(range: PrunableRange, conn: &mut db::Connection<'_>) -> Result<usize> {
let (from, to) = range.checkpoint_interval();
let filter = kv_checkpoints::table
.filter(kv_checkpoints::sequence_number.between(from as i64, to as i64 - 1));

Ok(diesel::delete(filter).execute(conn).await?)
}
}
11 changes: 10 additions & 1 deletion crates/sui-indexer-alt/src/handlers/kv_epoch_ends.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
use std::sync::Arc;

use anyhow::{bail, Context, Result};
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor};
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor, PrunableRange};
use sui_indexer_alt_schema::{epochs::StoredEpochEnd, schema::kv_epoch_ends};
use sui_pg_db as db;
use sui_types::{
Expand Down Expand Up @@ -125,4 +126,12 @@ impl Handler for KvEpochEnds {
.execute(conn)
.await?)
}

async fn prune(range: PrunableRange, conn: &mut db::Connection<'_>) -> Result<usize> {
let (from, to) = range.containing_epochs();
let filter =
kv_epoch_ends::table.filter(kv_epoch_ends::epoch.between(from as i64, to as i64 - 1));

Ok(diesel::delete(filter).execute(conn).await?)
}
}
11 changes: 10 additions & 1 deletion crates/sui-indexer-alt/src/handlers/kv_epoch_starts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
use std::sync::Arc;

use anyhow::{bail, Context, Result};
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor};
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor, PrunableRange};
use sui_indexer_alt_schema::{epochs::StoredEpochStart, schema::kv_epoch_starts};
use sui_pg_db as db;
use sui_types::{
Expand Down Expand Up @@ -72,4 +73,12 @@ impl Handler for KvEpochStarts {
.execute(conn)
.await?)
}

async fn prune(range: PrunableRange, conn: &mut db::Connection<'_>) -> Result<usize> {
let (from, to) = range.containing_epochs();
let filter = kv_epoch_starts::table
.filter(kv_epoch_starts::epoch.between(from as i64, to as i64 - 1));

Ok(diesel::delete(filter).execute(conn).await?)
}
}
11 changes: 10 additions & 1 deletion crates/sui-indexer-alt/src/handlers/kv_transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
use std::sync::Arc;

use anyhow::{Context, Result};
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor};
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor, PrunableRange};
use sui_indexer_alt_schema::{schema::kv_transactions, transactions::StoredTransaction};
use sui_pg_db as db;
use sui_types::full_checkpoint_content::CheckpointData;
Expand Down Expand Up @@ -66,4 +67,12 @@ impl Handler for KvTransactions {
.execute(conn)
.await?)
}

async fn prune(range: PrunableRange, conn: &mut db::Connection<'_>) -> Result<usize> {
let (from, to) = range.checkpoint_interval();
let filter = kv_transactions::table
.filter(kv_transactions::cp_sequence_number.between(from as i64, to as i64 - 1));

Ok(diesel::delete(filter).execute(conn).await?)
}
}
11 changes: 10 additions & 1 deletion crates/sui-indexer-alt/src/handlers/tx_affected_addresses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
use std::sync::Arc;

use anyhow::Result;
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use itertools::Itertools;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor};
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor, PrunableRange};
use sui_indexer_alt_schema::{
schema::tx_affected_addresses, transactions::StoredTxAffectedAddress,
};
Expand Down Expand Up @@ -69,4 +70,12 @@ impl Handler for TxAffectedAddresses {
.execute(conn)
.await?)
}

async fn prune(range: PrunableRange, conn: &mut db::Connection<'_>) -> Result<usize> {
let (from, to) = range.tx_interval();
let filter = tx_affected_addresses::table
.filter(tx_affected_addresses::tx_sequence_number.between(from as i64, to as i64 - 1));

Ok(diesel::delete(filter).execute(conn).await?)
}
}
11 changes: 10 additions & 1 deletion crates/sui-indexer-alt/src/handlers/tx_affected_objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
use std::sync::Arc;

use anyhow::Result;
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor};
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor, PrunableRange};
use sui_indexer_alt_schema::{schema::tx_affected_objects, transactions::StoredTxAffectedObject};
use sui_pg_db as db;
use sui_types::{effects::TransactionEffectsAPI, full_checkpoint_content::CheckpointData};
Expand Down Expand Up @@ -59,4 +60,12 @@ impl Handler for TxAffectedObjects {
.execute(conn)
.await?)
}

async fn prune(range: PrunableRange, conn: &mut db::Connection<'_>) -> Result<usize> {
let (from, to) = range.tx_interval();
let filter = tx_affected_objects::table
.filter(tx_affected_objects::tx_sequence_number.between(from as i64, to as i64 - 1));

Ok(diesel::delete(filter).execute(conn).await?)
}
}
11 changes: 10 additions & 1 deletion crates/sui-indexer-alt/src/handlers/tx_balance_changes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
use std::{collections::BTreeMap, sync::Arc};

use anyhow::{Context, Result};
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor};
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor, PrunableRange};
use sui_indexer_alt_schema::{
schema::tx_balance_changes,
transactions::{BalanceChange, StoredTxBalanceChange},
Expand Down Expand Up @@ -65,6 +66,14 @@ impl Handler for TxBalanceChanges {
.execute(conn)
.await?)
}

async fn prune(range: PrunableRange, conn: &mut db::Connection<'_>) -> Result<usize> {
let (from, to) = range.tx_interval();
let filter = tx_balance_changes::table
.filter(tx_balance_changes::tx_sequence_number.between(from as i64, to as i64 - 1));

Ok(diesel::delete(filter).execute(conn).await?)
}
}

/// Calculate balance changes based on the object's input and output objects.
Expand Down
11 changes: 10 additions & 1 deletion crates/sui-indexer-alt/src/handlers/tx_calls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
use std::sync::Arc;

use anyhow::{Ok, Result};
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor};
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor, PrunableRange};
use sui_indexer_alt_schema::{schema::tx_calls, transactions::StoredTxCalls};
use sui_pg_db as db;
use sui_types::full_checkpoint_content::CheckpointData;
Expand Down Expand Up @@ -62,4 +63,12 @@ impl Handler for TxCalls {
.execute(conn)
.await?)
}

async fn prune(range: PrunableRange, conn: &mut db::Connection<'_>) -> Result<usize> {
let (from, to) = range.tx_interval();
let filter = tx_calls::table
.filter(tx_calls::tx_sequence_number.between(from as i64, to as i64 - 1));

Ok(diesel::delete(filter).execute(conn).await?)
}
}
11 changes: 10 additions & 1 deletion crates/sui-indexer-alt/src/handlers/tx_digests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
use std::sync::Arc;

use anyhow::Result;
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor};
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor, PrunableRange};
use sui_indexer_alt_schema::{schema::tx_digests, transactions::StoredTxDigest};
use sui_pg_db as db;
use sui_types::full_checkpoint_content::CheckpointData;
Expand Down Expand Up @@ -49,4 +50,12 @@ impl Handler for TxDigests {
.execute(conn)
.await?)
}

async fn prune(range: PrunableRange, conn: &mut db::Connection<'_>) -> Result<usize> {
let (from, to) = range.tx_interval();
let filter = tx_digests::table
.filter(tx_digests::tx_sequence_number.between(from as i64, to as i64 - 1));

Ok(diesel::delete(filter).execute(conn).await?)
}
}
11 changes: 10 additions & 1 deletion crates/sui-indexer-alt/src/handlers/tx_kinds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
use std::sync::Arc;

use anyhow::Result;
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor};
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor, PrunableRange};
use sui_indexer_alt_schema::{
schema::tx_kinds,
transactions::{StoredKind, StoredTxKind},
Expand Down Expand Up @@ -60,4 +61,12 @@ impl Handler for TxKinds {
.execute(conn)
.await?)
}

async fn prune(range: PrunableRange, conn: &mut db::Connection<'_>) -> Result<usize> {
let (from, to) = range.tx_interval();
let filter = tx_kinds::table
.filter(tx_kinds::tx_sequence_number.between(from as i64, to as i64 - 1));

Ok(diesel::delete(filter).execute(conn).await?)
}
}

0 comments on commit 438479b

Please sign in to comment.