Skip to content

Commit

Permalink
adjust all prune fns to use PrunableRange::get_range if needed
Browse files Browse the repository at this point in the history
  • Loading branch information
wlmyng committed Dec 26, 2024
1 parent ceec5a8 commit 4726c70
Show file tree
Hide file tree
Showing 12 changed files with 75 additions and 51 deletions.
10 changes: 6 additions & 4 deletions crates/sui-indexer-alt/src/handlers/ev_emit_mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use std::{collections::BTreeSet, sync::Arc};
use anyhow::Result;
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor, PrunableRange};
use sui_indexer_alt_framework::handlers::cp_sequence_numbers::PrunableRange;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor};
use sui_indexer_alt_schema::{events::StoredEvEmitMod, schema::ev_emit_mod};
use sui_pg_db as db;
use sui_types::full_checkpoint_content::CheckpointData;
Expand Down Expand Up @@ -59,10 +60,11 @@ impl Handler for EvEmitMod {
.await?)
}

async fn prune(range: PrunableRange, conn: &mut db::Connection<'_>) -> Result<usize> {
let (from, to) = range.tx_interval();
async fn prune(from: u64, to: u64, conn: &mut db::Connection<'_>) -> Result<usize> {
let range_mapping = PrunableRange::get_range(conn, from, to).await?;
let (from_tx, to_tx) = range_mapping.tx_interval();
let filter = ev_emit_mod::table
.filter(ev_emit_mod::tx_sequence_number.between(from as i64, to as i64 - 1));
.filter(ev_emit_mod::tx_sequence_number.between(from_tx as i64, to_tx as i64 - 1));

Ok(diesel::delete(filter).execute(conn).await?)
}
Expand Down
10 changes: 6 additions & 4 deletions crates/sui-indexer-alt/src/handlers/ev_struct_inst.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use std::{collections::BTreeSet, sync::Arc};
use anyhow::{Context, Result};
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor, PrunableRange};
use sui_indexer_alt_framework::handlers::cp_sequence_numbers::PrunableRange;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor};
use sui_indexer_alt_schema::{events::StoredEvStructInst, schema::ev_struct_inst};
use sui_pg_db as db;
use sui_types::full_checkpoint_content::CheckpointData;
Expand Down Expand Up @@ -62,10 +63,11 @@ impl Handler for EvStructInst {
.await?)
}

async fn prune(range: PrunableRange, conn: &mut db::Connection<'_>) -> Result<usize> {
let (from, to) = range.tx_interval();
async fn prune(from: u64, to: u64, conn: &mut db::Connection<'_>) -> Result<usize> {
let range_mapping = PrunableRange::get_range(conn, from, to).await?;
let (from_tx, to_tx) = range_mapping.tx_interval();
let filter = ev_struct_inst::table
.filter(ev_struct_inst::tx_sequence_number.between(from as i64, to as i64 - 1));
.filter(ev_struct_inst::tx_sequence_number.between(from_tx as i64, to_tx as i64 - 1));

Ok(diesel::delete(filter).execute(conn).await?)
}
Expand Down
5 changes: 2 additions & 3 deletions crates/sui-indexer-alt/src/handlers/kv_checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::sync::Arc;
use anyhow::{Context, Result};
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor, PrunableRange};
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor};
use sui_indexer_alt_schema::{checkpoints::StoredCheckpoint, schema::kv_checkpoints};
use sui_pg_db as db;
use sui_types::full_checkpoint_content::CheckpointData;
Expand Down Expand Up @@ -40,8 +40,7 @@ impl Handler for KvCheckpoints {
.await?)
}

async fn prune(range: PrunableRange, conn: &mut db::Connection<'_>) -> Result<usize> {
let (from, to) = range.checkpoint_interval();
async fn prune(from: u64, to: u64, conn: &mut db::Connection<'_>) -> Result<usize> {
let filter = kv_checkpoints::table
.filter(kv_checkpoints::sequence_number.between(from as i64, to as i64 - 1));

Expand Down
12 changes: 7 additions & 5 deletions crates/sui-indexer-alt/src/handlers/kv_epoch_ends.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use std::sync::Arc;
use anyhow::{bail, Context, Result};
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor, PrunableRange};
use sui_indexer_alt_framework::handlers::cp_sequence_numbers::PrunableRange;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor};
use sui_indexer_alt_schema::{epochs::StoredEpochEnd, schema::kv_epoch_ends};
use sui_pg_db as db;
use sui_types::{
Expand Down Expand Up @@ -127,10 +128,11 @@ impl Handler for KvEpochEnds {
.await?)
}

async fn prune(range: PrunableRange, conn: &mut db::Connection<'_>) -> Result<usize> {
let (from, to) = range.containing_epochs();
let filter =
kv_epoch_ends::table.filter(kv_epoch_ends::epoch.between(from as i64, to as i64 - 1));
async fn prune(from: u64, to: u64, conn: &mut db::Connection<'_>) -> Result<usize> {
let range_mapping = PrunableRange::get_range(conn, from, to).await?;
let (from_epoch, to_epoch) = range_mapping.epoch_interval();
let filter = kv_epoch_ends::table
.filter(kv_epoch_ends::epoch.between(from_epoch as i64, to_epoch as i64 - 1));

Ok(diesel::delete(filter).execute(conn).await?)
}
Expand Down
10 changes: 6 additions & 4 deletions crates/sui-indexer-alt/src/handlers/kv_epoch_starts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use std::sync::Arc;
use anyhow::{bail, Context, Result};
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor, PrunableRange};
use sui_indexer_alt_framework::handlers::cp_sequence_numbers::PrunableRange;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor};
use sui_indexer_alt_schema::{epochs::StoredEpochStart, schema::kv_epoch_starts};
use sui_pg_db as db;
use sui_types::{
Expand Down Expand Up @@ -74,10 +75,11 @@ impl Handler for KvEpochStarts {
.await?)
}

async fn prune(range: PrunableRange, conn: &mut db::Connection<'_>) -> Result<usize> {
let (from, to) = range.containing_epochs();
async fn prune(from: u64, to: u64, conn: &mut db::Connection<'_>) -> Result<usize> {
let range_mapping = PrunableRange::get_range(conn, from, to).await?;
let (from_epoch, to_epoch) = range_mapping.epoch_interval();
let filter = kv_epoch_starts::table
.filter(kv_epoch_starts::epoch.between(from as i64, to as i64 - 1));
.filter(kv_epoch_starts::epoch.between(from_epoch as i64, to_epoch as i64 - 1));

Ok(diesel::delete(filter).execute(conn).await?)
}
Expand Down
10 changes: 6 additions & 4 deletions crates/sui-indexer-alt/src/handlers/kv_transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use std::sync::Arc;
use anyhow::{Context, Result};
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor, PrunableRange};
use sui_indexer_alt_framework::handlers::cp_sequence_numbers::PrunableRange;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor};
use sui_indexer_alt_schema::{schema::kv_transactions, transactions::StoredTransaction};
use sui_pg_db as db;
use sui_types::full_checkpoint_content::CheckpointData;
Expand Down Expand Up @@ -68,10 +69,11 @@ impl Handler for KvTransactions {
.await?)
}

async fn prune(range: PrunableRange, conn: &mut db::Connection<'_>) -> Result<usize> {
let (from, to) = range.checkpoint_interval();
async fn prune(from: u64, to: u64, conn: &mut db::Connection<'_>) -> Result<usize> {
let range_mapping = PrunableRange::get_range(conn, from, to).await?;
let (from_tx, to_tx) = range_mapping.tx_interval();
let filter = kv_transactions::table
.filter(kv_transactions::cp_sequence_number.between(from as i64, to as i64 - 1));
.filter(kv_transactions::cp_sequence_number.between(from_tx as i64, to_tx as i64 - 1));

Ok(diesel::delete(filter).execute(conn).await?)
}
Expand Down
13 changes: 8 additions & 5 deletions crates/sui-indexer-alt/src/handlers/tx_affected_addresses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use anyhow::Result;
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use itertools::Itertools;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor, PrunableRange};
use sui_indexer_alt_framework::handlers::cp_sequence_numbers::PrunableRange;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor};
use sui_indexer_alt_schema::{
schema::tx_affected_addresses, transactions::StoredTxAffectedAddress,
};
Expand Down Expand Up @@ -71,10 +72,12 @@ impl Handler for TxAffectedAddresses {
.await?)
}

async fn prune(range: PrunableRange, conn: &mut db::Connection<'_>) -> Result<usize> {
let (from, to) = range.tx_interval();
let filter = tx_affected_addresses::table
.filter(tx_affected_addresses::tx_sequence_number.between(from as i64, to as i64 - 1));
async fn prune(from: u64, to: u64, conn: &mut db::Connection<'_>) -> Result<usize> {
let range_mapping = PrunableRange::get_range(conn, from, to).await?;
let (from_tx, to_tx) = range_mapping.tx_interval();
let filter = tx_affected_addresses::table.filter(
tx_affected_addresses::tx_sequence_number.between(from_tx as i64, to_tx as i64 - 1),
);

Ok(diesel::delete(filter).execute(conn).await?)
}
Expand Down
13 changes: 8 additions & 5 deletions crates/sui-indexer-alt/src/handlers/tx_affected_objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use std::sync::Arc;
use anyhow::Result;
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor, PrunableRange};
use sui_indexer_alt_framework::handlers::cp_sequence_numbers::PrunableRange;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor};
use sui_indexer_alt_schema::{schema::tx_affected_objects, transactions::StoredTxAffectedObject};
use sui_pg_db as db;
use sui_types::{effects::TransactionEffectsAPI, full_checkpoint_content::CheckpointData};
Expand Down Expand Up @@ -61,10 +62,12 @@ impl Handler for TxAffectedObjects {
.await?)
}

async fn prune(range: PrunableRange, conn: &mut db::Connection<'_>) -> Result<usize> {
let (from, to) = range.tx_interval();
let filter = tx_affected_objects::table
.filter(tx_affected_objects::tx_sequence_number.between(from as i64, to as i64 - 1));
async fn prune(from: u64, to: u64, conn: &mut db::Connection<'_>) -> Result<usize> {
let range_mapping = PrunableRange::get_range(conn, from, to).await?;
let (from_tx, to_tx) = range_mapping.tx_interval();
let filter = tx_affected_objects::table.filter(
tx_affected_objects::tx_sequence_number.between(from_tx as i64, to_tx as i64 - 1),
);

Ok(diesel::delete(filter).execute(conn).await?)
}
Expand Down
13 changes: 8 additions & 5 deletions crates/sui-indexer-alt/src/handlers/tx_balance_changes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use std::{collections::BTreeMap, sync::Arc};
use anyhow::{Context, Result};
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor, PrunableRange};
use sui_indexer_alt_framework::handlers::cp_sequence_numbers::PrunableRange;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor};
use sui_indexer_alt_schema::{
schema::tx_balance_changes,
transactions::{BalanceChange, StoredTxBalanceChange},
Expand Down Expand Up @@ -67,10 +68,12 @@ impl Handler for TxBalanceChanges {
.await?)
}

async fn prune(range: PrunableRange, conn: &mut db::Connection<'_>) -> Result<usize> {
let (from, to) = range.tx_interval();
let filter = tx_balance_changes::table
.filter(tx_balance_changes::tx_sequence_number.between(from as i64, to as i64 - 1));
async fn prune(from: u64, to: u64, conn: &mut db::Connection<'_>) -> Result<usize> {
let range_mapping = PrunableRange::get_range(conn, from, to).await?;
let (from_tx, to_tx) = range_mapping.tx_interval();
let filter = tx_balance_changes::table.filter(
tx_balance_changes::tx_sequence_number.between(from_tx as i64, to_tx as i64 - 1),
);

Ok(diesel::delete(filter).execute(conn).await?)
}
Expand Down
10 changes: 6 additions & 4 deletions crates/sui-indexer-alt/src/handlers/tx_calls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use std::sync::Arc;
use anyhow::{Ok, Result};
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor, PrunableRange};
use sui_indexer_alt_framework::handlers::cp_sequence_numbers::PrunableRange;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor};
use sui_indexer_alt_schema::{schema::tx_calls, transactions::StoredTxCalls};
use sui_pg_db as db;
use sui_types::full_checkpoint_content::CheckpointData;
Expand Down Expand Up @@ -64,10 +65,11 @@ impl Handler for TxCalls {
.await?)
}

async fn prune(range: PrunableRange, conn: &mut db::Connection<'_>) -> Result<usize> {
let (from, to) = range.tx_interval();
async fn prune(from: u64, to: u64, conn: &mut db::Connection<'_>) -> Result<usize> {
let range_mapping = PrunableRange::get_range(conn, from, to).await?;
let (from_tx, to_tx) = range_mapping.tx_interval();
let filter = tx_calls::table
.filter(tx_calls::tx_sequence_number.between(from as i64, to as i64 - 1));
.filter(tx_calls::tx_sequence_number.between(from_tx as i64, to_tx as i64 - 1));

Ok(diesel::delete(filter).execute(conn).await?)
}
Expand Down
10 changes: 6 additions & 4 deletions crates/sui-indexer-alt/src/handlers/tx_digests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use std::sync::Arc;
use anyhow::Result;
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor, PrunableRange};
use sui_indexer_alt_framework::handlers::cp_sequence_numbers::PrunableRange;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor};
use sui_indexer_alt_schema::{schema::tx_digests, transactions::StoredTxDigest};
use sui_pg_db as db;
use sui_types::full_checkpoint_content::CheckpointData;
Expand Down Expand Up @@ -51,10 +52,11 @@ impl Handler for TxDigests {
.await?)
}

async fn prune(range: PrunableRange, conn: &mut db::Connection<'_>) -> Result<usize> {
let (from, to) = range.tx_interval();
async fn prune(from: u64, to: u64, conn: &mut db::Connection<'_>) -> Result<usize> {
let range_mapping = PrunableRange::get_range(conn, from, to).await?;
let (from_tx, to_tx) = range_mapping.tx_interval();
let filter = tx_digests::table
.filter(tx_digests::tx_sequence_number.between(from as i64, to as i64 - 1));
.filter(tx_digests::tx_sequence_number.between(from_tx as i64, to_tx as i64 - 1));

Ok(diesel::delete(filter).execute(conn).await?)
}
Expand Down
10 changes: 6 additions & 4 deletions crates/sui-indexer-alt/src/handlers/tx_kinds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use std::sync::Arc;
use anyhow::Result;
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor, PrunableRange};
use sui_indexer_alt_framework::handlers::cp_sequence_numbers::PrunableRange;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor};
use sui_indexer_alt_schema::{
schema::tx_kinds,
transactions::{StoredKind, StoredTxKind},
Expand Down Expand Up @@ -62,10 +63,11 @@ impl Handler for TxKinds {
.await?)
}

async fn prune(range: PrunableRange, conn: &mut db::Connection<'_>) -> Result<usize> {
let (from, to) = range.tx_interval();
async fn prune(from: u64, to: u64, conn: &mut db::Connection<'_>) -> Result<usize> {
let range_mapping = PrunableRange::get_range(conn, from, to).await?;
let (from_tx, to_tx) = range_mapping.tx_interval();
let filter = tx_kinds::table
.filter(tx_kinds::tx_sequence_number.between(from as i64, to as i64 - 1));
.filter(tx_kinds::tx_sequence_number.between(from_tx as i64, to_tx as i64 - 1));

Ok(diesel::delete(filter).execute(conn).await?)
}
Expand Down

0 comments on commit 4726c70

Please sign in to comment.